relicense as MIT
[rofl0r-jobflow.git] / jobflow.c
blob9abec59cc5b65f55ea17bb5655e5540a0fe7b2c9
1 /*
2 MIT License
3 Copyright (C) 2012-2021 rofl0r
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the “Software”), to deal
7 in the Software without restriction, including without limitation the rights
8 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the Software is
10 furnished to do so, subject to the following conditions:
12 The above copyright notice and this permission notice shall be included in all
13 copies or substantial portions of the Software.
15 THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 SOFTWARE.
25 #define VERSION "1.3.1"
28 #undef _POSIX_C_SOURCE
29 #define _POSIX_C_SOURCE 200809L
30 #undef _XOPEN_SOURCE
31 #define _XOPEN_SOURCE 700
32 #undef _GNU_SOURCE
33 #define _GNU_SOURCE
35 #include "sblist.h"
37 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
39 #include <stdio.h>
40 #include <string.h>
41 #include <stdlib.h>
42 #include <stdbool.h>
43 #include <unistd.h>
44 #include <stdint.h>
45 #include <stddef.h>
46 #include <errno.h>
47 #include <time.h>
48 #include <assert.h>
49 #include <ctype.h>
50 #include <sys/mman.h>
52 /* process handling */
54 #include <fcntl.h>
55 #include <spawn.h>
56 #include <sys/wait.h>
57 #include <sys/stat.h>
59 #include <sys/resource.h>
61 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
62 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
63 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
64 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
65 static int prlimit(int pid, ...) {
66 (void) pid;
67 dprintf(2, "prlimit() not implemented on this system\n");
68 errno = EINVAL;
69 return -1;
71 #endif
73 #include <sys/time.h>
75 /* some small helper funcs from libulz */
77 static int msleep(long millisecs) {
78 struct timespec req, rem;
79 req.tv_sec = millisecs / 1000;
80 req.tv_nsec = (millisecs % 1000) * 1000 * 1000;
81 int ret;
82 while((ret = nanosleep(&req, &rem)) == -1 && errno == EINTR) req = rem;
83 return ret;
86 static const char ulz_conv_cypher[] =
87 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
88 #define ulz_conv_cypher_len (sizeof(ulz_conv_cypher) - 1)
89 static char* ulz_mkdtemp(char* templ) {
90 size_t i, l = strlen(templ);
91 if(l < 6) {
92 errno = EINVAL;
93 return NULL;
95 loop:
96 for(i = l - 6; i < l; i++)
97 templ[i] = ulz_conv_cypher[rand() % ulz_conv_cypher_len];
98 if(mkdir(templ, S_IRWXU) == -1) {
99 if(errno == EEXIST) goto loop;
100 return NULL;
102 return templ;
105 static size_t gen_fn(char* buf, const char* prefix, size_t pl, const char* tmpdir) {
106 size_t tl = strlen(tmpdir);
107 size_t a = 0;
108 memcpy(buf+a, tmpdir, tl);
109 a+=tl;
110 memcpy(buf+a,prefix,pl);
111 a+=pl;
112 memcpy(buf+a,"XXXXXX", 7);
113 return a+6;
116 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
117 * storage. returns size of the string returned in buffer */
118 static size_t mktempdir(const char* prefix, char* buffer, size_t bufsize) {
119 size_t ret, pl = strlen(prefix);
120 if(bufsize < sizeof("/dev/shm/") -1 + pl + sizeof("XXXXXX")) return 0;
121 ret = gen_fn(buffer, prefix, pl, "/dev/shm/");
122 if(!ulz_mkdtemp(buffer)) {
123 ret = gen_fn(buffer, prefix, pl, "/tmp/");
124 if(!ulz_mkdtemp(buffer)) return 0;
126 return ret;
130 typedef struct {
131 pid_t pid;
132 int pipe;
133 posix_spawn_file_actions_t fa;
134 } job_info;
136 typedef struct {
137 int limit;
138 struct rlimit rl;
139 } limit_rec;
141 typedef struct {
142 char temp_state[256];
143 char* cmd_argv[4096];
144 sblist* job_infos;
145 sblist* subst_entries;
146 sblist* limits;
147 char* tempdir;
148 unsigned long long lineno;
150 char* statefile;
151 char* eof_marker;
152 unsigned long numthreads;
153 unsigned long threads_running;
154 unsigned long skip;
155 unsigned long count;
156 unsigned long delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
157 the top value in ms can be supplied via a command line switch.
158 this option makes only sense if the interval is somewhat smaller than the
159 expected runtime of the average job.
160 this option is useful to not overload a network app due to hundreds of
161 parallel connection tries on startup.
163 unsigned long bulk_bytes;
165 bool pipe_mode;
166 bool use_seqnr;
167 bool buffered; /* write stdout and stderr of each task into a file,
168 and print it to stdout once the process ends.
169 this prevents mixing up of the output of multiple tasks. */
170 bool delayedflush; /* only write to statefile whenever all processes are busy, and at program end.
171 this means faster program execution, but could also be imprecise if the number of
172 jobs is small or smaller than the available threadcount. */
173 bool join_output; /* join stdout and stderr of launched jobs into stdout */
175 unsigned cmd_startarg;
176 } prog_state_s;
178 static prog_state_s prog_state;
181 extern char** environ;
183 static int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
184 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
185 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
186 return ret > 0 && (size_t) ret < bufsize;
189 static void launch_job(size_t jobindex, char** argv) {
190 char stdout_filename_buf[256];
191 char stderr_filename_buf[256];
192 job_info* job = sblist_get(prog_state.job_infos, jobindex);
194 if(job->pid != -1) return;
196 if(prog_state.buffered) {
197 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
198 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
199 dprintf(2, "temp filename too long!\n");
200 return;
204 errno = posix_spawn_file_actions_init(&job->fa);
205 if(errno) goto spawn_error;
207 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
208 if(errno) goto spawn_error;
210 int pipes[2];
211 if(prog_state.pipe_mode) {
212 if(pipe(pipes)) {
213 perror("pipe");
214 goto spawn_error;
216 job->pipe = pipes[1];
217 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
218 if(errno) goto spawn_error;
219 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
220 if(errno) goto spawn_error;
221 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
222 if(errno) goto spawn_error;
225 if(prog_state.buffered) {
226 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
227 if(errno) goto spawn_error;
228 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
229 if(errno) goto spawn_error;
232 if(!prog_state.pipe_mode) {
233 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
234 if(errno) goto spawn_error;
237 if(prog_state.buffered) {
238 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
239 if(errno) goto spawn_error;
240 if(prog_state.join_output)
241 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
242 else
243 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
244 if(errno) goto spawn_error;
247 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
248 if(errno) {
249 spawn_error:
250 job->pid = -1;
251 perror("posix_spawn");
252 } else {
253 prog_state.threads_running++;
254 if(prog_state.limits) {
255 limit_rec* limit;
256 sblist_iter(prog_state.limits, limit) {
257 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
258 perror("prlimit");
262 if(prog_state.pipe_mode)
263 close(pipes[0]);
266 static void dump_output(size_t job_id, int is_stderr) {
267 char out_filename_buf[256];
268 char buf[4096];
269 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
270 size_t nread;
272 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
274 dst = fopen(out_filename_buf, "r");
275 if(dst) {
276 while((nread = fread(buf, 1, sizeof(buf), dst))) {
277 fwrite(buf, 1, nread, out_stream);
278 if(nread < sizeof(buf)) break;
280 fclose(dst);
281 fflush(out_stream);
282 unlink(out_filename_buf);
286 static void write_all(int fd, void* buf, size_t size) {
287 size_t left = size;
288 const char *p = buf;
289 while(1) {
290 if(left == 0) break;
291 ssize_t n = write(fd, p, left);
292 switch(n) {
293 case -1:
294 if(errno == EINTR) continue;
295 else {
296 perror("write");
297 return;
299 default:
300 p += n;
301 left -= n;
306 static void pass_stdin(char *line, size_t len) {
307 static size_t next_child = 0;
308 if(next_child >= sblist_getsize(prog_state.job_infos))
309 next_child = 0;
310 job_info *job = sblist_get(prog_state.job_infos, next_child);
311 write_all(job->pipe, line, len);
312 next_child++;
315 static void close_pipes(void) {
316 size_t i;
317 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
318 job_info *job = sblist_get(prog_state.job_infos, i);
319 close(job->pipe);
323 /* wait till a child exits, reap it, and return its job index for slot reuse */
324 static size_t reap_child(void) {
325 size_t i;
326 job_info* job;
327 int ret, retval;
329 do ret = waitpid(-1, &retval, 0);
330 while(ret == -1 || !WIFEXITED(retval));
332 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
333 job = sblist_get(prog_state.job_infos, i);
334 if(job->pid == ret) {
335 job->pid = -1;
336 posix_spawn_file_actions_destroy(&job->fa);
337 prog_state.threads_running--;
338 if(prog_state.buffered) {
339 dump_output(i, 0);
340 if(!prog_state.join_output)
341 dump_output(i, 1);
343 return i;
346 assert(0);
347 return -1;
350 static size_t free_slots(void) {
351 return prog_state.numthreads - prog_state.threads_running;
354 #define die(...) do { dprintf(2, "error: " __VA_ARGS__); exit(1); } while(0)
356 static unsigned long parse_human_number(const char* num) {
357 unsigned long ret = 0;
358 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
359 const char* kmg = "KMG";
360 const char* kmgind, *p;
361 ret = atol(num);
362 p = num;
363 while(isdigit(*(++p)));
364 if(*p && (kmgind = strchr(kmg, *p)))
365 ret *= mul[kmgind - kmg];
366 return ret;
369 static int syntax(void) {
370 dprintf(2,
371 "jobflow " VERSION " (C) rofl0r\n"
372 "------------------------\n"
373 "this program is intended to be used as a recipient of another programs output.\n"
374 "it launches processes to which the current line can be passed as an argument\n"
375 "using {} for substitution (as in find -exec).\n"
376 "if no input substitution argument ({} or {.}) is provided, input is piped into\n"
377 "stdin of child processes. input will be then evenly distributed to jobs,\n"
378 "until EOF is received. we call this 'pipe mode'.\n"
379 "\n"
380 "available options:\n\n"
381 "-skip N -count N -threads N -resume -statefile=/tmp/state -delayedflush\n"
382 "-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
383 "-eof=XXX\n"
384 "-exec ./mycommand {}\n"
385 "\n"
386 "-skip N\n"
387 " N=number of entries to skip\n"
388 "-count N\n"
389 " N=only process count lines (after skipping)\n"
390 "-threads N (alternative: -j N)\n"
391 " N=number of parallel processes to spawn\n"
392 "-resume\n"
393 " resume from last jobnumber stored in statefile\n"
394 "-eof XXX\n"
395 " use XXX as the EOF marker on stdin\n"
396 " if the marker is encountered, behave as if stdin was closed\n"
397 " not compatible with pipe/bulk mode\n"
398 "-statefile XXX\n"
399 " XXX=filename\n"
400 " saves last launched jobnumber into a file\n"
401 "-delayedflush\n"
402 " only write to statefile whenever all processes are busy,\n"
403 " and at program end\n"
404 "-delayedspinup N\n"
405 " N=maximum amount of milliseconds\n"
406 " ...to wait when spinning up a fresh set of processes\n"
407 " a random value between 0 and the chosen amount is used to delay initial\n"
408 " spinup.\n"
409 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
410 " activity on program startup\n"
411 "-buffered\n"
412 " store the stdout and stderr of launched processes into a temporary file\n"
413 " which will be printed after a process has finished.\n"
414 " this prevents mixing up of output of different processes.\n"
415 "-joinoutput\n"
416 " if -buffered, write both stdout and stderr into the same file.\n"
417 " this saves the chronological order of the output, and the combined output\n"
418 " will only be printed to stdout.\n"
419 "-bulk N\n"
420 " do bulk copies with a buffer of N bytes. only usable in pipe mode.\n"
421 " this passes (almost) the entire buffer to the next scheduled job.\n"
422 " the passed buffer will be truncated to the last line break boundary,\n"
423 " so jobs always get entire lines to work with.\n"
424 " this option is useful when you have huge input files and relatively short\n"
425 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
426 " N must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
427 " actual memory allocation will be twice the amount passed.\n"
428 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
429 " than that probably doesn't make sense.\n"
430 "-limits [mem=N,cpu=N,stack=N,fsize=N,nofiles=N]\n"
431 " sets the rlimit of the new created processes.\n"
432 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
433 "-exec command with args\n"
434 " everything past -exec is treated as the command to execute on each line of\n"
435 " stdin received. the line can be passed as an argument using {}.\n"
436 " {.} passes everything before the last dot in a line as an argument.\n"
437 " {#} will be replaced with the sequence (aka line) number.\n"
438 " usage of {#} does not affect the decision whether pipe mode is used.\n"
439 " it is possible to use multiple substitutions inside a single argument,\n"
440 " but only of one type.\n"
441 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
442 "\n"
444 return 1;
447 static int parse_args(unsigned argc, char** argv) {
448 unsigned i, j, r = 0;
449 static bool resume = 0;
450 static char *limits = 0;
451 static const struct {
452 const char lname[14];
453 const char sname;
454 const char flag;
455 union {
456 bool *b;
457 unsigned long *i;
458 char **s;
459 } dest;
460 } opt_tab[] = {
461 {"threads", 'j', 'i', .dest.i = &prog_state.numthreads },
462 {"statefile", 0, 's', .dest.s = &prog_state.statefile },
463 {"eof", 0, 's', .dest.s = &prog_state.eof_marker },
464 {"skip", 0, 'i', .dest.i = &prog_state.skip },
465 {"count", 0, 'i', .dest.i = &prog_state.count },
466 {"resume", 0, 'b', .dest.b = &resume },
467 {"delayedflush", 0, 'b', .dest.b = &prog_state.delayedflush },
468 {"delayedspinup", 0, 'i', .dest.i = &prog_state.delayedspinup_interval },
469 {"buffered", 0, 'b', .dest.b =&prog_state.buffered},
470 {"joinoutput", 0, 'b', .dest.b =&prog_state.join_output},
471 {"bulk", 0, 'i', .dest.i = &prog_state.bulk_bytes},
472 {"limits", 0, 's', .dest.s = &limits},
475 prog_state.numthreads = 1;
476 prog_state.count = -1UL;
478 for(i=1; i<argc; ++i) {
479 char *p = argv[i], *q = strchr(p, '=');
480 if(*(p++) != '-') die("expected option instead of %s\n", argv[i]);
481 if(*p == '-') p++;
482 if(!*p) die("invalid option %s\n", argv[i]);
483 for(j=0;j<ARRAY_SIZE(opt_tab);++j) {
484 if(((!p[1] || p[1] == '=') && *p == opt_tab[j].sname) ||
485 (!strcmp(p, opt_tab[j].lname)) ||
486 (q && strlen(opt_tab[j].lname) == q-p && !strncmp(p, opt_tab[j].lname, q-p))) {
487 switch(opt_tab[j].flag) {
488 case 'b': *opt_tab[j].dest.b=1; break;
489 case 'i': case 's':
490 if(!q) {
491 if(argc <= i+1 || argv[i+1][0] == '-') {
492 e_expect_op:;
493 die("option %s requires operand\n", argv[i]);
495 q = argv[++i];
496 } else {
497 if(*(++q) == 0) goto e_expect_op;
499 if(opt_tab[j].flag == 'i') {
500 if(!isdigit(*q))
501 die("expected numeric operand for %s at %s\n", p, q);
502 *opt_tab[j].dest.i=parse_human_number(q);
503 } else
504 *opt_tab[j].dest.s=q;
505 break;
507 break;
510 if(j>=ARRAY_SIZE(opt_tab)) {
511 if(!strcmp(p, "exec")) {
512 r = i+1;
513 break;
514 } else if(!strcmp(p, "help")) {
515 return syntax();
516 } else die("unknown option %s\n", argv[i]);
520 if((long)prog_state.numthreads <= 0) die("threadcount must be >= 1\n");
522 if(resume) {
523 if(!prog_state.statefile) die("-resume needs -statefile\n");
524 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
525 FILE *f = fopen(prog_state.statefile, "r");
526 if(f) {
527 char nb[64];
528 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
529 fclose(f);
534 if(prog_state.delayedflush && !prog_state.statefile)
535 die("-delayedflush needs -statefile\n");
537 prog_state.pipe_mode = 1;
538 prog_state.cmd_startarg = r;
539 prog_state.subst_entries = NULL;
541 if(r) {
542 uint32_t subst_ent;
543 if(r < (unsigned) argc) {
544 prog_state.cmd_startarg = r;
545 } else die("-exec without arguments\n");
547 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
549 // save entries which must be substituted, to save some cycles.
550 for(i = r; i < (unsigned) argc; i++) {
551 subst_ent = i - r;
552 char *a, *b, *c=0;
553 if((a = strstr(argv[i], "{}")) ||
554 (b = strstr(argv[i], "{.}")) ||
555 (c = strstr(argv[i], "{#}"))) {
556 if(!c) prog_state.pipe_mode = 0;
557 else prog_state.use_seqnr = 1;
558 sblist_add(prog_state.subst_entries, &subst_ent);
561 if(sblist_getsize(prog_state.subst_entries) == 0) {
562 sblist_free(prog_state.subst_entries);
563 prog_state.subst_entries = 0;
567 if(prog_state.join_output && !prog_state.buffered)
568 die("-joinoutput needs -buffered\n");
570 if(prog_state.bulk_bytes % 4096)
571 die("bulk size must be a multiple of 4096\n");
573 if(limits) {
574 unsigned i;
575 while(1) {
576 limits += strspn(limits, ",");
577 size_t l = strcspn(limits, ",");
578 if(!l) break;
579 size_t l2 = strcspn(limits, "=");
580 if(l2 >= l) die("syntax error in limits argument\n");
581 limit_rec lim;
582 if(!prog_state.limits)
583 prog_state.limits = sblist_new(sizeof(limit_rec), 4);
584 static const struct { int lim_val; const char lim_name[8]; } lim_tab[] = {
585 { RLIMIT_AS, "mem" },
586 { RLIMIT_CPU, "cpu" },
587 { RLIMIT_STACK, "stack" },
588 { RLIMIT_FSIZE, "fsize" },
589 { RLIMIT_NOFILE, "nofiles" },
591 for(i=0; i<ARRAY_SIZE(lim_tab);++i)
592 if(!strncmp(limits, lim_tab[i].lim_name, l2)) {
593 lim.limit = lim_tab[i].lim_val;
594 break;
596 if(i >= ARRAY_SIZE(lim_tab))
597 die("unknown option passed to -limits\n");
598 if(getrlimit(lim.limit, &lim.rl) == -1) {
599 perror("getrlimit");
600 die("could not query rlimits\n");
602 lim.rl.rlim_cur = parse_human_number(limits+l2+1);
603 sblist_add(prog_state.limits, &lim);
604 limits += l;
607 return 0;
610 static void init_queue(void) {
611 unsigned i;
612 job_info ji = {.pid = -1};
614 for(i = 0; i < prog_state.numthreads; i++)
615 sblist_add(prog_state.job_infos, &ji);
618 static void write_statefile(unsigned long long n, const char* tempfile) {
619 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
620 if(fd != -1) {
621 dprintf(fd, "%llu\n", n + 1ULL);
622 close(fd);
623 if(rename(tempfile, prog_state.statefile) == -1)
624 perror("rename");
625 } else
626 perror("open");
629 static int str_here(char* haystack, size_t hay_size, size_t bufpos,
630 char* needle, size_t needle_size) {
631 if(needle_size <= hay_size - bufpos) {
632 if(!memcmp(needle, haystack + bufpos, needle_size))
633 return 1;
635 return 0;
637 // returns numbers of substitutions done, -1 on out of buffer.
638 // dest is always overwritten. if no substitutions were done, it contains a copy of source.
639 static int substitute_all(char *dest, ssize_t dest_size,
640 char *src, size_t src_size,
641 char *what, size_t what_size,
642 char *whit, size_t whit_size) {
643 size_t i;
644 int ret = 0;
645 for(i = 0; dest_size > 0 && i < src_size; ) {
646 if(str_here(src, src_size, i, what, what_size)) {
647 if(dest_size < (ssize_t) whit_size) return -1;
648 memcpy(dest, whit, whit_size);
649 dest += whit_size;
650 dest_size -= whit_size;
651 ret++;
652 i += what_size;
653 } else {
654 *dest = src[i];
655 dest++;
656 dest_size--;
657 i++;
660 if(!dest_size) return -1;
661 *dest = 0;
662 return ret;
665 static char* mystrnchr(const char *in, int ch, size_t end) {
666 const char *e = in+end;
667 const char *p = in;
668 while(p != e && *p != ch) p++;
669 if(p != e) return (char*)p;
670 return 0;
672 static char* mystrnrchr(const char *in, int ch, size_t end) {
673 const char *e = in+end-1;
674 const char *p = in;
675 while(p != e && *e != ch) e--;
676 if(*e == ch) return (char*)e;
677 return 0;
679 static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
680 if(!end) return 0;
681 return mystrnrchr(in, ch, end);
684 static int need_linecounter(void) {
685 return !!prog_state.skip || prog_state.statefile ||
686 prog_state.use_seqnr || prog_state.count != -1UL;
688 static size_t count_linefeeds(const char *buf, size_t len) {
689 const char *p = buf, *e = buf+len;
690 size_t cnt = 0;
691 while(p < e) {
692 if(*p == '\n') cnt++;
693 p++;
695 return cnt;
698 static int match_eof(char* inbuf, size_t len) {
699 if(!prog_state.eof_marker) return 0;
700 size_t l = strlen(prog_state.eof_marker);
701 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
704 static inline int islb(int p) { return p == '\n' || p == '\r'; }
705 static void chomp(char *s, size_t *len) {
706 while(*len && islb(s[*len-1])) s[--(*len)] = 0;
709 #define MAX_SUBSTS 16
710 static int dispatch_line(char* inbuf, size_t len, char** argv) {
711 char subst_buf[MAX_SUBSTS][4096];
712 static unsigned spinup_counter = 0;
714 if(!prog_state.bulk_bytes)
715 prog_state.lineno++;
716 else if(need_linecounter()) {
717 prog_state.lineno += count_linefeeds(inbuf, len);
720 if(prog_state.skip) {
721 if(!prog_state.bulk_bytes) {
722 prog_state.skip--;
723 return 1;
724 } else {
725 while(len && prog_state.skip) {
726 char *q = mystrnchr(inbuf, '\n', len);
727 if(q) {
728 ptrdiff_t diff = (q - inbuf) + 1;
729 inbuf += diff;
730 len -= diff;
731 prog_state.skip--;
732 } else {
733 return 1;
736 if(!len) return 1;
738 } else if(prog_state.count != -1UL) {
739 if(!prog_state.count) return -1;
740 --prog_state.count;
743 if(!prog_state.cmd_startarg) {
744 write_all(1, inbuf, len);
745 return 1;
748 if(!prog_state.pipe_mode)
749 chomp(inbuf, &len);
751 char *line = inbuf;
752 size_t line_size = len;
754 if(prog_state.subst_entries) {
755 unsigned max_subst = 0;
756 uint32_t* index;
757 sblist_iter(prog_state.subst_entries, index) {
758 if(max_subst >= MAX_SUBSTS) break;
759 char *source = argv[*index + prog_state.cmd_startarg];
760 size_t source_len = strlen(source);
761 int ret;
762 ret = substitute_all(subst_buf[max_subst], 4096,
763 source, source_len,
764 "{}", 2,
765 line, line_size);
766 if(ret == -1) {
767 too_long:
768 dprintf(2, "fatal: line too long for substitution: %s\n", line);
769 return 0;
770 } else if(!ret) {
771 char* lastdot = mystrnrchr_chk(line, '.', line_size);
772 size_t tilLastDot = line_size;
773 if(lastdot) tilLastDot = lastdot - line;
774 ret = substitute_all(subst_buf[max_subst], 4096,
775 source, source_len,
776 "{.}", 3,
777 line, tilLastDot);
778 if(ret == -1) goto too_long;
780 if(!ret) {
781 char lineno[32];
782 sprintf(lineno, "%llu", prog_state.lineno);
783 ret = substitute_all(subst_buf[max_subst], 4096,
784 source, source_len,
785 "{#}", 3,
786 lineno, strlen(lineno));
787 if(ret == -1) goto too_long;
789 if(ret) {
790 prog_state.cmd_argv[*index] = subst_buf[max_subst];
791 max_subst++;
797 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
798 msleep(rand() % (prog_state.delayedspinup_interval + 1));
799 spinup_counter++;
802 if(free_slots())
803 launch_job(prog_state.threads_running, prog_state.cmd_argv);
804 else if(!prog_state.pipe_mode)
805 launch_job(reap_child(), prog_state.cmd_argv);
807 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
808 write_statefile(prog_state.lineno, prog_state.temp_state);
811 if(prog_state.pipe_mode)
812 pass_stdin(line, line_size);
814 return 1;
817 int main(int argc, char** argv) {
818 unsigned i;
820 char tempdir_buf[256];
822 srand(time(NULL));
824 if(argc > 4096) argc = 4096;
826 prog_state.threads_running = 0;
828 if(parse_args(argc, argv)) return 1;
830 if(prog_state.statefile)
831 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
833 prog_state.tempdir = NULL;
835 if(prog_state.buffered) {
836 prog_state.tempdir = tempdir_buf;
837 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
838 perror("mkdtemp");
839 die("could not create tempdir\n");
841 } else {
842 /* if the stdout/stderr fds are not in O_APPEND mode,
843 the dup()'s of the fds in posix_spawn can cause different
844 file positions, causing the different processes to overwrite each others output.
845 testcase:
846 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
848 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
849 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
852 if(prog_state.cmd_startarg) {
853 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
854 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
856 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
859 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
860 init_queue();
862 prog_state.lineno = 0;
864 size_t left = 0, bytes_read = 0;
865 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
867 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
868 char *buf1 = mem;
869 char *buf2 = mem+chunksize;
870 char *in, *inbuf;
872 int exitcode = 1;
874 while(1) {
875 inbuf = buf1+chunksize-left;
876 memcpy(inbuf, buf2+bytes_read-left, left);
877 ssize_t n = read(0, buf2, chunksize);
878 if(n == -1) {
879 perror("read");
880 goto out;
882 bytes_read = n;
883 left += n;
884 in = inbuf;
885 while(left) {
886 char *p;
887 if(prog_state.pipe_mode && prog_state.bulk_bytes)
888 p = mystrnrchr(in, '\n', left);
889 else
890 p = mystrnchr (in, '\n', left);
892 if(!p) break;
893 ptrdiff_t diff = (p - in) + 1;
894 if(match_eof(in, diff)) {
895 exitcode = 0;
896 goto out;
898 if(!dispatch_line(in, diff, argv))
899 goto out;
900 left -= diff;
901 in += diff;
903 if(!n) {
904 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
905 break;
907 if(left > chunksize) {
908 dprintf(2, "error: input line length exceeds buffer size\n");
909 goto out;
913 exitcode = 0;
915 out:
917 if(prog_state.pipe_mode) {
918 close_pipes();
921 if(prog_state.delayedflush)
922 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
924 while(prog_state.threads_running) reap_child();
926 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
927 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
928 if(prog_state.limits) sblist_free(prog_state.limits);
930 if(prog_state.tempdir)
931 rmdir(prog_state.tempdir);
934 fflush(stdout);
935 fflush(stderr);
938 return exitcode;