From 77b7dea2d988a9944f795e2efae24aeb16009f5f Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Fri, 9 Sep 2016 13:07:57 +0200 Subject: [PATCH] pthreadpool: Use detached threads So far we used joinable threads. This prevented pthreadpool_destroy with blocked threads. This patch converts pthreadpool to detached threads. Now pthreadpool_destroy does not have to wait for the idle threads to finish, it can immediately return. pthreadpool_destroy will tell all threads to exit, and the last active thread will actually free(pthreadpool). Signed-off-by: Volker Lendecke Reviewed-by: Jeremy Allison --- source3/lib/pthreadpool/pthreadpool.c | 191 ++++++++++++---------------------- source3/lib/pthreadpool/pthreadpool.h | 9 +- 2 files changed, 74 insertions(+), 126 deletions(-) diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c index a306c88fdcf..a1eb9241a4d 100644 --- a/source3/lib/pthreadpool/pthreadpool.c +++ b/source3/lib/pthreadpool/pthreadpool.c @@ -89,12 +89,6 @@ struct pthreadpool { * Number of idle threads */ int num_idle; - - /* - * An array of threads that require joining. - */ - int num_exited; - pthread_t *exited; /* We alloc more */ }; static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -152,8 +146,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, pool->shutdown = false; pool->num_threads = 0; - pool->num_exited = 0; - pool->exited = NULL; pool->max_threads = max_threads; pool->num_idle = 0; @@ -220,11 +212,6 @@ static void pthreadpool_child(void) pool = DLIST_PREV(pool)) { pool->num_threads = 0; - - pool->num_exited = 0; - free(pool->exited); - pool->exited = NULL; - pool->num_idle = 0; pool->head = 0; pool->num_jobs = 0; @@ -243,36 +230,40 @@ static void pthreadpool_prep_atfork(void) pthreadpool_child); } -/* - * Do a pthread_join() on all children that have exited, pool->mutex must be - * locked - */ -static void pthreadpool_join_children(struct pthreadpool *pool) +static int pthreadpool_free(struct pthreadpool *pool) { - int i; + int ret, ret1; - for (i=0; inum_exited; i++) { - int ret; + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); - ret = pthread_join(pool->exited[i], NULL); - if (ret != 0) { - /* - * Severe internal error, we can't do much but - * abort here. - */ - abort(); - } + ret = pthread_mutex_destroy(&pool->mutex); + ret1 = pthread_cond_destroy(&pool->condvar); + + if (ret != 0) { + return ret; + } + if (ret1 != 0) { + return ret1; } - pool->num_exited = 0; - /* - * Deliberately not free and NULL pool->exited. That will be - * re-used by realloc later. - */ + ret = pthread_mutex_lock(&pthreadpools_mutex); + if (ret != 0) { + return ret; + } + DLIST_REMOVE(pthreadpools, pool); + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + free(pool->jobs); + free(pool); + + return 0; } /* - * Destroy a thread pool, finishing all threads working for it + * Destroy a thread pool. Wake up all idle threads for exit. The last + * one will free the pool. */ int pthreadpool_destroy(struct pthreadpool *pool) @@ -284,98 +275,49 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } - if ((pool->num_jobs != 0) || pool->shutdown) { + if (pool->num_threads == 0) { + ret = pthreadpool_free(pool); + return ret; + } + + if (pool->shutdown) { ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); return EBUSY; } - if (pool->num_threads > 0) { - /* - * We have active threads, tell them to finish, wait for that. - */ - - pool->shutdown = true; - - if (pool->num_idle > 0) { - /* - * Wake the idle threads. They will find - * pool->shutdown to be set and exit themselves - */ - ret = pthread_cond_broadcast(&pool->condvar); - if (ret != 0) { - pthread_mutex_unlock(&pool->mutex); - return ret; - } - } - - while ((pool->num_threads > 0) || (pool->num_exited > 0)) { - - if (pool->num_exited > 0) { - pthreadpool_join_children(pool); - continue; - } - /* - * A thread that shuts down will also signal - * pool->condvar - */ - ret = pthread_cond_wait(&pool->condvar, &pool->mutex); - if (ret != 0) { - pthread_mutex_unlock(&pool->mutex); - return ret; - } - } - } + /* + * We have active threads, tell them to finish. + */ - ret = pthread_mutex_unlock(&pool->mutex); - if (ret != 0) { - return ret; - } - ret = pthread_mutex_destroy(&pool->mutex); - ret1 = pthread_cond_destroy(&pool->condvar); + pool->shutdown = true; - if (ret != 0) { - return ret; - } - if (ret1 != 0) { - return ret1; - } + ret = pthread_cond_broadcast(&pool->condvar); - ret = pthread_mutex_lock(&pthreadpools_mutex); - if (ret != 0) { - return ret; - } - DLIST_REMOVE(pthreadpools, pool); - ret = pthread_mutex_unlock(&pthreadpools_mutex); - assert(ret == 0); + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); - free(pool->exited); - free(pool->jobs); - free(pool); - - return 0; + return ret; } /* - * Prepare for pthread_exit(), pool->mutex must be locked + * Prepare for pthread_exit(), pool->mutex must be locked and will be + * unlocked here. This is a bit of a layering violation, but here we + * also take care of removing the pool if we're the last thread. */ static void pthreadpool_server_exit(struct pthreadpool *pool) { - pthread_t *exited; + int ret; pool->num_threads -= 1; - exited = (pthread_t *)realloc( - pool->exited, sizeof(pthread_t) * (pool->num_exited + 1)); - - if (exited == NULL) { - /* lost a thread status */ + if (pool->shutdown && (pool->num_threads == 0)) { + pthreadpool_free(pool); return; } - pool->exited = exited; - pool->exited[pool->num_exited] = pthread_self(); - pool->num_exited += 1; + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); } static bool pthreadpool_get_job(struct pthreadpool *p, @@ -470,7 +412,6 @@ static void *pthreadpool_server(void *arg) * us. Exit. */ pthreadpool_server_exit(pool); - pthread_mutex_unlock(&pool->mutex); return NULL; } @@ -500,7 +441,6 @@ static void *pthreadpool_server(void *arg) if (ret != 0) { pthreadpool_server_exit(pool); - pthread_mutex_unlock(&pool->mutex); return NULL; } } @@ -511,16 +451,6 @@ static void *pthreadpool_server(void *arg) * exit */ pthreadpool_server_exit(pool); - - if (pool->num_threads == 0) { - /* - * Ping the main thread waiting for all of us - * workers to have quit. - */ - pthread_cond_broadcast(&pool->condvar); - } - - pthread_mutex_unlock(&pool->mutex); return NULL; } } @@ -529,6 +459,7 @@ static void *pthreadpool_server(void *arg) int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { + pthread_attr_t thread_attr; pthread_t thread_id; int res; sigset_t mask, omask; @@ -549,11 +480,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, } /* - * Just some cleanup under the mutex - */ - pthreadpool_join_children(pool); - - /* * Add job to the end of the queue */ if (!pthreadpool_put_job(pool, job_id, fn, private_data)) { @@ -585,20 +511,37 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, sigfillset(&mask); + res = pthread_attr_init(&thread_attr); + if (res != 0) { + pthread_mutex_unlock(&pool->mutex); + return res; + } + + res = pthread_attr_setdetachstate( + &thread_attr, PTHREAD_CREATE_DETACHED); + if (res != 0) { + pthread_attr_destroy(&thread_attr); + pthread_mutex_unlock(&pool->mutex); + return res; + } + res = pthread_sigmask(SIG_BLOCK, &mask, &omask); if (res != 0) { + pthread_attr_destroy(&thread_attr); pthread_mutex_unlock(&pool->mutex); return res; } res = pthread_create(&thread_id, NULL, pthreadpool_server, - (void *)pool); + (void *)pool); if (res == 0) { pool->num_threads += 1; } assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0); + pthread_attr_destroy(&thread_attr); + pthread_mutex_unlock(&pool->mutex); return res; } diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h index ee9d9578050..defbe5a9f62 100644 --- a/source3/lib/pthreadpool/pthreadpool.h +++ b/source3/lib/pthreadpool/pthreadpool.h @@ -53,8 +53,13 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, /** * @brief Destroy a pthreadpool * - * Destroy a pthreadpool. If jobs are still active, this returns - * EBUSY. + * Destroy a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. The caller of + * pthreadpool_init must make sure the required resources are still + * around when the pool is destroyed with pending jobs. The last + * thread to exit will finally free() the pool memory. * * @param[in] pool The pool to destroy * @return success: 0, failure: errno -- 2.11.4.GIT