ts: disable utf8 warnings
[moreutils.git] / parallel.c
blobaf85a334d4b2e3c5750af05eedb14bba435c67be
1 /*
2 * parallel.c - run commands in parallel until you run out of commands
4 * Copyright © 2008 Tollef Fog Heen <tfheen@err.no>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18 * USA
22 #define _GNU_SOURCE
23 #include <unistd.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <stdlib.h>
29 #include <errno.h>
30 #include <sys/select.h>
31 #include <sys/types.h>
32 #include <sys/wait.h>
33 #include <unistd.h>
35 #ifdef __sun
36 # include <sys/loadavg.h> /* getloadavg() */
37 #endif
39 #if !defined(WEXITED)
40 #define WEXITED 0
41 #endif
43 static pid_t pipe_child_stdout = 0;
44 static pid_t pipe_child_stderr = 0;
46 void usage() {
47 printf("parallel [OPTIONS] command -- arguments\n\tfor each argument, "
48 "run command with argument, in parallel\n");
49 printf("parallel [OPTIONS] -- commands\n\trun specified commands in parallel\n");
50 exit(1);
53 static void redirect(int fd, int target_fd, const char *name)
55 if (fd == target_fd)
56 return;
58 if (dup2(fd, target_fd) < 0) {
59 fprintf(stderr, "unable to open %s from internal pipe: %s\n",
60 name, strerror(errno));
61 exit(1);
63 close(fd);
66 void exec_child(char **command, char **arguments, int replace_cb, int nargs,
67 int stdout_fd, int stderr_fd)
69 if (fork() != 0) {
70 return;
73 redirect(stdout_fd, 1, "stdout");
74 redirect(stderr_fd, 2, "stderr");
76 if (command[0]) {
77 char **argv;
78 int argc = 0;
79 int i;
80 char *s;
82 while (command[argc] != 0) {
83 argc++;
85 if (! replace_cb)
86 argc++;
87 argv = calloc(sizeof(char*), argc + nargs);
89 for (i = 0; i < argc; i++) {
90 while (replace_cb && (s=strstr(command[i], "{}"))) {
91 char *buf=malloc(strlen(command[i]) + strlen(arguments[0]));
92 s[0]='\0';
93 sprintf(buf, "%s%s%s", command[i], arguments[0], s+2);
94 command[i]=buf;
96 argv[i] = command[i];
98 if (! replace_cb)
99 memcpy(argv + i - 1, arguments, nargs * sizeof(char *));
100 execvp(argv[0], argv);
101 exit(1);
103 else {
104 int ret=system(arguments[0]);
105 if (WIFEXITED(ret)) {
106 exit(WEXITSTATUS(ret));
108 else {
109 exit(1);
112 return;
115 int wait_for_child(int options) {
116 id_t id_ignored = 0;
117 siginfo_t infop;
119 infop.si_pid = 0;
120 waitid(P_ALL, id_ignored, &infop, WEXITED | options);
121 if (infop.si_pid == 0) {
122 return -1; /* Nothing to wait for */
124 if (infop.si_code == CLD_EXITED) {
125 return infop.si_status;
127 return 1;
130 static int pipe_child(int fd, int orig_fd)
132 const char *fd_info = (orig_fd == 1) ? "out" : "err";
133 char buf[4096];
134 int r;
136 while ((r = read(fd, buf, sizeof(buf))) >= 0) {
137 int w;
138 int len;
140 len = r;
142 do {
143 w = write(orig_fd, buf, len);
144 if (w < 0) {
145 fprintf(stderr, "unable to write to std%s: "
146 "%s\n", fd_info, strerror(errno));
147 exit(1);
150 len -= w;
151 } while (len > 0);
154 fprintf(stderr, "unable to read from std%s: %s\n", fd_info,
155 strerror(errno));
156 exit(1);
159 pid_t create_pipe_child(int *fd, int orig_fd)
161 int fds[2];
162 pid_t pid;
164 if (pipe(fds)) {
165 fprintf(stderr, "unable to create pipe: %s\n",
166 strerror(errno));
167 exit(1);
170 *fd = fds[1];
172 pid = fork();
173 if (pid < 0) {
174 fprintf(stderr, "unable to fork: %s\n", strerror(errno));
175 return pid;
178 if (pid) {
179 close(fds[0]);
180 return pid;
183 close(fds[1]);
185 return pipe_child(fds[0], orig_fd);
188 int main(int argc, char **argv) {
189 int maxjobs = -1;
190 int curjobs = 0;
191 double maxload = -1;
192 int argsatonce = 1;
193 int opt;
194 char **command = calloc(sizeof(char*), argc);
195 char **arguments = NULL;
196 int argidx = 0;
197 int arglen = 0;
198 int cidx = 0;
199 int returncode = 0;
200 int replace_cb = 0;
201 int stdout_fd = 1;
202 int stderr_fd = 2;
203 char *t;
205 while ((argv[optind] && strcmp(argv[optind], "--") != 0) &&
206 (opt = getopt(argc, argv, "+hij:l:n:")) != -1) {
207 switch (opt) {
208 case 'h':
209 usage();
210 break;
211 case 'i':
212 replace_cb = 1;
213 break;
214 case 'j':
215 errno = 0;
216 maxjobs = strtoul(optarg, &t, 0);
217 if (errno != 0 || (t-optarg) != strlen(optarg)) {
218 fprintf(stderr, "option '%s' is not a number\n",
219 optarg);
220 exit(2);
222 break;
223 case 'l':
224 errno = 0;
225 maxload = strtod(optarg, &t);
226 if (errno != 0 || (t-optarg) != strlen(optarg)) {
227 fprintf(stderr, "option '%s' is not a number\n",
228 optarg);
229 exit(2);
231 break;
232 case 'n':
233 errno = 0;
234 argsatonce = strtoul(optarg, &t, 0);
235 if (errno != 0 || argsatonce < 1 || (t-optarg) != strlen(optarg)) {
236 fprintf(stderr, "option '%s' is not a positive number\n",
237 optarg);
238 exit(2);
240 break;
241 default: /* ’?’ */
242 usage();
243 break;
247 if (replace_cb && argsatonce > 1) {
248 fprintf(stderr, "options -i and -n are incomaptible\n");
249 exit(2);
252 if (maxjobs < 0) {
253 #ifdef _SC_NPROCESSORS_ONLN
254 maxjobs = sysconf(_SC_NPROCESSORS_ONLN);
255 #else
256 #warning Cannot autodetect number of CPUS on this system: _SC_NPROCESSORS_ONLN not defined.
257 maxjobs = 1;
258 #endif
261 while (optind < argc) {
262 if (strcmp(argv[optind], "--") == 0) {
263 int i;
265 optind++;
266 arglen = argc - optind;
267 arguments = calloc(sizeof(char *), arglen);
268 if (! arguments) {
269 exit(1);
272 for (i = 0; i < arglen; i++) {
273 arguments[i] = strdup(argv[optind + i]);
275 optind += i;
277 else {
278 command[cidx] = strdup(argv[optind]);
279 cidx++;
281 optind++;
284 if (argsatonce > 1 && ! command[0]) {
285 fprintf(stderr, "option -n cannot be used without a command\n");
286 exit(2);
289 pipe_child_stdout = create_pipe_child(&stdout_fd, 1);
290 pipe_child_stderr = create_pipe_child(&stderr_fd, 2);
292 if ((pipe_child_stdout < 0) || (pipe_child_stderr < 0))
293 exit(1);
295 while (argidx < arglen) {
296 double load;
298 getloadavg(&load, 1);
300 if ((maxjobs == 0 || curjobs < maxjobs) &&
301 (maxload < 0 || load < maxload)) {
303 if (argsatonce > arglen - argidx)
304 argsatonce = arglen - argidx;
305 exec_child(command, arguments + argidx,
306 replace_cb, argsatonce, stdout_fd,
307 stderr_fd);
308 argidx += argsatonce;
309 curjobs++;
312 if (maxjobs == 0 || curjobs == maxjobs) {
313 returncode |= wait_for_child(0);
314 curjobs--;
317 if (maxload > 0 && load >= maxload) {
318 int r;
319 sleep(1); /* XXX We should have a better
320 * heurestic than this */
321 r = wait_for_child(WNOHANG);
322 if (r > 0)
323 returncode |= r;
324 if (r != -1)
325 curjobs--;
328 while (curjobs > 0) {
329 returncode |= wait_for_child(0);
330 curjobs--;
333 if (pipe_child_stdout) {
334 kill(pipe_child_stdout, SIGKILL);
335 wait_for_child(0);
337 if (pipe_child_stderr) {
338 kill(pipe_child_stderr, SIGKILL);
339 wait_for_child(0);
342 return returncode;