fix cleanup on error
[moreutils.git] / parallel.c
blobd283b96101fbc89ef214436bab316cf1092536f9
1 /*
2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 * USA
22 #define _GNU_SOURCE
23 #include <unistd.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <stdlib.h>
29 #include <errno.h>
30 #include <sys/select.h>
31 #include <sys/types.h>
32 #include <sys/wait.h>
33 #include <unistd.h>
35 #if defined(__FreeBSD_kernel__)
36 #define WEXITED 0
37 #endif
39 void usage() {
40 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
41 "run command with argument, in parallel\n");
42 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
43 exit(1);
46 void exec_child(char **command, char **arguments, int replace_cb, int nargs) {
47 if (fork() != 0) {
48 return;
51 if (command[0]) {
52 char **argv;
53 int argc = 0;
54 int i;
55 char *s;
57 while (command[argc] != 0) {
58 argc++;
60 if (! replace_cb)
61 argc++;
62 argv = calloc(sizeof(char*), argc + nargs);
64 for (i = 0; i < argc; i++) {
65 while (replace_cb && (s=strstr(command[i], "{}"))) {
66 char *buf=malloc(strlen(command[i]) + strlen(arguments[0]));
67 s[0]='\0';
68 sprintf(buf, "%s%s%s", command[i], arguments[0], s+2);
69 command[i]=buf;
71 argv[i] = command[i];
73 if (! replace_cb)
74 memcpy(argv + i - 1, arguments, nargs * sizeof(char *));
75 execvp(argv[0], argv);
76 exit(1);
78 else {
79 int ret=system(arguments[0]);
80 if (WIFEXITED(ret)) {
81 exit(WEXITSTATUS(ret));
83 else {
84 exit(1);
87 return;
90 int wait_for_child(int options) {
91 id_t id_ignored = 0;
92 siginfo_t infop;
94 infop.si_pid = 0;
95 waitid(P_ALL, id_ignored, &infop, WEXITED | options);
96 if (infop.si_pid == 0) {
97 return -1; /* Nothing to wait for */
99 if (infop.si_code == CLD_EXITED) {
100 return infop.si_status;
102 return 1;
105 int main(int argc, char **argv) {
106 int maxjobs = -1;
107 int curjobs = 0;
108 double maxload = -1;
109 int argsatonce = 1;
110 int opt;
111 char **command = calloc(sizeof(char*), argc);
112 char **arguments = NULL;
113 int argidx = 0;
114 int arglen = 0;
115 int cidx = 0;
116 int returncode = 0;
117 int replace_cb = 0;
118 char *t;
120 while ((argv[optind] && strcmp(argv[optind], "--") != 0) &&
121 (opt = getopt(argc, argv, "+hij:l:n:")) != -1) {
122 switch (opt) {
123 case 'h':
124 usage();
125 break;
126 case 'i':
127 replace_cb = 1;
128 break;
129 case 'j':
130 errno = 0;
131 maxjobs = strtoul(optarg, &t, 0);
132 if (errno != 0 || (t-optarg) != strlen(optarg)) {
133 fprintf(stderr, "option '%s' is not a number\n",
134 optarg);
135 exit(2);
137 break;
138 case 'l':
139 errno = 0;
140 maxload = strtod(optarg, &t);
141 if (errno != 0 || (t-optarg) != strlen(optarg)) {
142 fprintf(stderr, "option '%s' is not a number\n",
143 optarg);
144 exit(2);
146 break;
147 case 'n':
148 errno = 0;
149 argsatonce = strtoul(optarg, &t, 0);
150 if (errno != 0 || argsatonce < 1 || (t-optarg) != strlen(optarg)) {
151 fprintf(stderr, "option '%s' is not a positive number\n",
152 optarg);
153 exit(2);
155 break;
156 default: /* ’?’ */
157 usage();
158 break;
162 if (replace_cb && argsatonce > 1) {
163 fprintf(stderr, "options -i and -n are incomaptible\n");
164 exit(2);
167 if (maxjobs < 0) {
168 #ifdef _SC_NPROCESSORS_ONLN
169 maxjobs = sysconf(_SC_NPROCESSORS_ONLN);
170 #else
171 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
172 maxjobs = 1;
173 #endif
176 while (optind < argc) {
177 if (strcmp(argv[optind], "--") == 0) {
178 int i;
180 optind++;
181 arglen = argc - optind;
182 arguments = calloc(sizeof(char *), arglen);
183 if (! arguments) {
184 exit(1);
187 for (i = 0; i < arglen; i++) {
188 arguments[i] = strdup(argv[optind + i]);
190 optind += i;
192 else {
193 command[cidx] = strdup(argv[optind]);
194 cidx++;
196 optind++;
199 if (argsatonce > 1 && ! command[0]) {
200 fprintf(stderr, "option -n cannot be used without a command\n");
201 exit(2);
204 while (argidx < arglen) {
205 double load;
207 getloadavg(&load, 1);
209 if ((maxjobs == 0 || curjobs < maxjobs) &&
210 (maxload < 0 || load < maxload)) {
212 if (argsatonce > arglen - argidx)
213 argsatonce = arglen - argidx;
214 exec_child(command, arguments + argidx,
215 replace_cb, argsatonce);
216 argidx += argsatonce;
217 curjobs++;
220 if (maxjobs == 0 || curjobs == maxjobs) {
221 returncode |= wait_for_child(0);
222 curjobs--;
225 if (maxload > 0 && load >= maxload) {
226 int r;
227 sleep(1); /* XXX We should have a better
228 * heurestic than this */
229 r = wait_for_child(WNOHANG);
230 if (r > 0)
231 returncode |= r;
232 if (r != -1)
233 curjobs--;
236 while (curjobs > 0) {
237 returncode |= wait_for_child(0);
238 curjobs--;
241 return returncode;