From 58b6a3d2d379d39c531686b42f329c46ac5ee81b Mon Sep 17 00:00:00 2001 From: rofl0r Date: Wed, 8 Jun 2016 18:14:59 +0100 Subject: [PATCH] remove confusing usage of slot list while reading the code, i found the usage of the slot list rather confusing. the intent was to optimize looking up of a free job slot (index into the job array), instead of iterating over the job array to find an unused entry (pid == -1). however this can be entirely left away if we just fill the job array sequentially till it's full, and later by waiting for/reaping a single process and re-use its job slot immediately when we want to add a job. --- jobflow.c | 51 ++++++++++++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/jobflow.c b/jobflow.c index 6fdde05..031594b 100644 --- a/jobflow.c +++ b/jobflow.c @@ -37,6 +37,7 @@ along with this program. If not, see . #include #include #include +#include /* process handling */ @@ -72,9 +73,6 @@ typedef struct { struct rlimit rl; } limit_rec; -/* defines how many slots our free_slots struct can take */ -#define MAX_SLOTS 128 - typedef struct { int numthreads; unsigned threads_running; @@ -84,8 +82,6 @@ typedef struct { sblist* subst_entries; sblist* limits; unsigned cmd_startarg; - size_t free_slots[MAX_SLOTS]; - unsigned free_slots_count; char* tempdir; int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time. the top value in ms can be supplied via a command line switch. @@ -99,7 +95,7 @@ typedef struct { this prevents mixing up of the output of multiple tasks. */ int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end. this means faster program execution, but could also be imprecise if the number of - jobs is small or smaller than the available threadcount / MAX_SLOTS. */ + jobs is small or smaller than the available threadcount. */ int join_output:1; /* join stdout and stderr of launched jobs into stdout */ } prog_state_s; @@ -172,13 +168,6 @@ void launch_job(size_t jobindex, char** argv) { } } -static void releaseJobSlot(size_t job_id) { - if(prog_state.free_slots_count < MAX_SLOTS) { - prog_state.free_slots[prog_state.free_slots_count] = job_id; - prog_state.free_slots_count++; - } -} - static void dump_output(size_t job_id, int is_stderr) { char out_filename_buf[256]; char buf[4096]; @@ -198,30 +187,43 @@ static void dump_output(size_t job_id, int is_stderr) { } } -static void reapChilds(void) { +/* wait till a child exits, reap it, and return its job index for slot reuse */ +static size_t reap_child(void) { size_t i; job_info* job; int ret, retval; - ret = waitpid(-1, &retval, 0); - if(ret == -1) return; - if(!WIFEXITED(retval)) return; + do ret = waitpid(-1, &retval, 0); + while(ret == -1 || !WIFEXITED(retval)); + for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) { job = sblist_get(prog_state.job_infos, i); if(job->pid == ret) { job->pid = -1; posix_spawn_file_actions_destroy(&job->fa); - releaseJobSlot(i); prog_state.threads_running--; if(prog_state.buffered) { dump_output(i, 0); if(!prog_state.join_output) dump_output(i, 1); } + return i; } } + assert(0); + return -1; } +static size_t free_slots(void) { + return prog_state.numthreads - prog_state.threads_running; +} + +static void add_job(char **argv) { + if(free_slots()) + launch_job(prog_state.threads_running, argv); + else + launch_job(reap_child(), argv); +} __attribute__((noreturn)) static void die(const char* msg) { @@ -408,10 +410,8 @@ static void init_queue(void) { int i; job_info ji = {.pid = -1}; - for(i = 0; i < prog_state.numthreads; i++) { + for(i = 0; i < prog_state.numthreads; i++) sblist_add(prog_state.job_infos, &ji); - releaseJobSlot(i); - } } static void write_statefile(uint64_t n, const char* tempfile) { @@ -469,7 +469,6 @@ int main(int argc, char** argv) { if(argc > 4096) argc = 4096; prog_state.threads_running = 0; - prog_state.free_slots_count = 0; if(parse_args(argc, argv)) return 1; @@ -540,17 +539,15 @@ int main(int argc, char** argv) { } } - while(!prog_state.free_slots_count) reapChilds(); if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) { msleep(rand() % (prog_state.delayedspinup_interval + 1)); spinup_counter++; } - launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv); - prog_state.free_slots_count--; + add_job(cmd_argv); - if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) { + if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) { write_statefile(n, temp_state); } } @@ -563,7 +560,7 @@ int main(int argc, char** argv) { if(prog_state.delayedflush) write_statefile(n - 1, temp_state); - while(prog_state.threads_running) reapChilds(); + while(prog_state.threads_running) reap_child(); if(prog_state.subst_entries) sblist_free(prog_state.subst_entries); if(prog_state.job_infos) sblist_free(prog_state.job_infos); -- 2.11.4.GIT