From 84aa2ddd861549d6ec8d1ef15f4fd518e03f449b Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Fri, 21 Mar 2014 17:53:26 +0100 Subject: [PATCH] pthreadpool: Avoid a malloc/free per job pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so we should avoid anything CPU-intensive. pthreadpool used to malloc each job and free it in the worker thread. This patch adds a FIFO queue for jobs that helper threads copy from, avoiding constant malloc/free. This cuts user space CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system. Signed-off-by: Volker Lendecke Reviewed-by: Jeremy Allison --- source3/lib/pthreadpool/pthreadpool.c | 145 +++++++++++++++++++++------------- 1 file changed, 91 insertions(+), 54 deletions(-) diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c index 654d420732f..d51e8083601 100644 --- a/source3/lib/pthreadpool/pthreadpool.c +++ b/source3/lib/pthreadpool/pthreadpool.c @@ -34,7 +34,6 @@ #include "lib/util/dlinklist.h" struct pthreadpool_job { - struct pthreadpool_job *next; int id; void (*fn)(void *private_data); void *private_data; @@ -57,9 +56,13 @@ struct pthreadpool { pthread_cond_t condvar; /* - * List of work jobs + * Array of jobs */ - struct pthreadpool_job *jobs, *last_job; + size_t jobs_array_len; + struct pthreadpool_job *jobs; + + size_t head; + size_t num_jobs; /* * pipe for signalling @@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) return ENOMEM; } + pool->jobs_array_len = 4; + pool->jobs = calloc( + pool->jobs_array_len, sizeof(struct pthreadpool_job)); + + if (pool->jobs == NULL) { + free(pool); + return ENOMEM; + } + + pool->head = pool->num_jobs = 0; + ret = pipe(pool->sig_pipe); if (ret == -1) { int err = errno; + free(pool->jobs); free(pool); return err; } @@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) if (ret != 0) { close(pool->sig_pipe[0]); close(pool->sig_pipe[1]); + free(pool->jobs); free(pool); return ret; } @@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) pthread_mutex_destroy(&pool->mutex); close(pool->sig_pipe[0]); close(pool->sig_pipe[1]); + free(pool->jobs); free(pool); return ret; } pool->shutdown = 0; - pool->jobs = pool->last_job = NULL; pool->num_threads = 0; pool->num_exited = 0; pool->exited = NULL; @@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) pthread_mutex_destroy(&pool->mutex); close(pool->sig_pipe[0]); close(pool->sig_pipe[1]); + free(pool->jobs); free(pool); return ret; } @@ -221,14 +238,8 @@ static void pthreadpool_child(void) pool->exited = NULL; pool->num_idle = 0; - - while (pool->jobs != NULL) { - struct pthreadpool_job *job; - job = pool->jobs; - pool->jobs = job->next; - free(job); - } - pool->last_job = NULL; + pool->head = 0; + pool->num_jobs = 0; ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -311,7 +322,7 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } - if ((pool->jobs != NULL) || pool->shutdown) { + if ((pool->num_jobs != 0) || pool->shutdown) { ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); return EBUSY; @@ -383,6 +394,7 @@ int pthreadpool_destroy(struct pthreadpool *pool) pool->sig_pipe[1] = -1; free(pool->exited); + free(pool->jobs); free(pool); return 0; @@ -410,6 +422,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) pool->num_exited += 1; } +static bool pthreadpool_get_job(struct pthreadpool *p, + struct pthreadpool_job *job) +{ + if (p->num_jobs == 0) { + return false; + } + *job = p->jobs[p->head]; + p->head = (p->head+1) % p->jobs_array_len; + p->num_jobs -= 1; + return true; +} + +static bool pthreadpool_put_job(struct pthreadpool *p, + int id, + void (*fn)(void *private_data), + void *private_data) +{ + struct pthreadpool_job *job; + + if (p->num_jobs == p->jobs_array_len) { + struct pthreadpool_job *tmp; + size_t new_len = p->jobs_array_len * 2; + + tmp = realloc( + p->jobs, sizeof(struct pthreadpool_job) * new_len); + if (tmp == NULL) { + return false; + } + p->jobs = tmp; + + /* + * We just doubled the jobs array. The array implements a FIFO + * queue with a modulo-based wraparound, so we have to memcpy + * the jobs that are logically at the queue end but physically + * before the queue head into the reallocated area. The new + * space starts at the current jobs_array_len, and we have to + * copy everything before the current head job into the new + * area. + */ + memcpy(&p->jobs[p->jobs_array_len], p->jobs, + sizeof(struct pthreadpool_job) * p->head); + + p->jobs_array_len = new_len; + } + + job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len]; + job->id = id; + job->fn = fn; + job->private_data = private_data; + + p->num_jobs += 1; + + return true; +} + static void *pthreadpool_server(void *arg) { struct pthreadpool *pool = (struct pthreadpool *)arg; @@ -422,7 +489,7 @@ static void *pthreadpool_server(void *arg) while (1) { struct timespec ts; - struct pthreadpool_job *job; + struct pthreadpool_job job; /* * idle-wait at most 1 second. If nothing happens in that @@ -432,7 +499,7 @@ static void *pthreadpool_server(void *arg) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; - while ((pool->jobs == NULL) && (pool->shutdown == 0)) { + while ((pool->num_jobs == 0) && (pool->shutdown == 0)) { pool->num_idle += 1; res = pthread_cond_timedwait( @@ -441,7 +508,7 @@ static void *pthreadpool_server(void *arg) if (res == ETIMEDOUT) { - if (pool->jobs == NULL) { + if (pool->num_jobs == 0) { /* * we timed out and still no work for * us. Exit. @@ -456,19 +523,9 @@ static void *pthreadpool_server(void *arg) assert(res == 0); } - job = pool->jobs; - - if (job != NULL) { + if (pthreadpool_get_job(pool, &job)) { ssize_t written; - - /* - * Ok, there's work for us to do, remove the job from - * the pthreadpool list - */ - pool->jobs = job->next; - if (pool->last_job == job) { - pool->last_job = NULL; - } + int sig_pipe = pool->sig_pipe[1]; /* * Do the work with the mutex unlocked @@ -477,12 +534,8 @@ static void *pthreadpool_server(void *arg) res = pthread_mutex_unlock(&pool->mutex); assert(res == 0); - job->fn(job->private_data); - - written = write(pool->sig_pipe[1], &job->id, - sizeof(int)); - - free(job); + job.fn(job.private_data); + written = write(sig_pipe, &job.id, sizeof(job.id)); res = pthread_mutex_lock(&pool->mutex); assert(res == 0); @@ -494,7 +547,7 @@ static void *pthreadpool_server(void *arg) } } - if ((pool->jobs == NULL) && (pool->shutdown != 0)) { + if ((pool->num_jobs == 0) && (pool->shutdown != 0)) { /* * No more work to do and we're asked to shut down, so * exit @@ -518,24 +571,12 @@ static void *pthreadpool_server(void *arg) int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { - struct pthreadpool_job *job; pthread_t thread_id; int res; sigset_t mask, omask; - job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job)); - if (job == NULL) { - return ENOMEM; - } - - job->fn = fn; - job->private_data = private_data; - job->id = job_id; - job->next = NULL; - res = pthread_mutex_lock(&pool->mutex); if (res != 0) { - free(job); return res; } @@ -546,7 +587,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, */ res = pthread_mutex_unlock(&pool->mutex); assert(res == 0); - free(job); return EINVAL; } @@ -558,13 +598,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, /* * Add job to the end of the queue */ - if (pool->jobs == NULL) { - pool->jobs = job; - } - else { - pool->last_job->next = job; + if (!pthreadpool_put_job(pool, job_id, fn, private_data)) { + pthread_mutex_unlock(&pool->mutex); + return ENOMEM; } - pool->last_job = job; if (pool->num_idle > 0) { /* -- 2.11.4.GIT