From de802d6cc267b72735dac60169bc68a564247303 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Tue, 3 Jan 2012 17:31:45 +0100 Subject: [PATCH] initial commit --- jobflow.c | 269 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 jobflow.c diff --git a/jobflow.c b/jobflow.c new file mode 100644 index 0000000..d9ecb9a --- /dev/null +++ b/jobflow.c @@ -0,0 +1,269 @@ +#include "../lib/include/optparser.h" +#include "../lib/include/stringptr.h" +#include "../lib/include/stringptrlist.h" +#include "../lib/include/sblist.h" +#include "../lib/include/strlib.h" +#include "../lib/include/timelib.h" + +#include +#include +#include +#include +#include +#include + +#define SLEEP_MS 12 + +typedef struct { + unsigned numthreads; + char* statefile; + unsigned skip; + int buffered; + sblist* job_infos; + unsigned cmd_startarg; +} prog_state_s; + +prog_state_s prog_state; + +/* process handling */ + +#include +#include +#include +#include + +typedef struct { + pid_t pid; +} job_info; + +extern char** environ; + +int makeLogfilename(char* buf, ...) { + return 0; +} + +void launch_job(job_info* job, char** argv) { + posix_spawn_file_actions_t fa; + char buf[256]; + + if(job->pid != -1) return; + + if(prog_state.buffered) + if(!makeLogfilename(buf, sizeof(buf), argv[0])) { + fprintf(stderr, " filename too long: %s\n", argv[0]); + return; + } + + + errno = posix_spawn_file_actions_init(&fa); + if(errno) goto spawn_error; + errno = posix_spawn_file_actions_addclose(&fa, 0); + if(errno) goto spawn_error; + + if(prog_state.buffered) { + errno = posix_spawn_file_actions_addclose(&fa, 1); + if(errno) goto spawn_error; + errno = posix_spawn_file_actions_addclose(&fa, 2); + if(errno) goto spawn_error; + } + + errno = posix_spawn_file_actions_addopen(&fa, 0, "/dev/null", O_RDONLY, 0); + if(errno) goto spawn_error; + + if(prog_state.buffered) { + errno = posix_spawn_file_actions_addopen(&fa, 1, buf, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if(errno) goto spawn_error; + errno = posix_spawn_file_actions_adddup2(&fa, 1, 2); + if(errno) goto spawn_error; + } + + errno = posix_spawnp(&job->pid, argv[0], &fa, NULL, argv, environ); + if(errno) { + spawn_error: + job->pid = -1; + perror("posix_spawn"); + } +} + +typedef struct { + job_info* empty_slot; + unsigned threads_running; +} reap_info; + +/* reap childs and return pointer to a free "slot" or NULL */ +reap_info reapChilds(void) { + size_t i; + job_info* job; + int ret, retval; + + reap_info result; + + result.empty_slot = NULL; + result.threads_running = 0; + + for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) { + job = sblist_get(prog_state.job_infos, i); + if(job->pid != -1) { + ret = waitpid(job->pid, &retval, WNOHANG); + if(ret != 0) { + // error or changed state. + if(ret == -1) { + perror("waitpid"); + continue; + } + if(!retval) { + //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL); + } + else { + //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL); + } + job->pid = -1; + //job->passed = 0; + result.empty_slot = job; + } else + result.threads_running++; + } else + result.empty_slot = job; + } + + return result; +} + + +__attribute__((noreturn)) +void die(const char* msg) { + fprintf(stderr, msg); + exit(1); +} + +void parse_args(int argc, char** argv) { + op_state op_b, *op = &op_b; + op_init(op, argc, argv); + char *op_temp; + op_temp = op_get(op, SPL("threads")); + prog_state.numthreads = op_temp ? atoi(op_temp) : 1; + op_temp = op_get(op, SPL("statefile")); + prog_state.statefile = op_temp; + + op_temp = op_get(op, SPL("skip")); + prog_state.skip = op_temp ? atoi(op_temp) : 0; + if(op_hasflag(op, SPL("resume"))) { + if(!prog_state.statefile) die("-resume needs -statefile\n"); + if(access(prog_state.statefile, W_OK | R_OK) != -1) { + stringptr* fc = stringptr_fromfile(prog_state.statefile); + prog_state.skip = atoi(fc->ptr); + } + } + + prog_state.cmd_startarg = 0; + if(op_hasflag(op, SPL("exec"))) { + unsigned i, r = 0; + for(i = 1; i < (unsigned) argc; i++) { + if(str_equal(argv[i], "-exec")) { + r = i + 1; + break; + } + } + if(r && r < argc) { + prog_state.cmd_startarg = r; + } + } + prog_state.buffered = 0; +} + +void init_queue(void) { + unsigned i; + job_info ji; + + ji.pid = -1; + + for(i = 0; i < prog_state.numthreads; i++) { + sblist_add(prog_state.job_infos, &ji); + } +} + +int main(int argc, char** argv) { + char inbuf[4096]; char* fgets_result, *strstr_result, *p; + stringptr line_b, *line = &line_b; + char* cmd_argv[4096]; + char subst_buf[4096][16]; + unsigned max_subst; + + char numbuf[64]; + stringptr num_b, *num = &num_b; + + reap_info ri; + + uint64_t n = 0; + unsigned i, j; + + if(argc > 4096) argc = 4096; + parse_args(argc, argv); + if(prog_state.cmd_startarg) { + for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) { + cmd_argv[i - prog_state.cmd_startarg] = argv[i]; + } + cmd_argv[argc - prog_state.cmd_startarg] = NULL; + } + + prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads); + init_queue(); + + while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) { + if(prog_state.skip) + prog_state.skip--; + else { + if(!prog_state.cmd_startarg) + printf(fgets_result); + else { + stringptr_fromchar(fgets_result, line); + stringptr_chomp(line); + + max_subst = 0; + for(i = 1; i < argc - prog_state.cmd_startarg; i++) { + p = argv[i + prog_state.cmd_startarg]; + if((strstr_result = strstr(p, "{}"))) { + j = 0; + j = strstr_result - p; + if(j) memcpy(subst_buf[max_subst], p, j); + strncpy(&subst_buf[max_subst][j], line->ptr, 4096 - j); + j += line->size; + if(j > 4096) { + fprintf(stderr, "fatal: line to long for substitution: %s\n", line->ptr); + goto out; + } + strncpy(&subst_buf[max_subst][j], strstr_result + 2, 4096 - j); + + cmd_argv[i] = subst_buf[max_subst]; + max_subst++; + if(max_subst >= 16) die("too many substitutions!\n"); + } + } + + do { + ri = reapChilds(); + if(!ri.empty_slot) msleep(SLEEP_MS); + + } while(!ri.empty_slot); + + launch_job(ri.empty_slot, cmd_argv); + + if(prog_state.statefile) { + num_b.ptr = uint64ToString(n + 1, numbuf); + num_b.size = strlen(numbuf); + stringptr_tofile(prog_state.statefile, num); + } + } + } + n++; + } + + out: + do { + ri = reapChilds(); + if(ri.threads_running) msleep(SLEEP_MS); + + } while(ri.threads_running); + + return 0; +} -- 2.11.4.GIT