2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/threads.h"
25 #include "pthreadpool.h"
26 #include "lib/util/dlinklist.h"
29 struct pthreadpool_job
{
31 void (*fn
)(void *private_data
);
37 * List pthreadpools for fork safety
39 struct pthreadpool
*prev
, *next
;
42 * Control access to this struct
44 pthread_mutex_t mutex
;
47 * Threads waiting for work do so here
49 pthread_cond_t condvar
;
54 size_t jobs_array_len
;
55 struct pthreadpool_job
*jobs
;
66 * indicator to worker threads that they should shut down
71 * maximum number of threads
81 * Number of idle threads
86 * An array of threads that require joining.
89 pthread_t
*exited
; /* We alloc more */
92 static pthread_mutex_t pthreadpools_mutex
= PTHREAD_MUTEX_INITIALIZER
;
93 static struct pthreadpool
*pthreadpools
= NULL
;
94 static pthread_once_t pthreadpool_atfork_initialized
= PTHREAD_ONCE_INIT
;
96 static void pthreadpool_prep_atfork(void);
99 * Initialize a thread pool
102 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
)
104 struct pthreadpool
*pool
;
107 pool
= (struct pthreadpool
*)malloc(sizeof(struct pthreadpool
));
112 pool
->jobs_array_len
= 4;
114 pool
->jobs_array_len
, sizeof(struct pthreadpool_job
));
116 if (pool
->jobs
== NULL
) {
121 pool
->head
= pool
->num_jobs
= 0;
123 ret
= pipe(pool
->sig_pipe
);
131 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
133 close(pool
->sig_pipe
[0]);
134 close(pool
->sig_pipe
[1]);
140 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
142 pthread_mutex_destroy(&pool
->mutex
);
143 close(pool
->sig_pipe
[0]);
144 close(pool
->sig_pipe
[1]);
151 pool
->num_threads
= 0;
152 pool
->num_exited
= 0;
154 pool
->max_threads
= max_threads
;
157 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
159 pthread_cond_destroy(&pool
->condvar
);
160 pthread_mutex_destroy(&pool
->mutex
);
161 close(pool
->sig_pipe
[0]);
162 close(pool
->sig_pipe
[1]);
167 DLIST_ADD(pthreadpools
, pool
);
169 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
172 pthread_once(&pthreadpool_atfork_initialized
, pthreadpool_prep_atfork
);
179 static void pthreadpool_prepare(void)
182 struct pthreadpool
*pool
;
184 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
189 while (pool
!= NULL
) {
190 ret
= pthread_mutex_lock(&pool
->mutex
);
196 static void pthreadpool_parent(void)
199 struct pthreadpool
*pool
;
201 for (pool
= DLIST_TAIL(pthreadpools
);
203 pool
= DLIST_PREV(pool
)) {
204 ret
= pthread_mutex_unlock(&pool
->mutex
);
208 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
212 static void pthreadpool_child(void)
215 struct pthreadpool
*pool
;
217 for (pool
= DLIST_TAIL(pthreadpools
);
219 pool
= DLIST_PREV(pool
)) {
221 close(pool
->sig_pipe
[0]);
222 close(pool
->sig_pipe
[1]);
224 ret
= pipe(pool
->sig_pipe
);
227 pool
->num_threads
= 0;
229 pool
->num_exited
= 0;
237 ret
= pthread_mutex_unlock(&pool
->mutex
);
241 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
245 static void pthreadpool_prep_atfork(void)
247 pthread_atfork(pthreadpool_prepare
, pthreadpool_parent
,
252 * Return the file descriptor which becomes readable when a job has
256 int pthreadpool_signal_fd(struct pthreadpool
*pool
)
258 return pool
->sig_pipe
[0];
262 * Do a pthread_join() on all children that have exited, pool->mutex must be
265 static void pthreadpool_join_children(struct pthreadpool
*pool
)
269 for (i
=0; i
<pool
->num_exited
; i
++) {
272 ret
= pthread_join(pool
->exited
[i
], NULL
);
275 * Severe internal error, we can't do much but
281 pool
->num_exited
= 0;
284 * Deliberately not free and NULL pool->exited. That will be
285 * re-used by realloc later.
290 * Fetch a finished job number from the signal pipe
293 int pthreadpool_finished_jobs(struct pthreadpool
*pool
, int *jobids
,
296 ssize_t to_read
, nread
;
301 to_read
= sizeof(int) * num_jobids
;
303 while ((nread
== -1) && (errno
== EINTR
)) {
304 nread
= read(pool
->sig_pipe
[0], jobids
, to_read
);
309 if ((nread
% sizeof(int)) != 0) {
312 return nread
/ sizeof(int);
316 * Destroy a thread pool, finishing all threads working for it
319 int pthreadpool_destroy(struct pthreadpool
*pool
)
323 ret
= pthread_mutex_lock(&pool
->mutex
);
328 if ((pool
->num_jobs
!= 0) || pool
->shutdown
) {
329 ret
= pthread_mutex_unlock(&pool
->mutex
);
334 if (pool
->num_threads
> 0) {
336 * We have active threads, tell them to finish, wait for that.
341 if (pool
->num_idle
> 0) {
343 * Wake the idle threads. They will find
344 * pool->shutdown to be set and exit themselves
346 ret
= pthread_cond_broadcast(&pool
->condvar
);
348 pthread_mutex_unlock(&pool
->mutex
);
353 while ((pool
->num_threads
> 0) || (pool
->num_exited
> 0)) {
355 if (pool
->num_exited
> 0) {
356 pthreadpool_join_children(pool
);
360 * A thread that shuts down will also signal
363 ret
= pthread_cond_wait(&pool
->condvar
, &pool
->mutex
);
365 pthread_mutex_unlock(&pool
->mutex
);
371 ret
= pthread_mutex_unlock(&pool
->mutex
);
375 ret
= pthread_mutex_destroy(&pool
->mutex
);
376 ret1
= pthread_cond_destroy(&pool
->condvar
);
385 ret
= pthread_mutex_lock(&pthreadpools_mutex
);
389 DLIST_REMOVE(pthreadpools
, pool
);
390 ret
= pthread_mutex_unlock(&pthreadpools_mutex
);
393 close(pool
->sig_pipe
[0]);
394 pool
->sig_pipe
[0] = -1;
396 close(pool
->sig_pipe
[1]);
397 pool
->sig_pipe
[1] = -1;
407 * Prepare for pthread_exit(), pool->mutex must be locked
409 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
413 pool
->num_threads
-= 1;
415 exited
= (pthread_t
*)realloc(
416 pool
->exited
, sizeof(pthread_t
) * (pool
->num_exited
+ 1));
418 if (exited
== NULL
) {
419 /* lost a thread status */
422 pool
->exited
= exited
;
424 pool
->exited
[pool
->num_exited
] = pthread_self();
425 pool
->num_exited
+= 1;
428 static bool pthreadpool_get_job(struct pthreadpool
*p
,
429 struct pthreadpool_job
*job
)
431 if (p
->num_jobs
== 0) {
434 *job
= p
->jobs
[p
->head
];
435 p
->head
= (p
->head
+1) % p
->jobs_array_len
;
440 static bool pthreadpool_put_job(struct pthreadpool
*p
,
442 void (*fn
)(void *private_data
),
445 struct pthreadpool_job
*job
;
447 if (p
->num_jobs
== p
->jobs_array_len
) {
448 struct pthreadpool_job
*tmp
;
449 size_t new_len
= p
->jobs_array_len
* 2;
452 p
->jobs
, sizeof(struct pthreadpool_job
) * new_len
);
459 * We just doubled the jobs array. The array implements a FIFO
460 * queue with a modulo-based wraparound, so we have to memcpy
461 * the jobs that are logically at the queue end but physically
462 * before the queue head into the reallocated area. The new
463 * space starts at the current jobs_array_len, and we have to
464 * copy everything before the current head job into the new
467 memcpy(&p
->jobs
[p
->jobs_array_len
], p
->jobs
,
468 sizeof(struct pthreadpool_job
) * p
->head
);
470 p
->jobs_array_len
= new_len
;
473 job
= &p
->jobs
[(p
->head
+ p
->num_jobs
) % p
->jobs_array_len
];
476 job
->private_data
= private_data
;
483 static void *pthreadpool_server(void *arg
)
485 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
488 res
= pthread_mutex_lock(&pool
->mutex
);
495 struct pthreadpool_job job
;
498 * idle-wait at most 1 second. If nothing happens in that
499 * time, exit this thread.
502 clock_gettime(CLOCK_REALTIME
, &ts
);
505 while ((pool
->num_jobs
== 0) && (pool
->shutdown
== 0)) {
508 res
= pthread_cond_timedwait(
509 &pool
->condvar
, &pool
->mutex
, &ts
);
512 if (res
== ETIMEDOUT
) {
514 if (pool
->num_jobs
== 0) {
516 * we timed out and still no work for
519 pthreadpool_server_exit(pool
);
520 pthread_mutex_unlock(&pool
->mutex
);
529 if (pthreadpool_get_job(pool
, &job
)) {
531 int sig_pipe
= pool
->sig_pipe
[1];
534 * Do the work with the mutex unlocked
537 res
= pthread_mutex_unlock(&pool
->mutex
);
540 job
.fn(job
.private_data
);
542 res
= pthread_mutex_lock(&pool
->mutex
);
545 written
= write(sig_pipe
, &job
.id
, sizeof(job
.id
));
546 if (written
!= sizeof(int)) {
547 pthreadpool_server_exit(pool
);
548 pthread_mutex_unlock(&pool
->mutex
);
553 if ((pool
->num_jobs
== 0) && (pool
->shutdown
!= 0)) {
555 * No more work to do and we're asked to shut down, so
558 pthreadpool_server_exit(pool
);
560 if (pool
->num_threads
== 0) {
562 * Ping the main thread waiting for all of us
563 * workers to have quit.
565 pthread_cond_broadcast(&pool
->condvar
);
568 pthread_mutex_unlock(&pool
->mutex
);
574 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
575 void (*fn
)(void *private_data
), void *private_data
)
579 sigset_t mask
, omask
;
581 res
= pthread_mutex_lock(&pool
->mutex
);
586 if (pool
->shutdown
) {
588 * Protect against the pool being shut down while
589 * trying to add a job
591 res
= pthread_mutex_unlock(&pool
->mutex
);
597 * Just some cleanup under the mutex
599 pthreadpool_join_children(pool
);
602 * Add job to the end of the queue
604 if (!pthreadpool_put_job(pool
, job_id
, fn
, private_data
)) {
605 pthread_mutex_unlock(&pool
->mutex
);
609 if (pool
->num_idle
> 0) {
611 * We have idle threads, wake one.
613 res
= pthread_cond_signal(&pool
->condvar
);
614 pthread_mutex_unlock(&pool
->mutex
);
618 if ((pool
->max_threads
!= 0) &&
619 (pool
->num_threads
>= pool
->max_threads
)) {
621 * No more new threads, we just queue the request
623 pthread_mutex_unlock(&pool
->mutex
);
628 * Create a new worker thread. It should not receive any signals.
633 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
635 pthread_mutex_unlock(&pool
->mutex
);
639 res
= pthread_create(&thread_id
, NULL
, pthreadpool_server
,
642 pool
->num_threads
+= 1;
645 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
647 pthread_mutex_unlock(&pool
->mutex
);