2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/select.h>
31 #include <sys/types.h>
36 # include <sys/loadavg.h> /* getloadavg() */
43 static pid_t pipe_child_stdout
= 0;
44 static pid_t pipe_child_stderr
= 0;
47 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
48 "run command with argument, in parallel\n");
49 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
53 static void redirect(int fd
, int target_fd
, const char *name
)
58 if (dup2(fd
, target_fd
) < 0) {
59 fprintf(stderr
, "unable to open %s from internal pipe: %s\n",
60 name
, strerror(errno
));
66 void exec_child(char **command
, char **arguments
, int replace_cb
, int nargs
,
67 int stdout_fd
, int stderr_fd
)
73 redirect(stdout_fd
, 1, "stdout");
74 redirect(stderr_fd
, 2, "stderr");
82 while (command
[argc
] != 0) {
87 argv
= calloc(sizeof(char*), argc
+ nargs
);
89 for (i
= 0; i
< argc
; i
++) {
90 while (replace_cb
&& (s
=strstr(command
[i
], "{}"))) {
91 char *buf
=malloc(strlen(command
[i
]) + strlen(arguments
[0]));
93 sprintf(buf
, "%s%s%s", command
[i
], arguments
[0], s
+2);
99 memcpy(argv
+ i
- 1, arguments
, nargs
* sizeof(char *));
100 execvp(argv
[0], argv
);
104 int ret
=system(arguments
[0]);
105 if (WIFEXITED(ret
)) {
106 exit(WEXITSTATUS(ret
));
115 int wait_for_child(int options
) {
120 waitid(P_ALL
, id_ignored
, &infop
, WEXITED
| options
);
121 if (infop
.si_pid
== 0) {
122 return -1; /* Nothing to wait for */
124 if (infop
.si_code
== CLD_EXITED
) {
125 return infop
.si_status
;
130 static int pipe_child(int fd
, int orig_fd
)
132 const char *fd_info
= (orig_fd
== 1) ? "out" : "err";
136 while ((r
= read(fd
, buf
, sizeof(buf
))) >= 0) {
143 w
= write(orig_fd
, buf
, len
);
145 fprintf(stderr
, "unable to write to std%s: "
146 "%s\n", fd_info
, strerror(errno
));
154 fprintf(stderr
, "unable to read from std%s: %s\n", fd_info
,
159 pid_t
create_pipe_child(int *fd
, int orig_fd
)
165 fprintf(stderr
, "unable to create pipe: %s\n",
174 fprintf(stderr
, "unable to fork: %s\n", strerror(errno
));
185 return pipe_child(fds
[0], orig_fd
);
188 int main(int argc
, char **argv
) {
194 char **command
= calloc(sizeof(char*), argc
);
195 char **arguments
= NULL
;
205 while ((argv
[optind
] && strcmp(argv
[optind
], "--") != 0) &&
206 (opt
= getopt(argc
, argv
, "+hij:l:n:")) != -1) {
216 maxjobs
= strtoul(optarg
, &t
, 0);
217 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
218 fprintf(stderr
, "option '%s' is not a number\n",
225 maxload
= strtod(optarg
, &t
);
226 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
227 fprintf(stderr
, "option '%s' is not a number\n",
234 argsatonce
= strtoul(optarg
, &t
, 0);
235 if (errno
!= 0 || argsatonce
< 1 || (t
-optarg
) != strlen(optarg
)) {
236 fprintf(stderr
, "option '%s' is not a positive number\n",
247 if (replace_cb
&& argsatonce
> 1) {
248 fprintf(stderr
, "options -i and -n are incomaptible\n");
253 #ifdef _SC_NPROCESSORS_ONLN
254 maxjobs
= sysconf(_SC_NPROCESSORS_ONLN
);
256 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
261 while (optind
< argc
) {
262 if (strcmp(argv
[optind
], "--") == 0) {
266 arglen
= argc
- optind
;
267 arguments
= calloc(sizeof(char *), arglen
);
272 for (i
= 0; i
< arglen
; i
++) {
273 arguments
[i
] = strdup(argv
[optind
+ i
]);
278 command
[cidx
] = strdup(argv
[optind
]);
284 if (argsatonce
> 1 && ! command
[0]) {
285 fprintf(stderr
, "option -n cannot be used without a command\n");
289 pipe_child_stdout
= create_pipe_child(&stdout_fd
, 1);
290 pipe_child_stderr
= create_pipe_child(&stderr_fd
, 2);
292 if ((pipe_child_stdout
< 0) || (pipe_child_stderr
< 0))
295 while (argidx
< arglen
) {
298 getloadavg(&load
, 1);
300 if ((maxjobs
== 0 || curjobs
< maxjobs
) &&
301 (maxload
< 0 || load
< maxload
)) {
303 if (argsatonce
> arglen
- argidx
)
304 argsatonce
= arglen
- argidx
;
305 exec_child(command
, arguments
+ argidx
,
306 replace_cb
, argsatonce
, stdout_fd
,
308 argidx
+= argsatonce
;
312 if (maxjobs
== 0 || curjobs
== maxjobs
) {
313 returncode
|= wait_for_child(0);
317 if (maxload
> 0 && load
>= maxload
) {
319 sleep(1); /* XXX We should have a better
320 * heurestic than this */
321 r
= wait_for_child(WNOHANG
);
328 while (curjobs
> 0) {
329 returncode
|= wait_for_child(0);
333 if (pipe_child_stdout
) {
334 kill(pipe_child_stdout
, SIGKILL
);
337 if (pipe_child_stderr
) {
338 kill(pipe_child_stderr
, SIGKILL
);