add a test for -limit functionality
[rofl0r-jobflow.git] / jobflow.c
blobc090f822ba154cbd34160b58abd0e593e2262eda
1 /*
2 MIT License
3 Copyright (C) 2012-2021 rofl0r
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the “Software”), to deal
7 in the Software without restriction, including without limitation the rights
8 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the Software is
10 furnished to do so, subject to the following conditions:
12 The above copyright notice and this permission notice shall be included in all
13 copies or substantial portions of the Software.
15 THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 SOFTWARE.
25 #define VERSION "1.3.1"
28 #undef _POSIX_C_SOURCE
29 #define _POSIX_C_SOURCE 200809L
30 #undef _XOPEN_SOURCE
31 #define _XOPEN_SOURCE 700
32 #undef _GNU_SOURCE
33 #define _GNU_SOURCE
35 #include "sblist.h"
37 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
39 #include <stdio.h>
40 #include <string.h>
41 #include <stdlib.h>
42 #include <stdbool.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <stddef.h>
46 #include <errno.h>
47 #include <time.h>
48 #include <assert.h>
49 #include <ctype.h>
50 #include <sys/mman.h>
52 /* process handling */
54 #include <fcntl.h>
55 #include <spawn.h>
56 #include <sys/wait.h>
57 #include <sys/stat.h>
59 #include <sys/resource.h>
61 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
62 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
63 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
64 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
65 static int prlimit(int pid, ...) {
66 (void) pid;
67 dprintf(2, "prlimit() not implemented on this system\n");
68 errno = EINVAL;
69 return -1;
71 #endif
73 #include <sys/time.h>
75 /* some small helper funcs from libulz */
77 static int msleep(long millisecs) {
78 struct timespec req, rem;
79 req.tv_sec = millisecs / 1000;
80 req.tv_nsec = (millisecs % 1000) * 1000 * 1000;
81 int ret;
82 while((ret = nanosleep(&req, &rem)) == -1 && errno == EINTR) req = rem;
83 return ret;
86 static const char ulz_conv_cypher[] =
87 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
88 #define ulz_conv_cypher_len (sizeof(ulz_conv_cypher) - 1)
89 static char* ulz_mkdtemp(char* templ) {
90 size_t i, l = strlen(templ);
91 if(l < 6) {
92 errno = EINVAL;
93 return NULL;
95 loop:
96 for(i = l - 6; i < l; i++)
97 templ[i] = ulz_conv_cypher[rand() % ulz_conv_cypher_len];
98 if(mkdir(templ, S_IRWXU) == -1) {
99 if(errno == EEXIST) goto loop;
100 return NULL;
102 return templ;
105 static size_t gen_fn(char* buf, const char* prefix, size_t pl, const char* tmpdir) {
106 size_t tl = strlen(tmpdir);
107 size_t a = 0;
108 memcpy(buf+a, tmpdir, tl);
109 a+=tl;
110 memcpy(buf+a,prefix,pl);
111 a+=pl;
112 memcpy(buf+a,"XXXXXX", 7);
113 return a+6;
116 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
117 * storage. returns size of the string returned in buffer */
118 static size_t mktempdir(const char* prefix, char* buffer, size_t bufsize) {
119 size_t ret, pl = strlen(prefix);
120 if(bufsize < sizeof("/dev/shm/") -1 + pl + sizeof("XXXXXX")) return 0;
121 ret = gen_fn(buffer, prefix, pl, "/dev/shm/");
122 if(!ulz_mkdtemp(buffer)) {
123 ret = gen_fn(buffer, prefix, pl, "/tmp/");
124 if(!ulz_mkdtemp(buffer)) return 0;
126 return ret;
130 typedef struct {
131 pid_t pid;
132 int pipe;
133 posix_spawn_file_actions_t fa;
134 } job_info;
136 typedef struct {
137 int limit;
138 struct rlimit rl;
139 } limit_rec;
141 typedef struct {
142 char temp_state[256];
143 char* cmd_argv[4096];
144 sblist* job_infos;
145 sblist* subst_entries;
146 sblist* limits;
147 char* tempdir;
148 unsigned long long lineno;
150 char* statefile;
151 char* eof_marker;
152 unsigned long numthreads;
153 unsigned long threads_running;
154 unsigned long skip;
155 unsigned long count;
156 unsigned long delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
157 the top value in ms can be supplied via a command line switch.
158 this option makes only sense if the interval is somewhat smaller than the
159 expected runtime of the average job.
160 this option is useful to not overload a network app due to hundreds of
161 parallel connection tries on startup.
163 unsigned long bulk_bytes;
165 bool pipe_mode;
166 bool use_seqnr;
167 bool buffered; /* write stdout and stderr of each task into a file,
168 and print it to stdout once the process ends.
169 this prevents mixing up of the output of multiple tasks. */
170 bool delayedflush; /* only write to statefile whenever all processes are busy, and at program end.
171 this means faster program execution, but could also be imprecise if the number of
172 jobs is small or smaller than the available threadcount. */
173 bool join_output; /* join stdout and stderr of launched jobs into stdout */
175 unsigned cmd_startarg;
176 } prog_state_s;
178 static prog_state_s prog_state;
181 extern char** environ;
183 static int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
184 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
185 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
186 return ret > 0 && (size_t) ret < bufsize;
189 static void launch_job(size_t jobindex, char** argv) {
190 char stdout_filename_buf[256];
191 char stderr_filename_buf[256];
192 job_info* job = sblist_get(prog_state.job_infos, jobindex);
194 if(job->pid != -1) return;
196 if(prog_state.buffered) {
197 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
198 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
199 dprintf(2, "temp filename too long!\n");
200 return;
204 errno = posix_spawn_file_actions_init(&job->fa);
205 if(errno) goto spawn_error;
207 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
208 if(errno) goto spawn_error;
210 int pipes[2];
211 if(prog_state.pipe_mode) {
212 if(pipe(pipes)) {
213 perror("pipe");
214 goto spawn_error;
216 job->pipe = pipes[1];
217 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
218 if(errno) goto spawn_error;
219 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
220 if(errno) goto spawn_error;
221 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
222 if(errno) goto spawn_error;
225 if(prog_state.buffered) {
226 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
227 if(errno) goto spawn_error;
228 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
229 if(errno) goto spawn_error;
232 if(!prog_state.pipe_mode) {
233 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
234 if(errno) goto spawn_error;
237 if(prog_state.buffered) {
238 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
239 if(errno) goto spawn_error;
240 if(prog_state.join_output)
241 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
242 else
243 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
244 if(errno) goto spawn_error;
247 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
248 if(errno) {
249 spawn_error:
250 job->pid = -1;
251 perror("posix_spawn");
252 } else {
253 prog_state.threads_running++;
254 if(prog_state.limits) {
255 limit_rec* limit;
256 sblist_iter(prog_state.limits, limit) {
257 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
258 perror("prlimit");
262 if(prog_state.pipe_mode)
263 close(pipes[0]);
266 static void dump_output(size_t job_id, int is_stderr) {
267 char out_filename_buf[256];
268 char buf[4096];
269 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
270 size_t nread;
272 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
274 dst = fopen(out_filename_buf, "r");
275 if(dst) {
276 while((nread = fread(buf, 1, sizeof(buf), dst))) {
277 fwrite(buf, 1, nread, out_stream);
278 if(nread < sizeof(buf)) break;
280 fclose(dst);
281 fflush(out_stream);
282 unlink(out_filename_buf);
286 static void write_all(int fd, void* buf, size_t size) {
287 size_t left = size;
288 const char *p = buf;
289 while(1) {
290 if(left == 0) break;
291 ssize_t n = write(fd, p, left);
292 switch(n) {
293 case -1:
294 if(errno == EINTR) continue;
295 else {
296 perror("write");
297 return;
299 default:
300 p += n;
301 left -= n;
306 static void pass_stdin(char *line, size_t len) {
307 static size_t next_child = 0;
308 if(next_child >= sblist_getsize(prog_state.job_infos))
309 next_child = 0;
310 job_info *job = sblist_get(prog_state.job_infos, next_child);
311 write_all(job->pipe, line, len);
312 next_child++;
315 static void close_pipes(void) {
316 size_t i;
317 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
318 job_info *job = sblist_get(prog_state.job_infos, i);
319 close(job->pipe);
323 /* wait till a child exits, reap it, and return its job index for slot reuse */
324 static size_t reap_child(int *retval) {
325 size_t i;
326 job_info* job;
327 int ret;
329 do ret = waitpid(-1, retval, 0);
330 while(ret == -1 && errno == EINTR);
331 if(ret == -1) abort();
333 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
334 job = sblist_get(prog_state.job_infos, i);
335 if(job->pid == ret) {
336 job->pid = -1;
337 posix_spawn_file_actions_destroy(&job->fa);
338 prog_state.threads_running--;
339 if(prog_state.buffered) {
340 dump_output(i, 0);
341 if(!prog_state.join_output)
342 dump_output(i, 1);
344 return i;
347 assert(0);
348 return -1;
351 static size_t free_slots(void) {
352 return prog_state.numthreads - prog_state.threads_running;
355 #define die(...) do { dprintf(2, "error: " __VA_ARGS__); exit(1); } while(0)
357 static unsigned long parse_human_number(const char* num) {
358 unsigned long ret = 0;
359 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
360 const char* kmg = "KMG";
361 const char* kmgind, *p;
362 ret = atol(num);
363 p = num;
364 while(isdigit(*(++p)));
365 if(*p && (kmgind = strchr(kmg, *p)))
366 ret *= mul[kmgind - kmg];
367 return ret;
370 static int syntax(void) {
371 dprintf(2,
372 "jobflow " VERSION " (C) rofl0r\n"
373 "------------------------\n"
374 "this program is intended to be used as a recipient of another programs output.\n"
375 "it launches processes to which the current line can be passed as an argument\n"
376 "using {} for substitution (as in find -exec).\n"
377 "if no input substitution argument ({} or {.}) is provided, input is piped into\n"
378 "stdin of child processes. input will be then evenly distributed to jobs,\n"
379 "until EOF is received. we call this 'pipe mode'.\n"
380 "\n"
381 "available options:\n\n"
382 "-skip N -count N -threads N -resume -statefile=/tmp/state -delayedflush\n"
383 "-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
384 "-eof=XXX\n"
385 "-exec ./mycommand {}\n"
386 "\n"
387 "-skip N\n"
388 " N=number of entries to skip\n"
389 "-count N\n"
390 " N=only process count lines (after skipping)\n"
391 "-threads N (alternative: -j N)\n"
392 " N=number of parallel processes to spawn\n"
393 "-resume\n"
394 " resume from last jobnumber stored in statefile\n"
395 "-eof XXX\n"
396 " use XXX as the EOF marker on stdin\n"
397 " if the marker is encountered, behave as if stdin was closed\n"
398 " not compatible with pipe/bulk mode\n"
399 "-statefile XXX\n"
400 " XXX=filename\n"
401 " saves last launched jobnumber into a file\n"
402 "-delayedflush\n"
403 " only write to statefile whenever all processes are busy,\n"
404 " and at program end\n"
405 "-delayedspinup N\n"
406 " N=maximum amount of milliseconds\n"
407 " ...to wait when spinning up a fresh set of processes\n"
408 " a random value between 0 and the chosen amount is used to delay initial\n"
409 " spinup.\n"
410 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
411 " activity on program startup\n"
412 "-buffered\n"
413 " store the stdout and stderr of launched processes into a temporary file\n"
414 " which will be printed after a process has finished.\n"
415 " this prevents mixing up of output of different processes.\n"
416 "-joinoutput\n"
417 " if -buffered, write both stdout and stderr into the same file.\n"
418 " this saves the chronological order of the output, and the combined output\n"
419 " will only be printed to stdout.\n"
420 "-bulk N\n"
421 " do bulk copies with a buffer of N bytes. only usable in pipe mode.\n"
422 " this passes (almost) the entire buffer to the next scheduled job.\n"
423 " the passed buffer will be truncated to the last line break boundary,\n"
424 " so jobs always get entire lines to work with.\n"
425 " this option is useful when you have huge input files and relatively short\n"
426 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
427 " N must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
428 " actual memory allocation will be twice the amount passed.\n"
429 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
430 " than that probably doesn't make sense.\n"
431 "-limits [mem=N,cpu=N,stack=N,fsize=N,nofiles=N]\n"
432 " sets the rlimit of the new created processes.\n"
433 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
434 "-exec command with args\n"
435 " everything past -exec is treated as the command to execute on each line of\n"
436 " stdin received. the line can be passed as an argument using {}.\n"
437 " {.} passes everything before the last dot in a line as an argument.\n"
438 " {#} will be replaced with the sequence (aka line) number.\n"
439 " usage of {#} does not affect the decision whether pipe mode is used.\n"
440 " it is possible to use multiple substitutions inside a single argument,\n"
441 " but only of one type.\n"
442 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
443 "\n"
445 return 1;
448 static int parse_args(unsigned argc, char** argv) {
449 unsigned i, j, r = 0;
450 static bool resume = 0;
451 static char *limits = 0;
452 static const struct {
453 const char lname[14];
454 const char sname;
455 const char flag;
456 union {
457 bool *b;
458 unsigned long *i;
459 char **s;
460 } dest;
461 } opt_tab[] = {
462 {"threads", 'j', 'i', .dest.i = &prog_state.numthreads },
463 {"statefile", 0, 's', .dest.s = &prog_state.statefile },
464 {"eof", 0, 's', .dest.s = &prog_state.eof_marker },
465 {"skip", 0, 'i', .dest.i = &prog_state.skip },
466 {"count", 0, 'i', .dest.i = &prog_state.count },
467 {"resume", 0, 'b', .dest.b = &resume },
468 {"delayedflush", 0, 'b', .dest.b = &prog_state.delayedflush },
469 {"delayedspinup", 0, 'i', .dest.i = &prog_state.delayedspinup_interval },
470 {"buffered", 0, 'b', .dest.b =&prog_state.buffered},
471 {"joinoutput", 0, 'b', .dest.b =&prog_state.join_output},
472 {"bulk", 0, 'i', .dest.i = &prog_state.bulk_bytes},
473 {"limits", 0, 's', .dest.s = &limits},
476 prog_state.numthreads = 1;
477 prog_state.count = -1UL;
479 for(i=1; i<argc; ++i) {
480 char *p = argv[i], *q = strchr(p, '=');
481 if(*(p++) != '-') die("expected option instead of %s\n", argv[i]);
482 if(*p == '-') p++;
483 if(!*p) die("invalid option %s\n", argv[i]);
484 for(j=0;j<ARRAY_SIZE(opt_tab);++j) {
485 if(((!p[1] || p[1] == '=') && *p == opt_tab[j].sname) ||
486 (!strcmp(p, opt_tab[j].lname)) ||
487 (q && strlen(opt_tab[j].lname) == q-p && !strncmp(p, opt_tab[j].lname, q-p))) {
488 switch(opt_tab[j].flag) {
489 case 'b': *opt_tab[j].dest.b=1; break;
490 case 'i': case 's':
491 if(!q) {
492 if(argc <= i+1 || argv[i+1][0] == '-') {
493 e_expect_op:;
494 die("option %s requires operand\n", argv[i]);
496 q = argv[++i];
497 } else {
498 if(*(++q) == 0) goto e_expect_op;
500 if(opt_tab[j].flag == 'i') {
501 if(!isdigit(*q))
502 die("expected numeric operand for %s at %s\n", p, q);
503 *opt_tab[j].dest.i=parse_human_number(q);
504 } else
505 *opt_tab[j].dest.s=q;
506 break;
508 break;
511 if(j>=ARRAY_SIZE(opt_tab)) {
512 if(!strcmp(p, "exec")) {
513 r = i+1;
514 break;
515 } else if(!strcmp(p, "help")) {
516 return syntax();
517 } else die("unknown option %s\n", argv[i]);
521 if((long)prog_state.numthreads <= 0) die("threadcount must be >= 1\n");
523 if(resume) {
524 if(!prog_state.statefile) die("-resume needs -statefile\n");
525 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
526 FILE *f = fopen(prog_state.statefile, "r");
527 if(f) {
528 char nb[64];
529 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
530 fclose(f);
535 if(prog_state.delayedflush && !prog_state.statefile)
536 die("-delayedflush needs -statefile\n");
538 prog_state.pipe_mode = 1;
539 prog_state.cmd_startarg = r;
540 prog_state.subst_entries = NULL;
542 if(r) {
543 uint32_t subst_ent;
544 if(r < (unsigned) argc) {
545 prog_state.cmd_startarg = r;
546 } else die("-exec without arguments\n");
548 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
550 // save entries which must be substituted, to save some cycles.
551 for(i = r; i < (unsigned) argc; i++) {
552 subst_ent = i - r;
553 char *a, *b, *c=0;
554 if((a = strstr(argv[i], "{}")) ||
555 (b = strstr(argv[i], "{.}")) ||
556 (c = strstr(argv[i], "{#}"))) {
557 if(!c) prog_state.pipe_mode = 0;
558 else prog_state.use_seqnr = 1;
559 sblist_add(prog_state.subst_entries, &subst_ent);
562 if(sblist_getsize(prog_state.subst_entries) == 0) {
563 sblist_free(prog_state.subst_entries);
564 prog_state.subst_entries = 0;
568 if(prog_state.join_output && !prog_state.buffered)
569 die("-joinoutput needs -buffered\n");
571 if(prog_state.bulk_bytes % 4096)
572 die("bulk size must be a multiple of 4096\n");
574 if(limits) {
575 unsigned i;
576 while(1) {
577 limits += strspn(limits, ",");
578 size_t l = strcspn(limits, ",");
579 if(!l) break;
580 size_t l2 = strcspn(limits, "=");
581 if(l2 >= l) die("syntax error in limits argument\n");
582 limit_rec lim;
583 if(!prog_state.limits)
584 prog_state.limits = sblist_new(sizeof(limit_rec), 4);
585 static const struct { int lim_val; const char lim_name[8]; } lim_tab[] = {
586 { RLIMIT_AS, "mem" },
587 { RLIMIT_CPU, "cpu" },
588 { RLIMIT_STACK, "stack" },
589 { RLIMIT_FSIZE, "fsize" },
590 { RLIMIT_NOFILE, "nofiles" },
592 for(i=0; i<ARRAY_SIZE(lim_tab);++i)
593 if(!strncmp(limits, lim_tab[i].lim_name, l2)) {
594 lim.limit = lim_tab[i].lim_val;
595 break;
597 if(i >= ARRAY_SIZE(lim_tab))
598 die("unknown option passed to -limits\n");
599 if(getrlimit(lim.limit, &lim.rl) == -1) {
600 perror("getrlimit");
601 die("could not query rlimits\n");
603 lim.rl.rlim_cur = parse_human_number(limits+l2+1);
604 sblist_add(prog_state.limits, &lim);
605 limits += l;
608 return 0;
611 static void init_queue(void) {
612 unsigned i;
613 job_info ji = {.pid = -1};
615 for(i = 0; i < prog_state.numthreads; i++)
616 sblist_add(prog_state.job_infos, &ji);
619 static void write_statefile(unsigned long long n, const char* tempfile) {
620 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
621 if(fd != -1) {
622 dprintf(fd, "%llu\n", n + 1ULL);
623 close(fd);
624 if(rename(tempfile, prog_state.statefile) == -1)
625 perror("rename");
626 } else
627 perror("open");
630 static int str_here(char* haystack, size_t hay_size, size_t bufpos,
631 char* needle, size_t needle_size) {
632 if(needle_size <= hay_size - bufpos) {
633 if(!memcmp(needle, haystack + bufpos, needle_size))
634 return 1;
636 return 0;
638 // returns numbers of substitutions done, -1 on out of buffer.
639 // dest is always overwritten. if no substitutions were done, it contains a copy of source.
640 static int substitute_all(char *dest, ssize_t dest_size,
641 char *src, size_t src_size,
642 char *what, size_t what_size,
643 char *whit, size_t whit_size) {
644 size_t i;
645 int ret = 0;
646 for(i = 0; dest_size > 0 && i < src_size; ) {
647 if(str_here(src, src_size, i, what, what_size)) {
648 if(dest_size < (ssize_t) whit_size) return -1;
649 memcpy(dest, whit, whit_size);
650 dest += whit_size;
651 dest_size -= whit_size;
652 ret++;
653 i += what_size;
654 } else {
655 *dest = src[i];
656 dest++;
657 dest_size--;
658 i++;
661 if(!dest_size) return -1;
662 *dest = 0;
663 return ret;
666 static char* mystrnchr(const char *in, int ch, size_t end) {
667 const char *e = in+end;
668 const char *p = in;
669 while(p != e && *p != ch) p++;
670 if(p != e) return (char*)p;
671 return 0;
673 static char* mystrnrchr(const char *in, int ch, size_t end) {
674 const char *e = in+end-1;
675 const char *p = in;
676 while(p != e && *e != ch) e--;
677 if(*e == ch) return (char*)e;
678 return 0;
680 static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
681 if(!end) return 0;
682 return mystrnrchr(in, ch, end);
685 static int need_linecounter(void) {
686 return !!prog_state.skip || prog_state.statefile ||
687 prog_state.use_seqnr || prog_state.count != -1UL;
689 static size_t count_linefeeds(const char *buf, size_t len) {
690 const char *p = buf, *e = buf+len;
691 size_t cnt = 0;
692 while(p < e) {
693 if(*p == '\n') cnt++;
694 p++;
696 return cnt;
699 static int match_eof(char* inbuf, size_t len) {
700 if(!prog_state.eof_marker) return 0;
701 size_t l = strlen(prog_state.eof_marker);
702 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
705 static inline int islb(int p) { return p == '\n' || p == '\r'; }
706 static void chomp(char *s, size_t *len) {
707 while(*len && islb(s[*len-1])) s[--(*len)] = 0;
710 static int process_failed(int retval) {
711 return WIFSIGNALED(retval) ||
712 (WIFEXITED(retval) && WEXITSTATUS(retval));
715 #define MAX_SUBSTS 16
716 static int dispatch_line(char* inbuf, size_t len, char** argv) {
717 char subst_buf[MAX_SUBSTS][4096];
718 static unsigned spinup_counter = 0;
720 if(!prog_state.bulk_bytes)
721 prog_state.lineno++;
722 else if(need_linecounter()) {
723 prog_state.lineno += count_linefeeds(inbuf, len);
726 if(prog_state.skip) {
727 if(!prog_state.bulk_bytes) {
728 prog_state.skip--;
729 return 1;
730 } else {
731 while(len && prog_state.skip) {
732 char *q = mystrnchr(inbuf, '\n', len);
733 if(q) {
734 ptrdiff_t diff = (q - inbuf) + 1;
735 inbuf += diff;
736 len -= diff;
737 prog_state.skip--;
738 } else {
739 return 1;
742 if(!len) return 1;
744 } else if(prog_state.count != -1UL) {
745 if(!prog_state.count) return -1;
746 --prog_state.count;
749 if(!prog_state.cmd_startarg) {
750 write_all(1, inbuf, len);
751 return 1;
754 if(!prog_state.pipe_mode)
755 chomp(inbuf, &len);
757 char *line = inbuf;
758 size_t line_size = len;
759 int ret;
761 if(prog_state.subst_entries) {
762 unsigned max_subst = 0;
763 uint32_t* index;
764 sblist_iter(prog_state.subst_entries, index) {
765 if(max_subst >= MAX_SUBSTS) break;
766 char *source = argv[*index + prog_state.cmd_startarg];
767 size_t source_len = strlen(source);
768 ret = substitute_all(subst_buf[max_subst], 4096,
769 source, source_len,
770 "{}", 2,
771 line, line_size);
772 if(ret == -1) {
773 too_long:
774 dprintf(2, "fatal: line too long for substitution: %s\n", line);
775 return 0;
776 } else if(!ret) {
777 char* lastdot = mystrnrchr_chk(line, '.', line_size);
778 size_t tilLastDot = line_size;
779 if(lastdot) tilLastDot = lastdot - line;
780 ret = substitute_all(subst_buf[max_subst], 4096,
781 source, source_len,
782 "{.}", 3,
783 line, tilLastDot);
784 if(ret == -1) goto too_long;
786 if(!ret) {
787 char lineno[32];
788 sprintf(lineno, "%llu", prog_state.lineno);
789 ret = substitute_all(subst_buf[max_subst], 4096,
790 source, source_len,
791 "{#}", 3,
792 lineno, strlen(lineno));
793 if(ret == -1) goto too_long;
795 if(ret) {
796 prog_state.cmd_argv[*index] = subst_buf[max_subst];
797 max_subst++;
803 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
804 msleep(rand() % (prog_state.delayedspinup_interval + 1));
805 spinup_counter++;
808 ret = 1;
809 if(free_slots())
810 launch_job(prog_state.threads_running, prog_state.cmd_argv);
811 else if(!prog_state.pipe_mode) {
812 int retval;
813 launch_job(reap_child(&retval), prog_state.cmd_argv);
814 ret = !process_failed(retval);
817 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
818 write_statefile(prog_state.lineno, prog_state.temp_state);
821 if(prog_state.pipe_mode)
822 pass_stdin(line, line_size);
824 return ret;
827 int main(int argc, char** argv) {
828 unsigned i;
830 char tempdir_buf[256];
832 srand(time(NULL));
834 if(argc > 4096) argc = 4096;
836 prog_state.threads_running = 0;
838 if(parse_args(argc, argv)) return 1;
840 if(prog_state.statefile)
841 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
843 prog_state.tempdir = NULL;
845 if(prog_state.buffered) {
846 prog_state.tempdir = tempdir_buf;
847 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
848 perror("mkdtemp");
849 die("could not create tempdir\n");
851 } else {
852 /* if the stdout/stderr fds are not in O_APPEND mode,
853 the dup()'s of the fds in posix_spawn can cause different
854 file positions, causing the different processes to overwrite each others output.
855 testcase:
856 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
858 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
859 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
862 if(prog_state.cmd_startarg) {
863 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
864 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
866 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
869 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
870 init_queue();
872 prog_state.lineno = 0;
874 size_t left = 0, bytes_read = 0;
875 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
877 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
878 char *buf1 = mem;
879 char *buf2 = mem+chunksize;
880 char *in, *inbuf;
882 int exitcode = 1;
884 while(1) {
885 inbuf = buf1+chunksize-left;
886 memcpy(inbuf, buf2+bytes_read-left, left);
887 ssize_t n = read(0, buf2, chunksize);
888 if(n == -1) {
889 perror("read");
890 goto out;
892 bytes_read = n;
893 left += n;
894 in = inbuf;
895 while(left) {
896 char *p;
897 if(prog_state.pipe_mode && prog_state.bulk_bytes)
898 p = mystrnrchr(in, '\n', left);
899 else
900 p = mystrnchr (in, '\n', left);
902 if(!p) break;
903 ptrdiff_t diff = (p - in) + 1;
904 if(match_eof(in, diff)) {
905 exitcode = 0;
906 goto out;
908 if(!dispatch_line(in, diff, argv))
909 goto out;
910 left -= diff;
911 in += diff;
913 if(!n) {
914 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
915 break;
917 if(left > chunksize) {
918 dprintf(2, "error: input line length exceeds buffer size\n");
919 goto out;
923 exitcode = 0;
925 out:
927 if(prog_state.pipe_mode) {
928 close_pipes();
931 if(prog_state.delayedflush)
932 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
934 int retval = 0;
935 while(prog_state.threads_running) {
936 reap_child(&retval);
937 if(!exitcode) exitcode = process_failed(retval);
940 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
941 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
942 if(prog_state.limits) sblist_free(prog_state.limits);
944 if(prog_state.tempdir)
945 rmdir(prog_state.tempdir);
948 fflush(stdout);
949 fflush(stderr);
952 return exitcode;