2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #include "system/time.h"
30 #include "system/filesys.h"
33 #include "pthreadpool.h"
34 #include "lib/util/dlinklist.h"
36 struct pthreadpool_job
{
37 struct pthreadpool_job
*next
;
39 void (*fn
)(void *private_data
);
45 * List pthreadpools for fork safety
47 struct pthreadpool
*prev
, *next
;
50 * Control access to this struct
52 pthread_mutex_t mutex
;
55 * Threads waiting for work do so here
57 pthread_cond_t condvar
;
62 struct pthreadpool_job
*jobs
, *last_job
;
70 * indicator to worker threads that they should shut down
75 * maximum number of threads
85 * Number of idle threads
90 * An array of threads that require joining.
93 pthread_t
*exited
; /* We alloc more */
96 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
97 static struct pthreadpool
*pthreadpools
= NULL
;
98 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
100 static void pthreadpool_prep_atfork(void);
103 * Initialize a thread pool
106 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
)
108 struct pthreadpool
*pool
;
111 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
116 ret
= pipe(pool
->sig_pipe
);
123 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
125 close(pool
->sig_pipe
[0]);
126 close(pool
->sig_pipe
[1]);
131 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
133 pthread_mutex_destroy(&pool
->mutex
);
134 close(pool
->sig_pipe
[0]);
135 close(pool
->sig_pipe
[1]);
141 pool
->jobs
= pool
->last_job
= NULL
;
142 pool
->num_threads
= 0;
143 pool
->num_exited
= 0;
145 pool
->max_threads
= max_threads
;
148 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
150 pthread_cond_destroy(&pool
->condvar
);
151 pthread_mutex_destroy(&pool
->mutex
);
152 close(pool
->sig_pipe
[0]);
153 close(pool
->sig_pipe
[1]);
157 DLIST_ADD(pthreadpools
, pool
);
159 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
162 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
169 static void pthreadpool_prepare(void)
172 struct pthreadpool
*pool
;
174 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
179 while (pool
!= NULL
) {
180 ret
= pthread_mutex_lock(&pool
->mutex
);
186 static void pthreadpool_parent(void)
189 struct pthreadpool
*pool
;
191 pool
= DLIST_TAIL(pthreadpools
);
194 ret
= pthread_mutex_unlock(&pool
->mutex
);
197 if (pool
== pthreadpools
) {
203 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
207 static void pthreadpool_child(void)
210 struct pthreadpool
*pool
;
212 pool
= DLIST_TAIL(pthreadpools
);
215 close(pool
->sig_pipe
[0]);
216 close(pool
->sig_pipe
[1]);
218 ret
= pipe(pool
->sig_pipe
);
221 pool
->num_threads
= 0;
223 pool
->num_exited
= 0;
229 while (pool
->jobs
!= NULL
) {
230 struct pthreadpool_job
*job
;
232 pool
->jobs
= job
->next
;
235 pool
->last_job
= NULL
;
237 ret
= pthread_mutex_unlock(&pool
->mutex
);
240 if (pool
== pthreadpools
) {
246 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
250 static void pthreadpool_prep_atfork(void)
252 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
257 * Return the file descriptor which becomes readable when a job has
261 int pthreadpool_signal_fd(struct pthreadpool
*pool
)
263 return pool
->sig_pipe
[0];
267 * Do a pthread_join() on all children that have exited, pool->mutex must be
270 static void pthreadpool_join_children(struct pthreadpool
*pool
)
274 for (i
=0; i
<pool
->num_exited
; i
++) {
275 pthread_join(pool
->exited
[i
], NULL
);
277 pool
->num_exited
= 0;
280 * Deliberately not free and NULL pool->exited. That will be
281 * re-used by realloc later.
286 * Fetch a finished job number from the signal pipe
289 int pthreadpool_finished_job(struct pthreadpool
*pool
, int *jobid
)
297 while ((nread
== -1) && (errno
== EINTR
)) {
298 nread
= read(pool
->sig_pipe
[0], &ret_jobid
, sizeof(int));
303 if (nread
!= sizeof(int)) {
311 * Destroy a thread pool, finishing all threads working for it
314 int pthreadpool_destroy(struct pthreadpool
*pool
)
318 ret
= pthread_mutex_lock(&pool
->mutex
);
323 if ((pool
->jobs
!= NULL
) || pool
->shutdown
) {
324 ret
= pthread_mutex_unlock(&pool
->mutex
);
329 if (pool
->num_threads
> 0) {
331 * We have active threads, tell them to finish, wait for that.
336 if (pool
->num_idle
> 0) {
338 * Wake the idle threads. They will find
339 * pool->shutdown to be set and exit themselves
341 ret
= pthread_cond_broadcast(&pool
->condvar
);
343 pthread_mutex_unlock(&pool
->mutex
);
348 while ((pool
->num_threads
> 0) || (pool
->num_exited
> 0)) {
350 if (pool
->num_exited
> 0) {
351 pthreadpool_join_children(pool
);
355 * A thread that shuts down will also signal
358 ret
= pthread_cond_wait(&pool
->condvar
, &pool
->mutex
);
360 pthread_mutex_unlock(&pool
->mutex
);
366 ret
= pthread_mutex_unlock(&pool
->mutex
);
370 ret
= pthread_mutex_destroy(&pool
->mutex
);
371 ret1
= pthread_cond_destroy(&pool
->condvar
);
380 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
384 DLIST_REMOVE(pthreadpools
, pool
);
385 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
388 close(pool
->sig_pipe
[0]);
389 pool
->sig_pipe
[0] = -1;
391 close(pool
->sig_pipe
[1]);
392 pool
->sig_pipe
[1] = -1;
401 * Prepare for pthread_exit(), pool->mutex must be locked
403 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
407 pool
->num_threads
-= 1;
409 exited
= (pthread_t
*)realloc(
410 pool
->exited
, sizeof(pthread_t
) * (pool
->num_exited
+ 1));
412 if (exited
== NULL
) {
413 /* lost a thread status */
416 pool
->exited
= exited
;
418 pool
->exited
[pool
->num_exited
] = pthread_self();
419 pool
->num_exited
+= 1;
422 static void *pthreadpool_server(void *arg
)
424 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
427 res
= pthread_mutex_lock(&pool
->mutex
);
434 struct pthreadpool_job
*job
;
437 * idle-wait at most 1 second. If nothing happens in that
438 * time, exit this thread.
441 clock_gettime(CLOCK_REALTIME
, &ts
);
444 while ((pool
->jobs
== NULL
) && (pool
->shutdown
== 0)) {
447 res
= pthread_cond_timedwait(
448 &pool
->condvar
, &pool
->mutex
, &ts
);
451 if (res
== ETIMEDOUT
) {
453 if (pool
->jobs
== NULL
) {
455 * we timed out and still no work for
458 pthreadpool_server_exit(pool
);
459 pthread_mutex_unlock(&pool
->mutex
);
474 * Ok, there's work for us to do, remove the job from
475 * the pthreadpool list
477 pool
->jobs
= job
->next
;
478 if (pool
->last_job
== job
) {
479 pool
->last_job
= NULL
;
483 * Do the work with the mutex unlocked
486 res
= pthread_mutex_unlock(&pool
->mutex
);
489 job
->fn(job
->private_data
);
491 written
= write(pool
->sig_pipe
[1], &job
->id
,
496 res
= pthread_mutex_lock(&pool
->mutex
);
499 if (written
!= sizeof(int)) {
500 pthreadpool_server_exit(pool
);
501 pthread_mutex_unlock(&pool
->mutex
);
506 if ((pool
->jobs
== NULL
) && (pool
->shutdown
!= 0)) {
508 * No more work to do and we're asked to shut down, so
511 pthreadpool_server_exit(pool
);
513 if (pool
->num_threads
== 0) {
515 * Ping the main thread waiting for all of us
516 * workers to have quit.
518 pthread_cond_broadcast(&pool
->condvar
);
521 pthread_mutex_unlock(&pool
->mutex
);
527 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
528 void (*fn
)(void *private_data
), void *private_data
)
530 struct pthreadpool_job
*job
;
533 sigset_t mask
, omask
;
535 job
= (struct pthreadpool_job
*)malloc(sizeof(struct pthreadpool_job
));
541 job
->private_data
= private_data
;
545 res
= pthread_mutex_lock(&pool
->mutex
);
551 if (pool
->shutdown
) {
553 * Protect against the pool being shut down while
554 * trying to add a job
556 res
= pthread_mutex_unlock(&pool
->mutex
);
563 * Just some cleanup under the mutex
565 pthreadpool_join_children(pool
);
568 * Add job to the end of the queue
570 if (pool
->jobs
== NULL
) {
574 pool
->last_job
->next
= job
;
576 pool
->last_job
= job
;
578 if (pool
->num_idle
> 0) {
580 * We have idle threads, wake one.
582 res
= pthread_cond_signal(&pool
->condvar
);
583 pthread_mutex_unlock(&pool
->mutex
);
587 if ((pool
->max_threads
!= 0) &&
588 (pool
->num_threads
>= pool
->max_threads
)) {
590 * No more new threads, we just queue the request
592 pthread_mutex_unlock(&pool
->mutex
);
597 * Create a new worker thread. It should not receive any signals.
602 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
604 pthread_mutex_unlock(&pool
->mutex
);
608 res
= pthread_create(&thread_id
, NULL
, pthreadpool_server
,
611 pool
->num_threads
+= 1;
614 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
616 pthread_mutex_unlock(&pool
->mutex
);