2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <sys/select.h>
32 #include <sys/types.h>
38 # include <sys/loadavg.h> /* getloadavg() */
45 static pid_t pipe_child_stdout
= 0;
46 static pid_t pipe_child_stderr
= 0;
49 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
50 "run command with argument, in parallel\n");
51 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
55 static void redirect(int fd
, int target_fd
, const char *name
)
60 if (dup2(fd
, target_fd
) < 0) {
61 fprintf(stderr
, "unable to open %s from internal pipe: %s\n",
62 name
, strerror(errno
));
68 void exec_child(char **command
, char **arguments
, int replace_cb
, int nargs
,
69 int stdout_fd
, int stderr_fd
)
75 redirect(stdout_fd
, 1, "stdout");
76 redirect(stderr_fd
, 2, "stderr");
84 while (command
[argc
] != 0) {
89 argv
= calloc(sizeof(char*), argc
+ nargs
);
91 for (i
= 0; i
< argc
; i
++) {
92 while (replace_cb
&& (s
=strstr(command
[i
], "{}"))) {
93 char *buf
=malloc(strlen(command
[i
]) + strlen(arguments
[0]));
95 sprintf(buf
, "%s%s%s", command
[i
], arguments
[0], s
+2);
101 memcpy(argv
+ i
- 1, arguments
, nargs
* sizeof(char *));
102 execvp(argv
[0], argv
);
106 int ret
=system(arguments
[0]);
107 if (WIFEXITED(ret
)) {
108 exit(WEXITSTATUS(ret
));
117 #if defined(__CYGWIN__)
123 int waitid(idtype_t idtype
, id_t id
, siginfo_t
*infop
, int options
) {
140 pid
= waitpid(pid
, &status
, WEXITED
| options
);
145 infop
->si_signo
= SIGCHLD
;
146 if (WIFEXITED(status
)) {
147 infop
->si_code
= CLD_EXITED
;
148 infop
->si_status
= WEXITSTATUS(status
);
150 else if (WIFSIGNALED(status
)) {
151 infop
->si_code
= CLD_KILLED
;
152 infop
->si_status
= WTERMSIG(status
);
154 if (WCOREDUMP(status
)) {
155 infop
->si_code
= CLD_DUMPED
;
159 else if (WIFSTOPPED(status
)) {
160 infop
->si_code
= CLD_STOPPED
;
161 infop
->si_status
= WSTOPSIG(status
);
163 else if (WIFCONTINUED(status
)) {
164 infop
->si_code
= CLD_CONTINUED
;
165 infop
->si_status
= SIGCONT
;
171 int wait_for_child(int options
) {
176 waitid(P_ALL
, id_ignored
, &infop
, WEXITED
| options
);
177 if (infop
.si_pid
== 0) {
178 return -1; /* Nothing to wait for */
180 if (infop
.si_code
== CLD_EXITED
) {
181 return infop
.si_status
;
186 static int pipe_child(int fd
, int orig_fd
)
188 const char *fd_info
= (orig_fd
== 1) ? "out" : "err";
192 while ((r
= read(fd
, buf
, sizeof(buf
))) >= 0) {
199 w
= write(orig_fd
, buf
, len
);
201 fprintf(stderr
, "unable to write to std%s: "
202 "%s\n", fd_info
, strerror(errno
));
210 fprintf(stderr
, "unable to read from std%s: %s\n", fd_info
,
215 pid_t
create_pipe_child(int *fd
, int orig_fd
)
221 fprintf(stderr
, "unable to create pipe: %s\n",
230 fprintf(stderr
, "unable to fork: %s\n", strerror(errno
));
241 return pipe_child(fds
[0], orig_fd
);
244 #if defined(__CYGWIN__)
245 int getloadavg(double loadavg
[], int nelem
) {
249 fd
= open("/proc/loadavg", O_RDONLY
);
253 n
= read(fd
, buf
, sizeof(buf
)-1);
254 if (close(fd
) == -1 || n
== -1) {
258 for (elem
= 0; elem
< nelem
; elem
++) {
260 double d
= strtod(p
, &end
);
267 return elem
== 0 ? -1 : elem
;
271 int main(int argc
, char **argv
) {
277 char **command
= calloc(sizeof(char*), argc
);
278 char **arguments
= NULL
;
288 while ((argv
[optind
] && strcmp(argv
[optind
], "--") != 0) &&
289 (opt
= getopt(argc
, argv
, "+hij:l:n:")) != -1) {
299 maxjobs
= strtoul(optarg
, &t
, 0);
300 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
301 fprintf(stderr
, "option '%s' is not a number\n",
308 maxload
= strtod(optarg
, &t
);
309 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
310 fprintf(stderr
, "option '%s' is not a number\n",
317 argsatonce
= strtoul(optarg
, &t
, 0);
318 if (errno
!= 0 || argsatonce
< 1 || (t
-optarg
) != strlen(optarg
)) {
319 fprintf(stderr
, "option '%s' is not a positive number\n",
330 if (replace_cb
&& argsatonce
> 1) {
331 fprintf(stderr
, "options -i and -n are incomaptible\n");
336 #ifdef _SC_NPROCESSORS_ONLN
337 maxjobs
= sysconf(_SC_NPROCESSORS_ONLN
);
339 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
344 while (optind
< argc
) {
345 if (strcmp(argv
[optind
], "--") == 0) {
349 arglen
= argc
- optind
;
350 arguments
= calloc(sizeof(char *), arglen
);
355 for (i
= 0; i
< arglen
; i
++) {
356 arguments
[i
] = strdup(argv
[optind
+ i
]);
361 command
[cidx
] = strdup(argv
[optind
]);
367 if (argsatonce
> 1 && ! command
[0]) {
368 fprintf(stderr
, "option -n cannot be used without a command\n");
372 pipe_child_stdout
= create_pipe_child(&stdout_fd
, 1);
373 pipe_child_stderr
= create_pipe_child(&stderr_fd
, 2);
375 if ((pipe_child_stdout
< 0) || (pipe_child_stderr
< 0))
378 while (argidx
< arglen
) {
381 getloadavg(&load
, 1);
383 if ((maxjobs
== 0 || curjobs
< maxjobs
) &&
384 (maxload
< 0 || load
< maxload
)) {
386 if (argsatonce
> arglen
- argidx
)
387 argsatonce
= arglen
- argidx
;
388 exec_child(command
, arguments
+ argidx
,
389 replace_cb
, argsatonce
, stdout_fd
,
391 argidx
+= argsatonce
;
395 if (maxjobs
== 0 || curjobs
== maxjobs
) {
396 returncode
|= wait_for_child(0);
400 if (maxload
> 0 && load
>= maxload
) {
402 sleep(1); /* XXX We should have a better
403 * heurestic than this */
404 r
= wait_for_child(WNOHANG
);
411 while (curjobs
> 0) {
412 returncode
|= wait_for_child(0);
416 if (pipe_child_stdout
) {
417 kill(pipe_child_stdout
, SIGKILL
);
420 if (pipe_child_stderr
) {
421 kill(pipe_child_stderr
, SIGKILL
);