remove use of strtoint64()
[rofl0r-jobflow.git] / jobflow.c
blob892253b2aef7d7c794124e4ef932911c36acf4d8
1 /*
2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.4"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/sblist.h"
32 #include "../lib/include/strlib.h"
33 #include "../lib/include/macros.h"
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <stdint.h>
39 #include <stddef.h>
40 #include <errno.h>
41 #include <time.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <sys/mman.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
67 #include <sys/time.h>
69 /* some small helper funcs from libulz */
71 static int msleep(long millisecs) {
72 struct timespec req, rem;
73 req.tv_sec = millisecs / 1000;
74 req.tv_nsec = (millisecs % 1000) * 1000 * 1000;
75 int ret;
76 while((ret = nanosleep(&req, &rem)) == -1 && errno == EINTR) req = rem;
77 return ret;
80 static const char ulz_conv_cypher[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
81 static const size_t ulz_conv_cypher_len = sizeof(ulz_conv_cypher) - 1;
82 static char* ulz_mkdtemp(char* templ) {
83 size_t i, l = strlen(templ);
84 if(l < 6) {
85 errno = EINVAL;
86 return NULL;
88 loop:
89 for(i = l - 6; i < l; i++) templ[i] = ulz_conv_cypher[rand() % ulz_conv_cypher_len];
90 if(mkdir(templ, S_IRWXU) == -1) {
91 if(errno == EEXIST) goto loop;
92 return NULL;
94 return templ;
97 static size_t gen_fn(char* buf, const char* prefix, size_t pl, const char* tmpdir) {
98 size_t tl = strlen(tmpdir);
99 size_t a = 0;
100 memcpy(buf+a, tmpdir, tl);
101 a+=tl;
102 memcpy(buf+a,prefix,pl);
103 a+=pl;
104 memcpy(buf+a,"XXXXXX", 7);
105 return a+6;
108 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
109 * storage. returns size of the string returned in buffer */
110 static size_t mktempdir(const char* prefix, char* buffer, size_t bufsize) {
111 size_t ret, pl = strlen(prefix);
112 if(bufsize < sizeof("/dev/shm/") -1 + pl + sizeof("XXXXXX")) return 0;
113 ret = gen_fn(buffer, prefix, pl, "/dev/shm/");
114 if(!ulz_mkdtemp(buffer)) {
115 ret = gen_fn(buffer, prefix, pl, "/tmp/");
116 if(!ulz_mkdtemp(buffer)) return 0;
118 return ret;
122 typedef struct {
123 pid_t pid;
124 int pipe;
125 posix_spawn_file_actions_t fa;
126 } job_info;
128 typedef struct {
129 int limit;
130 struct rlimit rl;
131 } limit_rec;
133 typedef struct {
134 char temp_state[256];
135 char* cmd_argv[4096];
136 unsigned long long lineno;
137 unsigned numthreads;
138 unsigned threads_running;
139 char* statefile;
140 char* eof_marker;
141 unsigned long long skip;
142 sblist* job_infos;
143 sblist* subst_entries;
144 sblist* limits;
145 unsigned cmd_startarg;
146 char* tempdir;
147 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
148 the top value in ms can be supplied via a command line switch.
149 this option makes only sense if the interval is somewhat smaller than the
150 expected runtime of the average job.
151 this option is useful to not overload a network app due to hundreds of
152 parallel connection tries on startup.
154 int buffered:1; /* write stdout and stderr of each task into a file,
155 and print it to stdout once the process ends.
156 this prevents mixing up of the output of multiple tasks. */
157 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
158 this means faster program execution, but could also be imprecise if the number of
159 jobs is small or smaller than the available threadcount. */
160 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
161 int pipe_mode:1;
162 size_t bulk_bytes;
163 } prog_state_s;
165 prog_state_s prog_state;
168 extern char** environ;
170 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
171 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
172 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
173 return ret > 0 && (size_t) ret < bufsize;
176 void launch_job(size_t jobindex, char** argv) {
177 char stdout_filename_buf[256];
178 char stderr_filename_buf[256];
179 job_info* job = sblist_get(prog_state.job_infos, jobindex);
181 if(job->pid != -1) return;
183 if(prog_state.buffered) {
184 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
185 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
186 dprintf(2, "temp filename too long!\n");
187 return;
191 errno = posix_spawn_file_actions_init(&job->fa);
192 if(errno) goto spawn_error;
194 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
195 if(errno) goto spawn_error;
197 int pipes[2];
198 if(prog_state.pipe_mode) {
199 if(pipe(pipes)) {
200 perror("pipe");
201 goto spawn_error;
203 job->pipe = pipes[1];
204 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
205 if(errno) goto spawn_error;
206 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
207 if(errno) goto spawn_error;
208 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
209 if(errno) goto spawn_error;
212 if(prog_state.buffered) {
213 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
214 if(errno) goto spawn_error;
215 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
216 if(errno) goto spawn_error;
219 if(!prog_state.pipe_mode) {
220 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
221 if(errno) goto spawn_error;
224 if(prog_state.buffered) {
225 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
226 if(errno) goto spawn_error;
227 if(prog_state.join_output)
228 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
229 else
230 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
231 if(errno) goto spawn_error;
234 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
235 if(errno) {
236 spawn_error:
237 job->pid = -1;
238 perror("posix_spawn");
239 } else {
240 prog_state.threads_running++;
241 if(prog_state.limits) {
242 limit_rec* limit;
243 sblist_iter(prog_state.limits, limit) {
244 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
245 perror("prlimit");
249 if(prog_state.pipe_mode)
250 close(pipes[0]);
253 static void dump_output(size_t job_id, int is_stderr) {
254 char out_filename_buf[256];
255 char buf[4096];
256 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
257 size_t nread;
259 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
261 dst = fopen(out_filename_buf, "r");
262 if(dst) {
263 while((nread = fread(buf, 1, sizeof(buf), dst))) {
264 fwrite(buf, 1, nread, out_stream);
265 if(nread < sizeof(buf)) break;
267 fclose(dst);
268 fflush(out_stream);
269 unlink(out_filename_buf);
273 static void write_all(int fd, void* buf, size_t size) {
274 size_t left = size;
275 const char *p = buf;
276 while(1) {
277 if(left == 0) break;
278 ssize_t n = write(fd, p, left);
279 switch(n) {
280 case -1:
281 if(errno == EINTR) continue;
282 else {
283 perror("write");
284 return;
286 default:
287 p += n;
288 left -= n;
293 static void pass_stdin(char *line, size_t len) {
294 static size_t next_child = 0;
295 if(next_child >= sblist_getsize(prog_state.job_infos))
296 next_child = 0;
297 job_info *job = sblist_get(prog_state.job_infos, next_child);
298 write_all(job->pipe, line, len);
299 next_child++;
302 static void close_pipes(void) {
303 size_t i;
304 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
305 job_info *job = sblist_get(prog_state.job_infos, i);
306 close(job->pipe);
310 /* wait till a child exits, reap it, and return its job index for slot reuse */
311 static size_t reap_child(void) {
312 size_t i;
313 job_info* job;
314 int ret, retval;
316 do ret = waitpid(-1, &retval, 0);
317 while(ret == -1 || !WIFEXITED(retval));
319 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
320 job = sblist_get(prog_state.job_infos, i);
321 if(job->pid == ret) {
322 job->pid = -1;
323 posix_spawn_file_actions_destroy(&job->fa);
324 prog_state.threads_running--;
325 if(prog_state.buffered) {
326 dump_output(i, 0);
327 if(!prog_state.join_output)
328 dump_output(i, 1);
330 return i;
333 assert(0);
334 return -1;
337 static size_t free_slots(void) {
338 return prog_state.numthreads - prog_state.threads_running;
341 __attribute__((noreturn))
342 static void die(const char* msg) {
343 dprintf(2, msg);
344 exit(1);
347 static unsigned long parse_human_number(const char* num) {
348 unsigned long ret = 0;
349 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
350 const char* kmg = "KMG";
351 const char* kmgind, *p;
352 ret = atol(num);
353 p = num;
354 while(isdigit(*(++p)));
355 if(*p && (kmgind = strchr(kmg, *p)))
356 ret *= mul[kmgind - kmg];
357 return ret;
360 static int syntax(void) {
361 dprintf(2,
362 "jobflow " VERSION " (C) rofl0r\n"
363 "------------------------\n"
364 "this program is intended to be used as a recipient of another programs output\n"
365 "it launches processes to which the current line can be passed as an argument\n"
366 "using {} for substitution (as in find -exec).\n"
367 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
368 "stdin of child processes. input will be then evenly distributed to jobs,\n"
369 "until EOF is received. we call this 'pipe mode'.\n"
370 "\n"
371 "available options:\n\n"
372 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
373 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
374 "-eof=XXX\n"
375 "-exec ./mycommand {}\n"
376 "\n"
377 "-skip=XXX\n"
378 " XXX=number of entries to skip\n"
379 "-threads=XXX (alternative: -j=XXX)\n"
380 " XXX=number of parallel processes to spawn\n"
381 "-resume\n"
382 " resume from last jobnumber stored in statefile\n"
383 "-eof=XXX\n"
384 " use XXX as the EOF marker on stdin\n"
385 " if the marker is encountered, behave as if stdin was closed\n"
386 " not compatible with pipe/bulk mode\n"
387 "-statefile=XXX\n"
388 " XXX=filename\n"
389 " saves last launched jobnumber into a file\n"
390 "-delayedflush\n"
391 " only write to statefile whenever all processes are busy,\n"
392 " and at program end\n"
393 "-delayedspinup=XXX\n"
394 " XXX=maximum amount of milliseconds\n"
395 " ...to wait when spinning up a fresh set of processes\n"
396 " a random value between 0 and the chosen amount is used to delay initial\n"
397 " spinup.\n"
398 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
399 " activity on program startup\n"
400 "-buffered\n"
401 " store the stdout and stderr of launched processes into a temporary file\n"
402 " which will be printed after a process has finished.\n"
403 " this prevents mixing up of output of different processes.\n"
404 "-joinoutput\n"
405 " if -buffered, write both stdout and stderr into the same file.\n"
406 " this saves the chronological order of the output, and the combined output\n"
407 " will only be printed to stdout.\n"
408 "-bulk=XXX\n"
409 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
410 " this passes (almost) the entire buffer to the next scheduled job.\n"
411 " the passed buffer will be truncated to the last line break boundary,\n"
412 " so jobs always get entire lines to work with.\n"
413 " this option is useful when you have huge input files and relatively short\n"
414 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
415 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
416 " actual memory allocation will be twice the amount passed.\n"
417 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
418 " than that probably doesn't make sense.\n"
419 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
420 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
421 " sets the rlimit of the new created processes.\n"
422 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
423 "-exec command with args\n"
424 " everything past -exec is treated as the command to execute on each line of\n"
425 " stdin received. the line can be passed as an argument using {}.\n"
426 " {.} passes everything before the last dot in a line as an argument.\n"
427 " it is possible to use multiple substitutions inside a single argument,\n"
428 " but currently only of one type.\n"
429 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
430 "\n"
432 return 1;
435 static int parse_args(int argc, char** argv) {
436 op_state op_b, *op = &op_b;
437 op_init(op, argc, argv);
438 char *op_temp;
439 if(op_hasflag(op, SPL("help")))
440 return syntax();
442 op_temp = op_get(op, SPL("threads"));
443 if(!op_temp) op_temp = op_get(op, SPL("j"));
444 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
445 if(x <= 0) die("threadcount must be >= 1\n");
446 prog_state.numthreads = x;
448 op_temp = op_get(op, SPL("statefile"));
449 prog_state.statefile = op_temp;
451 op_temp = op_get(op, SPL("eof"));
452 prog_state.eof_marker = op_temp;
454 op_temp = op_get(op, SPL("skip"));
455 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
456 if(op_hasflag(op, SPL("resume"))) {
457 if(!prog_state.statefile) die("-resume needs -statefile\n");
458 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
459 FILE *f = fopen(prog_state.statefile, "r");
460 if(f) {
461 char nb[64];
462 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
463 fclose(f);
468 prog_state.delayedflush = 0;
469 if(op_hasflag(op, SPL("delayedflush"))) {
470 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
471 prog_state.delayedflush = 1;
474 prog_state.pipe_mode = 0;
476 op_temp = op_get(op, SPL("delayedspinup"));
477 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
479 prog_state.cmd_startarg = 0;
480 prog_state.subst_entries = NULL;
482 if(op_hasflag(op, SPL("exec"))) {
483 uint32_t subst_ent;
484 unsigned i, r = 0;
485 for(i = 1; i < (unsigned) argc; i++) {
486 if(str_equal(argv[i], "-exec") || str_equal(argv[i], "--exec")) {
487 r = i + 1;
488 break;
491 if(r && r < (unsigned) argc) {
492 prog_state.cmd_startarg = r;
495 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
497 // save entries which must be substituted, to save some cycles.
498 for(i = r; i < (unsigned) argc; i++) {
499 subst_ent = i - r;
500 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
501 sblist_add(prog_state.subst_entries, &subst_ent);
504 if(sblist_getsize(prog_state.subst_entries) == 0) {
505 prog_state.pipe_mode = 1;
506 sblist_free(prog_state.subst_entries);
507 prog_state.subst_entries = 0;
511 prog_state.buffered = 0;
512 if(op_hasflag(op, SPL("buffered"))) {
513 prog_state.buffered = 1;
516 prog_state.join_output = 0;
517 if(op_hasflag(op, SPL("joinoutput"))) {
518 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
519 prog_state.join_output = 1;
522 prog_state.bulk_bytes = 0;
523 op_temp = op_get(op, SPL("bulk"));
524 if(op_temp) {
525 prog_state.bulk_bytes = parse_human_number(op_temp);
526 if(prog_state.bulk_bytes % 4096)
527 die("bulk size must be a multiple of 4096\n");
528 } else if(op_hasflag(op, SPL("bulk")))
529 prog_state.bulk_bytes = 4096;
531 prog_state.limits = NULL;
532 op_temp = op_get(op, SPL("limits"));
533 if(op_temp) {
534 unsigned i;
535 char *limits = op_temp;
536 while(1) {
537 limits += strspn(limits, ",");
538 size_t l = strcspn(limits, ",");
539 if(!l) break;
540 size_t l2 = strcspn(limits, "=");
541 if(l2 >= l) die("syntax error in limits argument");
542 limit_rec lim;
543 if(!prog_state.limits)
544 prog_state.limits = sblist_new(sizeof(limit_rec), 4);
545 static const struct { int lim_val; const char lim_name[8]; } lim_tab[] = {
546 { RLIMIT_AS, "mem" },
547 { RLIMIT_CPU, "cpu" },
548 { RLIMIT_STACK, "stack" },
549 { RLIMIT_FSIZE, "fsize" },
550 { RLIMIT_NOFILE, "nofiles" },
552 for(i=0; i<ARRAY_SIZE(lim_tab);++i)
553 if(!strncmp(limits, lim_tab[i].lim_name, l2)) {
554 lim.limit = lim_tab[i].lim_val;
555 break;
557 if(i >= ARRAY_SIZE(lim_tab))
558 die("unknown option passed to -limits");
559 if(getrlimit(lim.limit, &lim.rl) == -1) {
560 perror("getrlimit");
561 die("could not query rlimits");
563 lim.rl.rlim_cur = parse_human_number(limits+l2+1);
564 sblist_add(prog_state.limits, &lim);
565 limits += l;
568 return 0;
571 static void init_queue(void) {
572 unsigned i;
573 job_info ji = {.pid = -1};
575 for(i = 0; i < prog_state.numthreads; i++)
576 sblist_add(prog_state.job_infos, &ji);
579 static void write_statefile(unsigned long long n, const char* tempfile) {
580 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
581 if(fd != -1) {
582 dprintf(fd, "%llu\n", n + 1ULL);
583 close(fd);
584 if(rename(tempfile, prog_state.statefile) == -1)
585 perror("rename");
586 } else
587 perror("open");
590 static int str_here(char* haystack, size_t hay_size, size_t bufpos,
591 char* needle, size_t needle_size) {
592 if(needle_size <= hay_size - bufpos) {
593 if(!memcmp(needle, haystack + bufpos, needle_size))
594 return 1;
596 return 0;
598 // returns numbers of substitutions done, -1 on out of buffer.
599 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
600 int substitute_all(char *dest, ssize_t dest_size,
601 char *src, size_t src_size,
602 char *what, size_t what_size,
603 char *whit, size_t whit_size) {
604 size_t i;
605 int ret = 0;
606 for(i = 0; dest_size > 0 && i < src_size; ) {
607 if(str_here(src, src_size, i, what, what_size)) {
608 if(dest_size < (ssize_t) whit_size) return -1;
609 memcpy(dest, whit, whit_size);
610 dest += whit_size;
611 dest_size -= whit_size;
612 ret++;
613 i += what_size;
614 } else {
615 *dest = src[i];
616 dest++;
617 dest_size--;
618 i++;
621 if(!dest_size) return -1;
622 *dest = 0;
623 return ret;
626 static char* mystrnchr(const char *in, int ch, size_t end) {
627 const char *e = in+end;
628 const char *p = in;
629 while(p != e && *p != ch) p++;
630 if(p != e) return (char*)p;
631 return 0;
633 static char* mystrnrchr(const char *in, int ch, size_t end) {
634 const char *e = in+end-1;
635 const char *p = in;
636 while(p != e && *e != ch) e--;
637 if(*e == ch) return (char*)e;
638 return 0;
640 static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
641 if(!end) return 0;
642 return mystrnrchr(in, ch, end);
645 static int need_linecounter(void) {
646 return !!prog_state.skip || prog_state.statefile;
648 static size_t count_linefeeds(const char *buf, size_t len) {
649 const char *p = buf, *e = buf+len;
650 size_t cnt = 0;
651 while(p < e) {
652 if(*p == '\n') cnt++;
653 p++;
655 return cnt;
658 static int match_eof(char* inbuf, size_t len) {
659 if(!prog_state.eof_marker) return 0;
660 size_t l = strlen(prog_state.eof_marker);
661 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
664 static inline int islb(int p) { return p == '\n' || p == '\r'; }
665 static void chomp(char *s, size_t *len) {
666 while(*len && islb(s[*len-1])) s[--(*len)] = 0;
669 #define MAX_SUBSTS 16
670 static int dispatch_line(char* inbuf, size_t len, char** argv) {
671 char subst_buf[MAX_SUBSTS][4096];
672 static unsigned spinup_counter = 0;
674 stringptr line_b, *line = &line_b;
676 if(!prog_state.bulk_bytes)
677 prog_state.lineno++;
678 else if(need_linecounter()) {
679 prog_state.lineno += count_linefeeds(inbuf, len);
682 if(prog_state.skip) {
683 if(!prog_state.bulk_bytes) {
684 prog_state.skip--;
685 return 1;
686 } else {
687 while(len && prog_state.skip) {
688 char *q = mystrnchr(inbuf, '\n', len);
689 if(q) {
690 ptrdiff_t diff = (q - inbuf) + 1;
691 inbuf += diff;
692 len -= diff;
693 prog_state.skip--;
694 } else {
695 return 1;
698 if(!len) return 1;
701 if(!prog_state.cmd_startarg) {
702 write_all(1, inbuf, len);
703 return 1;
706 if(!prog_state.pipe_mode)
707 chomp(inbuf, &len);
709 line->ptr = inbuf; line->size = len;
711 if(prog_state.subst_entries) {
712 unsigned max_subst = 0;
713 uint32_t* index;
714 sblist_iter(prog_state.subst_entries, index) {
715 if(max_subst >= MAX_SUBSTS) break;
716 char *source = argv[*index + prog_state.cmd_startarg];
717 size_t source_len = strlen(source);
718 int ret;
719 ret = substitute_all(subst_buf[max_subst], 4096,
720 source, source_len,
721 "{}", 2,
722 line->ptr, line->size);
723 if(ret == -1) {
724 too_long:
725 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
726 return 0;
727 } else if(!ret) {
728 char* lastdot = mystrnrchr_chk(line->ptr, '.', line->size);
729 stringptr tilLastDot = *line;
730 if(lastdot) tilLastDot.size = lastdot - line->ptr;
731 ret = substitute_all(subst_buf[max_subst], 4096,
732 source, source_len,
733 "{.}", 3,
734 tilLastDot.ptr, tilLastDot.size);
735 if(ret == -1) goto too_long;
737 if(ret) {
738 prog_state.cmd_argv[*index] = subst_buf[max_subst];
739 max_subst++;
745 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
746 msleep(rand() % (prog_state.delayedspinup_interval + 1));
747 spinup_counter++;
750 if(free_slots())
751 launch_job(prog_state.threads_running, prog_state.cmd_argv);
752 else if(!prog_state.pipe_mode)
753 launch_job(reap_child(), prog_state.cmd_argv);
755 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
756 write_statefile(prog_state.lineno, prog_state.temp_state);
759 if(prog_state.pipe_mode)
760 pass_stdin(line->ptr, line->size);
762 return 1;
765 int main(int argc, char** argv) {
766 unsigned i;
768 char tempdir_buf[256];
770 srand(time(NULL));
772 if(argc > 4096) argc = 4096;
774 prog_state.threads_running = 0;
776 if(parse_args(argc, argv)) return 1;
778 if(prog_state.statefile)
779 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
781 prog_state.tempdir = NULL;
783 if(prog_state.buffered) {
784 prog_state.tempdir = tempdir_buf;
785 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
786 perror("mkdtemp");
787 die("could not create tempdir\n");
789 } else {
790 /* if the stdout/stderr fds are not in O_APPEND mode,
791 the dup()'s of the fds in posix_spawn can cause different
792 file positions, causing the different processes to overwrite each others output.
793 testcase:
794 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
796 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
797 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
800 if(prog_state.cmd_startarg) {
801 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
802 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
804 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
807 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
808 init_queue();
810 prog_state.lineno = 0;
812 size_t left = 0, bytes_read = 0;
813 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
815 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
816 char *buf1 = mem;
817 char *buf2 = mem+chunksize;
818 char *in, *inbuf;
820 int exitcode = 1;
822 while(1) {
823 inbuf = buf1+chunksize-left;
824 memcpy(inbuf, buf2+bytes_read-left, left);
825 ssize_t n = read(0, buf2, chunksize);
826 if(n == -1) {
827 perror("read");
828 goto out;
830 bytes_read = n;
831 left += n;
832 in = inbuf;
833 while(left) {
834 char *p;
835 if(prog_state.pipe_mode && prog_state.bulk_bytes)
836 p = mystrnrchr(in, '\n', left);
837 else
838 p = mystrnchr (in, '\n', left);
840 if(!p) break;
841 ptrdiff_t diff = (p - in) + 1;
842 if(match_eof(in, diff)) {
843 exitcode = 0;
844 goto out;
846 if(!dispatch_line(in, diff, argv))
847 goto out;
848 left -= diff;
849 in += diff;
851 if(!n) {
852 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
853 break;
855 if(left > chunksize) {
856 dprintf(2, "error: input line length exceeds buffer size\n");
857 goto out;
861 exitcode = 0;
863 out:
865 if(prog_state.pipe_mode) {
866 close_pipes();
869 if(prog_state.delayedflush)
870 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
872 while(prog_state.threads_running) reap_child();
874 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
875 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
876 if(prog_state.limits) sblist_free(prog_state.limits);
878 if(prog_state.tempdir)
879 rmdir(prog_state.tempdir);
882 fflush(stdout);
883 fflush(stderr);
886 return exitcode;