add EOF marker functionality
[rofl0r-jobflow.git] / jobflow.c
blob7f3a233a80ca83303e1566cb4ab29955d5040b0e
1 /*
2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.3"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
45 #include <sys/mman.h>
47 /* process handling */
49 #include <fcntl.h>
50 #include <spawn.h>
51 #include <sys/wait.h>
52 #include <sys/stat.h>
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid, ...) {
61 (void) pid;
62 dprintf(2, "prlimit() not implemented on this system\n");
63 errno = EINVAL;
64 return -1;
66 #endif
69 #include <sys/time.h>
71 typedef struct {
72 pid_t pid;
73 int pipe;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 typedef struct {
83 char temp_state[256];
84 char* cmd_argv[4096];
85 unsigned long long lineno;
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 char* eof_marker;
90 unsigned long long skip;
91 sblist* job_infos;
92 sblist* subst_entries;
93 sblist* limits;
94 unsigned cmd_startarg;
95 char* tempdir;
96 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
97 the top value in ms can be supplied via a command line switch.
98 this option makes only sense if the interval is somewhat smaller than the
99 expected runtime of the average job.
100 this option is useful to not overload a network app due to hundreds of
101 parallel connection tries on startup.
103 int buffered:1; /* write stdout and stderr of each task into a file,
104 and print it to stdout once the process ends.
105 this prevents mixing up of the output of multiple tasks. */
106 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
107 this means faster program execution, but could also be imprecise if the number of
108 jobs is small or smaller than the available threadcount. */
109 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
110 int pipe_mode:1;
111 size_t bulk_bytes;
112 } prog_state_s;
114 prog_state_s prog_state;
117 extern char** environ;
119 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
120 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
121 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
122 return ret > 0 && (size_t) ret < bufsize;
125 void launch_job(size_t jobindex, char** argv) {
126 char stdout_filename_buf[256];
127 char stderr_filename_buf[256];
128 job_info* job = sblist_get(prog_state.job_infos, jobindex);
130 if(job->pid != -1) return;
132 if(prog_state.buffered) {
133 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
134 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
135 dprintf(2, "temp filename too long!\n");
136 return;
140 errno = posix_spawn_file_actions_init(&job->fa);
141 if(errno) goto spawn_error;
143 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
144 if(errno) goto spawn_error;
146 int pipes[2];
147 if(prog_state.pipe_mode) {
148 if(pipe(pipes)) {
149 perror("pipe");
150 goto spawn_error;
152 job->pipe = pipes[1];
153 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
154 if(errno) goto spawn_error;
155 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
156 if(errno) goto spawn_error;
157 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
158 if(errno) goto spawn_error;
161 if(prog_state.buffered) {
162 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
163 if(errno) goto spawn_error;
164 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
165 if(errno) goto spawn_error;
168 if(!prog_state.pipe_mode) {
169 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
170 if(errno) goto spawn_error;
173 if(prog_state.buffered) {
174 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
175 if(errno) goto spawn_error;
176 if(prog_state.join_output)
177 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
178 else
179 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
180 if(errno) goto spawn_error;
183 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
184 if(errno) {
185 spawn_error:
186 job->pid = -1;
187 perror("posix_spawn");
188 } else {
189 prog_state.threads_running++;
190 if(prog_state.limits) {
191 limit_rec* limit;
192 sblist_iter(prog_state.limits, limit) {
193 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
194 perror("prlimit");
198 if(prog_state.pipe_mode)
199 close(pipes[0]);
202 static void dump_output(size_t job_id, int is_stderr) {
203 char out_filename_buf[256];
204 char buf[4096];
205 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
206 size_t nread;
208 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
210 dst = fopen(out_filename_buf, "r");
211 if(dst) {
212 while((nread = fread(buf, 1, sizeof(buf), dst))) {
213 fwrite(buf, 1, nread, out_stream);
214 if(nread < sizeof(buf)) break;
216 fclose(dst);
217 fflush(out_stream);
218 unlink(out_filename_buf);
222 static void write_all(int fd, void* buf, size_t size) {
223 size_t left = size;
224 const char *p = buf;
225 while(1) {
226 if(left == 0) break;
227 ssize_t n = write(fd, p, left);
228 switch(n) {
229 case -1:
230 if(errno == EINTR) continue;
231 else {
232 perror("write");
233 return;
235 default:
236 p += n;
237 left -= n;
242 static void pass_stdin(stringptr *line) {
243 static size_t next_child = 0;
244 if(next_child >= sblist_getsize(prog_state.job_infos))
245 next_child = 0;
246 job_info *job = sblist_get(prog_state.job_infos, next_child);
247 write_all(job->pipe, line->ptr, line->size);
248 next_child++;
251 static void close_pipes(void) {
252 size_t i;
253 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
254 job_info *job = sblist_get(prog_state.job_infos, i);
255 close(job->pipe);
259 /* wait till a child exits, reap it, and return its job index for slot reuse */
260 static size_t reap_child(void) {
261 size_t i;
262 job_info* job;
263 int ret, retval;
265 do ret = waitpid(-1, &retval, 0);
266 while(ret == -1 || !WIFEXITED(retval));
268 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
269 job = sblist_get(prog_state.job_infos, i);
270 if(job->pid == ret) {
271 job->pid = -1;
272 posix_spawn_file_actions_destroy(&job->fa);
273 prog_state.threads_running--;
274 if(prog_state.buffered) {
275 dump_output(i, 0);
276 if(!prog_state.join_output)
277 dump_output(i, 1);
279 return i;
282 assert(0);
283 return -1;
286 static size_t free_slots(void) {
287 return prog_state.numthreads - prog_state.threads_running;
290 __attribute__((noreturn))
291 static void die(const char* msg) {
292 dprintf(2, msg);
293 exit(1);
296 static unsigned long parse_human_number(stringptr* num) {
297 unsigned long ret = 0;
298 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
299 const char* kmg = "KMG";
300 char* kmgind;
301 if(num && num->size) {
302 ret = atol(num->ptr);
303 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
304 ret *= mul[kmgind - kmg];
306 return ret;
309 static int syntax(void) {
310 dprintf(2,
311 "jobflow " VERSION " (C) rofl0r\n"
312 "------------------------\n"
313 "this program is intended to be used as a recipient of another programs output\n"
314 "it launches processes to which the current line can be passed as an argument\n"
315 "using {} for substitution (as in find -exec).\n"
316 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
317 "stdin of child processes. input will be then evenly distributed to jobs,\n"
318 "until EOF is received.\n"
319 "\n"
320 "available options:\n\n"
321 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
322 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
323 "-eof=XXX\n"
324 "-exec ./mycommand {}\n"
325 "\n"
326 "-skip=XXX\n"
327 " XXX=number of entries to skip\n"
328 "-threads=XXX (alternative: -j=XXX)\n"
329 " XXX=number of parallel processes to spawn\n"
330 "-resume\n"
331 " resume from last jobnumber stored in statefile\n"
332 "-eof=XXX\n"
333 " use XXX as the EOF marker on stdin\n"
334 " if the marker is encountered, behave as if stdin was closed\n"
335 " not compatible with pipe/bulk mode\n"
336 "-statefile=XXX\n"
337 " XXX=filename\n"
338 " saves last launched jobnumber into a file\n"
339 "-delayedflush\n"
340 " only write to statefile whenever all processes are busy,\n"
341 " and at program end\n"
342 "-delayedspinup=XXX\n"
343 " XXX=maximum amount of milliseconds\n"
344 " ...to wait when spinning up a fresh set of processes\n"
345 " a random value between 0 and the chosen amount is used to delay initial\n"
346 " spinup.\n"
347 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
348 " activity on program startup\n"
349 "-buffered\n"
350 " store the stdout and stderr of launched processes into a temporary file\n"
351 " which will be printed after a process has finished.\n"
352 " this prevents mixing up of output of different processes.\n"
353 "-joinoutput\n"
354 " if -buffered, write both stdout and stderr into the same file.\n"
355 " this saves the chronological order of the output, and the combined output\n"
356 " will only be printed to stdout.\n"
357 "-bulk=XXX\n"
358 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
359 " this passes (almost) the entire buffer to the next scheduled job.\n"
360 " the passed buffer will be truncated to the last line break boundary,\n"
361 " so jobs always get entire lines to work with.\n"
362 " this option is useful when you have huge input files and relatively short\n"
363 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
364 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
365 " actual memory allocation will be twice the amount passed.\n"
366 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
367 " than that probably doesn't make sense.\n"
368 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
369 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
370 " sets the rlimit of the new created processes.\n"
371 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
372 "-exec command with args\n"
373 " everything past -exec is treated as the command to execute on each line of\n"
374 " stdin received. the line can be passed as an argument using {}.\n"
375 " {.} passes everything before the last dot in a line as an argument.\n"
376 " it is possible to use multiple substitutions inside a single argument,\n"
377 " but currently only of one type.\n"
378 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
379 "\n"
381 return 1;
384 #undef strtoll
385 #define strtoll(a,b,c) strtoint64(a, strlen(a))
386 static int parse_args(int argc, char** argv) {
387 op_state op_b, *op = &op_b;
388 op_init(op, argc, argv);
389 char *op_temp;
390 if(op_hasflag(op, SPL("help")))
391 return syntax();
393 op_temp = op_get(op, SPL("threads"));
394 if(!op_temp) op_temp = op_get(op, SPL("j"));
395 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
396 if(x <= 0) die("threadcount must be >= 1\n");
397 prog_state.numthreads = x;
399 op_temp = op_get(op, SPL("statefile"));
400 prog_state.statefile = op_temp;
402 op_temp = op_get(op, SPL("eof"));
403 prog_state.eof_marker = op_temp;
405 op_temp = op_get(op, SPL("skip"));
406 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
407 if(op_hasflag(op, SPL("resume"))) {
408 if(!prog_state.statefile) die("-resume needs -statefile\n");
409 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
410 FILE *f = fopen(prog_state.statefile, "r");
411 if(f) {
412 char nb[64];
413 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
414 fclose(f);
419 prog_state.delayedflush = 0;
420 if(op_hasflag(op, SPL("delayedflush"))) {
421 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
422 prog_state.delayedflush = 1;
425 prog_state.pipe_mode = 0;
427 op_temp = op_get(op, SPL("delayedspinup"));
428 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
430 prog_state.cmd_startarg = 0;
431 prog_state.subst_entries = NULL;
433 if(op_hasflag(op, SPL("exec"))) {
434 uint32_t subst_ent;
435 unsigned i, r = 0;
436 for(i = 1; i < (unsigned) argc; i++) {
437 if(str_equal(argv[i], "-exec")) {
438 r = i + 1;
439 break;
442 if(r && r < (unsigned) argc) {
443 prog_state.cmd_startarg = r;
446 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
448 // save entries which must be substituted, to save some cycles.
449 for(i = r; i < (unsigned) argc; i++) {
450 subst_ent = i - r;
451 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
452 sblist_add(prog_state.subst_entries, &subst_ent);
455 if(sblist_getsize(prog_state.subst_entries) == 0) {
456 prog_state.pipe_mode = 1;
457 sblist_free(prog_state.subst_entries);
458 prog_state.subst_entries = 0;
462 prog_state.buffered = 0;
463 if(op_hasflag(op, SPL("buffered"))) {
464 prog_state.buffered = 1;
467 prog_state.join_output = 0;
468 if(op_hasflag(op, SPL("joinoutput"))) {
469 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
470 prog_state.join_output = 1;
473 prog_state.bulk_bytes = 0;
474 op_temp = op_get(op, SPL("bulk"));
475 if(op_temp) {
476 SPDECLAREC(value, op_temp);
477 prog_state.bulk_bytes = parse_human_number(value);
478 if(prog_state.bulk_bytes % 4096)
479 die("bulk size must be a multiple of 4096\n");
480 } else if(op_hasflag(op, SPL("bulk")))
481 prog_state.bulk_bytes = 4096;
483 prog_state.limits = NULL;
484 op_temp = op_get(op, SPL("limits"));
485 if(op_temp) {
486 unsigned i;
487 SPDECLAREC(limits, op_temp);
488 stringptrlist* limit_list = stringptr_splitc(limits, ',');
489 stringptrlist* kv;
490 stringptr* key, *value;
491 limit_rec lim;
492 if(stringptrlist_getsize(limit_list)) {
493 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
494 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
495 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
496 if(stringptrlist_getsize(kv) != 2) continue;
497 key = stringptrlist_get(kv, 0);
498 value = stringptrlist_get(kv, 1);
499 if(EQ(key, SPL("mem")))
500 lim.limit = RLIMIT_AS;
501 else if(EQ(key, SPL("cpu")))
502 lim.limit = RLIMIT_CPU;
503 else if(EQ(key, SPL("stack")))
504 lim.limit = RLIMIT_STACK;
505 else if(EQ(key, SPL("fsize")))
506 lim.limit = RLIMIT_FSIZE;
507 else if(EQ(key, SPL("nofiles")))
508 lim.limit = RLIMIT_NOFILE;
509 else
510 die("unknown option passed to -limits");
512 if(getrlimit(lim.limit, &lim.rl) == -1) {
513 perror("getrlimit");
514 die("could not query rlimits");
516 lim.rl.rlim_cur = parse_human_number(value);
517 sblist_add(prog_state.limits, &lim);
518 stringptrlist_free(kv);
520 stringptrlist_free(limit_list);
523 return 0;
526 static void init_queue(void) {
527 unsigned i;
528 job_info ji = {.pid = -1};
530 for(i = 0; i < prog_state.numthreads; i++)
531 sblist_add(prog_state.job_infos, &ji);
534 static void write_statefile(unsigned long long n, const char* tempfile) {
535 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
536 if(fd != -1) {
537 dprintf(fd, "%llu\n", n + 1ULL);
538 close(fd);
539 if(rename(tempfile, prog_state.statefile) == -1)
540 perror("rename");
541 } else
542 perror("open");
545 // returns numbers of substitutions done, -1 on out of buffer.
546 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
547 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
548 size_t i;
549 int ret = 0;
550 for(i = 0; dest_size > 0 && i < source->size; ) {
551 if(stringptr_here(source, i, what)) {
552 if(dest_size < (ssize_t) whit->size) return -1;
553 memcpy(dest, whit->ptr, whit->size);
554 dest += whit->size;
555 dest_size -= whit->size;
556 ret++;
557 i += what->size;
558 } else {
559 *dest = source->ptr[i];
560 dest++;
561 dest_size--;
562 i++;
565 if(!dest_size) return -1;
566 *dest = 0;
567 return ret;
570 static char* mystrnchr(const char *in, int ch, size_t end) {
571 const char *e = in+end;
572 const char *p = in;
573 while(p != e && *p != ch) p++;
574 if(p != e) return (char*)p;
575 return 0;
577 static char* mystrnrchr(const char *in, int ch, size_t end) {
578 const char *e = in+end-1;
579 const char *p = in;
580 while(p != e && *e != ch) e--;
581 if(*e == ch) return (char*)e;
582 return 0;
585 static int need_linecounter(void) {
586 return !!prog_state.skip || prog_state.statefile;
588 static size_t count_linefeeds(const char *buf, size_t len) {
589 const char *p = buf, *e = buf+len;
590 size_t cnt = 0;
591 while(p < e) {
592 if(*p == '\n') cnt++;
593 p++;
595 return cnt;
598 static int match_eof(char* inbuf, size_t len) {
599 if(!prog_state.eof_marker) return 0;
600 size_t l = strlen(prog_state.eof_marker);
601 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
604 #define MAX_SUBSTS 16
605 static int dispatch_line(char* inbuf, size_t len, char** argv) {
606 char subst_buf[MAX_SUBSTS][4096];
607 static unsigned spinup_counter = 0;
609 stringptr line_b, *line = &line_b;
611 if(!prog_state.bulk_bytes)
612 prog_state.lineno++;
613 else if(need_linecounter()) {
614 prog_state.lineno += count_linefeeds(inbuf, len);
617 if(prog_state.skip) {
618 if(!prog_state.bulk_bytes) {
619 prog_state.skip--;
620 return 1;
621 } else {
622 while(len && prog_state.skip) {
623 char *q = mystrnchr(inbuf, '\n', len);
624 if(q) {
625 ptrdiff_t diff = (q - inbuf) + 1;
626 inbuf += diff;
627 len -= diff;
628 prog_state.skip--;
629 } else {
630 return 1;
633 if(!len) return 1;
636 if(!prog_state.cmd_startarg) {
637 write_all(1, inbuf, len);
638 return 1;
641 line->ptr = inbuf; line->size = len;
643 if(!prog_state.pipe_mode)
644 stringptr_chomp(line);
646 if(prog_state.subst_entries) {
647 unsigned max_subst = 0;
648 uint32_t* index;
649 sblist_iter(prog_state.subst_entries, index) {
650 if(max_subst >= MAX_SUBSTS) break;
651 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
652 int ret;
653 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
654 if(ret == -1) {
655 too_long:
656 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
657 return 0;
658 } else if(!ret) {
659 char* lastdot = stringptr_rchr(line, '.');
660 stringptr tilLastDot = *line;
661 if(lastdot) tilLastDot.size = lastdot - line->ptr;
662 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
663 if(ret == -1) goto too_long;
665 if(ret) {
666 prog_state.cmd_argv[*index] = subst_buf[max_subst];
667 max_subst++;
673 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
674 msleep(rand() % (prog_state.delayedspinup_interval + 1));
675 spinup_counter++;
678 if(free_slots())
679 launch_job(prog_state.threads_running, prog_state.cmd_argv);
680 else if(!prog_state.pipe_mode)
681 launch_job(reap_child(), prog_state.cmd_argv);
683 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
684 write_statefile(prog_state.lineno, prog_state.temp_state);
687 if(prog_state.pipe_mode)
688 pass_stdin(line);
690 return 1;
693 int main(int argc, char** argv) {
694 unsigned i;
696 char tempdir_buf[256];
698 srand(time(NULL));
700 if(argc > 4096) argc = 4096;
702 prog_state.threads_running = 0;
704 if(parse_args(argc, argv)) return 1;
706 if(prog_state.statefile)
707 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
709 prog_state.tempdir = NULL;
711 if(prog_state.buffered) {
712 prog_state.tempdir = tempdir_buf;
713 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
714 perror("mkdtemp");
715 die("could not create tempdir\n");
717 } else {
718 /* if the stdout/stderr fds are not in O_APPEND mode,
719 the dup()'s of the fds in posix_spawn can cause different
720 file positions, causing the different processes to overwrite each others output.
721 testcase:
722 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
724 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
725 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
728 if(prog_state.cmd_startarg) {
729 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
730 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
732 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
735 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
736 init_queue();
738 prog_state.lineno = 0;
740 size_t left = 0, bytes_read = 0;
741 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
743 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
744 char *buf1 = mem;
745 char *buf2 = mem+chunksize;
746 char *in, *inbuf;
748 int exitcode = 1;
750 while(1) {
751 inbuf = buf1+chunksize-left;
752 memcpy(inbuf, buf2+bytes_read-left, left);
753 ssize_t n = read(0, buf2, chunksize);
754 if(n == -1) {
755 perror("read");
756 goto out;
758 bytes_read = n;
759 left += n;
760 in = inbuf;
761 while(left) {
762 char *p;
763 if(prog_state.pipe_mode && prog_state.bulk_bytes)
764 p = mystrnrchr(in, '\n', left);
765 else
766 p = mystrnchr (in, '\n', left);
768 if(!p) break;
769 ptrdiff_t diff = (p - in) + 1;
770 if(match_eof(in, diff)) {
771 exitcode = 0;
772 goto out;
774 if(!dispatch_line(in, diff, argv))
775 goto out;
776 left -= diff;
777 in += diff;
779 if(!n) {
780 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
781 break;
783 if(left > chunksize) {
784 dprintf(2, "error: input line length exceeds buffer size\n");
785 goto out;
789 exitcode = 0;
791 out:
793 if(prog_state.pipe_mode) {
794 close_pipes();
797 if(prog_state.delayedflush)
798 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
800 while(prog_state.threads_running) reap_child();
802 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
803 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
804 if(prog_state.limits) sblist_free(prog_state.limits);
806 if(prog_state.tempdir)
807 rmdir(prog_state.tempdir);
810 fflush(stdout);
811 fflush(stderr);
814 return exitcode;