2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "system/filesys.h"
25 #include "pthreadpool.h"
26 #include "lib/util/dlinklist.h"
34 struct pthreadpool_job
{
36 void (*fn
)(void *private_data
);
42 * List pthreadpools for fork safety
44 struct pthreadpool
*prev
, *next
;
47 * Control access to this struct
49 pthread_mutex_t mutex
;
52 * Threads waiting for work do so here
54 pthread_cond_t condvar
;
59 size_t jobs_array_len
;
60 struct pthreadpool_job
*jobs
;
66 * Indicate job completion
68 int (*signal_fn
)(int jobid
,
69 void (*job_fn
)(void *private_data
),
70 void *job_fn_private_data
,
72 void *signal_fn_private_data
;
75 * indicator to worker threads to stop processing further jobs
81 * indicator to the last worker thread to free the pool
87 * maximum number of threads
88 * 0 means no real thread, only strict sync processing.
98 * Number of idle threads
103 * Condition variable indicating that helper threads should
104 * quickly go away making way for fork() without anybody
105 * waiting on pool->condvar.
107 pthread_cond_t
*prefork_cond
;
110 * Waiting position for helper threads while fork is
111 * running. The forking thread will have locked it, and all
112 * idle helper threads will sit here until after the fork,
113 * where the forking thread will unlock it again.
115 pthread_mutex_t fork_mutex
;
118 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
119 static struct pthreadpool
*pthreadpools
= NULL
;
120 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
122 static void pthreadpool_prep_atfork(void);
125 * Initialize a thread pool
128 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
,
129 int (*signal_fn
)(int jobid
,
130 void (*job_fn
)(void *private_data
),
131 void *job_fn_private_data
,
133 void *signal_fn_private_data
)
135 struct pthreadpool
*pool
;
138 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
142 pool
->signal_fn
= signal_fn
;
143 pool
->signal_fn_private_data
= signal_fn_private_data
;
145 pool
->jobs_array_len
= 4;
147 pool
->jobs_array_len
, sizeof(struct pthreadpool_job
));
149 if (pool
->jobs
== NULL
) {
154 pool
->head
= pool
->num_jobs
= 0;
156 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
163 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
165 pthread_mutex_destroy(&pool
->mutex
);
171 ret
= pthread_mutex_init(&pool
->fork_mutex
, NULL
);
173 pthread_cond_destroy(&pool
->condvar
);
174 pthread_mutex_destroy(&pool
->mutex
);
180 pool
->stopped
= false;
181 pool
->destroyed
= false;
182 pool
->num_threads
= 0;
183 pool
->max_threads
= max_threads
;
185 pool
->prefork_cond
= NULL
;
187 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
189 pthread_mutex_destroy(&pool
->fork_mutex
);
190 pthread_cond_destroy(&pool
->condvar
);
191 pthread_mutex_destroy(&pool
->mutex
);
196 DLIST_ADD(pthreadpools
, pool
);
198 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
201 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
208 size_t pthreadpool_max_threads(struct pthreadpool
*pool
)
214 return pool
->max_threads
;
217 size_t pthreadpool_queued_jobs(struct pthreadpool
*pool
)
227 res
= pthread_mutex_lock(&pool
->mutex
);
233 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
234 assert(unlock_res
== 0);
238 ret
= pool
->num_jobs
;
240 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
241 assert(unlock_res
== 0);
245 static void pthreadpool_prepare_pool(struct pthreadpool
*pool
)
249 ret
= pthread_mutex_lock(&pool
->fork_mutex
);
252 ret
= pthread_mutex_lock(&pool
->mutex
);
255 while (pool
->num_idle
!= 0) {
256 unsigned num_idle
= pool
->num_idle
;
257 pthread_cond_t prefork_cond
;
259 ret
= pthread_cond_init(&prefork_cond
, NULL
);
263 * Push all idle threads off pool->condvar. In the
264 * child we can destroy the pool, which would result
265 * in undefined behaviour in the
266 * pthread_cond_destroy(pool->condvar). glibc just
269 pool
->prefork_cond
= &prefork_cond
;
271 ret
= pthread_cond_signal(&pool
->condvar
);
274 while (pool
->num_idle
== num_idle
) {
275 ret
= pthread_cond_wait(&prefork_cond
, &pool
->mutex
);
279 pool
->prefork_cond
= NULL
;
281 ret
= pthread_cond_destroy(&prefork_cond
);
286 * Probably it's well-defined somewhere: What happens to
287 * condvars after a fork? The rationale of pthread_atfork only
288 * writes about mutexes. So better be safe than sorry and
289 * destroy/reinit pool->condvar across a fork.
292 ret
= pthread_cond_destroy(&pool
->condvar
);
296 static void pthreadpool_prepare(void)
299 struct pthreadpool
*pool
;
301 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
306 while (pool
!= NULL
) {
307 pthreadpool_prepare_pool(pool
);
312 static void pthreadpool_parent(void)
315 struct pthreadpool
*pool
;
317 for (pool
= DLIST_TAIL(pthreadpools
);
319 pool
= DLIST_PREV(pool
)) {
320 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
322 ret
= pthread_mutex_unlock(&pool
->mutex
);
324 ret
= pthread_mutex_unlock(&pool
->fork_mutex
);
328 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
332 static void pthreadpool_child(void)
335 struct pthreadpool
*pool
;
337 for (pool
= DLIST_TAIL(pthreadpools
);
339 pool
= DLIST_PREV(pool
)) {
341 pool
->num_threads
= 0;
345 pool
->stopped
= true;
347 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
350 ret
= pthread_mutex_unlock(&pool
->mutex
);
353 ret
= pthread_mutex_unlock(&pool
->fork_mutex
);
357 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
361 static void pthreadpool_prep_atfork(void)
363 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
367 static int pthreadpool_free(struct pthreadpool
*pool
)
371 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
375 DLIST_REMOVE(pthreadpools
, pool
);
376 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
379 ret
= pthread_mutex_lock(&pool
->mutex
);
381 ret
= pthread_mutex_unlock(&pool
->mutex
);
384 ret
= pthread_mutex_destroy(&pool
->mutex
);
385 ret1
= pthread_cond_destroy(&pool
->condvar
);
386 ret2
= pthread_mutex_destroy(&pool
->fork_mutex
);
405 * Stop a thread pool. Wake up all idle threads for exit.
408 static int pthreadpool_stop_locked(struct pthreadpool
*pool
)
412 pool
->stopped
= true;
414 if (pool
->num_threads
== 0) {
419 * We have active threads, tell them to finish.
422 ret
= pthread_cond_broadcast(&pool
->condvar
);
428 * Stop a thread pool. Wake up all idle threads for exit.
431 int pthreadpool_stop(struct pthreadpool
*pool
)
435 ret
= pthread_mutex_lock(&pool
->mutex
);
440 if (!pool
->stopped
) {
441 ret
= pthreadpool_stop_locked(pool
);
444 ret1
= pthread_mutex_unlock(&pool
->mutex
);
451 * Destroy a thread pool. Wake up all idle threads for exit. The last
452 * one will free the pool.
455 int pthreadpool_destroy(struct pthreadpool
*pool
)
460 assert(!pool
->destroyed
);
462 ret
= pthread_mutex_lock(&pool
->mutex
);
467 pool
->destroyed
= true;
469 if (!pool
->stopped
) {
470 ret
= pthreadpool_stop_locked(pool
);
473 free_it
= (pool
->num_threads
== 0);
475 ret1
= pthread_mutex_unlock(&pool
->mutex
);
479 pthreadpool_free(pool
);
485 * Prepare for pthread_exit(), pool->mutex must be locked and will be
486 * unlocked here. This is a bit of a layering violation, but here we
487 * also take care of removing the pool if we're the last thread.
489 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
494 pool
->num_threads
-= 1;
496 free_it
= (pool
->destroyed
&& (pool
->num_threads
== 0));
498 ret
= pthread_mutex_unlock(&pool
->mutex
);
502 pthreadpool_free(pool
);
506 static bool pthreadpool_get_job(struct pthreadpool
*p
,
507 struct pthreadpool_job
*job
)
513 if (p
->num_jobs
== 0) {
516 *job
= p
->jobs
[p
->head
];
517 p
->head
= (p
->head
+1) % p
->jobs_array_len
;
522 static bool pthreadpool_put_job(struct pthreadpool
*p
,
524 void (*fn
)(void *private_data
),
527 struct pthreadpool_job
*job
;
529 if (p
->num_jobs
== p
->jobs_array_len
) {
530 struct pthreadpool_job
*tmp
;
531 size_t new_len
= p
->jobs_array_len
* 2;
534 p
->jobs
, sizeof(struct pthreadpool_job
) * new_len
);
541 * We just doubled the jobs array. The array implements a FIFO
542 * queue with a modulo-based wraparound, so we have to memcpy
543 * the jobs that are logically at the queue end but physically
544 * before the queue head into the reallocated area. The new
545 * space starts at the current jobs_array_len, and we have to
546 * copy everything before the current head job into the new
549 memcpy(&p
->jobs
[p
->jobs_array_len
], p
->jobs
,
550 sizeof(struct pthreadpool_job
) * p
->head
);
552 p
->jobs_array_len
= new_len
;
555 job
= &p
->jobs
[(p
->head
+ p
->num_jobs
) % p
->jobs_array_len
];
558 job
->private_data
= private_data
;
565 static void pthreadpool_undo_put_job(struct pthreadpool
*p
)
570 static void *pthreadpool_server(void *arg
)
572 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
575 res
= pthread_mutex_lock(&pool
->mutex
);
582 struct pthreadpool_job job
;
585 * idle-wait at most 1 second. If nothing happens in that
586 * time, exit this thread.
589 clock_gettime(CLOCK_REALTIME
, &ts
);
592 while ((pool
->num_jobs
== 0) && !pool
->stopped
) {
595 res
= pthread_cond_timedwait(
596 &pool
->condvar
, &pool
->mutex
, &ts
);
599 if (pool
->prefork_cond
!= NULL
) {
601 * Me must allow fork() to continue
602 * without anybody waiting on
603 * &pool->condvar. Tell
604 * pthreadpool_prepare_pool that we
608 res
= pthread_cond_signal(pool
->prefork_cond
);
611 res
= pthread_mutex_unlock(&pool
->mutex
);
615 * pthreadpool_prepare_pool has
616 * already locked this mutex across
617 * the fork. This makes us wait
618 * without sitting in a condvar.
620 res
= pthread_mutex_lock(&pool
->fork_mutex
);
622 res
= pthread_mutex_unlock(&pool
->fork_mutex
);
625 res
= pthread_mutex_lock(&pool
->mutex
);
629 if (res
== ETIMEDOUT
) {
631 if (pool
->num_jobs
== 0) {
633 * we timed out and still no work for
636 pthreadpool_server_exit(pool
);
645 if (pthreadpool_get_job(pool
, &job
)) {
649 * Do the work with the mutex unlocked
652 res
= pthread_mutex_unlock(&pool
->mutex
);
655 job
.fn(job
.private_data
);
657 ret
= pool
->signal_fn(job
.id
,
658 job
.fn
, job
.private_data
,
659 pool
->signal_fn_private_data
);
661 res
= pthread_mutex_lock(&pool
->mutex
);
665 pthreadpool_server_exit(pool
);
672 * we're asked to stop processing jobs, so exit
674 pthreadpool_server_exit(pool
);
680 static int pthreadpool_create_thread(struct pthreadpool
*pool
)
682 pthread_attr_t thread_attr
;
685 sigset_t mask
, omask
;
688 * Create a new worker thread. It should not receive any signals.
693 res
= pthread_attr_init(&thread_attr
);
698 res
= pthread_attr_setdetachstate(
699 &thread_attr
, PTHREAD_CREATE_DETACHED
);
701 pthread_attr_destroy(&thread_attr
);
705 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
707 pthread_attr_destroy(&thread_attr
);
711 res
= pthread_create(&thread_id
, &thread_attr
, pthreadpool_server
,
714 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
716 pthread_attr_destroy(&thread_attr
);
719 pool
->num_threads
+= 1;
725 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
726 void (*fn
)(void *private_data
), void *private_data
)
731 assert(!pool
->destroyed
);
733 res
= pthread_mutex_lock(&pool
->mutex
);
740 * Protect against the pool being shut down while
741 * trying to add a job
743 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
744 assert(unlock_res
== 0);
748 if (pool
->max_threads
== 0) {
749 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
750 assert(unlock_res
== 0);
753 * If no thread are allowed we do strict sync processing.
756 res
= pool
->signal_fn(job_id
, fn
, private_data
,
757 pool
->signal_fn_private_data
);
762 * Add job to the end of the queue
764 if (!pthreadpool_put_job(pool
, job_id
, fn
, private_data
)) {
765 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
766 assert(unlock_res
== 0);
770 if (pool
->num_idle
> 0) {
772 * We have idle threads, wake one.
774 res
= pthread_cond_signal(&pool
->condvar
);
776 pthreadpool_undo_put_job(pool
);
778 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
779 assert(unlock_res
== 0);
783 if (pool
->num_threads
>= pool
->max_threads
) {
785 * No more new threads, we just queue the request
787 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
788 assert(unlock_res
== 0);
792 res
= pthreadpool_create_thread(pool
);
794 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
795 assert(unlock_res
== 0);
799 if (pool
->num_threads
!= 0) {
801 * At least one thread is still available, let
802 * that one run the queued job.
804 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
805 assert(unlock_res
== 0);
809 pthreadpool_undo_put_job(pool
);
811 unlock_res
= pthread_mutex_unlock(&pool
->mutex
);
812 assert(unlock_res
== 0);
817 size_t pthreadpool_cancel_job(struct pthreadpool
*pool
, int job_id
,
818 void (*fn
)(void *private_data
), void *private_data
)
824 assert(!pool
->destroyed
);
826 res
= pthread_mutex_lock(&pool
->mutex
);
831 for (i
= 0, j
= 0; i
< pool
->num_jobs
; i
++) {
832 size_t idx
= (pool
->head
+ i
) % pool
->jobs_array_len
;
833 size_t new_idx
= (pool
->head
+ j
) % pool
->jobs_array_len
;
834 struct pthreadpool_job
*job
= &pool
->jobs
[idx
];
836 if ((job
->private_data
== private_data
) &&
837 (job
->id
== job_id
) &&
841 * Just skip the entry.
848 * If we already removed one or more jobs (so j will be smaller
849 * then i), we need to fill possible gaps in the logical list.
852 pool
->jobs
[new_idx
] = *job
;
857 pool
->num_jobs
-= num
;
859 res
= pthread_mutex_unlock(&pool
->mutex
);