2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/select.h>
31 #include <sys/types.h>
37 # include <sys/loadavg.h> /* getloadavg() */
44 static pid_t pipe_child_stdout
= 0;
45 static pid_t pipe_child_stderr
= 0;
48 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
49 "run command with argument, in parallel\n");
50 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
54 static void redirect(int fd
, int target_fd
, const char *name
)
59 if (dup2(fd
, target_fd
) < 0) {
60 fprintf(stderr
, "unable to open %s from internal pipe: %s\n",
61 name
, strerror(errno
));
67 void exec_child(char **command
, char **arguments
, int replace_cb
, int nargs
,
68 int stdout_fd
, int stderr_fd
)
74 redirect(stdout_fd
, 1, "stdout");
75 redirect(stderr_fd
, 2, "stderr");
83 while (command
[argc
] != 0) {
88 argv
= calloc(sizeof(char*), argc
+ nargs
);
90 for (i
= 0; i
< argc
; i
++) {
91 while (replace_cb
&& (s
=strstr(command
[i
], "{}"))) {
92 char *buf
=malloc(strlen(command
[i
]) + strlen(arguments
[0]));
94 sprintf(buf
, "%s%s%s", command
[i
], arguments
[0], s
+2);
100 memcpy(argv
+ i
- 1, arguments
, nargs
* sizeof(char *));
101 execvp(argv
[0], argv
);
105 int ret
=system(arguments
[0]);
106 if (WIFEXITED(ret
)) {
107 exit(WEXITSTATUS(ret
));
116 int wait_for_child(int options
) {
121 waitid(P_ALL
, id_ignored
, &infop
, WEXITED
| options
);
122 if (infop
.si_pid
== 0) {
123 return -1; /* Nothing to wait for */
125 if (infop
.si_code
== CLD_EXITED
) {
126 return infop
.si_status
;
131 static int pipe_child(int fd
, int orig_fd
)
133 const char *fd_info
= (orig_fd
== 1) ? "out" : "err";
137 while ((r
= read(fd
, buf
, sizeof(buf
))) >= 0) {
144 w
= write(orig_fd
, buf
, len
);
146 fprintf(stderr
, "unable to write to std%s: "
147 "%s\n", fd_info
, strerror(errno
));
155 fprintf(stderr
, "unable to read from std%s: %s\n", fd_info
,
160 pid_t
create_pipe_child(int *fd
, int orig_fd
)
166 fprintf(stderr
, "unable to create pipe: %s\n",
175 fprintf(stderr
, "unable to fork: %s\n", strerror(errno
));
186 return pipe_child(fds
[0], orig_fd
);
189 int main(int argc
, char **argv
) {
195 char **command
= calloc(sizeof(char*), argc
);
196 char **arguments
= NULL
;
206 while ((argv
[optind
] && strcmp(argv
[optind
], "--") != 0) &&
207 (opt
= getopt(argc
, argv
, "+hij:l:n:")) != -1) {
217 maxjobs
= strtoul(optarg
, &t
, 0);
218 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
219 fprintf(stderr
, "option '%s' is not a number\n",
226 maxload
= strtod(optarg
, &t
);
227 if (errno
!= 0 || (t
-optarg
) != strlen(optarg
)) {
228 fprintf(stderr
, "option '%s' is not a number\n",
235 argsatonce
= strtoul(optarg
, &t
, 0);
236 if (errno
!= 0 || argsatonce
< 1 || (t
-optarg
) != strlen(optarg
)) {
237 fprintf(stderr
, "option '%s' is not a positive number\n",
248 if (replace_cb
&& argsatonce
> 1) {
249 fprintf(stderr
, "options -i and -n are incomaptible\n");
254 #ifdef _SC_NPROCESSORS_ONLN
255 maxjobs
= sysconf(_SC_NPROCESSORS_ONLN
);
257 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
262 while (optind
< argc
) {
263 if (strcmp(argv
[optind
], "--") == 0) {
267 arglen
= argc
- optind
;
268 arguments
= calloc(sizeof(char *), arglen
);
273 for (i
= 0; i
< arglen
; i
++) {
274 arguments
[i
] = strdup(argv
[optind
+ i
]);
279 command
[cidx
] = strdup(argv
[optind
]);
285 if (argsatonce
> 1 && ! command
[0]) {
286 fprintf(stderr
, "option -n cannot be used without a command\n");
290 pipe_child_stdout
= create_pipe_child(&stdout_fd
, 1);
291 pipe_child_stderr
= create_pipe_child(&stderr_fd
, 2);
293 if ((pipe_child_stdout
< 0) || (pipe_child_stderr
< 0))
296 while (argidx
< arglen
) {
299 getloadavg(&load
, 1);
301 if ((maxjobs
== 0 || curjobs
< maxjobs
) &&
302 (maxload
< 0 || load
< maxload
)) {
304 if (argsatonce
> arglen
- argidx
)
305 argsatonce
= arglen
- argidx
;
306 exec_child(command
, arguments
+ argidx
,
307 replace_cb
, argsatonce
, stdout_fd
,
309 argidx
+= argsatonce
;
313 if (maxjobs
== 0 || curjobs
== maxjobs
) {
314 returncode
|= wait_for_child(0);
318 if (maxload
> 0 && load
>= maxload
) {
320 sleep(1); /* XXX We should have a better
321 * heurestic than this */
322 r
= wait_for_child(WNOHANG
);
329 while (curjobs
> 0) {
330 returncode
|= wait_for_child(0);
334 if (pipe_child_stdout
) {
335 kill(pipe_child_stdout
, SIGKILL
);
338 if (pipe_child_stderr
) {
339 kill(pipe_child_stderr
, SIGKILL
);