From f658c6a9c30e052aee38a8cea53d494d9170deba Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Fri, 22 Apr 2011 11:47:11 +0200 Subject: [PATCH] s3: Many pthreadpool fixes In particular, this makes it fork-safe (cherry picked from commit 62689d8166b8e070f855e6910470796dd7e1b2c8) (cherry picked from commit 2caf8e097cd8f724c7cd93c3f8e1fc3cd095d8ff) --- source3/configure.in | 2 +- source3/lib/fncall.c | 2 +- source3/lib/pthreadpool/Makefile | 9 + source3/lib/{ => pthreadpool}/pthreadpool.c | 259 ++++++++++----- source3/{include => lib/pthreadpool}/pthreadpool.h | 0 source3/lib/pthreadpool/tests.c | 362 +++++++++++++++++++++ 6 files changed, 546 insertions(+), 88 deletions(-) create mode 100644 source3/lib/pthreadpool/Makefile rename source3/lib/{ => pthreadpool}/pthreadpool.c (70%) rename source3/{include => lib/pthreadpool}/pthreadpool.h (100%) create mode 100644 source3/lib/pthreadpool/tests.c diff --git a/source3/configure.in b/source3/configure.in index c9518280c75..7f65b39be22 100644 --- a/source3/configure.in +++ b/source3/configure.in @@ -6646,7 +6646,7 @@ if test x"$enable_pthreadpool" = x"yes" -a x"$samba_cv_HAVE_PTHREAD" = x"yes"; t LIBS="$LIBS $PTHREAD_LDFLAGS" CFLAGS="$CFLAGS $PTHREAD_CFLAGS" AC_DEFINE(WITH_PTHREADPOOL, 1, [Whether to include pthreadpool helpers]) - AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool.o") + AC_SUBST(PTHREADPOOL_OBJ, "lib/pthreadpool/pthreadpool.o") fi ################################################# diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c index e810b6814ed..bd06be2cd8c 100644 --- a/source3/lib/fncall.c +++ b/source3/lib/fncall.c @@ -21,7 +21,7 @@ #if WITH_PTHREADPOOL -#include "pthreadpool.h" +#include "lib/pthreadpool/pthreadpool.h" struct fncall_state { struct fncall_context *ctx; diff --git a/source3/lib/pthreadpool/Makefile b/source3/lib/pthreadpool/Makefile new file mode 100644 index 00000000000..48626bd2c0a --- /dev/null +++ b/source3/lib/pthreadpool/Makefile @@ -0,0 +1,9 @@ +all: tests + +CFLAGS=-O3 -g -Wall + +pthreadpool.o: pthreadpool.c pthreadpool.h + gcc -c -O3 -o pthreadpool.o pthreadpool.c -I../../.. + +tests: tests.o pthreadpool.o + gcc -o tests tests.o pthreadpool.o -lpthread \ No newline at end of file diff --git a/source3/lib/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c similarity index 70% rename from source3/lib/pthreadpool.c rename to source3/lib/pthreadpool/pthreadpool.c index b62bab0a2ee..4605538cf28 100644 --- a/source3/lib/pthreadpool.c +++ b/source3/lib/pthreadpool/pthreadpool.c @@ -26,8 +26,10 @@ #include #include #include +#include #include "pthreadpool.h" +#include "lib/util/dlinklist.h" struct pthreadpool_job { struct pthreadpool_job *next; @@ -38,6 +40,11 @@ struct pthreadpool_job { struct pthreadpool { /* + * List pthreadpools for fork safety + */ + struct pthreadpool *prev, *next; + + /* * Control access to this struct */ pthread_mutex_t mutex; @@ -85,6 +92,12 @@ struct pthreadpool { pthread_t exited[1]; /* We alloc more */ }; +static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER; +static struct pthreadpool *pthreadpools = NULL; +static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT; + +static void pthreadpool_prep_atfork(void); + /* * Initialize a thread pool */ @@ -95,13 +108,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) size_t size; int ret; - size = sizeof(struct pthreadpool) + max_threads * sizeof(pthread_t); + size = sizeof(struct pthreadpool) + + (max_threads-1) * sizeof(pthread_t); pool = (struct pthreadpool *)malloc(size); if (pool == NULL) { return ENOMEM; } + ret = pipe(pool->sig_pipe); + if (ret == -1) { + int err = errno; + free(pool); + return err; + } + ret = pthread_mutex_init(&pool->mutex, NULL); if (ret != 0) { free(pool); @@ -121,44 +142,117 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult) pool->num_exited = 0; pool->max_threads = max_threads; pool->num_idle = 0; - pool->sig_pipe[0] = -1; - pool->sig_pipe[1] = -1; + + ret = pthread_mutex_lock(&pthreadpools_mutex); + if (ret != 0) { + pthread_cond_destroy(&pool->condvar); + pthread_mutex_destroy(&pool->mutex); + free(pool); + return ret; + } + DLIST_ADD(pthreadpools, pool); + + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork); *presult = pool; + return 0; } -/* - * Create and return a file descriptor which becomes readable when a job has - * finished - */ - -int pthreadpool_sig_fd(struct pthreadpool *pool) +static void pthreadpool_prepare(void) { - int result, ret; + int ret; + struct pthreadpool *pool; - ret = pthread_mutex_lock(&pool->mutex); - if (ret != 0) { - errno = ret; - return -1; + ret = pthread_mutex_lock(&pthreadpools_mutex); + assert(ret == 0); + + pool = pthreadpools; + + while (pool != NULL) { + ret = pthread_mutex_lock(&pool->mutex); + assert(ret == 0); + pool = pool->next; } +} + +static void pthreadpool_parent(void) +{ + int ret; + struct pthreadpool *pool; + + pool = DLIST_TAIL(pthreadpools); + + while (1) { + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); - if (pool->sig_pipe[0] != -1) { - result = pool->sig_pipe[0]; - goto done; + if (pool == pthreadpools) { + break; + } + pool = pool->prev; } - ret = pipe(pool->sig_pipe); - if (ret == -1) { - result = -1; - goto done; + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); +} + +static void pthreadpool_child(void) +{ + int ret; + struct pthreadpool *pool; + + pool = DLIST_TAIL(pthreadpools); + + while (1) { + close(pool->sig_pipe[0]); + close(pool->sig_pipe[1]); + + ret = pipe(pool->sig_pipe); + assert(ret == 0); + + pool->num_threads = 0; + pool->num_exited = 0; + pool->num_idle = 0; + + while (pool->jobs != NULL) { + struct pthreadpool_job *job; + job = pool->jobs; + pool->jobs = job->next; + free(job); + } + pool->last_job = NULL; + + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + + if (pool == pthreadpools) { + break; + } + pool = pool->prev; } - result = pool->sig_pipe[0]; -done: - ret = pthread_mutex_unlock(&pool->mutex); + ret = pthread_mutex_unlock(&pthreadpools_mutex); assert(ret == 0); - return result; +} + +static void pthreadpool_prep_atfork(void) +{ + pthread_atfork(pthreadpool_prepare, pthreadpool_parent, + pthreadpool_child); +} + +/* + * Return the file descriptor which becomes readable when a job has + * finished + */ + +int pthreadpool_sig_fd(struct pthreadpool *pool) +{ + return pool->sig_pipe[0]; } /* @@ -181,59 +275,21 @@ static void pthreadpool_join_children(struct pthreadpool *pool) int pthreadpool_finished_job(struct pthreadpool *pool) { - int result, ret, fd; + int result; ssize_t nread; - ret = pthread_mutex_lock(&pool->mutex); - if (ret != 0) { - errno = ret; - return -1; - } - - /* - * Just some cleanup under the mutex - */ - pthreadpool_join_children(pool); - - fd = pool->sig_pipe[0]; - - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - - if (fd == -1) { - errno = EINVAL; - return -1; - } - nread = -1; errno = EINTR; while ((nread == -1) && (errno == EINTR)) { - nread = read(fd, &result, sizeof(int)); + nread = read(pool->sig_pipe[0], &result, sizeof(int)); } - - /* - * TODO: handle nread > 0 && nread < sizeof(int) - */ - - /* - * Lock the mutex to provide a memory barrier for data from the worker - * thread to the main thread. The pipe access itself does not have to - * be locked, for sizeof(int) the write to a pipe is atomic, and only - * one thread reads from it. But we need to lock the mutex briefly - * even if we don't do anything under the lock, to make sure we can - * see all memory the helper thread has written. - */ - - ret = pthread_mutex_lock(&pool->mutex); - if (ret == -1) { - errno = ret; - return -1; + if (nread == -1) { + return errno; + } + if (nread != sizeof(int)) { + return EINVAL; } - - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - return result; } @@ -250,6 +306,12 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } + if ((pool->jobs != NULL) || pool->shutdown) { + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + return EBUSY; + } + if (pool->num_threads > 0) { /* * We have active threads, tell them to finish, wait for that. @@ -294,14 +356,30 @@ int pthreadpool_destroy(struct pthreadpool *pool) ret = pthread_mutex_destroy(&pool->mutex); ret1 = pthread_cond_destroy(&pool->condvar); - if ((ret == 0) && (ret1 == 0)) { - free(pool); + if (ret != 0) { + return ret; + } + if (ret1 != 0) { + return ret1; } + ret = pthread_mutex_lock(&pthreadpools_mutex); if (ret != 0) { return ret; } - return ret1; + DLIST_REMOVE(pthreadpools, pool); + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + close(pool->sig_pipe[0]); + pool->sig_pipe[0] = -1; + + close(pool->sig_pipe[1]); + pool->sig_pipe[1] = -1; + + free(pool); + + return 0; } /* @@ -325,7 +403,8 @@ static void *pthreadpool_server(void *arg) } while (1) { - struct timespec timeout; + struct timeval tv; + struct timespec ts; struct pthreadpool_job *job; /* @@ -333,14 +412,15 @@ static void *pthreadpool_server(void *arg) * time, exit this thread. */ - timeout.tv_sec = time(NULL) + 1; - timeout.tv_nsec = 0; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec = tv.tv_usec*1000; while ((pool->jobs == NULL) && (pool->shutdown == 0)) { pool->num_idle += 1; res = pthread_cond_timedwait( - &pool->condvar, &pool->mutex, &timeout); + &pool->condvar, &pool->mutex, &ts); pool->num_idle -= 1; if (res == ETIMEDOUT) { @@ -363,7 +443,6 @@ static void *pthreadpool_server(void *arg) job = pool->jobs; if (job != NULL) { - int fd = pool->sig_pipe[1]; ssize_t written; /* @@ -376,7 +455,7 @@ static void *pthreadpool_server(void *arg) } /* - * Do the work with the mutex unlocked :-) + * Do the work with the mutex unlocked */ res = pthread_mutex_unlock(&pool->mutex); @@ -384,17 +463,14 @@ static void *pthreadpool_server(void *arg) job->fn(job->private_data); - written = sizeof(int); + written = write(pool->sig_pipe[1], &job->id, + sizeof(int)); + + free(job); res = pthread_mutex_lock(&pool->mutex); assert(res == 0); - if (fd != -1) { - written = write(fd, &job->id, sizeof(int)); - } - - free(job); - if (written != sizeof(int)) { pthreadpool_server_exit(pool); pthread_mutex_unlock(&pool->mutex); @@ -447,6 +523,17 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } + if (pool->shutdown) { + /* + * Protect against the pool being shut down while + * trying to add a job + */ + res = pthread_mutex_unlock(&pool->mutex); + assert(res == 0); + free(job); + return EINVAL; + } + /* * Just some cleanup under the mutex */ diff --git a/source3/include/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h similarity index 100% rename from source3/include/pthreadpool.h rename to source3/lib/pthreadpool/pthreadpool.h diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c new file mode 100644 index 00000000000..d365fbd5b67 --- /dev/null +++ b/source3/lib/pthreadpool/tests.c @@ -0,0 +1,362 @@ +#include +#include +#include +#include +#include +#include +#include +#include "pthreadpool.h" + +static int test_init(void) +{ + struct pthreadpool *p; + int ret; + + ret = pthreadpool_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + return 0; +} + +static void test_sleep(void *ptr) +{ + int *ptimeout = (int *)ptr; + int ret; + ret = poll(NULL, 0, *ptimeout); + if (ret != 0) { + fprintf(stderr, "poll returned %d (%s)\n", + ret, strerror(errno)); + } +} + +static int test_jobs(int num_threads, int num_jobs) +{ + char *finished; + struct pthreadpool *p; + int timeout = 1; + int i, ret; + + finished = (char *)calloc(1, num_jobs); + if (finished == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + ret = pthreadpool_init(num_threads, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_init failed: %s\n", + strerror(ret)); + return -1; + } + + for (i=0; i= num_jobs)) { + fprintf(stderr, "invalid job number %d\n", ret); + return -1; + } + finished[ret] += 1; + } + + for (i=0; inum_jobs; i++) { + int ret = pthreadpool_add_job(state->p, state->start_job + i, + test_sleep, &state->timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return NULL; + } + } + return NULL; +} + +static int test_threaded_addjob(int num_pools, int num_threads, int poolsize, + int num_jobs) +{ + struct pthreadpool **pools; + struct threaded_state *states; + struct threaded_state *state; + struct pollfd *pfds; + char *finished; + pid_t child; + int i, ret, poolnum; + int received; + + states = calloc(num_threads, sizeof(struct threaded_state)); + if (states == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + finished = calloc(num_threads * num_jobs, 1); + if (finished == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + pools = calloc(num_pools, sizeof(struct pthreadpool *)); + if (pools == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + pfds = calloc(num_pools, sizeof(struct pollfd)); + if (pfds == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + for (i=0; ip = pools[poolnum]; + poolnum = (poolnum + 1) % num_pools; + + state->num_jobs = num_jobs; + state->timeout = 1; + state->start_job = i * num_jobs; + + ret = pthread_create(&state->tid, NULL, test_threaded_worker, + state); + if (ret != 0) { + fprintf(stderr, "pthread_create failed: %s\n", + strerror(ret)); + return -1; + } + } + + if (random() % 1) { + poll(NULL, 0, 1); + } + + child = fork(); + if (child < 0) { + fprintf(stderr, "fork failed: %s\n", strerror(errno)); + return -1; + } + if (child == 0) { + for (i=0; i= num_jobs * num_threads)) { + fprintf(stderr, "invalid job number %d\n", + ret); + return -1; + } + finished[ret] += 1; + received += 1; + } + } + + for (i=0; i