lib/util: inline lib/util/util_runcmd.h again
[Samba.git] / lib / pthreadpool / pthreadpool.c
blobb6dad310b0da5160f6ce094aa6b0c319345a9bae
1 /*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "system/filesys.h"
25 #include "pthreadpool.h"
26 #include "lib/util/dlinklist.h"
28 #ifdef NDEBUG
29 #undef NDEBUG
30 #endif
32 #include <assert.h>
34 struct pthreadpool_job {
35 int id;
36 void (*fn)(void *private_data);
37 void *private_data;
40 struct pthreadpool {
42 * List pthreadpools for fork safety
44 struct pthreadpool *prev, *next;
47 * Control access to this struct
49 pthread_mutex_t mutex;
52 * Threads waiting for work do so here
54 pthread_cond_t condvar;
57 * Array of jobs
59 size_t jobs_array_len;
60 struct pthreadpool_job *jobs;
62 size_t head;
63 size_t num_jobs;
66 * Indicate job completion
68 int (*signal_fn)(int jobid,
69 void (*job_fn)(void *private_data),
70 void *job_fn_private_data,
71 void *private_data);
72 void *signal_fn_private_data;
75 * indicator to worker threads to stop processing further jobs
76 * and exit.
78 bool stopped;
81 * indicator to the last worker thread to free the pool
82 * resources.
84 bool destroyed;
87 * maximum number of threads
88 * 0 means no real thread, only strict sync processing.
90 unsigned max_threads;
93 * Number of threads
95 unsigned num_threads;
98 * Number of idle threads
100 unsigned num_idle;
103 * Condition variable indicating that helper threads should
104 * quickly go away making way for fork() without anybody
105 * waiting on pool->condvar.
107 pthread_cond_t *prefork_cond;
110 * Waiting position for helper threads while fork is
111 * running. The forking thread will have locked it, and all
112 * idle helper threads will sit here until after the fork,
113 * where the forking thread will unlock it again.
115 pthread_mutex_t fork_mutex;
118 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
119 static struct pthreadpool *pthreadpools = NULL;
120 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
122 static void pthreadpool_prep_atfork(void);
125 * Initialize a thread pool
128 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
129 int (*signal_fn)(int jobid,
130 void (*job_fn)(void *private_data),
131 void *job_fn_private_data,
132 void *private_data),
133 void *signal_fn_private_data)
135 struct pthreadpool *pool;
136 int ret;
138 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
139 if (pool == NULL) {
140 return ENOMEM;
142 pool->signal_fn = signal_fn;
143 pool->signal_fn_private_data = signal_fn_private_data;
145 pool->jobs_array_len = 4;
146 pool->jobs = calloc(
147 pool->jobs_array_len, sizeof(struct pthreadpool_job));
149 if (pool->jobs == NULL) {
150 free(pool);
151 return ENOMEM;
154 pool->head = pool->num_jobs = 0;
156 ret = pthread_mutex_init(&pool->mutex, NULL);
157 if (ret != 0) {
158 free(pool->jobs);
159 free(pool);
160 return ret;
163 ret = pthread_cond_init(&pool->condvar, NULL);
164 if (ret != 0) {
165 pthread_mutex_destroy(&pool->mutex);
166 free(pool->jobs);
167 free(pool);
168 return ret;
171 ret = pthread_mutex_init(&pool->fork_mutex, NULL);
172 if (ret != 0) {
173 pthread_cond_destroy(&pool->condvar);
174 pthread_mutex_destroy(&pool->mutex);
175 free(pool->jobs);
176 free(pool);
177 return ret;
180 pool->stopped = false;
181 pool->destroyed = false;
182 pool->num_threads = 0;
183 pool->max_threads = max_threads;
184 pool->num_idle = 0;
185 pool->prefork_cond = NULL;
187 ret = pthread_mutex_lock(&pthreadpools_mutex);
188 if (ret != 0) {
189 pthread_mutex_destroy(&pool->fork_mutex);
190 pthread_cond_destroy(&pool->condvar);
191 pthread_mutex_destroy(&pool->mutex);
192 free(pool->jobs);
193 free(pool);
194 return ret;
196 DLIST_ADD(pthreadpools, pool);
198 ret = pthread_mutex_unlock(&pthreadpools_mutex);
199 assert(ret == 0);
201 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
203 *presult = pool;
205 return 0;
208 size_t pthreadpool_max_threads(struct pthreadpool *pool)
210 if (pool->stopped) {
211 return 0;
214 return pool->max_threads;
217 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
219 int res;
220 int unlock_res;
221 size_t ret;
223 if (pool->stopped) {
224 return 0;
227 res = pthread_mutex_lock(&pool->mutex);
228 if (res != 0) {
229 return res;
232 if (pool->stopped) {
233 unlock_res = pthread_mutex_unlock(&pool->mutex);
234 assert(unlock_res == 0);
235 return 0;
238 ret = pool->num_jobs;
240 unlock_res = pthread_mutex_unlock(&pool->mutex);
241 assert(unlock_res == 0);
242 return ret;
245 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
247 int ret;
249 ret = pthread_mutex_lock(&pool->fork_mutex);
250 assert(ret == 0);
252 ret = pthread_mutex_lock(&pool->mutex);
253 assert(ret == 0);
255 while (pool->num_idle != 0) {
256 unsigned num_idle = pool->num_idle;
257 pthread_cond_t prefork_cond;
259 ret = pthread_cond_init(&prefork_cond, NULL);
260 assert(ret == 0);
263 * Push all idle threads off pool->condvar. In the
264 * child we can destroy the pool, which would result
265 * in undefined behaviour in the
266 * pthread_cond_destroy(pool->condvar). glibc just
267 * blocks here.
269 pool->prefork_cond = &prefork_cond;
271 ret = pthread_cond_signal(&pool->condvar);
272 assert(ret == 0);
274 while (pool->num_idle == num_idle) {
275 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
276 assert(ret == 0);
279 pool->prefork_cond = NULL;
281 ret = pthread_cond_destroy(&prefork_cond);
282 assert(ret == 0);
286 * Probably it's well-defined somewhere: What happens to
287 * condvars after a fork? The rationale of pthread_atfork only
288 * writes about mutexes. So better be safe than sorry and
289 * destroy/reinit pool->condvar across a fork.
292 ret = pthread_cond_destroy(&pool->condvar);
293 assert(ret == 0);
296 static void pthreadpool_prepare(void)
298 int ret;
299 struct pthreadpool *pool;
301 ret = pthread_mutex_lock(&pthreadpools_mutex);
302 assert(ret == 0);
304 pool = pthreadpools;
306 while (pool != NULL) {
307 pthreadpool_prepare_pool(pool);
308 pool = pool->next;
312 static void pthreadpool_parent(void)
314 int ret;
315 struct pthreadpool *pool;
317 for (pool = DLIST_TAIL(pthreadpools);
318 pool != NULL;
319 pool = DLIST_PREV(pool)) {
320 ret = pthread_cond_init(&pool->condvar, NULL);
321 assert(ret == 0);
322 ret = pthread_mutex_unlock(&pool->mutex);
323 assert(ret == 0);
324 ret = pthread_mutex_unlock(&pool->fork_mutex);
325 assert(ret == 0);
328 ret = pthread_mutex_unlock(&pthreadpools_mutex);
329 assert(ret == 0);
332 static void pthreadpool_child(void)
334 int ret;
335 struct pthreadpool *pool;
337 for (pool = DLIST_TAIL(pthreadpools);
338 pool != NULL;
339 pool = DLIST_PREV(pool)) {
341 pool->num_threads = 0;
342 pool->num_idle = 0;
343 pool->head = 0;
344 pool->num_jobs = 0;
345 pool->stopped = true;
347 ret = pthread_cond_init(&pool->condvar, NULL);
348 assert(ret == 0);
350 ret = pthread_mutex_unlock(&pool->mutex);
351 assert(ret == 0);
353 ret = pthread_mutex_unlock(&pool->fork_mutex);
354 assert(ret == 0);
357 ret = pthread_mutex_unlock(&pthreadpools_mutex);
358 assert(ret == 0);
361 static void pthreadpool_prep_atfork(void)
363 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
364 pthreadpool_child);
367 static int pthreadpool_free(struct pthreadpool *pool)
369 int ret, ret1, ret2;
371 ret = pthread_mutex_lock(&pthreadpools_mutex);
372 if (ret != 0) {
373 return ret;
375 DLIST_REMOVE(pthreadpools, pool);
376 ret = pthread_mutex_unlock(&pthreadpools_mutex);
377 assert(ret == 0);
379 ret = pthread_mutex_lock(&pool->mutex);
380 assert(ret == 0);
381 ret = pthread_mutex_unlock(&pool->mutex);
382 assert(ret == 0);
384 ret = pthread_mutex_destroy(&pool->mutex);
385 ret1 = pthread_cond_destroy(&pool->condvar);
386 ret2 = pthread_mutex_destroy(&pool->fork_mutex);
388 if (ret != 0) {
389 return ret;
391 if (ret1 != 0) {
392 return ret1;
394 if (ret2 != 0) {
395 return ret2;
398 free(pool->jobs);
399 free(pool);
401 return 0;
405 * Stop a thread pool. Wake up all idle threads for exit.
408 static int pthreadpool_stop_locked(struct pthreadpool *pool)
410 int ret;
412 pool->stopped = true;
414 if (pool->num_threads == 0) {
415 return 0;
419 * We have active threads, tell them to finish.
422 ret = pthread_cond_broadcast(&pool->condvar);
424 return ret;
428 * Stop a thread pool. Wake up all idle threads for exit.
431 int pthreadpool_stop(struct pthreadpool *pool)
433 int ret, ret1;
435 ret = pthread_mutex_lock(&pool->mutex);
436 if (ret != 0) {
437 return ret;
440 if (!pool->stopped) {
441 ret = pthreadpool_stop_locked(pool);
444 ret1 = pthread_mutex_unlock(&pool->mutex);
445 assert(ret1 == 0);
447 return ret;
451 * Destroy a thread pool. Wake up all idle threads for exit. The last
452 * one will free the pool.
455 int pthreadpool_destroy(struct pthreadpool *pool)
457 int ret, ret1;
458 bool free_it;
460 assert(!pool->destroyed);
462 ret = pthread_mutex_lock(&pool->mutex);
463 if (ret != 0) {
464 return ret;
467 pool->destroyed = true;
469 if (!pool->stopped) {
470 ret = pthreadpool_stop_locked(pool);
473 free_it = (pool->num_threads == 0);
475 ret1 = pthread_mutex_unlock(&pool->mutex);
476 assert(ret1 == 0);
478 if (free_it) {
479 pthreadpool_free(pool);
482 return ret;
485 * Prepare for pthread_exit(), pool->mutex must be locked and will be
486 * unlocked here. This is a bit of a layering violation, but here we
487 * also take care of removing the pool if we're the last thread.
489 static void pthreadpool_server_exit(struct pthreadpool *pool)
491 int ret;
492 bool free_it;
494 pool->num_threads -= 1;
496 free_it = (pool->destroyed && (pool->num_threads == 0));
498 ret = pthread_mutex_unlock(&pool->mutex);
499 assert(ret == 0);
501 if (free_it) {
502 pthreadpool_free(pool);
506 static bool pthreadpool_get_job(struct pthreadpool *p,
507 struct pthreadpool_job *job)
509 if (p->stopped) {
510 return false;
513 if (p->num_jobs == 0) {
514 return false;
516 *job = p->jobs[p->head];
517 p->head = (p->head+1) % p->jobs_array_len;
518 p->num_jobs -= 1;
519 return true;
522 static bool pthreadpool_put_job(struct pthreadpool *p,
523 int id,
524 void (*fn)(void *private_data),
525 void *private_data)
527 struct pthreadpool_job *job;
529 if (p->num_jobs == p->jobs_array_len) {
530 struct pthreadpool_job *tmp;
531 size_t new_len = p->jobs_array_len * 2;
533 tmp = realloc(
534 p->jobs, sizeof(struct pthreadpool_job) * new_len);
535 if (tmp == NULL) {
536 return false;
538 p->jobs = tmp;
541 * We just doubled the jobs array. The array implements a FIFO
542 * queue with a modulo-based wraparound, so we have to memcpy
543 * the jobs that are logically at the queue end but physically
544 * before the queue head into the reallocated area. The new
545 * space starts at the current jobs_array_len, and we have to
546 * copy everything before the current head job into the new
547 * area.
549 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
550 sizeof(struct pthreadpool_job) * p->head);
552 p->jobs_array_len = new_len;
555 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
556 job->id = id;
557 job->fn = fn;
558 job->private_data = private_data;
560 p->num_jobs += 1;
562 return true;
565 static void pthreadpool_undo_put_job(struct pthreadpool *p)
567 p->num_jobs -= 1;
570 static void *pthreadpool_server(void *arg)
572 struct pthreadpool *pool = (struct pthreadpool *)arg;
573 int res;
575 res = pthread_mutex_lock(&pool->mutex);
576 if (res != 0) {
577 return NULL;
580 while (1) {
581 struct timespec ts;
582 struct pthreadpool_job job;
585 * idle-wait at most 1 second. If nothing happens in that
586 * time, exit this thread.
589 clock_gettime(CLOCK_REALTIME, &ts);
590 ts.tv_sec += 1;
592 while ((pool->num_jobs == 0) && !pool->stopped) {
594 pool->num_idle += 1;
595 res = pthread_cond_timedwait(
596 &pool->condvar, &pool->mutex, &ts);
597 pool->num_idle -= 1;
599 if (pool->prefork_cond != NULL) {
601 * Me must allow fork() to continue
602 * without anybody waiting on
603 * &pool->condvar. Tell
604 * pthreadpool_prepare_pool that we
605 * got that message.
608 res = pthread_cond_signal(pool->prefork_cond);
609 assert(res == 0);
611 res = pthread_mutex_unlock(&pool->mutex);
612 assert(res == 0);
615 * pthreadpool_prepare_pool has
616 * already locked this mutex across
617 * the fork. This makes us wait
618 * without sitting in a condvar.
620 res = pthread_mutex_lock(&pool->fork_mutex);
621 assert(res == 0);
622 res = pthread_mutex_unlock(&pool->fork_mutex);
623 assert(res == 0);
625 res = pthread_mutex_lock(&pool->mutex);
626 assert(res == 0);
629 if (res == ETIMEDOUT) {
631 if (pool->num_jobs == 0) {
633 * we timed out and still no work for
634 * us. Exit.
636 pthreadpool_server_exit(pool);
637 return NULL;
640 break;
642 assert(res == 0);
645 if (pthreadpool_get_job(pool, &job)) {
646 int ret;
649 * Do the work with the mutex unlocked
652 res = pthread_mutex_unlock(&pool->mutex);
653 assert(res == 0);
655 job.fn(job.private_data);
657 ret = pool->signal_fn(job.id,
658 job.fn, job.private_data,
659 pool->signal_fn_private_data);
661 res = pthread_mutex_lock(&pool->mutex);
662 assert(res == 0);
664 if (ret != 0) {
665 pthreadpool_server_exit(pool);
666 return NULL;
670 if (pool->stopped) {
672 * we're asked to stop processing jobs, so exit
674 pthreadpool_server_exit(pool);
675 return NULL;
680 static int pthreadpool_create_thread(struct pthreadpool *pool)
682 pthread_attr_t thread_attr;
683 pthread_t thread_id;
684 int res;
685 sigset_t mask, omask;
688 * Create a new worker thread. It should not receive any signals.
691 sigfillset(&mask);
693 res = pthread_attr_init(&thread_attr);
694 if (res != 0) {
695 return res;
698 res = pthread_attr_setdetachstate(
699 &thread_attr, PTHREAD_CREATE_DETACHED);
700 if (res != 0) {
701 pthread_attr_destroy(&thread_attr);
702 return res;
705 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
706 if (res != 0) {
707 pthread_attr_destroy(&thread_attr);
708 return res;
711 res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
712 (void *)pool);
714 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
716 pthread_attr_destroy(&thread_attr);
718 if (res == 0) {
719 pool->num_threads += 1;
722 return res;
725 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
726 void (*fn)(void *private_data), void *private_data)
728 int res;
729 int unlock_res;
731 assert(!pool->destroyed);
733 res = pthread_mutex_lock(&pool->mutex);
734 if (res != 0) {
735 return res;
738 if (pool->stopped) {
740 * Protect against the pool being shut down while
741 * trying to add a job
743 unlock_res = pthread_mutex_unlock(&pool->mutex);
744 assert(unlock_res == 0);
745 return EINVAL;
748 if (pool->max_threads == 0) {
749 unlock_res = pthread_mutex_unlock(&pool->mutex);
750 assert(unlock_res == 0);
753 * If no thread are allowed we do strict sync processing.
755 fn(private_data);
756 res = pool->signal_fn(job_id, fn, private_data,
757 pool->signal_fn_private_data);
758 return res;
762 * Add job to the end of the queue
764 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
765 unlock_res = pthread_mutex_unlock(&pool->mutex);
766 assert(unlock_res == 0);
767 return ENOMEM;
770 if (pool->num_idle > 0) {
772 * We have idle threads, wake one.
774 res = pthread_cond_signal(&pool->condvar);
775 if (res != 0) {
776 pthreadpool_undo_put_job(pool);
778 unlock_res = pthread_mutex_unlock(&pool->mutex);
779 assert(unlock_res == 0);
780 return res;
783 if (pool->num_threads >= pool->max_threads) {
785 * No more new threads, we just queue the request
787 unlock_res = pthread_mutex_unlock(&pool->mutex);
788 assert(unlock_res == 0);
789 return 0;
792 res = pthreadpool_create_thread(pool);
793 if (res == 0) {
794 unlock_res = pthread_mutex_unlock(&pool->mutex);
795 assert(unlock_res == 0);
796 return 0;
799 if (pool->num_threads != 0) {
801 * At least one thread is still available, let
802 * that one run the queued job.
804 unlock_res = pthread_mutex_unlock(&pool->mutex);
805 assert(unlock_res == 0);
806 return 0;
810 * No thread could be created to run job, fallback to sync
811 * call.
813 pthreadpool_undo_put_job(pool);
815 unlock_res = pthread_mutex_unlock(&pool->mutex);
816 assert(unlock_res == 0);
818 return res;
821 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
822 void (*fn)(void *private_data), void *private_data)
824 int res;
825 size_t i, j;
826 size_t num = 0;
828 assert(!pool->destroyed);
830 res = pthread_mutex_lock(&pool->mutex);
831 if (res != 0) {
832 return res;
835 for (i = 0, j = 0; i < pool->num_jobs; i++) {
836 size_t idx = (pool->head + i) % pool->jobs_array_len;
837 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
838 struct pthreadpool_job *job = &pool->jobs[idx];
840 if ((job->private_data == private_data) &&
841 (job->id == job_id) &&
842 (job->fn == fn))
845 * Just skip the entry.
847 num++;
848 continue;
852 * If we already removed one or more jobs (so j will be smaller
853 * then i), we need to fill possible gaps in the logical list.
855 if (j < i) {
856 pool->jobs[new_idx] = *job;
858 j++;
861 pool->num_jobs -= num;
863 res = pthread_mutex_unlock(&pool->mutex);
864 assert(res == 0);
866 return num;