2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job
{
35 void (*fn
)(void *private_data
);
41 * List pthreadpools for fork safety
43 struct pthreadpool
*prev
, *next
;
46 * Control access to this struct
48 pthread_mutex_t mutex
;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar
;
58 size_t jobs_array_len
;
59 struct pthreadpool_job
*jobs
;
65 * Indicate job completion
67 int (*signal_fn
)(int jobid
,
68 void (*job_fn
)(void *private_data
),
69 void *job_fn_private_data
,
71 void *signal_fn_private_data
;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
89 * Number of idle threads
94 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
95 static struct pthreadpool
*pthreadpools
= NULL
;
96 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
98 static void pthreadpool_prep_atfork(void);
101 * Initialize a thread pool
104 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
,
105 int (*signal_fn
)(int jobid
,
106 void (*job_fn
)(void *private_data
),
107 void *job_fn_private_data
,
109 void *signal_fn_private_data
)
111 struct pthreadpool
*pool
;
114 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
118 pool
->signal_fn
= signal_fn
;
119 pool
->signal_fn_private_data
= signal_fn_private_data
;
121 pool
->jobs_array_len
= 4;
123 pool
->jobs_array_len
, sizeof(struct pthreadpool_job
));
125 if (pool
->jobs
== NULL
) {
130 pool
->head
= pool
->num_jobs
= 0;
132 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
139 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
141 pthread_mutex_destroy(&pool
->mutex
);
147 pool
->shutdown
= false;
148 pool
->num_threads
= 0;
149 pool
->max_threads
= max_threads
;
152 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
154 pthread_cond_destroy(&pool
->condvar
);
155 pthread_mutex_destroy(&pool
->mutex
);
160 DLIST_ADD(pthreadpools
, pool
);
162 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
165 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
172 static void pthreadpool_prepare(void)
175 struct pthreadpool
*pool
;
177 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
182 while (pool
!= NULL
) {
183 ret
= pthread_mutex_lock(&pool
->mutex
);
189 static void pthreadpool_parent(void)
192 struct pthreadpool
*pool
;
194 for (pool
= DLIST_TAIL(pthreadpools
);
196 pool
= DLIST_PREV(pool
)) {
197 ret
= pthread_mutex_unlock(&pool
->mutex
);
201 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
205 static void pthreadpool_child(void)
208 struct pthreadpool
*pool
;
210 for (pool
= DLIST_TAIL(pthreadpools
);
212 pool
= DLIST_PREV(pool
)) {
214 pool
->num_threads
= 0;
219 ret
= pthread_mutex_unlock(&pool
->mutex
);
223 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
227 static void pthreadpool_prep_atfork(void)
229 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
233 static int pthreadpool_free(struct pthreadpool
*pool
)
237 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
241 DLIST_REMOVE(pthreadpools
, pool
);
242 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
245 ret
= pthread_mutex_destroy(&pool
->mutex
);
246 ret1
= pthread_cond_destroy(&pool
->condvar
);
262 * Destroy a thread pool. Wake up all idle threads for exit. The last
263 * one will free the pool.
266 int pthreadpool_destroy(struct pthreadpool
*pool
)
270 ret
= pthread_mutex_lock(&pool
->mutex
);
275 if (pool
->shutdown
) {
276 ret
= pthread_mutex_unlock(&pool
->mutex
);
281 pool
->shutdown
= true;
283 if (pool
->num_threads
== 0) {
284 ret
= pthread_mutex_unlock(&pool
->mutex
);
287 ret
= pthreadpool_free(pool
);
292 * We have active threads, tell them to finish.
295 ret
= pthread_cond_broadcast(&pool
->condvar
);
297 ret1
= pthread_mutex_unlock(&pool
->mutex
);
304 * Prepare for pthread_exit(), pool->mutex must be locked and will be
305 * unlocked here. This is a bit of a layering violation, but here we
306 * also take care of removing the pool if we're the last thread.
308 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
313 pool
->num_threads
-= 1;
315 free_it
= (pool
->shutdown
&& (pool
->num_threads
== 0));
317 ret
= pthread_mutex_unlock(&pool
->mutex
);
321 pthreadpool_free(pool
);
325 static bool pthreadpool_get_job(struct pthreadpool
*p
,
326 struct pthreadpool_job
*job
)
328 if (p
->num_jobs
== 0) {
331 *job
= p
->jobs
[p
->head
];
332 p
->head
= (p
->head
+1) % p
->jobs_array_len
;
337 static bool pthreadpool_put_job(struct pthreadpool
*p
,
339 void (*fn
)(void *private_data
),
342 struct pthreadpool_job
*job
;
344 if (p
->num_jobs
== p
->jobs_array_len
) {
345 struct pthreadpool_job
*tmp
;
346 size_t new_len
= p
->jobs_array_len
* 2;
349 p
->jobs
, sizeof(struct pthreadpool_job
) * new_len
);
356 * We just doubled the jobs array. The array implements a FIFO
357 * queue with a modulo-based wraparound, so we have to memcpy
358 * the jobs that are logically at the queue end but physically
359 * before the queue head into the reallocated area. The new
360 * space starts at the current jobs_array_len, and we have to
361 * copy everything before the current head job into the new
364 memcpy(&p
->jobs
[p
->jobs_array_len
], p
->jobs
,
365 sizeof(struct pthreadpool_job
) * p
->head
);
367 p
->jobs_array_len
= new_len
;
370 job
= &p
->jobs
[(p
->head
+ p
->num_jobs
) % p
->jobs_array_len
];
373 job
->private_data
= private_data
;
380 static void *pthreadpool_server(void *arg
)
382 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
385 res
= pthread_mutex_lock(&pool
->mutex
);
392 struct pthreadpool_job job
;
395 * idle-wait at most 1 second. If nothing happens in that
396 * time, exit this thread.
399 clock_gettime(CLOCK_REALTIME
, &ts
);
402 while ((pool
->num_jobs
== 0) && !pool
->shutdown
) {
405 res
= pthread_cond_timedwait(
406 &pool
->condvar
, &pool
->mutex
, &ts
);
409 if (res
== ETIMEDOUT
) {
411 if (pool
->num_jobs
== 0) {
413 * we timed out and still no work for
416 pthreadpool_server_exit(pool
);
425 if (pthreadpool_get_job(pool
, &job
)) {
429 * Do the work with the mutex unlocked
432 res
= pthread_mutex_unlock(&pool
->mutex
);
435 job
.fn(job
.private_data
);
437 ret
= pool
->signal_fn(job
.id
,
438 job
.fn
, job
.private_data
,
439 pool
->signal_fn_private_data
);
441 res
= pthread_mutex_lock(&pool
->mutex
);
445 pthreadpool_server_exit(pool
);
450 if ((pool
->num_jobs
== 0) && pool
->shutdown
) {
452 * No more work to do and we're asked to shut down, so
455 pthreadpool_server_exit(pool
);
461 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
462 void (*fn
)(void *private_data
), void *private_data
)
464 pthread_attr_t thread_attr
;
467 sigset_t mask
, omask
;
469 res
= pthread_mutex_lock(&pool
->mutex
);
474 if (pool
->shutdown
) {
476 * Protect against the pool being shut down while
477 * trying to add a job
479 res
= pthread_mutex_unlock(&pool
->mutex
);
485 * Add job to the end of the queue
487 if (!pthreadpool_put_job(pool
, job_id
, fn
, private_data
)) {
488 pthread_mutex_unlock(&pool
->mutex
);
492 if (pool
->num_idle
> 0) {
494 * We have idle threads, wake one.
496 res
= pthread_cond_signal(&pool
->condvar
);
497 pthread_mutex_unlock(&pool
->mutex
);
501 if ((pool
->max_threads
!= 0) &&
502 (pool
->num_threads
>= pool
->max_threads
)) {
504 * No more new threads, we just queue the request
506 pthread_mutex_unlock(&pool
->mutex
);
511 * Create a new worker thread. It should not receive any signals.
516 res
= pthread_attr_init(&thread_attr
);
518 pthread_mutex_unlock(&pool
->mutex
);
522 res
= pthread_attr_setdetachstate(
523 &thread_attr
, PTHREAD_CREATE_DETACHED
);
525 pthread_attr_destroy(&thread_attr
);
526 pthread_mutex_unlock(&pool
->mutex
);
530 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
532 pthread_attr_destroy(&thread_attr
);
533 pthread_mutex_unlock(&pool
->mutex
);
537 res
= pthread_create(&thread_id
, &thread_attr
, pthreadpool_server
,
540 pool
->num_threads
+= 1;
543 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
545 pthread_attr_destroy(&thread_attr
);
547 pthread_mutex_unlock(&pool
->mutex
);