3 Copyright (C) 2012-2021 rofl0r
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the “Software”), to deal
7 in the Software without restriction, including without limitation the rights
8 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the Software is
10 furnished to do so, subject to the following conditions:
12 The above copyright notice and this permission notice shall be included in all
13 copies or substantial portions of the Software.
15 THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 #define VERSION "1.3.1"
28 #undef _POSIX_C_SOURCE
29 #define _POSIX_C_SOURCE 200809L
31 #define _XOPEN_SOURCE 700
37 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
52 /* process handling */
59 #include <sys/resource.h>
61 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
62 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
63 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
64 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
65 static int prlimit(int pid
, ...) {
67 dprintf(2, "prlimit() not implemented on this system\n");
75 /* some small helper funcs from libulz */
77 static int msleep(long millisecs
) {
78 struct timespec req
, rem
;
79 req
.tv_sec
= millisecs
/ 1000;
80 req
.tv_nsec
= (millisecs
% 1000) * 1000 * 1000;
82 while((ret
= nanosleep(&req
, &rem
)) == -1 && errno
== EINTR
) req
= rem
;
86 static const char ulz_conv_cypher
[] =
87 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
88 #define ulz_conv_cypher_len (sizeof(ulz_conv_cypher) - 1)
89 static char* ulz_mkdtemp(char* templ
) {
90 size_t i
, l
= strlen(templ
);
96 for(i
= l
- 6; i
< l
; i
++)
97 templ
[i
] = ulz_conv_cypher
[rand() % ulz_conv_cypher_len
];
98 if(mkdir(templ
, S_IRWXU
) == -1) {
99 if(errno
== EEXIST
) goto loop
;
105 static size_t gen_fn(char* buf
, const char* prefix
, size_t pl
, const char* tmpdir
) {
106 size_t tl
= strlen(tmpdir
);
108 memcpy(buf
+a
, tmpdir
, tl
);
110 memcpy(buf
+a
,prefix
,pl
);
112 memcpy(buf
+a
,"XXXXXX", 7);
116 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
117 * storage. returns size of the string returned in buffer */
118 static size_t mktempdir(const char* prefix
, char* buffer
, size_t bufsize
) {
119 size_t ret
, pl
= strlen(prefix
);
120 if(bufsize
< sizeof("/dev/shm/") -1 + pl
+ sizeof("XXXXXX")) return 0;
121 ret
= gen_fn(buffer
, prefix
, pl
, "/dev/shm/");
122 if(!ulz_mkdtemp(buffer
)) {
123 ret
= gen_fn(buffer
, prefix
, pl
, "/tmp/");
124 if(!ulz_mkdtemp(buffer
)) return 0;
133 posix_spawn_file_actions_t fa
;
142 char temp_state
[256];
143 char* cmd_argv
[4096];
145 sblist
* subst_entries
;
148 unsigned long long lineno
;
152 unsigned long numthreads
;
153 unsigned long threads_running
;
156 unsigned long delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
157 the top value in ms can be supplied via a command line switch.
158 this option makes only sense if the interval is somewhat smaller than the
159 expected runtime of the average job.
160 this option is useful to not overload a network app due to hundreds of
161 parallel connection tries on startup.
163 unsigned long bulk_bytes
;
167 bool buffered
; /* write stdout and stderr of each task into a file,
168 and print it to stdout once the process ends.
169 this prevents mixing up of the output of multiple tasks. */
170 bool delayedflush
; /* only write to statefile whenever all processes are busy, and at program end.
171 this means faster program execution, but could also be imprecise if the number of
172 jobs is small or smaller than the available threadcount. */
173 bool join_output
; /* join stdout and stderr of launched jobs into stdout */
175 unsigned cmd_startarg
;
178 static prog_state_s prog_state
;
181 extern char** environ
;
183 static int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
184 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
185 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
186 return ret
> 0 && (size_t) ret
< bufsize
;
189 static void launch_job(size_t jobindex
, char** argv
) {
190 char stdout_filename_buf
[256];
191 char stderr_filename_buf
[256];
192 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
194 if(job
->pid
!= -1) return;
196 if(prog_state
.buffered
) {
197 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
198 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
199 dprintf(2, "temp filename too long!\n");
204 errno
= posix_spawn_file_actions_init(&job
->fa
);
205 if(errno
) goto spawn_error
;
207 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
208 if(errno
) goto spawn_error
;
211 if(prog_state
.pipe_mode
) {
216 job
->pipe
= pipes
[1];
217 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
218 if(errno
) goto spawn_error
;
219 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
220 if(errno
) goto spawn_error
;
221 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
222 if(errno
) goto spawn_error
;
225 if(prog_state
.buffered
) {
226 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
227 if(errno
) goto spawn_error
;
228 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
229 if(errno
) goto spawn_error
;
232 if(!prog_state
.pipe_mode
) {
233 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
234 if(errno
) goto spawn_error
;
237 if(prog_state
.buffered
) {
238 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
239 if(errno
) goto spawn_error
;
240 if(prog_state
.join_output
)
241 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
243 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
244 if(errno
) goto spawn_error
;
247 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
251 perror("posix_spawn");
253 prog_state
.threads_running
++;
254 if(prog_state
.limits
) {
256 sblist_iter(prog_state
.limits
, limit
) {
257 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
262 if(prog_state
.pipe_mode
)
266 static void dump_output(size_t job_id
, int is_stderr
) {
267 char out_filename_buf
[256];
269 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
272 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
274 dst
= fopen(out_filename_buf
, "r");
276 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
277 fwrite(buf
, 1, nread
, out_stream
);
278 if(nread
< sizeof(buf
)) break;
282 unlink(out_filename_buf
);
286 static void write_all(int fd
, void* buf
, size_t size
) {
291 ssize_t n
= write(fd
, p
, left
);
294 if(errno
== EINTR
) continue;
306 static void pass_stdin(char *line
, size_t len
) {
307 static size_t next_child
= 0;
308 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
310 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
311 write_all(job
->pipe
, line
, len
);
315 static void close_pipes(void) {
317 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
318 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
323 /* wait till a child exits, reap it, and return its job index for slot reuse */
324 static size_t reap_child(void) {
329 do ret
= waitpid(-1, &retval
, 0);
330 while(ret
== -1 || !WIFEXITED(retval
));
332 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
333 job
= sblist_get(prog_state
.job_infos
, i
);
334 if(job
->pid
== ret
) {
336 posix_spawn_file_actions_destroy(&job
->fa
);
337 prog_state
.threads_running
--;
338 if(prog_state
.buffered
) {
340 if(!prog_state
.join_output
)
350 static size_t free_slots(void) {
351 return prog_state
.numthreads
- prog_state
.threads_running
;
354 #define die(...) do { dprintf(2, "error: " __VA_ARGS__); exit(1); } while(0)
356 static unsigned long parse_human_number(const char* num
) {
357 unsigned long ret
= 0;
358 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
359 const char* kmg
= "KMG";
360 const char* kmgind
, *p
;
363 while(isdigit(*(++p
)));
364 if(*p
&& (kmgind
= strchr(kmg
, *p
)))
365 ret
*= mul
[kmgind
- kmg
];
369 static int syntax(void) {
371 "jobflow " VERSION
" (C) rofl0r\n"
372 "------------------------\n"
373 "this program is intended to be used as a recipient of another programs output.\n"
374 "it launches processes to which the current line can be passed as an argument\n"
375 "using {} for substitution (as in find -exec).\n"
376 "if no input substitution argument ({} or {.}) is provided, input is piped into\n"
377 "stdin of child processes. input will be then evenly distributed to jobs,\n"
378 "until EOF is received. we call this 'pipe mode'.\n"
380 "available options:\n\n"
381 "-skip N -count N -threads N -resume -statefile=/tmp/state -delayedflush\n"
382 "-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
384 "-exec ./mycommand {}\n"
387 " N=number of entries to skip\n"
389 " N=only process count lines (after skipping)\n"
390 "-threads N (alternative: -j N)\n"
391 " N=number of parallel processes to spawn\n"
393 " resume from last jobnumber stored in statefile\n"
395 " use XXX as the EOF marker on stdin\n"
396 " if the marker is encountered, behave as if stdin was closed\n"
397 " not compatible with pipe/bulk mode\n"
400 " saves last launched jobnumber into a file\n"
402 " only write to statefile whenever all processes are busy,\n"
403 " and at program end\n"
405 " N=maximum amount of milliseconds\n"
406 " ...to wait when spinning up a fresh set of processes\n"
407 " a random value between 0 and the chosen amount is used to delay initial\n"
409 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
410 " activity on program startup\n"
412 " store the stdout and stderr of launched processes into a temporary file\n"
413 " which will be printed after a process has finished.\n"
414 " this prevents mixing up of output of different processes.\n"
416 " if -buffered, write both stdout and stderr into the same file.\n"
417 " this saves the chronological order of the output, and the combined output\n"
418 " will only be printed to stdout.\n"
420 " do bulk copies with a buffer of N bytes. only usable in pipe mode.\n"
421 " this passes (almost) the entire buffer to the next scheduled job.\n"
422 " the passed buffer will be truncated to the last line break boundary,\n"
423 " so jobs always get entire lines to work with.\n"
424 " this option is useful when you have huge input files and relatively short\n"
425 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
426 " N must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
427 " actual memory allocation will be twice the amount passed.\n"
428 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
429 " than that probably doesn't make sense.\n"
430 "-limits [mem=N,cpu=N,stack=N,fsize=N,nofiles=N]\n"
431 " sets the rlimit of the new created processes.\n"
432 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
433 "-exec command with args\n"
434 " everything past -exec is treated as the command to execute on each line of\n"
435 " stdin received. the line can be passed as an argument using {}.\n"
436 " {.} passes everything before the last dot in a line as an argument.\n"
437 " {#} will be replaced with the sequence (aka line) number.\n"
438 " usage of {#} does not affect the decision whether pipe mode is used.\n"
439 " it is possible to use multiple substitutions inside a single argument,\n"
440 " but only of one type.\n"
441 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
447 static int parse_args(unsigned argc
, char** argv
) {
448 unsigned i
, j
, r
= 0;
449 static bool resume
= 0;
450 static char *limits
= 0;
451 static const struct {
452 const char lname
[14];
461 {"threads", 'j', 'i', .dest
.i
= &prog_state
.numthreads
},
462 {"statefile", 0, 's', .dest
.s
= &prog_state
.statefile
},
463 {"eof", 0, 's', .dest
.s
= &prog_state
.eof_marker
},
464 {"skip", 0, 'i', .dest
.i
= &prog_state
.skip
},
465 {"count", 0, 'i', .dest
.i
= &prog_state
.count
},
466 {"resume", 0, 'b', .dest
.b
= &resume
},
467 {"delayedflush", 0, 'b', .dest
.b
= &prog_state
.delayedflush
},
468 {"delayedspinup", 0, 'i', .dest
.i
= &prog_state
.delayedspinup_interval
},
469 {"buffered", 0, 'b', .dest
.b
=&prog_state
.buffered
},
470 {"joinoutput", 0, 'b', .dest
.b
=&prog_state
.join_output
},
471 {"bulk", 0, 'i', .dest
.i
= &prog_state
.bulk_bytes
},
472 {"limits", 0, 's', .dest
.s
= &limits
},
475 prog_state
.numthreads
= 1;
476 prog_state
.count
= -1UL;
478 for(i
=1; i
<argc
; ++i
) {
479 char *p
= argv
[i
], *q
= strchr(p
, '=');
480 if(*(p
++) != '-') die("expected option instead of %s\n", argv
[i
]);
482 if(!*p
) die("invalid option %s\n", argv
[i
]);
483 for(j
=0;j
<ARRAY_SIZE(opt_tab
);++j
) {
484 if(((!p
[1] || p
[1] == '=') && *p
== opt_tab
[j
].sname
) ||
485 (!strcmp(p
, opt_tab
[j
].lname
)) ||
486 (q
&& strlen(opt_tab
[j
].lname
) == q
-p
&& !strncmp(p
, opt_tab
[j
].lname
, q
-p
))) {
487 switch(opt_tab
[j
].flag
) {
488 case 'b': *opt_tab
[j
].dest
.b
=1; break;
491 if(argc
<= i
+1 || argv
[i
+1][0] == '-') {
493 die("option %s requires operand\n", argv
[i
]);
497 if(*(++q
) == 0) goto e_expect_op
;
499 if(opt_tab
[j
].flag
== 'i') {
501 die("expected numeric operand for %s at %s\n", p
, q
);
502 *opt_tab
[j
].dest
.i
=parse_human_number(q
);
504 *opt_tab
[j
].dest
.s
=q
;
510 if(j
>=ARRAY_SIZE(opt_tab
)) {
511 if(!strcmp(p
, "exec")) {
514 } else if(!strcmp(p
, "help")) {
516 } else die("unknown option %s\n", argv
[i
]);
520 if((long)prog_state
.numthreads
<= 0) die("threadcount must be >= 1\n");
523 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
524 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
525 FILE *f
= fopen(prog_state
.statefile
, "r");
528 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
534 if(prog_state
.delayedflush
&& !prog_state
.statefile
)
535 die("-delayedflush needs -statefile\n");
537 prog_state
.pipe_mode
= 1;
538 prog_state
.cmd_startarg
= r
;
539 prog_state
.subst_entries
= NULL
;
543 if(r
< (unsigned) argc
) {
544 prog_state
.cmd_startarg
= r
;
545 } else die("-exec without arguments\n");
547 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
549 // save entries which must be substituted, to save some cycles.
550 for(i
= r
; i
< (unsigned) argc
; i
++) {
553 if((a
= strstr(argv
[i
], "{}")) ||
554 (b
= strstr(argv
[i
], "{.}")) ||
555 (c
= strstr(argv
[i
], "{#}"))) {
556 if(!c
) prog_state
.pipe_mode
= 0;
557 else prog_state
.use_seqnr
= 1;
558 sblist_add(prog_state
.subst_entries
, &subst_ent
);
561 if(sblist_getsize(prog_state
.subst_entries
) == 0) {
562 sblist_free(prog_state
.subst_entries
);
563 prog_state
.subst_entries
= 0;
567 if(prog_state
.join_output
&& !prog_state
.buffered
)
568 die("-joinoutput needs -buffered\n");
570 if(prog_state
.bulk_bytes
% 4096)
571 die("bulk size must be a multiple of 4096\n");
576 limits
+= strspn(limits
, ",");
577 size_t l
= strcspn(limits
, ",");
579 size_t l2
= strcspn(limits
, "=");
580 if(l2
>= l
) die("syntax error in limits argument\n");
582 if(!prog_state
.limits
)
583 prog_state
.limits
= sblist_new(sizeof(limit_rec
), 4);
584 static const struct { int lim_val
; const char lim_name
[8]; } lim_tab
[] = {
585 { RLIMIT_AS
, "mem" },
586 { RLIMIT_CPU
, "cpu" },
587 { RLIMIT_STACK
, "stack" },
588 { RLIMIT_FSIZE
, "fsize" },
589 { RLIMIT_NOFILE
, "nofiles" },
591 for(i
=0; i
<ARRAY_SIZE(lim_tab
);++i
)
592 if(!strncmp(limits
, lim_tab
[i
].lim_name
, l2
)) {
593 lim
.limit
= lim_tab
[i
].lim_val
;
596 if(i
>= ARRAY_SIZE(lim_tab
))
597 die("unknown option passed to -limits\n");
598 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
600 die("could not query rlimits\n");
602 lim
.rl
.rlim_cur
= parse_human_number(limits
+l2
+1);
603 sblist_add(prog_state
.limits
, &lim
);
610 static void init_queue(void) {
612 job_info ji
= {.pid
= -1};
614 for(i
= 0; i
< prog_state
.numthreads
; i
++)
615 sblist_add(prog_state
.job_infos
, &ji
);
618 static void write_statefile(unsigned long long n
, const char* tempfile
) {
619 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
621 dprintf(fd
, "%llu\n", n
+ 1ULL);
623 if(rename(tempfile
, prog_state
.statefile
) == -1)
629 static int str_here(char* haystack
, size_t hay_size
, size_t bufpos
,
630 char* needle
, size_t needle_size
) {
631 if(needle_size
<= hay_size
- bufpos
) {
632 if(!memcmp(needle
, haystack
+ bufpos
, needle_size
))
637 // returns numbers of substitutions done, -1 on out of buffer.
638 // dest is always overwritten. if no substitutions were done, it contains a copy of source.
639 static int substitute_all(char *dest
, ssize_t dest_size
,
640 char *src
, size_t src_size
,
641 char *what
, size_t what_size
,
642 char *whit
, size_t whit_size
) {
645 for(i
= 0; dest_size
> 0 && i
< src_size
; ) {
646 if(str_here(src
, src_size
, i
, what
, what_size
)) {
647 if(dest_size
< (ssize_t
) whit_size
) return -1;
648 memcpy(dest
, whit
, whit_size
);
650 dest_size
-= whit_size
;
660 if(!dest_size
) return -1;
665 static char* mystrnchr(const char *in
, int ch
, size_t end
) {
666 const char *e
= in
+end
;
668 while(p
!= e
&& *p
!= ch
) p
++;
669 if(p
!= e
) return (char*)p
;
672 static char* mystrnrchr(const char *in
, int ch
, size_t end
) {
673 const char *e
= in
+end
-1;
675 while(p
!= e
&& *e
!= ch
) e
--;
676 if(*e
== ch
) return (char*)e
;
679 static char* mystrnrchr_chk(const char *in
, int ch
, size_t end
) {
681 return mystrnrchr(in
, ch
, end
);
684 static int need_linecounter(void) {
685 return !!prog_state
.skip
|| prog_state
.statefile
||
686 prog_state
.use_seqnr
|| prog_state
.count
!= -1UL;
688 static size_t count_linefeeds(const char *buf
, size_t len
) {
689 const char *p
= buf
, *e
= buf
+len
;
692 if(*p
== '\n') cnt
++;
698 static int match_eof(char* inbuf
, size_t len
) {
699 if(!prog_state
.eof_marker
) return 0;
700 size_t l
= strlen(prog_state
.eof_marker
);
701 return l
== len
-1 && !memcmp(prog_state
.eof_marker
, inbuf
, l
);
704 static inline int islb(int p
) { return p
== '\n' || p
== '\r'; }
705 static void chomp(char *s
, size_t *len
) {
706 while(*len
&& islb(s
[*len
-1])) s
[--(*len
)] = 0;
709 #define MAX_SUBSTS 16
710 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
711 char subst_buf
[MAX_SUBSTS
][4096];
712 static unsigned spinup_counter
= 0;
714 if(!prog_state
.bulk_bytes
)
716 else if(need_linecounter()) {
717 prog_state
.lineno
+= count_linefeeds(inbuf
, len
);
720 if(prog_state
.skip
) {
721 if(!prog_state
.bulk_bytes
) {
725 while(len
&& prog_state
.skip
) {
726 char *q
= mystrnchr(inbuf
, '\n', len
);
728 ptrdiff_t diff
= (q
- inbuf
) + 1;
738 } else if(prog_state
.count
!= -1UL) {
739 if(!prog_state
.count
) return -1;
743 if(!prog_state
.cmd_startarg
) {
744 write_all(1, inbuf
, len
);
748 if(!prog_state
.pipe_mode
)
752 size_t line_size
= len
;
754 if(prog_state
.subst_entries
) {
755 unsigned max_subst
= 0;
757 sblist_iter(prog_state
.subst_entries
, index
) {
758 if(max_subst
>= MAX_SUBSTS
) break;
759 char *source
= argv
[*index
+ prog_state
.cmd_startarg
];
760 size_t source_len
= strlen(source
);
762 ret
= substitute_all(subst_buf
[max_subst
], 4096,
768 dprintf(2, "fatal: line too long for substitution: %s\n", line
);
771 char* lastdot
= mystrnrchr_chk(line
, '.', line_size
);
772 size_t tilLastDot
= line_size
;
773 if(lastdot
) tilLastDot
= lastdot
- line
;
774 ret
= substitute_all(subst_buf
[max_subst
], 4096,
778 if(ret
== -1) goto too_long
;
782 sprintf(lineno
, "%llu", prog_state
.lineno
);
783 ret
= substitute_all(subst_buf
[max_subst
], 4096,
786 lineno
, strlen(lineno
));
787 if(ret
== -1) goto too_long
;
790 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
797 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
798 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
803 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
804 else if(!prog_state
.pipe_mode
)
805 launch_job(reap_child(), prog_state
.cmd_argv
);
807 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
808 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
811 if(prog_state
.pipe_mode
)
812 pass_stdin(line
, line_size
);
817 int main(int argc
, char** argv
) {
820 char tempdir_buf
[256];
824 if(argc
> 4096) argc
= 4096;
826 prog_state
.threads_running
= 0;
828 if(parse_args(argc
, argv
)) return 1;
830 if(prog_state
.statefile
)
831 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
833 prog_state
.tempdir
= NULL
;
835 if(prog_state
.buffered
) {
836 prog_state
.tempdir
= tempdir_buf
;
837 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
839 die("could not create tempdir\n");
842 /* if the stdout/stderr fds are not in O_APPEND mode,
843 the dup()'s of the fds in posix_spawn can cause different
844 file positions, causing the different processes to overwrite each others output.
846 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
848 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
849 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
852 if(prog_state
.cmd_startarg
) {
853 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
854 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
856 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
859 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
862 prog_state
.lineno
= 0;
864 size_t left
= 0, bytes_read
= 0;
865 const size_t chunksize
= prog_state
.bulk_bytes
? prog_state
.bulk_bytes
: 16*1024;
867 char *mem
= mmap(NULL
, chunksize
*2, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANON
, -1, 0);
869 char *buf2
= mem
+chunksize
;
875 inbuf
= buf1
+chunksize
-left
;
876 memcpy(inbuf
, buf2
+bytes_read
-left
, left
);
877 ssize_t n
= read(0, buf2
, chunksize
);
887 if(prog_state
.pipe_mode
&& prog_state
.bulk_bytes
)
888 p
= mystrnrchr(in
, '\n', left
);
890 p
= mystrnchr (in
, '\n', left
);
893 ptrdiff_t diff
= (p
- in
) + 1;
894 if(match_eof(in
, diff
)) {
898 if(!dispatch_line(in
, diff
, argv
))
904 if(left
&& !match_eof(in
, left
)) dispatch_line(in
, left
, argv
);
907 if(left
> chunksize
) {
908 dprintf(2, "error: input line length exceeds buffer size\n");
917 if(prog_state
.pipe_mode
) {
921 if(prog_state
.delayedflush
)
922 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
924 while(prog_state
.threads_running
) reap_child();
926 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
927 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
928 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
930 if(prog_state
.tempdir
)
931 rmdir(prog_state
.tempdir
);