s3: Allow unlimited parallelism in pthreadpool
[Samba.git] / source3 / lib / pthreadpool / pthreadpool.c
blob3cf6cb7045835b78c027c729b621e0e00ce2b1a7
1 /*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include <errno.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include <sys/time.h>
31 #include "pthreadpool.h"
32 #include "lib/util/dlinklist.h"
34 struct pthreadpool_job {
35 struct pthreadpool_job *next;
36 int id;
37 void (*fn)(void *private_data);
38 void *private_data;
41 struct pthreadpool {
43 * List pthreadpools for fork safety
45 struct pthreadpool *prev, *next;
48 * Control access to this struct
50 pthread_mutex_t mutex;
53 * Threads waiting for work do so here
55 pthread_cond_t condvar;
58 * List of work jobs
60 struct pthreadpool_job *jobs, *last_job;
63 * pipe for signalling
65 int sig_pipe[2];
68 * indicator to worker threads that they should shut down
70 int shutdown;
73 * maximum number of threads
75 int max_threads;
78 * Number of threads
80 int num_threads;
83 * Number of idle threads
85 int num_idle;
88 * An array of threads that require joining.
90 int num_exited;
91 pthread_t *exited; /* We alloc more */
94 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
95 static struct pthreadpool *pthreadpools = NULL;
96 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
98 static void pthreadpool_prep_atfork(void);
101 * Initialize a thread pool
104 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
106 struct pthreadpool *pool;
107 int ret;
109 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
110 if (pool == NULL) {
111 return ENOMEM;
114 ret = pipe(pool->sig_pipe);
115 if (ret == -1) {
116 int err = errno;
117 free(pool);
118 return err;
121 ret = pthread_mutex_init(&pool->mutex, NULL);
122 if (ret != 0) {
123 free(pool);
124 return ret;
127 ret = pthread_cond_init(&pool->condvar, NULL);
128 if (ret != 0) {
129 pthread_mutex_destroy(&pool->mutex);
130 free(pool);
131 return ret;
134 pool->shutdown = 0;
135 pool->jobs = pool->last_job = NULL;
136 pool->num_threads = 0;
137 pool->num_exited = 0;
138 pool->exited = NULL;
139 pool->max_threads = max_threads;
140 pool->num_idle = 0;
142 ret = pthread_mutex_lock(&pthreadpools_mutex);
143 if (ret != 0) {
144 pthread_cond_destroy(&pool->condvar);
145 pthread_mutex_destroy(&pool->mutex);
146 free(pool);
147 return ret;
149 DLIST_ADD(pthreadpools, pool);
151 ret = pthread_mutex_unlock(&pthreadpools_mutex);
152 assert(ret == 0);
154 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
156 *presult = pool;
158 return 0;
161 static void pthreadpool_prepare(void)
163 int ret;
164 struct pthreadpool *pool;
166 ret = pthread_mutex_lock(&pthreadpools_mutex);
167 assert(ret == 0);
169 pool = pthreadpools;
171 while (pool != NULL) {
172 ret = pthread_mutex_lock(&pool->mutex);
173 assert(ret == 0);
174 pool = pool->next;
178 static void pthreadpool_parent(void)
180 int ret;
181 struct pthreadpool *pool;
183 pool = DLIST_TAIL(pthreadpools);
185 while (1) {
186 ret = pthread_mutex_unlock(&pool->mutex);
187 assert(ret == 0);
189 if (pool == pthreadpools) {
190 break;
192 pool = pool->prev;
195 ret = pthread_mutex_unlock(&pthreadpools_mutex);
196 assert(ret == 0);
199 static void pthreadpool_child(void)
201 int ret;
202 struct pthreadpool *pool;
204 pool = DLIST_TAIL(pthreadpools);
206 while (1) {
207 close(pool->sig_pipe[0]);
208 close(pool->sig_pipe[1]);
210 ret = pipe(pool->sig_pipe);
211 assert(ret == 0);
213 pool->num_threads = 0;
215 pool->num_exited = 0;
216 free(pool->exited);
217 pool->exited = NULL;
219 pool->num_idle = 0;
221 while (pool->jobs != NULL) {
222 struct pthreadpool_job *job;
223 job = pool->jobs;
224 pool->jobs = job->next;
225 free(job);
227 pool->last_job = NULL;
229 ret = pthread_mutex_unlock(&pool->mutex);
230 assert(ret == 0);
232 if (pool == pthreadpools) {
233 break;
235 pool = pool->prev;
238 ret = pthread_mutex_unlock(&pthreadpools_mutex);
239 assert(ret == 0);
242 static void pthreadpool_prep_atfork(void)
244 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
245 pthreadpool_child);
249 * Return the file descriptor which becomes readable when a job has
250 * finished
253 int pthreadpool_signal_fd(struct pthreadpool *pool)
255 return pool->sig_pipe[0];
259 * Do a pthread_join() on all children that have exited, pool->mutex must be
260 * locked
262 static void pthreadpool_join_children(struct pthreadpool *pool)
264 int i;
266 for (i=0; i<pool->num_exited; i++) {
267 pthread_join(pool->exited[i], NULL);
269 pool->num_exited = 0;
272 * Deliberately not free and NULL pool->exited. That will be
273 * re-used by realloc later.
278 * Fetch a finished job number from the signal pipe
281 int pthreadpool_finished_job(struct pthreadpool *pool)
283 int result;
284 ssize_t nread;
286 nread = -1;
287 errno = EINTR;
289 while ((nread == -1) && (errno == EINTR)) {
290 nread = read(pool->sig_pipe[0], &result, sizeof(int));
292 if (nread == -1) {
293 return errno;
295 if (nread != sizeof(int)) {
296 return EINVAL;
298 return result;
302 * Destroy a thread pool, finishing all threads working for it
305 int pthreadpool_destroy(struct pthreadpool *pool)
307 int ret, ret1;
309 ret = pthread_mutex_lock(&pool->mutex);
310 if (ret != 0) {
311 return ret;
314 if ((pool->jobs != NULL) || pool->shutdown) {
315 ret = pthread_mutex_unlock(&pool->mutex);
316 assert(ret == 0);
317 return EBUSY;
320 if (pool->num_threads > 0) {
322 * We have active threads, tell them to finish, wait for that.
325 pool->shutdown = 1;
327 if (pool->num_idle > 0) {
329 * Wake the idle threads. They will find pool->quit to
330 * be set and exit themselves
332 ret = pthread_cond_broadcast(&pool->condvar);
333 if (ret != 0) {
334 pthread_mutex_unlock(&pool->mutex);
335 return ret;
339 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
341 if (pool->num_exited > 0) {
342 pthreadpool_join_children(pool);
343 continue;
346 * A thread that shuts down will also signal
347 * pool->condvar
349 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
350 if (ret != 0) {
351 pthread_mutex_unlock(&pool->mutex);
352 return ret;
357 ret = pthread_mutex_unlock(&pool->mutex);
358 if (ret != 0) {
359 return ret;
361 ret = pthread_mutex_destroy(&pool->mutex);
362 ret1 = pthread_cond_destroy(&pool->condvar);
364 if (ret != 0) {
365 return ret;
367 if (ret1 != 0) {
368 return ret1;
371 ret = pthread_mutex_lock(&pthreadpools_mutex);
372 if (ret != 0) {
373 return ret;
375 DLIST_REMOVE(pthreadpools, pool);
376 ret = pthread_mutex_unlock(&pthreadpools_mutex);
377 assert(ret == 0);
379 close(pool->sig_pipe[0]);
380 pool->sig_pipe[0] = -1;
382 close(pool->sig_pipe[1]);
383 pool->sig_pipe[1] = -1;
385 free(pool->exited);
386 free(pool);
388 return 0;
392 * Prepare for pthread_exit(), pool->mutex must be locked
394 static void pthreadpool_server_exit(struct pthreadpool *pool)
396 pthread_t *exited;
398 pool->num_threads -= 1;
400 exited = (pthread_t *)realloc(
401 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
403 if (exited == NULL) {
404 /* lost a thread status */
405 return;
407 pool->exited = exited;
409 pool->exited[pool->num_exited] = pthread_self();
410 pool->num_exited += 1;
413 static void *pthreadpool_server(void *arg)
415 struct pthreadpool *pool = (struct pthreadpool *)arg;
416 int res;
418 res = pthread_mutex_lock(&pool->mutex);
419 if (res != 0) {
420 return NULL;
423 while (1) {
424 struct timeval tv;
425 struct timespec ts;
426 struct pthreadpool_job *job;
429 * idle-wait at most 1 second. If nothing happens in that
430 * time, exit this thread.
433 gettimeofday(&tv, NULL);
434 ts.tv_sec = tv.tv_sec + 1;
435 ts.tv_nsec = tv.tv_usec*1000;
437 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
439 pool->num_idle += 1;
440 res = pthread_cond_timedwait(
441 &pool->condvar, &pool->mutex, &ts);
442 pool->num_idle -= 1;
444 if (res == ETIMEDOUT) {
446 if (pool->jobs == NULL) {
448 * we timed out and still no work for
449 * us. Exit.
451 pthreadpool_server_exit(pool);
452 pthread_mutex_unlock(&pool->mutex);
453 return NULL;
456 break;
458 assert(res == 0);
461 job = pool->jobs;
463 if (job != NULL) {
464 ssize_t written;
467 * Ok, there's work for us to do, remove the job from
468 * the pthreadpool list
470 pool->jobs = job->next;
471 if (pool->last_job == job) {
472 pool->last_job = NULL;
476 * Do the work with the mutex unlocked
479 res = pthread_mutex_unlock(&pool->mutex);
480 assert(res == 0);
482 job->fn(job->private_data);
484 written = write(pool->sig_pipe[1], &job->id,
485 sizeof(int));
487 free(job);
489 res = pthread_mutex_lock(&pool->mutex);
490 assert(res == 0);
492 if (written != sizeof(int)) {
493 pthreadpool_server_exit(pool);
494 pthread_mutex_unlock(&pool->mutex);
495 return NULL;
499 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
501 * No more work to do and we're asked to shut down, so
502 * exit
504 pthreadpool_server_exit(pool);
506 if (pool->num_threads == 0) {
508 * Ping the main thread waiting for all of us
509 * workers to have quit.
511 pthread_cond_broadcast(&pool->condvar);
514 pthread_mutex_unlock(&pool->mutex);
515 return NULL;
520 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
521 void (*fn)(void *private_data), void *private_data)
523 struct pthreadpool_job *job;
524 pthread_t thread_id;
525 int res;
526 sigset_t mask, omask;
528 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
529 if (job == NULL) {
530 return ENOMEM;
533 job->fn = fn;
534 job->private_data = private_data;
535 job->id = job_id;
536 job->next = NULL;
538 res = pthread_mutex_lock(&pool->mutex);
539 if (res != 0) {
540 free(job);
541 return res;
544 if (pool->shutdown) {
546 * Protect against the pool being shut down while
547 * trying to add a job
549 res = pthread_mutex_unlock(&pool->mutex);
550 assert(res == 0);
551 free(job);
552 return EINVAL;
556 * Just some cleanup under the mutex
558 pthreadpool_join_children(pool);
561 * Add job to the end of the queue
563 if (pool->jobs == NULL) {
564 pool->jobs = job;
566 else {
567 pool->last_job->next = job;
569 pool->last_job = job;
571 if (pool->num_idle > 0) {
573 * We have idle threads, wake one.
575 res = pthread_cond_signal(&pool->condvar);
576 pthread_mutex_unlock(&pool->mutex);
577 return res;
580 if ((pool->max_threads != 0) &&
581 (pool->num_threads >= pool->max_threads)) {
583 * No more new threads, we just queue the request
585 pthread_mutex_unlock(&pool->mutex);
586 return 0;
590 * Create a new worker thread. It should not receive any signals.
593 sigfillset(&mask);
595 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
596 if (res != 0) {
597 pthread_mutex_unlock(&pool->mutex);
598 return res;
601 res = pthread_create(&thread_id, NULL, pthreadpool_server,
602 (void *)pool);
603 if (res == 0) {
604 pool->num_threads += 1;
607 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
609 pthread_mutex_unlock(&pool->mutex);
610 return res;