style cleanup
[moreutils.git] / parallel.c
blob851eed3f3118e8e0fa966f265a482d8368bfef92
1 /*
2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 * USA
22 #define _GNU_SOURCE
23 #include <unistd.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <stdlib.h>
29 #include <errno.h>
30 #include <sys/select.h>
31 #include <sys/types.h>
32 #include <sys/wait.h>
33 #include <unistd.h>
34 #include <signal.h>
36 #ifdef __sun
37 # include <sys/loadavg.h> /* getloadavg() */
38 #endif
40 #if !defined(WEXITED)
41 #define WEXITED 0
42 #endif
44 static pid_t pipe_child_stdout = 0;
45 static pid_t pipe_child_stderr = 0;
47 void usage() {
48 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
49 "run command with argument, in parallel\n");
50 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
51 exit(1);
54 static void redirect(int fd, int target_fd, const char *name)
56 if (fd == target_fd)
57 return;
59 if (dup2(fd, target_fd) < 0) {
60 fprintf(stderr, "unable to open %s from internal pipe: %s\n",
61 name, strerror(errno));
62 exit(1);
64 close(fd);
67 void exec_child(char **command, char **arguments, int replace_cb, int nargs,
68 int stdout_fd, int stderr_fd)
70 if (fork() != 0) {
71 return;
74 redirect(stdout_fd, 1, "stdout");
75 redirect(stderr_fd, 2, "stderr");
77 if (command[0]) {
78 char **argv;
79 int argc = 0;
80 int i;
81 char *s;
83 while (command[argc] != 0) {
84 argc++;
86 if (! replace_cb)
87 argc++;
88 argv = calloc(sizeof(char*), argc + nargs);
90 for (i = 0; i < argc; i++) {
91 while (replace_cb && (s=strstr(command[i], "{}"))) {
92 char *buf=malloc(strlen(command[i]) + strlen(arguments[0]));
93 s[0]='\0';
94 sprintf(buf, "%s%s%s", command[i], arguments[0], s+2);
95 command[i]=buf;
97 argv[i] = command[i];
99 if (! replace_cb)
100 memcpy(argv + i - 1, arguments, nargs * sizeof(char *));
101 execvp(argv[0], argv);
102 exit(1);
104 else {
105 int ret=system(arguments[0]);
106 if (WIFEXITED(ret)) {
107 exit(WEXITSTATUS(ret));
109 else {
110 exit(1);
113 return;
116 int wait_for_child(int options) {
117 id_t id_ignored = 0;
118 siginfo_t infop;
120 infop.si_pid = 0;
121 waitid(P_ALL, id_ignored, &infop, WEXITED | options);
122 if (infop.si_pid == 0) {
123 return -1; /* Nothing to wait for */
125 if (infop.si_code == CLD_EXITED) {
126 return infop.si_status;
128 return 1;
131 static int pipe_child(int fd, int orig_fd)
133 const char *fd_info = (orig_fd == 1) ? "out" : "err";
134 char buf[4096];
135 int r;
137 while ((r = read(fd, buf, sizeof(buf))) >= 0) {
138 int w;
139 int len;
141 len = r;
143 do {
144 w = write(orig_fd, buf, len);
145 if (w < 0) {
146 fprintf(stderr, "unable to write to std%s: "
147 "%s\n", fd_info, strerror(errno));
148 exit(1);
151 len -= w;
152 } while (len > 0);
155 fprintf(stderr, "unable to read from std%s: %s\n", fd_info,
156 strerror(errno));
157 exit(1);
160 pid_t create_pipe_child(int *fd, int orig_fd)
162 int fds[2];
163 pid_t pid;
165 if (pipe(fds)) {
166 fprintf(stderr, "unable to create pipe: %s\n",
167 strerror(errno));
168 exit(1);
171 *fd = fds[1];
173 pid = fork();
174 if (pid < 0) {
175 fprintf(stderr, "unable to fork: %s\n", strerror(errno));
176 return pid;
179 if (pid) {
180 close(fds[0]);
181 return pid;
184 close(fds[1]);
186 return pipe_child(fds[0], orig_fd);
189 int main(int argc, char **argv) {
190 int maxjobs = -1;
191 int curjobs = 0;
192 double maxload = -1;
193 int argsatonce = 1;
194 int opt;
195 char **command = calloc(sizeof(char*), argc);
196 char **arguments = NULL;
197 int argidx = 0;
198 int arglen = 0;
199 int cidx = 0;
200 int returncode = 0;
201 int replace_cb = 0;
202 int stdout_fd = 1;
203 int stderr_fd = 2;
204 char *t;
206 while ((argv[optind] && strcmp(argv[optind], "--") != 0) &&
207 (opt = getopt(argc, argv, "+hij:l:n:")) != -1) {
208 switch (opt) {
209 case 'h':
210 usage();
211 break;
212 case 'i':
213 replace_cb = 1;
214 break;
215 case 'j':
216 errno = 0;
217 maxjobs = strtoul(optarg, &t, 0);
218 if (errno != 0 || (t-optarg) != strlen(optarg)) {
219 fprintf(stderr, "option '%s' is not a number\n",
220 optarg);
221 exit(2);
223 break;
224 case 'l':
225 errno = 0;
226 maxload = strtod(optarg, &t);
227 if (errno != 0 || (t-optarg) != strlen(optarg)) {
228 fprintf(stderr, "option '%s' is not a number\n",
229 optarg);
230 exit(2);
232 break;
233 case 'n':
234 errno = 0;
235 argsatonce = strtoul(optarg, &t, 0);
236 if (errno != 0 || argsatonce < 1 || (t-optarg) != strlen(optarg)) {
237 fprintf(stderr, "option '%s' is not a positive number\n",
238 optarg);
239 exit(2);
241 break;
242 default: /* ’?’ */
243 usage();
244 break;
248 if (replace_cb && argsatonce > 1) {
249 fprintf(stderr, "options -i and -n are incomaptible\n");
250 exit(2);
253 if (maxjobs < 0) {
254 #ifdef _SC_NPROCESSORS_ONLN
255 maxjobs = sysconf(_SC_NPROCESSORS_ONLN);
256 #else
257 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
258 maxjobs = 1;
259 #endif
262 while (optind < argc) {
263 if (strcmp(argv[optind], "--") == 0) {
264 int i;
266 optind++;
267 arglen = argc - optind;
268 arguments = calloc(sizeof(char *), arglen);
269 if (! arguments) {
270 exit(1);
273 for (i = 0; i < arglen; i++) {
274 arguments[i] = strdup(argv[optind + i]);
276 optind += i;
278 else {
279 command[cidx] = strdup(argv[optind]);
280 cidx++;
282 optind++;
285 if (argsatonce > 1 && ! command[0]) {
286 fprintf(stderr, "option -n cannot be used without a command\n");
287 exit(2);
290 pipe_child_stdout = create_pipe_child(&stdout_fd, 1);
291 pipe_child_stderr = create_pipe_child(&stderr_fd, 2);
293 if ((pipe_child_stdout < 0) || (pipe_child_stderr < 0))
294 exit(1);
296 while (argidx < arglen) {
297 double load;
299 getloadavg(&load, 1);
301 if ((maxjobs == 0 || curjobs < maxjobs) &&
302 (maxload < 0 || load < maxload)) {
304 if (argsatonce > arglen - argidx)
305 argsatonce = arglen - argidx;
306 exec_child(command, arguments + argidx,
307 replace_cb, argsatonce, stdout_fd,
308 stderr_fd);
309 argidx += argsatonce;
310 curjobs++;
313 if (maxjobs == 0 || curjobs == maxjobs) {
314 returncode |= wait_for_child(0);
315 curjobs--;
318 if (maxload > 0 && load >= maxload) {
319 int r;
320 sleep(1); /* XXX We should have a better
321 * heurestic than this */
322 r = wait_for_child(WNOHANG);
323 if (r > 0)
324 returncode |= r;
325 if (r != -1)
326 curjobs--;
329 while (curjobs > 0) {
330 returncode |= wait_for_child(0);
331 curjobs--;
334 if (pipe_child_stdout) {
335 kill(pipe_child_stdout, SIGKILL);
336 wait_for_child(0);
338 if (pipe_child_stderr) {
339 kill(pipe_child_stderr, SIGKILL);
340 wait_for_child(0);
343 return returncode;