2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job
{
35 void (*fn
)(void *private_data
);
41 * List pthreadpools for fork safety
43 struct pthreadpool
*prev
, *next
;
46 * Control access to this struct
48 pthread_mutex_t mutex
;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar
;
58 size_t jobs_array_len
;
59 struct pthreadpool_job
*jobs
;
65 * Indicate job completion
67 int (*signal_fn
)(int jobid
,
68 void (*job_fn
)(void *private_data
),
69 void *job_fn_private_data
,
71 void *signal_fn_private_data
;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
89 * Number of idle threads
94 * An array of threads that require joining.
97 pthread_t
*exited
; /* We alloc more */
100 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
101 static struct pthreadpool
*pthreadpools
= NULL
;
102 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
104 static void pthreadpool_prep_atfork(void);
107 * Initialize a thread pool
110 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
,
111 int (*signal_fn
)(int jobid
,
112 void (*job_fn
)(void *private_data
),
113 void *job_fn_private_data
,
115 void *signal_fn_private_data
)
117 struct pthreadpool
*pool
;
120 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
124 pool
->signal_fn
= signal_fn
;
125 pool
->signal_fn_private_data
= signal_fn_private_data
;
127 pool
->jobs_array_len
= 4;
129 pool
->jobs_array_len
, sizeof(struct pthreadpool_job
));
131 if (pool
->jobs
== NULL
) {
136 pool
->head
= pool
->num_jobs
= 0;
138 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
145 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
147 pthread_mutex_destroy(&pool
->mutex
);
154 pool
->num_threads
= 0;
155 pool
->num_exited
= 0;
157 pool
->max_threads
= max_threads
;
160 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
162 pthread_cond_destroy(&pool
->condvar
);
163 pthread_mutex_destroy(&pool
->mutex
);
168 DLIST_ADD(pthreadpools
, pool
);
170 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
173 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
180 static void pthreadpool_prepare(void)
183 struct pthreadpool
*pool
;
185 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
190 while (pool
!= NULL
) {
191 ret
= pthread_mutex_lock(&pool
->mutex
);
197 static void pthreadpool_parent(void)
200 struct pthreadpool
*pool
;
202 for (pool
= DLIST_TAIL(pthreadpools
);
204 pool
= DLIST_PREV(pool
)) {
205 ret
= pthread_mutex_unlock(&pool
->mutex
);
209 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
213 static void pthreadpool_child(void)
216 struct pthreadpool
*pool
;
218 for (pool
= DLIST_TAIL(pthreadpools
);
220 pool
= DLIST_PREV(pool
)) {
222 pool
->num_threads
= 0;
224 pool
->num_exited
= 0;
232 ret
= pthread_mutex_unlock(&pool
->mutex
);
236 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
240 static void pthreadpool_prep_atfork(void)
242 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
247 * Do a pthread_join() on all children that have exited, pool->mutex must be
250 static void pthreadpool_join_children(struct pthreadpool
*pool
)
254 for (i
=0; i
<pool
->num_exited
; i
++) {
257 ret
= pthread_join(pool
->exited
[i
], NULL
);
260 * Severe internal error, we can't do much but
266 pool
->num_exited
= 0;
269 * Deliberately not free and NULL pool->exited. That will be
270 * re-used by realloc later.
275 * Destroy a thread pool, finishing all threads working for it
278 int pthreadpool_destroy(struct pthreadpool
*pool
)
282 ret
= pthread_mutex_lock(&pool
->mutex
);
287 if ((pool
->num_jobs
!= 0) || pool
->shutdown
) {
288 ret
= pthread_mutex_unlock(&pool
->mutex
);
293 if (pool
->num_threads
> 0) {
295 * We have active threads, tell them to finish, wait for that.
300 if (pool
->num_idle
> 0) {
302 * Wake the idle threads. They will find
303 * pool->shutdown to be set and exit themselves
305 ret
= pthread_cond_broadcast(&pool
->condvar
);
307 pthread_mutex_unlock(&pool
->mutex
);
312 while ((pool
->num_threads
> 0) || (pool
->num_exited
> 0)) {
314 if (pool
->num_exited
> 0) {
315 pthreadpool_join_children(pool
);
319 * A thread that shuts down will also signal
322 ret
= pthread_cond_wait(&pool
->condvar
, &pool
->mutex
);
324 pthread_mutex_unlock(&pool
->mutex
);
330 ret
= pthread_mutex_unlock(&pool
->mutex
);
334 ret
= pthread_mutex_destroy(&pool
->mutex
);
335 ret1
= pthread_cond_destroy(&pool
->condvar
);
344 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
348 DLIST_REMOVE(pthreadpools
, pool
);
349 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
360 * Prepare for pthread_exit(), pool->mutex must be locked
362 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
366 pool
->num_threads
-= 1;
368 exited
= (pthread_t
*)realloc(
369 pool
->exited
, sizeof(pthread_t
) * (pool
->num_exited
+ 1));
371 if (exited
== NULL
) {
372 /* lost a thread status */
375 pool
->exited
= exited
;
377 pool
->exited
[pool
->num_exited
] = pthread_self();
378 pool
->num_exited
+= 1;
381 static bool pthreadpool_get_job(struct pthreadpool
*p
,
382 struct pthreadpool_job
*job
)
384 if (p
->num_jobs
== 0) {
387 *job
= p
->jobs
[p
->head
];
388 p
->head
= (p
->head
+1) % p
->jobs_array_len
;
393 static bool pthreadpool_put_job(struct pthreadpool
*p
,
395 void (*fn
)(void *private_data
),
398 struct pthreadpool_job
*job
;
400 if (p
->num_jobs
== p
->jobs_array_len
) {
401 struct pthreadpool_job
*tmp
;
402 size_t new_len
= p
->jobs_array_len
* 2;
405 p
->jobs
, sizeof(struct pthreadpool_job
) * new_len
);
412 * We just doubled the jobs array. The array implements a FIFO
413 * queue with a modulo-based wraparound, so we have to memcpy
414 * the jobs that are logically at the queue end but physically
415 * before the queue head into the reallocated area. The new
416 * space starts at the current jobs_array_len, and we have to
417 * copy everything before the current head job into the new
420 memcpy(&p
->jobs
[p
->jobs_array_len
], p
->jobs
,
421 sizeof(struct pthreadpool_job
) * p
->head
);
423 p
->jobs_array_len
= new_len
;
426 job
= &p
->jobs
[(p
->head
+ p
->num_jobs
) % p
->jobs_array_len
];
429 job
->private_data
= private_data
;
436 static void *pthreadpool_server(void *arg
)
438 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
441 res
= pthread_mutex_lock(&pool
->mutex
);
448 struct pthreadpool_job job
;
451 * idle-wait at most 1 second. If nothing happens in that
452 * time, exit this thread.
455 clock_gettime(CLOCK_REALTIME
, &ts
);
458 while ((pool
->num_jobs
== 0) && (pool
->shutdown
== 0)) {
461 res
= pthread_cond_timedwait(
462 &pool
->condvar
, &pool
->mutex
, &ts
);
465 if (res
== ETIMEDOUT
) {
467 if (pool
->num_jobs
== 0) {
469 * we timed out and still no work for
472 pthreadpool_server_exit(pool
);
473 pthread_mutex_unlock(&pool
->mutex
);
482 if (pthreadpool_get_job(pool
, &job
)) {
486 * Do the work with the mutex unlocked
489 res
= pthread_mutex_unlock(&pool
->mutex
);
492 job
.fn(job
.private_data
);
494 res
= pthread_mutex_lock(&pool
->mutex
);
497 ret
= pool
->signal_fn(job
.id
,
498 job
.fn
, job
.private_data
,
499 pool
->signal_fn_private_data
);
501 pthreadpool_server_exit(pool
);
502 pthread_mutex_unlock(&pool
->mutex
);
507 if ((pool
->num_jobs
== 0) && (pool
->shutdown
!= 0)) {
509 * No more work to do and we're asked to shut down, so
512 pthreadpool_server_exit(pool
);
514 if (pool
->num_threads
== 0) {
516 * Ping the main thread waiting for all of us
517 * workers to have quit.
519 pthread_cond_broadcast(&pool
->condvar
);
522 pthread_mutex_unlock(&pool
->mutex
);
528 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
529 void (*fn
)(void *private_data
), void *private_data
)
533 sigset_t mask
, omask
;
535 res
= pthread_mutex_lock(&pool
->mutex
);
540 if (pool
->shutdown
) {
542 * Protect against the pool being shut down while
543 * trying to add a job
545 res
= pthread_mutex_unlock(&pool
->mutex
);
551 * Just some cleanup under the mutex
553 pthreadpool_join_children(pool
);
556 * Add job to the end of the queue
558 if (!pthreadpool_put_job(pool
, job_id
, fn
, private_data
)) {
559 pthread_mutex_unlock(&pool
->mutex
);
563 if (pool
->num_idle
> 0) {
565 * We have idle threads, wake one.
567 res
= pthread_cond_signal(&pool
->condvar
);
568 pthread_mutex_unlock(&pool
->mutex
);
572 if ((pool
->max_threads
!= 0) &&
573 (pool
->num_threads
>= pool
->max_threads
)) {
575 * No more new threads, we just queue the request
577 pthread_mutex_unlock(&pool
->mutex
);
582 * Create a new worker thread. It should not receive any signals.
587 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
589 pthread_mutex_unlock(&pool
->mutex
);
593 res
= pthread_create(&thread_id
, NULL
, pthreadpool_server
,
596 pool
->num_threads
+= 1;
599 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
601 pthread_mutex_unlock(&pool
->mutex
);