allow -bulk argument without size specification
[rofl0r-jobflow.git] / jobflow.c
blob23021df10d06fe683e557e94af77818ea439bc53
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
45 #include <sys/mman.h>
47 /* process handling */
49 #include <fcntl.h>
50 #include <spawn.h>
51 #include <sys/wait.h>
52 #include <sys/stat.h>
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid, ...) {
61 (void) pid;
62 dprintf(2, "prlimit() not implemented on this system\n");
63 errno = EINVAL;
64 return -1;
66 #endif
69 #include <sys/time.h>
71 typedef struct {
72 pid_t pid;
73 int pipe;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 typedef struct {
83 char temp_state[256];
84 char* cmd_argv[4096];
85 unsigned long long lineno;
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 unsigned long long skip;
90 sblist* job_infos;
91 sblist* subst_entries;
92 sblist* limits;
93 unsigned cmd_startarg;
94 char* tempdir;
95 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
96 the top value in ms can be supplied via a command line switch.
97 this option makes only sense if the interval is somewhat smaller than the
98 expected runtime of the average job.
99 this option is useful to not overload a network app due to hundreds of
100 parallel connection tries on startup.
102 int buffered:1; /* write stdout and stderr of each task into a file,
103 and print it to stdout once the process ends.
104 this prevents mixing up of the output of multiple tasks. */
105 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
106 this means faster program execution, but could also be imprecise if the number of
107 jobs is small or smaller than the available threadcount. */
108 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
109 int pipe_mode:1;
110 size_t bulk_bytes;
111 } prog_state_s;
113 prog_state_s prog_state;
116 extern char** environ;
118 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
119 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
120 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
121 return ret > 0 && (size_t) ret < bufsize;
124 void launch_job(size_t jobindex, char** argv) {
125 char stdout_filename_buf[256];
126 char stderr_filename_buf[256];
127 job_info* job = sblist_get(prog_state.job_infos, jobindex);
129 if(job->pid != -1) return;
131 if(prog_state.buffered) {
132 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
133 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
134 dprintf(2, "temp filename too long!\n");
135 return;
139 errno = posix_spawn_file_actions_init(&job->fa);
140 if(errno) goto spawn_error;
142 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
143 if(errno) goto spawn_error;
145 int pipes[2];
146 if(prog_state.pipe_mode) {
147 if(pipe(pipes)) {
148 perror("pipe");
149 goto spawn_error;
151 job->pipe = pipes[1];
152 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
153 if(errno) goto spawn_error;
154 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
155 if(errno) goto spawn_error;
156 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
157 if(errno) goto spawn_error;
160 if(prog_state.buffered) {
161 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
162 if(errno) goto spawn_error;
163 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
164 if(errno) goto spawn_error;
167 if(!prog_state.pipe_mode) {
168 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
169 if(errno) goto spawn_error;
172 if(prog_state.buffered) {
173 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
175 if(prog_state.join_output)
176 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
177 else
178 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
179 if(errno) goto spawn_error;
182 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
183 if(errno) {
184 spawn_error:
185 job->pid = -1;
186 perror("posix_spawn");
187 } else {
188 prog_state.threads_running++;
189 if(prog_state.limits) {
190 limit_rec* limit;
191 sblist_iter(prog_state.limits, limit) {
192 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
193 perror("prlimit");
197 if(prog_state.pipe_mode)
198 close(pipes[0]);
201 static void dump_output(size_t job_id, int is_stderr) {
202 char out_filename_buf[256];
203 char buf[4096];
204 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
205 size_t nread;
207 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
209 dst = fopen(out_filename_buf, "r");
210 if(dst) {
211 while((nread = fread(buf, 1, sizeof(buf), dst))) {
212 fwrite(buf, 1, nread, out_stream);
213 if(nread < sizeof(buf)) break;
215 fclose(dst);
216 fflush(out_stream);
220 static void write_all(int fd, void* buf, size_t size) {
221 size_t left = size;
222 const char *p = buf;
223 while(1) {
224 if(left == 0) break;
225 ssize_t n = write(fd, p, left);
226 switch(n) {
227 case -1:
228 if(errno == EINTR) continue;
229 else {
230 perror("write");
231 return;
233 default:
234 p += n;
235 left -= n;
240 static void pass_stdin(stringptr *line) {
241 static size_t next_child = 0;
242 if(next_child >= sblist_getsize(prog_state.job_infos))
243 next_child = 0;
244 job_info *job = sblist_get(prog_state.job_infos, next_child);
245 write_all(job->pipe, line->ptr, line->size);
246 next_child++;
249 static void close_pipes(void) {
250 size_t i;
251 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
252 job_info *job = sblist_get(prog_state.job_infos, i);
253 close(job->pipe);
257 /* wait till a child exits, reap it, and return its job index for slot reuse */
258 static size_t reap_child(void) {
259 size_t i;
260 job_info* job;
261 int ret, retval;
263 do ret = waitpid(-1, &retval, 0);
264 while(ret == -1 || !WIFEXITED(retval));
266 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
267 job = sblist_get(prog_state.job_infos, i);
268 if(job->pid == ret) {
269 job->pid = -1;
270 posix_spawn_file_actions_destroy(&job->fa);
271 prog_state.threads_running--;
272 if(prog_state.buffered) {
273 dump_output(i, 0);
274 if(!prog_state.join_output)
275 dump_output(i, 1);
277 return i;
280 assert(0);
281 return -1;
284 static size_t free_slots(void) {
285 return prog_state.numthreads - prog_state.threads_running;
288 __attribute__((noreturn))
289 static void die(const char* msg) {
290 dprintf(2, msg);
291 exit(1);
294 static unsigned long parse_human_number(stringptr* num) {
295 unsigned long ret = 0;
296 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
297 const char* kmg = "KMG";
298 char* kmgind;
299 if(num && num->size) {
300 ret = atol(num->ptr);
301 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
302 ret *= mul[kmgind - kmg];
304 return ret;
307 static int syntax(void) {
308 dprintf(2,
309 "jobflow " VERSION " (C) rofl0r\n"
310 "------------------\n"
311 "this program is intended to be used as a recipient of another programs output\n"
312 "it launches processes to which the current line can be passed as an argument\n"
313 "using {} for substitution (as in find -exec).\n"
314 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
315 "stdin of child processes. input will be then evenly distributed to jobs,\n"
316 "until EOF is received.\n"
317 "\n"
318 "available options:\n\n"
319 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
320 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
321 "-exec ./mycommand {}\n"
322 "\n"
323 "-skip=XXX\n"
324 " XXX=number of entries to skip\n"
325 "-threads=XXX\n"
326 " XXX=number of parallel processes to spawn\n"
327 "-resume\n"
328 " resume from last jobnumber stored in statefile\n"
329 "-statefile=XXX\n"
330 " XXX=filename\n"
331 " saves last launched jobnumber into a file\n"
332 "-delayedflush\n"
333 " only write to statefile whenever all processes are busy,\n"
334 " and at program end\n"
335 "-delayedspinup=XXX\n"
336 " XXX=maximum amount of milliseconds\n"
337 " ...to wait when spinning up a fresh set of processes\n"
338 " a random value between 0 and the chosen amount is used to delay initial\n"
339 " spinup.\n"
340 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
341 " activity on program startup\n"
342 "-buffered\n"
343 " store the stdout and stderr of launched processes into a temporary file\n"
344 " which will be printed after a process has finished.\n"
345 " this prevents mixing up of output of different processes.\n"
346 "-joinoutput\n"
347 " if -buffered, write both stdout and stderr into the same file.\n"
348 " this saves the chronological order of the output, and the combined output\n"
349 " will only be printed to stdout.\n"
350 "-bulk=XXX\n"
351 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
352 " this passes (almost) the entire buffer to the next scheduled job.\n"
353 " the passed buffer will be truncated to the last line break boundary,\n"
354 " so jobs always get entire lines to work with.\n"
355 " this option is useful when you have huge input files and relatively short\n"
356 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
357 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
358 " actual memory allocation will be twice the amount passed.\n"
359 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
360 " than that probably doesn't make sense.\n"
361 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
362 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
363 " sets the rlimit of the new created processes.\n"
364 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
365 "-exec command with args\n"
366 " everything past -exec is treated as the command to execute on each line of\n"
367 " stdin received. the line can be passed as an argument using {}.\n"
368 " {.} passes everything before the last dot in a line as an argument.\n"
369 " it is possible to use multiple substitutions inside a single argument,\n"
370 " but currently only of one type.\n"
371 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
372 "\n"
374 return 1;
377 #undef strtoll
378 #define strtoll(a,b,c) strtoint64(a, strlen(a))
379 static int parse_args(int argc, char** argv) {
380 op_state op_b, *op = &op_b;
381 op_init(op, argc, argv);
382 char *op_temp;
383 if(op_hasflag(op, SPL("-help")))
384 return syntax();
386 op_temp = op_get(op, SPL("threads"));
387 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
388 if(x <= 0) die("threadcount must be >= 1\n");
389 prog_state.numthreads = x;
391 op_temp = op_get(op, SPL("statefile"));
392 prog_state.statefile = op_temp;
394 op_temp = op_get(op, SPL("skip"));
395 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
396 if(op_hasflag(op, SPL("resume"))) {
397 if(!prog_state.statefile) die("-resume needs -statefile\n");
398 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
399 FILE *f = fopen(prog_state.statefile, "r");
400 if(f) {
401 char nb[64];
402 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
403 fclose(f);
408 prog_state.delayedflush = 0;
409 if(op_hasflag(op, SPL("delayedflush"))) {
410 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
411 prog_state.delayedflush = 1;
414 prog_state.pipe_mode = 0;
416 op_temp = op_get(op, SPL("delayedspinup"));
417 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
419 prog_state.cmd_startarg = 0;
420 prog_state.subst_entries = NULL;
422 if(op_hasflag(op, SPL("exec"))) {
423 uint32_t subst_ent;
424 unsigned i, r = 0;
425 for(i = 1; i < (unsigned) argc; i++) {
426 if(str_equal(argv[i], "-exec")) {
427 r = i + 1;
428 break;
431 if(r && r < (unsigned) argc) {
432 prog_state.cmd_startarg = r;
435 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
437 // save entries which must be substituted, to save some cycles.
438 for(i = r; i < (unsigned) argc; i++) {
439 subst_ent = i - r;
440 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
441 sblist_add(prog_state.subst_entries, &subst_ent);
444 if(sblist_getsize(prog_state.subst_entries) == 0) {
445 prog_state.pipe_mode = 1;
446 sblist_free(prog_state.subst_entries);
447 prog_state.subst_entries = 0;
451 prog_state.buffered = 0;
452 if(op_hasflag(op, SPL("buffered"))) {
453 prog_state.buffered = 1;
456 prog_state.join_output = 0;
457 if(op_hasflag(op, SPL("joinoutput"))) {
458 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
459 prog_state.join_output = 1;
462 prog_state.bulk_bytes = 0;
463 op_temp = op_get(op, SPL("bulk"));
464 if(op_temp) {
465 SPDECLAREC(value, op_temp);
466 prog_state.bulk_bytes = parse_human_number(value);
467 if(prog_state.bulk_bytes % 4096)
468 die("bulk size must be a multiple of 4096\n");
469 } else if(op_hasflag(op, SPL("bulk")))
470 prog_state.bulk_bytes = 4096;
472 prog_state.limits = NULL;
473 op_temp = op_get(op, SPL("limits"));
474 if(op_temp) {
475 unsigned i;
476 SPDECLAREC(limits, op_temp);
477 stringptrlist* limit_list = stringptr_splitc(limits, ',');
478 stringptrlist* kv;
479 stringptr* key, *value;
480 limit_rec lim;
481 if(stringptrlist_getsize(limit_list)) {
482 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
483 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
484 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
485 if(stringptrlist_getsize(kv) != 2) continue;
486 key = stringptrlist_get(kv, 0);
487 value = stringptrlist_get(kv, 1);
488 if(EQ(key, SPL("mem")))
489 lim.limit = RLIMIT_AS;
490 else if(EQ(key, SPL("cpu")))
491 lim.limit = RLIMIT_CPU;
492 else if(EQ(key, SPL("stack")))
493 lim.limit = RLIMIT_STACK;
494 else if(EQ(key, SPL("fsize")))
495 lim.limit = RLIMIT_FSIZE;
496 else if(EQ(key, SPL("nofiles")))
497 lim.limit = RLIMIT_NOFILE;
498 else
499 die("unknown option passed to -limits");
501 if(getrlimit(lim.limit, &lim.rl) == -1) {
502 perror("getrlimit");
503 die("could not query rlimits");
505 lim.rl.rlim_cur = parse_human_number(value);
506 sblist_add(prog_state.limits, &lim);
507 stringptrlist_free(kv);
509 stringptrlist_free(limit_list);
512 return 0;
515 static void init_queue(void) {
516 unsigned i;
517 job_info ji = {.pid = -1};
519 for(i = 0; i < prog_state.numthreads; i++)
520 sblist_add(prog_state.job_infos, &ji);
523 static void write_statefile(unsigned long long n, const char* tempfile) {
524 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
525 if(fd != -1) {
526 dprintf(fd, "%llu\n", n + 1ULL);
527 close(fd);
528 if(rename(tempfile, prog_state.statefile) == -1)
529 perror("rename");
530 } else
531 perror("open");
534 // returns numbers of substitutions done, -1 on out of buffer.
535 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
536 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
537 size_t i;
538 int ret = 0;
539 for(i = 0; dest_size > 0 && i < source->size; ) {
540 if(stringptr_here(source, i, what)) {
541 if(dest_size < (ssize_t) whit->size) return -1;
542 memcpy(dest, whit->ptr, whit->size);
543 dest += whit->size;
544 dest_size -= whit->size;
545 ret++;
546 i += what->size;
547 } else {
548 *dest = source->ptr[i];
549 dest++;
550 dest_size--;
551 i++;
554 if(!dest_size) return -1;
555 *dest = 0;
556 return ret;
559 static int dispatch_line(char* inbuf, size_t len, char** argv) {
560 char subst_buf[16][4096];
562 stringptr line_b, *line = &line_b;
564 prog_state.lineno++;
565 static unsigned spinup_counter = 0;
568 if(prog_state.skip) {
569 prog_state.skip--;
570 return 1;
572 if(!prog_state.cmd_startarg) {
573 write_all(1, inbuf, len);
574 return 1;
577 line->ptr = inbuf; line->size = len;
579 if(!prog_state.pipe_mode)
580 stringptr_chomp(line);
582 if(prog_state.subst_entries) {
583 unsigned max_subst = 0;
584 uint32_t* index;
585 sblist_iter(prog_state.subst_entries, index) {
586 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
587 int ret;
588 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
589 if(ret == -1) {
590 too_long:
591 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
592 return 0;
593 } else if(!ret) {
594 char* lastdot = stringptr_rchr(line, '.');
595 stringptr tilLastDot = *line;
596 if(lastdot) tilLastDot.size = lastdot - line->ptr;
597 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
598 if(ret == -1) goto too_long;
600 if(ret) {
601 prog_state.cmd_argv[*index] = subst_buf[max_subst];
602 max_subst++;
608 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
609 msleep(rand() % (prog_state.delayedspinup_interval + 1));
610 spinup_counter++;
613 if(free_slots())
614 launch_job(prog_state.threads_running, prog_state.cmd_argv);
615 else if(!prog_state.pipe_mode)
616 launch_job(reap_child(), prog_state.cmd_argv);
618 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
619 write_statefile(prog_state.lineno, prog_state.temp_state);
622 if(prog_state.pipe_mode)
623 pass_stdin(line);
625 return 1;
628 static char* mystrnchr(const char *in, int ch, size_t end) {
629 const char *e = in+end;
630 const char *p = in;
631 while(p != e && *p != ch) p++;
632 if(*p == ch) return (char*)p;
633 return 0;
635 static char* mystrnrchr(const char *in, int ch, size_t end) {
636 const char *e = in+end-1;
637 const char *p = in;
638 while(p != e && *e != ch) e--;
639 if(*e == ch) return (char*)e;
640 return 0;
643 int main(int argc, char** argv) {
644 unsigned i;
646 char tempdir_buf[256];
648 srand(time(NULL));
650 if(argc > 4096) argc = 4096;
652 prog_state.threads_running = 0;
654 if(parse_args(argc, argv)) return 1;
656 if(prog_state.statefile)
657 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
659 prog_state.tempdir = NULL;
661 if(prog_state.buffered) {
662 prog_state.tempdir = tempdir_buf;
663 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
664 perror("mkdtemp");
665 die("could not create tempdir\n");
667 } else {
668 /* if the stdout/stderr fds are not in O_APPEND mode,
669 the dup()'s of the fds in posix_spawn can cause different
670 file positions, causing the different processes to overwrite each others output.
671 testcase:
672 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
674 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
675 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
678 if(prog_state.cmd_startarg) {
679 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
680 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
682 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
685 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
686 init_queue();
688 prog_state.lineno = 0;
690 size_t left = 0;
691 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
693 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
694 char *buf1 = mem;
695 char *buf2 = mem+chunksize;
696 char *in, *inbuf;
698 int exitcode = 1;
700 while(1) {
701 inbuf = buf1+chunksize-left;
702 memcpy(inbuf, buf2+chunksize-left, left);
703 ssize_t n = read(0, buf2, chunksize);
704 if(n == -1) {
705 perror("read");
706 goto out;
708 left += n;
709 in = inbuf;
710 while(left) {
711 char *p;
712 if(prog_state.pipe_mode && prog_state.bulk_bytes)
713 p = mystrnrchr(in, '\n', left);
714 else
715 p = mystrnchr (in, '\n', left);
717 if(!p) break;
718 ptrdiff_t diff = (p - in) + 1;
719 if(!dispatch_line(in, diff, argv))
720 goto out;
721 left -= diff;
722 in += diff;
724 if(!n) {
725 if(left) dispatch_line(in, left, argv);
726 break;
728 if(left > chunksize) {
729 dprintf(2, "error: input line length exceeds buffer size\n");
730 goto out;
734 exitcode = 0;
736 out:
738 if(prog_state.pipe_mode) {
739 close_pipes();
742 if(prog_state.delayedflush)
743 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
745 while(prog_state.threads_running) reap_child();
747 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
748 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
749 if(prog_state.limits) sblist_free(prog_state.limits);
751 if(prog_state.tempdir)
752 rmdir(prog_state.tempdir);
755 fflush(stdout);
756 fflush(stderr);
759 return exitcode;