fix typo in --help output
[rofl0r-jobflow.git] / jobflow.c
blobdb75fcef3ee00f592e62e9d45bf9088955924b92
1 /*
2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.3.0"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "sblist.h"
31 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
33 #include <stdio.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <stdbool.h>
37 #include <unistd.h>
38 #include <stdint.h>
39 #include <stddef.h>
40 #include <errno.h>
41 #include <time.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <sys/mman.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
67 #include <sys/time.h>
69 /* some small helper funcs from libulz */
71 static int msleep(long millisecs) {
72 struct timespec req, rem;
73 req.tv_sec = millisecs / 1000;
74 req.tv_nsec = (millisecs % 1000) * 1000 * 1000;
75 int ret;
76 while((ret = nanosleep(&req, &rem)) == -1 && errno == EINTR) req = rem;
77 return ret;
80 static const char ulz_conv_cypher[] =
81 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
82 #define ulz_conv_cypher_len (sizeof(ulz_conv_cypher) - 1)
83 static char* ulz_mkdtemp(char* templ) {
84 size_t i, l = strlen(templ);
85 if(l < 6) {
86 errno = EINVAL;
87 return NULL;
89 loop:
90 for(i = l - 6; i < l; i++)
91 templ[i] = ulz_conv_cypher[rand() % ulz_conv_cypher_len];
92 if(mkdir(templ, S_IRWXU) == -1) {
93 if(errno == EEXIST) goto loop;
94 return NULL;
96 return templ;
99 static size_t gen_fn(char* buf, const char* prefix, size_t pl, const char* tmpdir) {
100 size_t tl = strlen(tmpdir);
101 size_t a = 0;
102 memcpy(buf+a, tmpdir, tl);
103 a+=tl;
104 memcpy(buf+a,prefix,pl);
105 a+=pl;
106 memcpy(buf+a,"XXXXXX", 7);
107 return a+6;
110 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
111 * storage. returns size of the string returned in buffer */
112 static size_t mktempdir(const char* prefix, char* buffer, size_t bufsize) {
113 size_t ret, pl = strlen(prefix);
114 if(bufsize < sizeof("/dev/shm/") -1 + pl + sizeof("XXXXXX")) return 0;
115 ret = gen_fn(buffer, prefix, pl, "/dev/shm/");
116 if(!ulz_mkdtemp(buffer)) {
117 ret = gen_fn(buffer, prefix, pl, "/tmp/");
118 if(!ulz_mkdtemp(buffer)) return 0;
120 return ret;
124 typedef struct {
125 pid_t pid;
126 int pipe;
127 posix_spawn_file_actions_t fa;
128 } job_info;
130 typedef struct {
131 int limit;
132 struct rlimit rl;
133 } limit_rec;
135 typedef struct {
136 char temp_state[256];
137 char* cmd_argv[4096];
138 sblist* job_infos;
139 sblist* subst_entries;
140 sblist* limits;
141 char* tempdir;
142 unsigned long long lineno;
144 char* statefile;
145 char* eof_marker;
146 unsigned long numthreads;
147 unsigned long threads_running;
148 unsigned long skip;
149 unsigned long delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
150 the top value in ms can be supplied via a command line switch.
151 this option makes only sense if the interval is somewhat smaller than the
152 expected runtime of the average job.
153 this option is useful to not overload a network app due to hundreds of
154 parallel connection tries on startup.
156 unsigned long bulk_bytes;
158 bool pipe_mode;
159 bool buffered; /* write stdout and stderr of each task into a file,
160 and print it to stdout once the process ends.
161 this prevents mixing up of the output of multiple tasks. */
162 bool delayedflush; /* only write to statefile whenever all processes are busy, and at program end.
163 this means faster program execution, but could also be imprecise if the number of
164 jobs is small or smaller than the available threadcount. */
165 bool join_output; /* join stdout and stderr of launched jobs into stdout */
167 unsigned cmd_startarg;
168 } prog_state_s;
170 static prog_state_s prog_state;
173 extern char** environ;
175 static int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
176 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
177 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
178 return ret > 0 && (size_t) ret < bufsize;
181 static void launch_job(size_t jobindex, char** argv) {
182 char stdout_filename_buf[256];
183 char stderr_filename_buf[256];
184 job_info* job = sblist_get(prog_state.job_infos, jobindex);
186 if(job->pid != -1) return;
188 if(prog_state.buffered) {
189 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
190 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
191 dprintf(2, "temp filename too long!\n");
192 return;
196 errno = posix_spawn_file_actions_init(&job->fa);
197 if(errno) goto spawn_error;
199 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
200 if(errno) goto spawn_error;
202 int pipes[2];
203 if(prog_state.pipe_mode) {
204 if(pipe(pipes)) {
205 perror("pipe");
206 goto spawn_error;
208 job->pipe = pipes[1];
209 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
210 if(errno) goto spawn_error;
211 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
212 if(errno) goto spawn_error;
213 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
214 if(errno) goto spawn_error;
217 if(prog_state.buffered) {
218 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
219 if(errno) goto spawn_error;
220 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
221 if(errno) goto spawn_error;
224 if(!prog_state.pipe_mode) {
225 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
226 if(errno) goto spawn_error;
229 if(prog_state.buffered) {
230 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
231 if(errno) goto spawn_error;
232 if(prog_state.join_output)
233 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
234 else
235 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
236 if(errno) goto spawn_error;
239 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
240 if(errno) {
241 spawn_error:
242 job->pid = -1;
243 perror("posix_spawn");
244 } else {
245 prog_state.threads_running++;
246 if(prog_state.limits) {
247 limit_rec* limit;
248 sblist_iter(prog_state.limits, limit) {
249 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
250 perror("prlimit");
254 if(prog_state.pipe_mode)
255 close(pipes[0]);
258 static void dump_output(size_t job_id, int is_stderr) {
259 char out_filename_buf[256];
260 char buf[4096];
261 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
262 size_t nread;
264 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
266 dst = fopen(out_filename_buf, "r");
267 if(dst) {
268 while((nread = fread(buf, 1, sizeof(buf), dst))) {
269 fwrite(buf, 1, nread, out_stream);
270 if(nread < sizeof(buf)) break;
272 fclose(dst);
273 fflush(out_stream);
274 unlink(out_filename_buf);
278 static void write_all(int fd, void* buf, size_t size) {
279 size_t left = size;
280 const char *p = buf;
281 while(1) {
282 if(left == 0) break;
283 ssize_t n = write(fd, p, left);
284 switch(n) {
285 case -1:
286 if(errno == EINTR) continue;
287 else {
288 perror("write");
289 return;
291 default:
292 p += n;
293 left -= n;
298 static void pass_stdin(char *line, size_t len) {
299 static size_t next_child = 0;
300 if(next_child >= sblist_getsize(prog_state.job_infos))
301 next_child = 0;
302 job_info *job = sblist_get(prog_state.job_infos, next_child);
303 write_all(job->pipe, line, len);
304 next_child++;
307 static void close_pipes(void) {
308 size_t i;
309 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
310 job_info *job = sblist_get(prog_state.job_infos, i);
311 close(job->pipe);
315 /* wait till a child exits, reap it, and return its job index for slot reuse */
316 static size_t reap_child(void) {
317 size_t i;
318 job_info* job;
319 int ret, retval;
321 do ret = waitpid(-1, &retval, 0);
322 while(ret == -1 || !WIFEXITED(retval));
324 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
325 job = sblist_get(prog_state.job_infos, i);
326 if(job->pid == ret) {
327 job->pid = -1;
328 posix_spawn_file_actions_destroy(&job->fa);
329 prog_state.threads_running--;
330 if(prog_state.buffered) {
331 dump_output(i, 0);
332 if(!prog_state.join_output)
333 dump_output(i, 1);
335 return i;
338 assert(0);
339 return -1;
342 static size_t free_slots(void) {
343 return prog_state.numthreads - prog_state.threads_running;
346 #define die(...) do { dprintf(2, "error: " __VA_ARGS__); exit(1); } while(0)
348 static unsigned long parse_human_number(const char* num) {
349 unsigned long ret = 0;
350 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
351 const char* kmg = "KMG";
352 const char* kmgind, *p;
353 ret = atol(num);
354 p = num;
355 while(isdigit(*(++p)));
356 if(*p && (kmgind = strchr(kmg, *p)))
357 ret *= mul[kmgind - kmg];
358 return ret;
361 static int syntax(void) {
362 dprintf(2,
363 "jobflow " VERSION " (C) rofl0r\n"
364 "------------------------\n"
365 "this program is intended to be used as a recipient of another programs output.\n"
366 "it launches processes to which the current line can be passed as an argument\n"
367 "using {} for substitution (as in find -exec).\n"
368 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
369 "stdin of child processes. input will be then evenly distributed to jobs,\n"
370 "until EOF is received. we call this 'pipe mode'.\n"
371 "\n"
372 "available options:\n\n"
373 "-skip N -threads N -resume -statefile=/tmp/state -delayedflush\n"
374 "-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
375 "-eof=XXX\n"
376 "-exec ./mycommand {}\n"
377 "\n"
378 "-skip N\n"
379 " N=number of entries to skip\n"
380 "-threads N (alternative: -j N)\n"
381 " N=number of parallel processes to spawn\n"
382 "-resume\n"
383 " resume from last jobnumber stored in statefile\n"
384 "-eof XXX\n"
385 " use XXX as the EOF marker on stdin\n"
386 " if the marker is encountered, behave as if stdin was closed\n"
387 " not compatible with pipe/bulk mode\n"
388 "-statefile XXX\n"
389 " XXX=filename\n"
390 " saves last launched jobnumber into a file\n"
391 "-delayedflush\n"
392 " only write to statefile whenever all processes are busy,\n"
393 " and at program end\n"
394 "-delayedspinup N\n"
395 " N=maximum amount of milliseconds\n"
396 " ...to wait when spinning up a fresh set of processes\n"
397 " a random value between 0 and the chosen amount is used to delay initial\n"
398 " spinup.\n"
399 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
400 " activity on program startup\n"
401 "-buffered\n"
402 " store the stdout and stderr of launched processes into a temporary file\n"
403 " which will be printed after a process has finished.\n"
404 " this prevents mixing up of output of different processes.\n"
405 "-joinoutput\n"
406 " if -buffered, write both stdout and stderr into the same file.\n"
407 " this saves the chronological order of the output, and the combined output\n"
408 " will only be printed to stdout.\n"
409 "-bulk N\n"
410 " do bulk copies with a buffer of N bytes. only usable in pipe mode.\n"
411 " this passes (almost) the entire buffer to the next scheduled job.\n"
412 " the passed buffer will be truncated to the last line break boundary,\n"
413 " so jobs always get entire lines to work with.\n"
414 " this option is useful when you have huge input files and relatively short\n"
415 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
416 " N must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
417 " actual memory allocation will be twice the amount passed.\n"
418 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
419 " than that probably doesn't make sense.\n"
420 "-limits [mem=N,cpu=N,stack=N,fsize=N,nofiles=N]\n"
421 " sets the rlimit of the new created processes.\n"
422 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
423 "-exec command with args\n"
424 " everything past -exec is treated as the command to execute on each line of\n"
425 " stdin received. the line can be passed as an argument using {}.\n"
426 " {.} passes everything before the last dot in a line as an argument.\n"
427 " it is possible to use multiple substitutions inside a single argument,\n"
428 " but currently only of one type.\n"
429 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
430 "\n"
432 return 1;
435 static int parse_args(unsigned argc, char** argv) {
436 unsigned i, j, r = 0;
437 static bool resume = 0;
438 static char *limits = 0;
439 static const struct {
440 const char lname[14];
441 const char sname;
442 const char flag;
443 union {
444 bool *b;
445 unsigned long *i;
446 char **s;
447 } dest;
448 } opt_tab[] = {
449 {"threads", 'j', 'i', .dest.i = &prog_state.numthreads },
450 {"statefile", 0, 's', .dest.s = &prog_state.statefile },
451 {"eof", 0, 's', .dest.s = &prog_state.eof_marker },
452 {"skip", 0, 'i', .dest.i = &prog_state.skip },
453 {"resume", 0, 'b', .dest.b = &resume },
454 {"delayedflush", 0, 'b', .dest.b = &prog_state.delayedflush },
455 {"delayedspinup", 0, 'i', .dest.i = &prog_state.delayedspinup_interval },
456 {"buffered", 0, 'b', .dest.b =&prog_state.buffered},
457 {"joinoutput", 0, 'b', .dest.b =&prog_state.join_output},
458 {"bulk", 0, 'i', .dest.i = &prog_state.bulk_bytes},
459 {"limits", 0, 's', .dest.s = &limits},
462 prog_state.numthreads = 1;
464 for(i=1; i<argc; ++i) {
465 char *p = argv[i], *q = 0;
466 if(*(p++) != '-') die("expected option instead of %s\n", argv[i]);
467 if(*p == '-') p++;
468 if(!*p) die("invalid option %s\n", argv[i]);
469 for(j=0;j<ARRAY_SIZE(opt_tab);++j,q=0) {
470 if((!p[1] && *p == opt_tab[j].sname) ||
471 (!strcmp(p, opt_tab[j].lname)) ||
472 ((q = strchr(p, '=')) && !strncmp(p, opt_tab[j].lname, q-p))) {
473 switch(opt_tab[j].flag) {
474 case 'b': *opt_tab[j].dest.b=1; break;
475 case 'i': case 's':
476 if(!q) {
477 if(argc <= i+1 || argv[i+1][0] == '-') {
478 e_expect_op:;
479 die("option %s requires operand\n", argv[i]);
481 q = argv[++i];
482 } else {
483 if(*(++q) == 0) goto e_expect_op;
485 if(opt_tab[j].flag == 'i') {
486 if(!isdigit(*q))
487 die("expected numeric operand for %s at %s\n", p, q);
488 *opt_tab[j].dest.i=parse_human_number(q);
489 } else
490 *opt_tab[j].dest.s=q;
491 break;
493 break;
496 if(j>=ARRAY_SIZE(opt_tab)) {
497 if(!strcmp(p, "exec")) {
498 r = i+1;
499 break;
500 } else if(!strcmp(p, "help")) {
501 return syntax();
502 } else die("unknown option %s\n", argv[i]);
506 if((long)prog_state.numthreads <= 0) die("threadcount must be >= 1\n");
508 if(resume) {
509 if(!prog_state.statefile) die("-resume needs -statefile\n");
510 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
511 FILE *f = fopen(prog_state.statefile, "r");
512 if(f) {
513 char nb[64];
514 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
515 fclose(f);
520 if(prog_state.delayedflush && !prog_state.statefile)
521 die("-delayedflush needs -statefile\n");
523 prog_state.pipe_mode = 0;
524 prog_state.cmd_startarg = r;
525 prog_state.subst_entries = NULL;
527 if(r) {
528 uint32_t subst_ent;
529 if(r < (unsigned) argc) {
530 prog_state.cmd_startarg = r;
531 } else die("-exec without arguments\n");
533 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
535 // save entries which must be substituted, to save some cycles.
536 for(i = r; i < (unsigned) argc; i++) {
537 subst_ent = i - r;
538 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
539 sblist_add(prog_state.subst_entries, &subst_ent);
542 if(sblist_getsize(prog_state.subst_entries) == 0) {
543 prog_state.pipe_mode = 1;
544 sblist_free(prog_state.subst_entries);
545 prog_state.subst_entries = 0;
549 if(prog_state.join_output && !prog_state.buffered)
550 die("-joinoutput needs -buffered\n");
552 if(prog_state.bulk_bytes % 4096)
553 die("bulk size must be a multiple of 4096\n");
555 if(limits) {
556 unsigned i;
557 while(1) {
558 limits += strspn(limits, ",");
559 size_t l = strcspn(limits, ",");
560 if(!l) break;
561 size_t l2 = strcspn(limits, "=");
562 if(l2 >= l) die("syntax error in limits argument\n");
563 limit_rec lim;
564 if(!prog_state.limits)
565 prog_state.limits = sblist_new(sizeof(limit_rec), 4);
566 static const struct { int lim_val; const char lim_name[8]; } lim_tab[] = {
567 { RLIMIT_AS, "mem" },
568 { RLIMIT_CPU, "cpu" },
569 { RLIMIT_STACK, "stack" },
570 { RLIMIT_FSIZE, "fsize" },
571 { RLIMIT_NOFILE, "nofiles" },
573 for(i=0; i<ARRAY_SIZE(lim_tab);++i)
574 if(!strncmp(limits, lim_tab[i].lim_name, l2)) {
575 lim.limit = lim_tab[i].lim_val;
576 break;
578 if(i >= ARRAY_SIZE(lim_tab))
579 die("unknown option passed to -limits\n");
580 if(getrlimit(lim.limit, &lim.rl) == -1) {
581 perror("getrlimit");
582 die("could not query rlimits\n");
584 lim.rl.rlim_cur = parse_human_number(limits+l2+1);
585 sblist_add(prog_state.limits, &lim);
586 limits += l;
589 return 0;
592 static void init_queue(void) {
593 unsigned i;
594 job_info ji = {.pid = -1};
596 for(i = 0; i < prog_state.numthreads; i++)
597 sblist_add(prog_state.job_infos, &ji);
600 static void write_statefile(unsigned long long n, const char* tempfile) {
601 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
602 if(fd != -1) {
603 dprintf(fd, "%llu\n", n + 1ULL);
604 close(fd);
605 if(rename(tempfile, prog_state.statefile) == -1)
606 perror("rename");
607 } else
608 perror("open");
611 static int str_here(char* haystack, size_t hay_size, size_t bufpos,
612 char* needle, size_t needle_size) {
613 if(needle_size <= hay_size - bufpos) {
614 if(!memcmp(needle, haystack + bufpos, needle_size))
615 return 1;
617 return 0;
619 // returns numbers of substitutions done, -1 on out of buffer.
620 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
621 static int substitute_all(char *dest, ssize_t dest_size,
622 char *src, size_t src_size,
623 char *what, size_t what_size,
624 char *whit, size_t whit_size) {
625 size_t i;
626 int ret = 0;
627 for(i = 0; dest_size > 0 && i < src_size; ) {
628 if(str_here(src, src_size, i, what, what_size)) {
629 if(dest_size < (ssize_t) whit_size) return -1;
630 memcpy(dest, whit, whit_size);
631 dest += whit_size;
632 dest_size -= whit_size;
633 ret++;
634 i += what_size;
635 } else {
636 *dest = src[i];
637 dest++;
638 dest_size--;
639 i++;
642 if(!dest_size) return -1;
643 *dest = 0;
644 return ret;
647 static char* mystrnchr(const char *in, int ch, size_t end) {
648 const char *e = in+end;
649 const char *p = in;
650 while(p != e && *p != ch) p++;
651 if(p != e) return (char*)p;
652 return 0;
654 static char* mystrnrchr(const char *in, int ch, size_t end) {
655 const char *e = in+end-1;
656 const char *p = in;
657 while(p != e && *e != ch) e--;
658 if(*e == ch) return (char*)e;
659 return 0;
661 static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
662 if(!end) return 0;
663 return mystrnrchr(in, ch, end);
666 static int need_linecounter(void) {
667 return !!prog_state.skip || prog_state.statefile;
669 static size_t count_linefeeds(const char *buf, size_t len) {
670 const char *p = buf, *e = buf+len;
671 size_t cnt = 0;
672 while(p < e) {
673 if(*p == '\n') cnt++;
674 p++;
676 return cnt;
679 static int match_eof(char* inbuf, size_t len) {
680 if(!prog_state.eof_marker) return 0;
681 size_t l = strlen(prog_state.eof_marker);
682 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
685 static inline int islb(int p) { return p == '\n' || p == '\r'; }
686 static void chomp(char *s, size_t *len) {
687 while(*len && islb(s[*len-1])) s[--(*len)] = 0;
690 #define MAX_SUBSTS 16
691 static int dispatch_line(char* inbuf, size_t len, char** argv) {
692 char subst_buf[MAX_SUBSTS][4096];
693 static unsigned spinup_counter = 0;
695 if(!prog_state.bulk_bytes)
696 prog_state.lineno++;
697 else if(need_linecounter()) {
698 prog_state.lineno += count_linefeeds(inbuf, len);
701 if(prog_state.skip) {
702 if(!prog_state.bulk_bytes) {
703 prog_state.skip--;
704 return 1;
705 } else {
706 while(len && prog_state.skip) {
707 char *q = mystrnchr(inbuf, '\n', len);
708 if(q) {
709 ptrdiff_t diff = (q - inbuf) + 1;
710 inbuf += diff;
711 len -= diff;
712 prog_state.skip--;
713 } else {
714 return 1;
717 if(!len) return 1;
720 if(!prog_state.cmd_startarg) {
721 write_all(1, inbuf, len);
722 return 1;
725 if(!prog_state.pipe_mode)
726 chomp(inbuf, &len);
728 char *line = inbuf;
729 size_t line_size = len;
731 if(prog_state.subst_entries) {
732 unsigned max_subst = 0;
733 uint32_t* index;
734 sblist_iter(prog_state.subst_entries, index) {
735 if(max_subst >= MAX_SUBSTS) break;
736 char *source = argv[*index + prog_state.cmd_startarg];
737 size_t source_len = strlen(source);
738 int ret;
739 ret = substitute_all(subst_buf[max_subst], 4096,
740 source, source_len,
741 "{}", 2,
742 line, line_size);
743 if(ret == -1) {
744 too_long:
745 dprintf(2, "fatal: line too long for substitution: %s\n", line);
746 return 0;
747 } else if(!ret) {
748 char* lastdot = mystrnrchr_chk(line, '.', line_size);
749 size_t tilLastDot = line_size;
750 if(lastdot) tilLastDot = lastdot - line;
751 ret = substitute_all(subst_buf[max_subst], 4096,
752 source, source_len,
753 "{.}", 3,
754 line, tilLastDot);
755 if(ret == -1) goto too_long;
757 if(ret) {
758 prog_state.cmd_argv[*index] = subst_buf[max_subst];
759 max_subst++;
765 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
766 msleep(rand() % (prog_state.delayedspinup_interval + 1));
767 spinup_counter++;
770 if(free_slots())
771 launch_job(prog_state.threads_running, prog_state.cmd_argv);
772 else if(!prog_state.pipe_mode)
773 launch_job(reap_child(), prog_state.cmd_argv);
775 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
776 write_statefile(prog_state.lineno, prog_state.temp_state);
779 if(prog_state.pipe_mode)
780 pass_stdin(line, line_size);
782 return 1;
785 int main(int argc, char** argv) {
786 unsigned i;
788 char tempdir_buf[256];
790 srand(time(NULL));
792 if(argc > 4096) argc = 4096;
794 prog_state.threads_running = 0;
796 if(parse_args(argc, argv)) return 1;
798 if(prog_state.statefile)
799 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
801 prog_state.tempdir = NULL;
803 if(prog_state.buffered) {
804 prog_state.tempdir = tempdir_buf;
805 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
806 perror("mkdtemp");
807 die("could not create tempdir\n");
809 } else {
810 /* if the stdout/stderr fds are not in O_APPEND mode,
811 the dup()'s of the fds in posix_spawn can cause different
812 file positions, causing the different processes to overwrite each others output.
813 testcase:
814 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
816 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
817 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
820 if(prog_state.cmd_startarg) {
821 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
822 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
824 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
827 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
828 init_queue();
830 prog_state.lineno = 0;
832 size_t left = 0, bytes_read = 0;
833 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
835 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
836 char *buf1 = mem;
837 char *buf2 = mem+chunksize;
838 char *in, *inbuf;
840 int exitcode = 1;
842 while(1) {
843 inbuf = buf1+chunksize-left;
844 memcpy(inbuf, buf2+bytes_read-left, left);
845 ssize_t n = read(0, buf2, chunksize);
846 if(n == -1) {
847 perror("read");
848 goto out;
850 bytes_read = n;
851 left += n;
852 in = inbuf;
853 while(left) {
854 char *p;
855 if(prog_state.pipe_mode && prog_state.bulk_bytes)
856 p = mystrnrchr(in, '\n', left);
857 else
858 p = mystrnchr (in, '\n', left);
860 if(!p) break;
861 ptrdiff_t diff = (p - in) + 1;
862 if(match_eof(in, diff)) {
863 exitcode = 0;
864 goto out;
866 if(!dispatch_line(in, diff, argv))
867 goto out;
868 left -= diff;
869 in += diff;
871 if(!n) {
872 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
873 break;
875 if(left > chunksize) {
876 dprintf(2, "error: input line length exceeds buffer size\n");
877 goto out;
881 exitcode = 0;
883 out:
885 if(prog_state.pipe_mode) {
886 close_pipes();
889 if(prog_state.delayedflush)
890 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
892 while(prog_state.threads_running) reap_child();
894 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
895 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
896 if(prog_state.limits) sblist_free(prog_state.limits);
898 if(prog_state.tempdir)
899 rmdir(prog_state.tempdir);
902 fflush(stdout);
903 fflush(stderr);
906 return exitcode;