2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 #include "pthreadpool.h"
32 #include "lib/util/dlinklist.h"
34 struct pthreadpool_job
{
35 struct pthreadpool_job
*next
;
37 void (*fn
)(void *private_data
);
43 * List pthreadpools for fork safety
45 struct pthreadpool
*prev
, *next
;
48 * Control access to this struct
50 pthread_mutex_t mutex
;
53 * Threads waiting for work do so here
55 pthread_cond_t condvar
;
60 struct pthreadpool_job
*jobs
, *last_job
;
68 * indicator to worker threads that they should shut down
73 * maximum number of threads
83 * Number of idle threads
88 * An array of threads that require joining.
91 pthread_t
*exited
; /* We alloc more */
94 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
95 static struct pthreadpool
*pthreadpools
= NULL
;
96 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
98 static void pthreadpool_prep_atfork(void);
101 * Initialize a thread pool
104 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
)
106 struct pthreadpool
*pool
;
109 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
114 ret
= pipe(pool
->sig_pipe
);
121 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
123 close(pool
->sig_pipe
[0]);
124 close(pool
->sig_pipe
[1]);
129 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
131 pthread_mutex_destroy(&pool
->mutex
);
132 close(pool
->sig_pipe
[0]);
133 close(pool
->sig_pipe
[1]);
139 pool
->jobs
= pool
->last_job
= NULL
;
140 pool
->num_threads
= 0;
141 pool
->num_exited
= 0;
143 pool
->max_threads
= max_threads
;
146 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
148 pthread_cond_destroy(&pool
->condvar
);
149 pthread_mutex_destroy(&pool
->mutex
);
150 close(pool
->sig_pipe
[0]);
151 close(pool
->sig_pipe
[1]);
155 DLIST_ADD(pthreadpools
, pool
);
157 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
160 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
167 static void pthreadpool_prepare(void)
170 struct pthreadpool
*pool
;
172 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
177 while (pool
!= NULL
) {
178 ret
= pthread_mutex_lock(&pool
->mutex
);
184 static void pthreadpool_parent(void)
187 struct pthreadpool
*pool
;
189 pool
= DLIST_TAIL(pthreadpools
);
192 ret
= pthread_mutex_unlock(&pool
->mutex
);
195 if (pool
== pthreadpools
) {
201 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
205 static void pthreadpool_child(void)
208 struct pthreadpool
*pool
;
210 pool
= DLIST_TAIL(pthreadpools
);
213 close(pool
->sig_pipe
[0]);
214 close(pool
->sig_pipe
[1]);
216 ret
= pipe(pool
->sig_pipe
);
219 pool
->num_threads
= 0;
221 pool
->num_exited
= 0;
227 while (pool
->jobs
!= NULL
) {
228 struct pthreadpool_job
*job
;
230 pool
->jobs
= job
->next
;
233 pool
->last_job
= NULL
;
235 ret
= pthread_mutex_unlock(&pool
->mutex
);
238 if (pool
== pthreadpools
) {
244 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
248 static void pthreadpool_prep_atfork(void)
250 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
255 * Return the file descriptor which becomes readable when a job has
259 int pthreadpool_signal_fd(struct pthreadpool
*pool
)
261 return pool
->sig_pipe
[0];
265 * Do a pthread_join() on all children that have exited, pool->mutex must be
268 static void pthreadpool_join_children(struct pthreadpool
*pool
)
272 for (i
=0; i
<pool
->num_exited
; i
++) {
273 pthread_join(pool
->exited
[i
], NULL
);
275 pool
->num_exited
= 0;
278 * Deliberately not free and NULL pool->exited. That will be
279 * re-used by realloc later.
284 * Fetch a finished job number from the signal pipe
287 int pthreadpool_finished_job(struct pthreadpool
*pool
)
295 while ((nread
== -1) && (errno
== EINTR
)) {
296 nread
= read(pool
->sig_pipe
[0], &result
, sizeof(int));
301 if (nread
!= sizeof(int)) {
308 * Destroy a thread pool, finishing all threads working for it
311 int pthreadpool_destroy(struct pthreadpool
*pool
)
315 ret
= pthread_mutex_lock(&pool
->mutex
);
320 if ((pool
->jobs
!= NULL
) || pool
->shutdown
) {
321 ret
= pthread_mutex_unlock(&pool
->mutex
);
326 if (pool
->num_threads
> 0) {
328 * We have active threads, tell them to finish, wait for that.
333 if (pool
->num_idle
> 0) {
335 * Wake the idle threads. They will find pool->quit to
336 * be set and exit themselves
338 ret
= pthread_cond_broadcast(&pool
->condvar
);
340 pthread_mutex_unlock(&pool
->mutex
);
345 while ((pool
->num_threads
> 0) || (pool
->num_exited
> 0)) {
347 if (pool
->num_exited
> 0) {
348 pthreadpool_join_children(pool
);
352 * A thread that shuts down will also signal
355 ret
= pthread_cond_wait(&pool
->condvar
, &pool
->mutex
);
357 pthread_mutex_unlock(&pool
->mutex
);
363 ret
= pthread_mutex_unlock(&pool
->mutex
);
367 ret
= pthread_mutex_destroy(&pool
->mutex
);
368 ret1
= pthread_cond_destroy(&pool
->condvar
);
377 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
381 DLIST_REMOVE(pthreadpools
, pool
);
382 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
385 close(pool
->sig_pipe
[0]);
386 pool
->sig_pipe
[0] = -1;
388 close(pool
->sig_pipe
[1]);
389 pool
->sig_pipe
[1] = -1;
398 * Prepare for pthread_exit(), pool->mutex must be locked
400 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
404 pool
->num_threads
-= 1;
406 exited
= (pthread_t
*)realloc(
407 pool
->exited
, sizeof(pthread_t
*) * (pool
->num_exited
+ 1));
409 if (exited
== NULL
) {
410 /* lost a thread status */
413 pool
->exited
= exited
;
415 pool
->exited
[pool
->num_exited
] = pthread_self();
416 pool
->num_exited
+= 1;
419 static void *pthreadpool_server(void *arg
)
421 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
424 res
= pthread_mutex_lock(&pool
->mutex
);
432 struct pthreadpool_job
*job
;
435 * idle-wait at most 1 second. If nothing happens in that
436 * time, exit this thread.
439 gettimeofday(&tv
, NULL
);
440 ts
.tv_sec
= tv
.tv_sec
+ 1;
441 ts
.tv_nsec
= tv
.tv_usec
*1000;
443 while ((pool
->jobs
== NULL
) && (pool
->shutdown
== 0)) {
446 res
= pthread_cond_timedwait(
447 &pool
->condvar
, &pool
->mutex
, &ts
);
450 if (res
== ETIMEDOUT
) {
452 if (pool
->jobs
== NULL
) {
454 * we timed out and still no work for
457 pthreadpool_server_exit(pool
);
458 pthread_mutex_unlock(&pool
->mutex
);
473 * Ok, there's work for us to do, remove the job from
474 * the pthreadpool list
476 pool
->jobs
= job
->next
;
477 if (pool
->last_job
== job
) {
478 pool
->last_job
= NULL
;
482 * Do the work with the mutex unlocked
485 res
= pthread_mutex_unlock(&pool
->mutex
);
488 job
->fn(job
->private_data
);
490 written
= write(pool
->sig_pipe
[1], &job
->id
,
495 res
= pthread_mutex_lock(&pool
->mutex
);
498 if (written
!= sizeof(int)) {
499 pthreadpool_server_exit(pool
);
500 pthread_mutex_unlock(&pool
->mutex
);
505 if ((pool
->jobs
== NULL
) && (pool
->shutdown
!= 0)) {
507 * No more work to do and we're asked to shut down, so
510 pthreadpool_server_exit(pool
);
512 if (pool
->num_threads
== 0) {
514 * Ping the main thread waiting for all of us
515 * workers to have quit.
517 pthread_cond_broadcast(&pool
->condvar
);
520 pthread_mutex_unlock(&pool
->mutex
);
526 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
527 void (*fn
)(void *private_data
), void *private_data
)
529 struct pthreadpool_job
*job
;
532 sigset_t mask
, omask
;
534 job
= (struct pthreadpool_job
*)malloc(sizeof(struct pthreadpool_job
));
540 job
->private_data
= private_data
;
544 res
= pthread_mutex_lock(&pool
->mutex
);
550 if (pool
->shutdown
) {
552 * Protect against the pool being shut down while
553 * trying to add a job
555 res
= pthread_mutex_unlock(&pool
->mutex
);
562 * Just some cleanup under the mutex
564 pthreadpool_join_children(pool
);
567 * Add job to the end of the queue
569 if (pool
->jobs
== NULL
) {
573 pool
->last_job
->next
= job
;
575 pool
->last_job
= job
;
577 if (pool
->num_idle
> 0) {
579 * We have idle threads, wake one.
581 res
= pthread_cond_signal(&pool
->condvar
);
582 pthread_mutex_unlock(&pool
->mutex
);
586 if ((pool
->max_threads
!= 0) &&
587 (pool
->num_threads
>= pool
->max_threads
)) {
589 * No more new threads, we just queue the request
591 pthread_mutex_unlock(&pool
->mutex
);
596 * Create a new worker thread. It should not receive any signals.
601 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
603 pthread_mutex_unlock(&pool
->mutex
);
607 res
= pthread_create(&thread_id
, NULL
, pthreadpool_server
,
610 pool
->num_threads
+= 1;
613 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
615 pthread_mutex_unlock(&pool
->mutex
);