pthreadpool: We always want asserts to abort()
[Samba.git] / source3 / lib / pthreadpool / pthreadpool.c
blob2e9f42c258508953c36fc996aa13896a5b7d3ad8
1 /*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
31 #include <assert.h>
33 struct pthreadpool_job {
34 int id;
35 void (*fn)(void *private_data);
36 void *private_data;
39 struct pthreadpool {
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
56 * Array of jobs
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
61 size_t head;
62 size_t num_jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
70 void *private_data);
71 void *signal_fn_private_data;
74 * indicator to worker threads that they should shut down
76 int shutdown;
79 * maximum number of threads
81 int max_threads;
84 * Number of threads
86 int num_threads;
89 * Number of idle threads
91 int num_idle;
94 * An array of threads that require joining.
96 int num_exited;
97 pthread_t *exited; /* We alloc more */
100 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
101 static struct pthreadpool *pthreadpools = NULL;
102 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
104 static void pthreadpool_prep_atfork(void);
107 * Initialize a thread pool
110 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
111 int (*signal_fn)(int jobid,
112 void (*job_fn)(void *private_data),
113 void *job_fn_private_data,
114 void *private_data),
115 void *signal_fn_private_data)
117 struct pthreadpool *pool;
118 int ret;
120 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
121 if (pool == NULL) {
122 return ENOMEM;
124 pool->signal_fn = signal_fn;
125 pool->signal_fn_private_data = signal_fn_private_data;
127 pool->jobs_array_len = 4;
128 pool->jobs = calloc(
129 pool->jobs_array_len, sizeof(struct pthreadpool_job));
131 if (pool->jobs == NULL) {
132 free(pool);
133 return ENOMEM;
136 pool->head = pool->num_jobs = 0;
138 ret = pthread_mutex_init(&pool->mutex, NULL);
139 if (ret != 0) {
140 free(pool->jobs);
141 free(pool);
142 return ret;
145 ret = pthread_cond_init(&pool->condvar, NULL);
146 if (ret != 0) {
147 pthread_mutex_destroy(&pool->mutex);
148 free(pool->jobs);
149 free(pool);
150 return ret;
153 pool->shutdown = 0;
154 pool->num_threads = 0;
155 pool->num_exited = 0;
156 pool->exited = NULL;
157 pool->max_threads = max_threads;
158 pool->num_idle = 0;
160 ret = pthread_mutex_lock(&pthreadpools_mutex);
161 if (ret != 0) {
162 pthread_cond_destroy(&pool->condvar);
163 pthread_mutex_destroy(&pool->mutex);
164 free(pool->jobs);
165 free(pool);
166 return ret;
168 DLIST_ADD(pthreadpools, pool);
170 ret = pthread_mutex_unlock(&pthreadpools_mutex);
171 assert(ret == 0);
173 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
175 *presult = pool;
177 return 0;
180 static void pthreadpool_prepare(void)
182 int ret;
183 struct pthreadpool *pool;
185 ret = pthread_mutex_lock(&pthreadpools_mutex);
186 assert(ret == 0);
188 pool = pthreadpools;
190 while (pool != NULL) {
191 ret = pthread_mutex_lock(&pool->mutex);
192 assert(ret == 0);
193 pool = pool->next;
197 static void pthreadpool_parent(void)
199 int ret;
200 struct pthreadpool *pool;
202 for (pool = DLIST_TAIL(pthreadpools);
203 pool != NULL;
204 pool = DLIST_PREV(pool)) {
205 ret = pthread_mutex_unlock(&pool->mutex);
206 assert(ret == 0);
209 ret = pthread_mutex_unlock(&pthreadpools_mutex);
210 assert(ret == 0);
213 static void pthreadpool_child(void)
215 int ret;
216 struct pthreadpool *pool;
218 for (pool = DLIST_TAIL(pthreadpools);
219 pool != NULL;
220 pool = DLIST_PREV(pool)) {
222 pool->num_threads = 0;
224 pool->num_exited = 0;
225 free(pool->exited);
226 pool->exited = NULL;
228 pool->num_idle = 0;
229 pool->head = 0;
230 pool->num_jobs = 0;
232 ret = pthread_mutex_unlock(&pool->mutex);
233 assert(ret == 0);
236 ret = pthread_mutex_unlock(&pthreadpools_mutex);
237 assert(ret == 0);
240 static void pthreadpool_prep_atfork(void)
242 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
243 pthreadpool_child);
247 * Do a pthread_join() on all children that have exited, pool->mutex must be
248 * locked
250 static void pthreadpool_join_children(struct pthreadpool *pool)
252 int i;
254 for (i=0; i<pool->num_exited; i++) {
255 int ret;
257 ret = pthread_join(pool->exited[i], NULL);
258 if (ret != 0) {
260 * Severe internal error, we can't do much but
261 * abort here.
263 abort();
266 pool->num_exited = 0;
269 * Deliberately not free and NULL pool->exited. That will be
270 * re-used by realloc later.
275 * Destroy a thread pool, finishing all threads working for it
278 int pthreadpool_destroy(struct pthreadpool *pool)
280 int ret, ret1;
282 ret = pthread_mutex_lock(&pool->mutex);
283 if (ret != 0) {
284 return ret;
287 if ((pool->num_jobs != 0) || pool->shutdown) {
288 ret = pthread_mutex_unlock(&pool->mutex);
289 assert(ret == 0);
290 return EBUSY;
293 if (pool->num_threads > 0) {
295 * We have active threads, tell them to finish, wait for that.
298 pool->shutdown = 1;
300 if (pool->num_idle > 0) {
302 * Wake the idle threads. They will find
303 * pool->shutdown to be set and exit themselves
305 ret = pthread_cond_broadcast(&pool->condvar);
306 if (ret != 0) {
307 pthread_mutex_unlock(&pool->mutex);
308 return ret;
312 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
314 if (pool->num_exited > 0) {
315 pthreadpool_join_children(pool);
316 continue;
319 * A thread that shuts down will also signal
320 * pool->condvar
322 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
323 if (ret != 0) {
324 pthread_mutex_unlock(&pool->mutex);
325 return ret;
330 ret = pthread_mutex_unlock(&pool->mutex);
331 if (ret != 0) {
332 return ret;
334 ret = pthread_mutex_destroy(&pool->mutex);
335 ret1 = pthread_cond_destroy(&pool->condvar);
337 if (ret != 0) {
338 return ret;
340 if (ret1 != 0) {
341 return ret1;
344 ret = pthread_mutex_lock(&pthreadpools_mutex);
345 if (ret != 0) {
346 return ret;
348 DLIST_REMOVE(pthreadpools, pool);
349 ret = pthread_mutex_unlock(&pthreadpools_mutex);
350 assert(ret == 0);
352 free(pool->exited);
353 free(pool->jobs);
354 free(pool);
356 return 0;
360 * Prepare for pthread_exit(), pool->mutex must be locked
362 static void pthreadpool_server_exit(struct pthreadpool *pool)
364 pthread_t *exited;
366 pool->num_threads -= 1;
368 exited = (pthread_t *)realloc(
369 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
371 if (exited == NULL) {
372 /* lost a thread status */
373 return;
375 pool->exited = exited;
377 pool->exited[pool->num_exited] = pthread_self();
378 pool->num_exited += 1;
381 static bool pthreadpool_get_job(struct pthreadpool *p,
382 struct pthreadpool_job *job)
384 if (p->num_jobs == 0) {
385 return false;
387 *job = p->jobs[p->head];
388 p->head = (p->head+1) % p->jobs_array_len;
389 p->num_jobs -= 1;
390 return true;
393 static bool pthreadpool_put_job(struct pthreadpool *p,
394 int id,
395 void (*fn)(void *private_data),
396 void *private_data)
398 struct pthreadpool_job *job;
400 if (p->num_jobs == p->jobs_array_len) {
401 struct pthreadpool_job *tmp;
402 size_t new_len = p->jobs_array_len * 2;
404 tmp = realloc(
405 p->jobs, sizeof(struct pthreadpool_job) * new_len);
406 if (tmp == NULL) {
407 return false;
409 p->jobs = tmp;
412 * We just doubled the jobs array. The array implements a FIFO
413 * queue with a modulo-based wraparound, so we have to memcpy
414 * the jobs that are logically at the queue end but physically
415 * before the queue head into the reallocated area. The new
416 * space starts at the current jobs_array_len, and we have to
417 * copy everything before the current head job into the new
418 * area.
420 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
421 sizeof(struct pthreadpool_job) * p->head);
423 p->jobs_array_len = new_len;
426 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
427 job->id = id;
428 job->fn = fn;
429 job->private_data = private_data;
431 p->num_jobs += 1;
433 return true;
436 static void *pthreadpool_server(void *arg)
438 struct pthreadpool *pool = (struct pthreadpool *)arg;
439 int res;
441 res = pthread_mutex_lock(&pool->mutex);
442 if (res != 0) {
443 return NULL;
446 while (1) {
447 struct timespec ts;
448 struct pthreadpool_job job;
451 * idle-wait at most 1 second. If nothing happens in that
452 * time, exit this thread.
455 clock_gettime(CLOCK_REALTIME, &ts);
456 ts.tv_sec += 1;
458 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
460 pool->num_idle += 1;
461 res = pthread_cond_timedwait(
462 &pool->condvar, &pool->mutex, &ts);
463 pool->num_idle -= 1;
465 if (res == ETIMEDOUT) {
467 if (pool->num_jobs == 0) {
469 * we timed out and still no work for
470 * us. Exit.
472 pthreadpool_server_exit(pool);
473 pthread_mutex_unlock(&pool->mutex);
474 return NULL;
477 break;
479 assert(res == 0);
482 if (pthreadpool_get_job(pool, &job)) {
483 int ret;
486 * Do the work with the mutex unlocked
489 res = pthread_mutex_unlock(&pool->mutex);
490 assert(res == 0);
492 job.fn(job.private_data);
494 res = pthread_mutex_lock(&pool->mutex);
495 assert(res == 0);
497 ret = pool->signal_fn(job.id,
498 job.fn, job.private_data,
499 pool->signal_fn_private_data);
500 if (ret != 0) {
501 pthreadpool_server_exit(pool);
502 pthread_mutex_unlock(&pool->mutex);
503 return NULL;
507 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
509 * No more work to do and we're asked to shut down, so
510 * exit
512 pthreadpool_server_exit(pool);
514 if (pool->num_threads == 0) {
516 * Ping the main thread waiting for all of us
517 * workers to have quit.
519 pthread_cond_broadcast(&pool->condvar);
522 pthread_mutex_unlock(&pool->mutex);
523 return NULL;
528 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
529 void (*fn)(void *private_data), void *private_data)
531 pthread_t thread_id;
532 int res;
533 sigset_t mask, omask;
535 res = pthread_mutex_lock(&pool->mutex);
536 if (res != 0) {
537 return res;
540 if (pool->shutdown) {
542 * Protect against the pool being shut down while
543 * trying to add a job
545 res = pthread_mutex_unlock(&pool->mutex);
546 assert(res == 0);
547 return EINVAL;
551 * Just some cleanup under the mutex
553 pthreadpool_join_children(pool);
556 * Add job to the end of the queue
558 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
559 pthread_mutex_unlock(&pool->mutex);
560 return ENOMEM;
563 if (pool->num_idle > 0) {
565 * We have idle threads, wake one.
567 res = pthread_cond_signal(&pool->condvar);
568 pthread_mutex_unlock(&pool->mutex);
569 return res;
572 if ((pool->max_threads != 0) &&
573 (pool->num_threads >= pool->max_threads)) {
575 * No more new threads, we just queue the request
577 pthread_mutex_unlock(&pool->mutex);
578 return 0;
582 * Create a new worker thread. It should not receive any signals.
585 sigfillset(&mask);
587 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
588 if (res != 0) {
589 pthread_mutex_unlock(&pool->mutex);
590 return res;
593 res = pthread_create(&thread_id, NULL, pthreadpool_server,
594 (void *)pool);
595 if (res == 0) {
596 pool->num_threads += 1;
599 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
601 pthread_mutex_unlock(&pool->mutex);
602 return res;