3 Copyright (C) 2012-2021 rofl0r
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the “Software”), to deal
7 in the Software without restriction, including without limitation the rights
8 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the Software is
10 furnished to do so, subject to the following conditions:
12 The above copyright notice and this permission notice shall be included in all
13 copies or substantial portions of the Software.
15 THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 #define VERSION "1.3.1"
28 #undef _POSIX_C_SOURCE
29 #define _POSIX_C_SOURCE 200809L
31 #define _XOPEN_SOURCE 700
37 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
52 /* process handling */
59 #include <sys/resource.h>
61 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
62 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
63 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
64 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
65 static int prlimit(int pid
, ...) {
67 dprintf(2, "prlimit() not implemented on this system\n");
75 /* some small helper funcs from libulz */
77 static int msleep(long millisecs
) {
78 struct timespec req
, rem
;
79 req
.tv_sec
= millisecs
/ 1000;
80 req
.tv_nsec
= (millisecs
% 1000) * 1000 * 1000;
82 while((ret
= nanosleep(&req
, &rem
)) == -1 && errno
== EINTR
) req
= rem
;
86 static const char ulz_conv_cypher
[] =
87 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
88 #define ulz_conv_cypher_len (sizeof(ulz_conv_cypher) - 1)
89 static char* ulz_mkdtemp(char* templ
) {
90 size_t i
, l
= strlen(templ
);
96 for(i
= l
- 6; i
< l
; i
++)
97 templ
[i
] = ulz_conv_cypher
[rand() % ulz_conv_cypher_len
];
98 if(mkdir(templ
, S_IRWXU
) == -1) {
99 if(errno
== EEXIST
) goto loop
;
105 static size_t gen_fn(char* buf
, const char* prefix
, size_t pl
, const char* tmpdir
) {
106 size_t tl
= strlen(tmpdir
);
108 memcpy(buf
+a
, tmpdir
, tl
);
110 memcpy(buf
+a
,prefix
,pl
);
112 memcpy(buf
+a
,"XXXXXX", 7);
116 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
117 * storage. returns size of the string returned in buffer */
118 static size_t mktempdir(const char* prefix
, char* buffer
, size_t bufsize
) {
119 size_t ret
, pl
= strlen(prefix
);
120 if(bufsize
< sizeof("/dev/shm/") -1 + pl
+ sizeof("XXXXXX")) return 0;
121 ret
= gen_fn(buffer
, prefix
, pl
, "/dev/shm/");
122 if(!ulz_mkdtemp(buffer
)) {
123 ret
= gen_fn(buffer
, prefix
, pl
, "/tmp/");
124 if(!ulz_mkdtemp(buffer
)) return 0;
133 posix_spawn_file_actions_t fa
;
142 char temp_state
[256];
143 char* cmd_argv
[4096];
145 sblist
* subst_entries
;
148 unsigned long long lineno
;
152 unsigned long numthreads
;
153 unsigned long threads_running
;
156 unsigned long delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
157 the top value in ms can be supplied via a command line switch.
158 this option makes only sense if the interval is somewhat smaller than the
159 expected runtime of the average job.
160 this option is useful to not overload a network app due to hundreds of
161 parallel connection tries on startup.
163 unsigned long bulk_bytes
;
167 bool buffered
; /* write stdout and stderr of each task into a file,
168 and print it to stdout once the process ends.
169 this prevents mixing up of the output of multiple tasks. */
170 bool delayedflush
; /* only write to statefile whenever all processes are busy, and at program end.
171 this means faster program execution, but could also be imprecise if the number of
172 jobs is small or smaller than the available threadcount. */
173 bool join_output
; /* join stdout and stderr of launched jobs into stdout */
175 unsigned cmd_startarg
;
178 static prog_state_s prog_state
;
181 extern char** environ
;
183 static int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
184 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
185 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
186 return ret
> 0 && (size_t) ret
< bufsize
;
189 static void launch_job(size_t jobindex
, char** argv
) {
190 char stdout_filename_buf
[256];
191 char stderr_filename_buf
[256];
192 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
194 if(job
->pid
!= -1) return;
196 if(prog_state
.buffered
) {
197 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
198 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
199 dprintf(2, "temp filename too long!\n");
204 errno
= posix_spawn_file_actions_init(&job
->fa
);
205 if(errno
) goto spawn_error
;
207 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
208 if(errno
) goto spawn_error
;
211 if(prog_state
.pipe_mode
) {
216 job
->pipe
= pipes
[1];
217 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
218 if(errno
) goto spawn_error
;
219 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
220 if(errno
) goto spawn_error
;
221 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
222 if(errno
) goto spawn_error
;
225 if(prog_state
.buffered
) {
226 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
227 if(errno
) goto spawn_error
;
228 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
229 if(errno
) goto spawn_error
;
232 if(!prog_state
.pipe_mode
) {
233 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
234 if(errno
) goto spawn_error
;
237 if(prog_state
.buffered
) {
238 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
239 if(errno
) goto spawn_error
;
240 if(prog_state
.join_output
)
241 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
243 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
244 if(errno
) goto spawn_error
;
247 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
251 perror("posix_spawn");
253 prog_state
.threads_running
++;
254 if(prog_state
.limits
) {
256 sblist_iter(prog_state
.limits
, limit
) {
257 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
262 if(prog_state
.pipe_mode
)
266 static void dump_output(size_t job_id
, int is_stderr
) {
267 char out_filename_buf
[256];
269 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
272 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
274 dst
= fopen(out_filename_buf
, "r");
276 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
277 fwrite(buf
, 1, nread
, out_stream
);
278 if(nread
< sizeof(buf
)) break;
282 unlink(out_filename_buf
);
286 static void write_all(int fd
, void* buf
, size_t size
) {
291 ssize_t n
= write(fd
, p
, left
);
294 if(errno
== EINTR
) continue;
306 static void pass_stdin(char *line
, size_t len
) {
307 static size_t next_child
= 0;
308 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
310 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
311 write_all(job
->pipe
, line
, len
);
315 static void close_pipes(void) {
317 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
318 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
323 /* wait till a child exits, reap it, and return its job index for slot reuse */
324 static size_t reap_child(int *retval
) {
329 do ret
= waitpid(-1, retval
, 0);
330 while(ret
== -1 && errno
== EINTR
);
331 if(ret
== -1) abort();
333 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
334 job
= sblist_get(prog_state
.job_infos
, i
);
335 if(job
->pid
== ret
) {
337 posix_spawn_file_actions_destroy(&job
->fa
);
338 prog_state
.threads_running
--;
339 if(prog_state
.buffered
) {
341 if(!prog_state
.join_output
)
351 static size_t free_slots(void) {
352 return prog_state
.numthreads
- prog_state
.threads_running
;
355 #define die(...) do { dprintf(2, "error: " __VA_ARGS__); exit(1); } while(0)
357 static unsigned long parse_human_number(const char* num
) {
358 unsigned long ret
= 0;
359 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
360 const char* kmg
= "KMG";
361 const char* kmgind
, *p
;
364 while(isdigit(*(++p
)));
365 if(*p
&& (kmgind
= strchr(kmg
, *p
)))
366 ret
*= mul
[kmgind
- kmg
];
370 static int syntax(void) {
372 "jobflow " VERSION
" (C) rofl0r\n"
373 "------------------------\n"
374 "this program is intended to be used as a recipient of another programs output.\n"
375 "it launches processes to which the current line can be passed as an argument\n"
376 "using {} for substitution (as in find -exec).\n"
377 "if no input substitution argument ({} or {.}) is provided, input is piped into\n"
378 "stdin of child processes. input will be then evenly distributed to jobs,\n"
379 "until EOF is received. we call this 'pipe mode'.\n"
381 "available options:\n\n"
382 "-skip N -count N -threads N -resume -statefile=/tmp/state -delayedflush\n"
383 "-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
385 "-exec ./mycommand {}\n"
388 " N=number of entries to skip\n"
390 " N=only process count lines (after skipping)\n"
391 "-threads N (alternative: -j N)\n"
392 " N=number of parallel processes to spawn\n"
394 " resume from last jobnumber stored in statefile\n"
396 " use XXX as the EOF marker on stdin\n"
397 " if the marker is encountered, behave as if stdin was closed\n"
398 " not compatible with pipe/bulk mode\n"
401 " saves last launched jobnumber into a file\n"
403 " only write to statefile whenever all processes are busy,\n"
404 " and at program end\n"
406 " N=maximum amount of milliseconds\n"
407 " ...to wait when spinning up a fresh set of processes\n"
408 " a random value between 0 and the chosen amount is used to delay initial\n"
410 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
411 " activity on program startup\n"
413 " store the stdout and stderr of launched processes into a temporary file\n"
414 " which will be printed after a process has finished.\n"
415 " this prevents mixing up of output of different processes.\n"
417 " if -buffered, write both stdout and stderr into the same file.\n"
418 " this saves the chronological order of the output, and the combined output\n"
419 " will only be printed to stdout.\n"
421 " do bulk copies with a buffer of N bytes. only usable in pipe mode.\n"
422 " this passes (almost) the entire buffer to the next scheduled job.\n"
423 " the passed buffer will be truncated to the last line break boundary,\n"
424 " so jobs always get entire lines to work with.\n"
425 " this option is useful when you have huge input files and relatively short\n"
426 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
427 " N must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
428 " actual memory allocation will be twice the amount passed.\n"
429 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
430 " than that probably doesn't make sense.\n"
431 "-limits [mem=N,cpu=N,stack=N,fsize=N,nofiles=N]\n"
432 " sets the rlimit of the new created processes.\n"
433 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
434 "-exec command with args\n"
435 " everything past -exec is treated as the command to execute on each line of\n"
436 " stdin received. the line can be passed as an argument using {}.\n"
437 " {.} passes everything before the last dot in a line as an argument.\n"
438 " {#} will be replaced with the sequence (aka line) number.\n"
439 " usage of {#} does not affect the decision whether pipe mode is used.\n"
440 " it is possible to use multiple substitutions inside a single argument,\n"
441 " but only of one type.\n"
442 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
448 static int parse_args(unsigned argc
, char** argv
) {
449 unsigned i
, j
, r
= 0;
450 static bool resume
= 0;
451 static char *limits
= 0;
452 static const struct {
453 const char lname
[14];
462 {"threads", 'j', 'i', .dest
.i
= &prog_state
.numthreads
},
463 {"statefile", 0, 's', .dest
.s
= &prog_state
.statefile
},
464 {"eof", 0, 's', .dest
.s
= &prog_state
.eof_marker
},
465 {"skip", 0, 'i', .dest
.i
= &prog_state
.skip
},
466 {"count", 0, 'i', .dest
.i
= &prog_state
.count
},
467 {"resume", 0, 'b', .dest
.b
= &resume
},
468 {"delayedflush", 0, 'b', .dest
.b
= &prog_state
.delayedflush
},
469 {"delayedspinup", 0, 'i', .dest
.i
= &prog_state
.delayedspinup_interval
},
470 {"buffered", 0, 'b', .dest
.b
=&prog_state
.buffered
},
471 {"joinoutput", 0, 'b', .dest
.b
=&prog_state
.join_output
},
472 {"bulk", 0, 'i', .dest
.i
= &prog_state
.bulk_bytes
},
473 {"limits", 0, 's', .dest
.s
= &limits
},
476 prog_state
.numthreads
= 1;
477 prog_state
.count
= -1UL;
479 for(i
=1; i
<argc
; ++i
) {
480 char *p
= argv
[i
], *q
= strchr(p
, '=');
481 if(*(p
++) != '-') die("expected option instead of %s\n", argv
[i
]);
483 if(!*p
) die("invalid option %s\n", argv
[i
]);
484 for(j
=0;j
<ARRAY_SIZE(opt_tab
);++j
) {
485 if(((!p
[1] || p
[1] == '=') && *p
== opt_tab
[j
].sname
) ||
486 (!strcmp(p
, opt_tab
[j
].lname
)) ||
487 (q
&& strlen(opt_tab
[j
].lname
) == q
-p
&& !strncmp(p
, opt_tab
[j
].lname
, q
-p
))) {
488 switch(opt_tab
[j
].flag
) {
489 case 'b': *opt_tab
[j
].dest
.b
=1; break;
492 if(argc
<= i
+1 || argv
[i
+1][0] == '-') {
494 die("option %s requires operand\n", argv
[i
]);
498 if(*(++q
) == 0) goto e_expect_op
;
500 if(opt_tab
[j
].flag
== 'i') {
502 die("expected numeric operand for %s at %s\n", p
, q
);
503 *opt_tab
[j
].dest
.i
=parse_human_number(q
);
505 *opt_tab
[j
].dest
.s
=q
;
511 if(j
>=ARRAY_SIZE(opt_tab
)) {
512 if(!strcmp(p
, "exec")) {
515 } else if(!strcmp(p
, "help")) {
517 } else die("unknown option %s\n", argv
[i
]);
521 if((long)prog_state
.numthreads
<= 0) die("threadcount must be >= 1\n");
524 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
525 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
526 FILE *f
= fopen(prog_state
.statefile
, "r");
529 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
535 if(prog_state
.delayedflush
&& !prog_state
.statefile
)
536 die("-delayedflush needs -statefile\n");
538 prog_state
.pipe_mode
= 1;
539 prog_state
.cmd_startarg
= r
;
540 prog_state
.subst_entries
= NULL
;
544 if(r
< (unsigned) argc
) {
545 prog_state
.cmd_startarg
= r
;
546 } else die("-exec without arguments\n");
548 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
550 // save entries which must be substituted, to save some cycles.
551 for(i
= r
; i
< (unsigned) argc
; i
++) {
554 if((a
= strstr(argv
[i
], "{}")) ||
555 (b
= strstr(argv
[i
], "{.}")) ||
556 (c
= strstr(argv
[i
], "{#}"))) {
557 if(!c
) prog_state
.pipe_mode
= 0;
558 else prog_state
.use_seqnr
= 1;
559 sblist_add(prog_state
.subst_entries
, &subst_ent
);
562 if(sblist_getsize(prog_state
.subst_entries
) == 0) {
563 sblist_free(prog_state
.subst_entries
);
564 prog_state
.subst_entries
= 0;
568 if(prog_state
.join_output
&& !prog_state
.buffered
)
569 die("-joinoutput needs -buffered\n");
571 if(prog_state
.bulk_bytes
% 4096)
572 die("bulk size must be a multiple of 4096\n");
577 limits
+= strspn(limits
, ",");
578 size_t l
= strcspn(limits
, ",");
580 size_t l2
= strcspn(limits
, "=");
581 if(l2
>= l
) die("syntax error in limits argument\n");
583 if(!prog_state
.limits
)
584 prog_state
.limits
= sblist_new(sizeof(limit_rec
), 4);
585 static const struct { int lim_val
; const char lim_name
[8]; } lim_tab
[] = {
586 { RLIMIT_AS
, "mem" },
587 { RLIMIT_CPU
, "cpu" },
588 { RLIMIT_STACK
, "stack" },
589 { RLIMIT_FSIZE
, "fsize" },
590 { RLIMIT_NOFILE
, "nofiles" },
592 for(i
=0; i
<ARRAY_SIZE(lim_tab
);++i
)
593 if(!strncmp(limits
, lim_tab
[i
].lim_name
, l2
)) {
594 lim
.limit
= lim_tab
[i
].lim_val
;
597 if(i
>= ARRAY_SIZE(lim_tab
))
598 die("unknown option passed to -limits\n");
599 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
601 die("could not query rlimits\n");
603 lim
.rl
.rlim_cur
= parse_human_number(limits
+l2
+1);
604 sblist_add(prog_state
.limits
, &lim
);
611 static void init_queue(void) {
613 job_info ji
= {.pid
= -1};
615 for(i
= 0; i
< prog_state
.numthreads
; i
++)
616 sblist_add(prog_state
.job_infos
, &ji
);
619 static void write_statefile(unsigned long long n
, const char* tempfile
) {
620 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
622 dprintf(fd
, "%llu\n", n
+ 1ULL);
624 if(rename(tempfile
, prog_state
.statefile
) == -1)
630 static int str_here(char* haystack
, size_t hay_size
, size_t bufpos
,
631 char* needle
, size_t needle_size
) {
632 if(needle_size
<= hay_size
- bufpos
) {
633 if(!memcmp(needle
, haystack
+ bufpos
, needle_size
))
638 // returns numbers of substitutions done, -1 on out of buffer.
639 // dest is always overwritten. if no substitutions were done, it contains a copy of source.
640 static int substitute_all(char *dest
, ssize_t dest_size
,
641 char *src
, size_t src_size
,
642 char *what
, size_t what_size
,
643 char *whit
, size_t whit_size
) {
646 for(i
= 0; dest_size
> 0 && i
< src_size
; ) {
647 if(str_here(src
, src_size
, i
, what
, what_size
)) {
648 if(dest_size
< (ssize_t
) whit_size
) return -1;
649 memcpy(dest
, whit
, whit_size
);
651 dest_size
-= whit_size
;
661 if(!dest_size
) return -1;
666 static char* mystrnchr(const char *in
, int ch
, size_t end
) {
667 const char *e
= in
+end
;
669 while(p
!= e
&& *p
!= ch
) p
++;
670 if(p
!= e
) return (char*)p
;
673 static char* mystrnrchr(const char *in
, int ch
, size_t end
) {
674 const char *e
= in
+end
-1;
676 while(p
!= e
&& *e
!= ch
) e
--;
677 if(*e
== ch
) return (char*)e
;
680 static char* mystrnrchr_chk(const char *in
, int ch
, size_t end
) {
682 return mystrnrchr(in
, ch
, end
);
685 static int need_linecounter(void) {
686 return !!prog_state
.skip
|| prog_state
.statefile
||
687 prog_state
.use_seqnr
|| prog_state
.count
!= -1UL;
689 static size_t count_linefeeds(const char *buf
, size_t len
) {
690 const char *p
= buf
, *e
= buf
+len
;
693 if(*p
== '\n') cnt
++;
699 static int match_eof(char* inbuf
, size_t len
) {
700 if(!prog_state
.eof_marker
) return 0;
701 size_t l
= strlen(prog_state
.eof_marker
);
702 return l
== len
-1 && !memcmp(prog_state
.eof_marker
, inbuf
, l
);
705 static inline int islb(int p
) { return p
== '\n' || p
== '\r'; }
706 static void chomp(char *s
, size_t *len
) {
707 while(*len
&& islb(s
[*len
-1])) s
[--(*len
)] = 0;
710 static int process_failed(int retval
) {
711 return WIFSIGNALED(retval
) ||
712 (WIFEXITED(retval
) && WEXITSTATUS(retval
));
715 #define MAX_SUBSTS 16
716 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
717 char subst_buf
[MAX_SUBSTS
][4096];
718 static unsigned spinup_counter
= 0;
720 if(!prog_state
.bulk_bytes
)
722 else if(need_linecounter()) {
723 prog_state
.lineno
+= count_linefeeds(inbuf
, len
);
726 if(prog_state
.skip
) {
727 if(!prog_state
.bulk_bytes
) {
731 while(len
&& prog_state
.skip
) {
732 char *q
= mystrnchr(inbuf
, '\n', len
);
734 ptrdiff_t diff
= (q
- inbuf
) + 1;
744 } else if(prog_state
.count
!= -1UL) {
745 if(!prog_state
.count
) return -1;
749 if(!prog_state
.cmd_startarg
) {
750 write_all(1, inbuf
, len
);
754 if(!prog_state
.pipe_mode
)
758 size_t line_size
= len
;
761 if(prog_state
.subst_entries
) {
762 unsigned max_subst
= 0;
764 sblist_iter(prog_state
.subst_entries
, index
) {
765 if(max_subst
>= MAX_SUBSTS
) break;
766 char *source
= argv
[*index
+ prog_state
.cmd_startarg
];
767 size_t source_len
= strlen(source
);
768 ret
= substitute_all(subst_buf
[max_subst
], 4096,
774 dprintf(2, "fatal: line too long for substitution: %s\n", line
);
777 char* lastdot
= mystrnrchr_chk(line
, '.', line_size
);
778 size_t tilLastDot
= line_size
;
779 if(lastdot
) tilLastDot
= lastdot
- line
;
780 ret
= substitute_all(subst_buf
[max_subst
], 4096,
784 if(ret
== -1) goto too_long
;
788 sprintf(lineno
, "%llu", prog_state
.lineno
);
789 ret
= substitute_all(subst_buf
[max_subst
], 4096,
792 lineno
, strlen(lineno
));
793 if(ret
== -1) goto too_long
;
796 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
803 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
804 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
810 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
811 else if(!prog_state
.pipe_mode
) {
813 launch_job(reap_child(&retval
), prog_state
.cmd_argv
);
814 ret
= !process_failed(retval
);
817 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
818 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
821 if(prog_state
.pipe_mode
)
822 pass_stdin(line
, line_size
);
827 int main(int argc
, char** argv
) {
830 char tempdir_buf
[256];
834 if(argc
> 4096) argc
= 4096;
836 prog_state
.threads_running
= 0;
838 if(parse_args(argc
, argv
)) return 1;
840 if(prog_state
.statefile
)
841 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
843 prog_state
.tempdir
= NULL
;
845 if(prog_state
.buffered
) {
846 prog_state
.tempdir
= tempdir_buf
;
847 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
849 die("could not create tempdir\n");
852 /* if the stdout/stderr fds are not in O_APPEND mode,
853 the dup()'s of the fds in posix_spawn can cause different
854 file positions, causing the different processes to overwrite each others output.
856 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
858 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
859 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
862 if(prog_state
.cmd_startarg
) {
863 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
864 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
866 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
869 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
872 prog_state
.lineno
= 0;
874 size_t left
= 0, bytes_read
= 0;
875 const size_t chunksize
= prog_state
.bulk_bytes
? prog_state
.bulk_bytes
: 16*1024;
877 char *mem
= mmap(NULL
, chunksize
*2, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANON
, -1, 0);
879 char *buf2
= mem
+chunksize
;
885 inbuf
= buf1
+chunksize
-left
;
886 memcpy(inbuf
, buf2
+bytes_read
-left
, left
);
887 ssize_t n
= read(0, buf2
, chunksize
);
897 if(prog_state
.pipe_mode
&& prog_state
.bulk_bytes
)
898 p
= mystrnrchr(in
, '\n', left
);
900 p
= mystrnchr (in
, '\n', left
);
903 ptrdiff_t diff
= (p
- in
) + 1;
904 if(match_eof(in
, diff
)) {
908 if(!dispatch_line(in
, diff
, argv
))
914 if(left
&& !match_eof(in
, left
)) dispatch_line(in
, left
, argv
);
917 if(left
> chunksize
) {
918 dprintf(2, "error: input line length exceeds buffer size\n");
927 if(prog_state
.pipe_mode
) {
931 if(prog_state
.delayedflush
)
932 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
935 while(prog_state
.threads_running
) {
937 if(!exitcode
) exitcode
= process_failed(retval
);
940 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
941 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
942 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
944 if(prog_state
.tempdir
)
945 rmdir(prog_state
.tempdir
);