2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 #include "pthreadpool.h"
32 struct pthreadpool_job
{
33 struct pthreadpool_job
*next
;
35 void (*fn
)(void *private_data
);
41 * Control access to this struct
43 pthread_mutex_t mutex
;
46 * Threads waiting for work do so here
48 pthread_cond_t condvar
;
53 struct pthreadpool_job
*jobs
, *last_job
;
61 * indicator to worker threads that they should shut down
66 * maximum number of threads
76 * Number of idle threads
81 * An array of threads that require joining, the array has
82 * "max_threads" elements. It contains "num_exited" ids.
85 pthread_t exited
[1]; /* We alloc more */
89 * Initialize a thread pool
92 int pthreadpool_init(unsigned max_threads
, struct pthreadpool
**presult
)
94 struct pthreadpool
*pool
;
98 size
= sizeof(struct pthreadpool
) + max_threads
* sizeof(pthread_t
);
100 pool
= (struct pthreadpool
*)malloc(size
);
105 ret
= pthread_mutex_init(&pool
->mutex
, NULL
);
111 ret
= pthread_cond_init(&pool
->condvar
, NULL
);
113 pthread_mutex_destroy(&pool
->mutex
);
119 pool
->jobs
= pool
->last_job
= NULL
;
120 pool
->num_threads
= 0;
121 pool
->num_exited
= 0;
122 pool
->max_threads
= max_threads
;
124 pool
->sig_pipe
[0] = -1;
125 pool
->sig_pipe
[1] = -1;
132 * Create and return a file descriptor which becomes readable when a job has
136 int pthreadpool_sig_fd(struct pthreadpool
*pool
)
140 ret
= pthread_mutex_lock(&pool
->mutex
);
146 if (pool
->sig_pipe
[0] != -1) {
147 result
= pool
->sig_pipe
[0];
151 ret
= pipe(pool
->sig_pipe
);
157 result
= pool
->sig_pipe
[0];
159 ret
= pthread_mutex_unlock(&pool
->mutex
);
165 * Do a pthread_join() on all children that have exited, pool->mutex must be
168 static void pthreadpool_join_children(struct pthreadpool
*pool
)
172 for (i
=0; i
<pool
->num_exited
; i
++) {
173 pthread_join(pool
->exited
[i
], NULL
);
175 pool
->num_exited
= 0;
179 * Fetch a finished job number from the signal pipe
182 int pthreadpool_finished_job(struct pthreadpool
*pool
)
187 ret
= pthread_mutex_lock(&pool
->mutex
);
194 * Just some cleanup under the mutex
196 pthreadpool_join_children(pool
);
198 fd
= pool
->sig_pipe
[0];
200 ret
= pthread_mutex_unlock(&pool
->mutex
);
211 while ((nread
== -1) && (errno
== EINTR
)) {
212 nread
= read(fd
, &result
, sizeof(int));
216 * TODO: handle nread > 0 && nread < sizeof(int)
220 * Lock the mutex to provide a memory barrier for data from the worker
221 * thread to the main thread. The pipe access itself does not have to
222 * be locked, for sizeof(int) the write to a pipe is atomic, and only
223 * one thread reads from it. But we need to lock the mutex briefly
224 * even if we don't do anything under the lock, to make sure we can
225 * see all memory the helper thread has written.
228 ret
= pthread_mutex_lock(&pool
->mutex
);
234 ret
= pthread_mutex_unlock(&pool
->mutex
);
241 * Destroy a thread pool, finishing all threads working for it
244 int pthreadpool_destroy(struct pthreadpool
*pool
)
248 ret
= pthread_mutex_lock(&pool
->mutex
);
253 if (pool
->num_threads
> 0) {
255 * We have active threads, tell them to finish, wait for that.
260 if (pool
->num_idle
> 0) {
262 * Wake the idle threads. They will find pool->quit to
263 * be set and exit themselves
265 ret
= pthread_cond_broadcast(&pool
->condvar
);
267 pthread_mutex_unlock(&pool
->mutex
);
272 while ((pool
->num_threads
> 0) || (pool
->num_exited
> 0)) {
274 if (pool
->num_exited
> 0) {
275 pthreadpool_join_children(pool
);
279 * A thread that shuts down will also signal
282 ret
= pthread_cond_wait(&pool
->condvar
, &pool
->mutex
);
284 pthread_mutex_unlock(&pool
->mutex
);
290 ret
= pthread_mutex_unlock(&pool
->mutex
);
294 ret
= pthread_mutex_destroy(&pool
->mutex
);
295 ret1
= pthread_cond_destroy(&pool
->condvar
);
297 if ((ret
== 0) && (ret1
== 0)) {
308 * Prepare for pthread_exit(), pool->mutex must be locked
310 static void pthreadpool_server_exit(struct pthreadpool
*pool
)
312 pool
->num_threads
-= 1;
313 pool
->exited
[pool
->num_exited
] = pthread_self();
314 pool
->num_exited
+= 1;
317 static void *pthreadpool_server(void *arg
)
319 struct pthreadpool
*pool
= (struct pthreadpool
*)arg
;
322 res
= pthread_mutex_lock(&pool
->mutex
);
328 struct timespec timeout
;
329 struct pthreadpool_job
*job
;
332 * idle-wait at most 1 second. If nothing happens in that
333 * time, exit this thread.
336 timeout
.tv_sec
= time(NULL
) + 1;
339 while ((pool
->jobs
== NULL
) && (pool
->shutdown
== 0)) {
342 res
= pthread_cond_timedwait(
343 &pool
->condvar
, &pool
->mutex
, &timeout
);
346 if (res
== ETIMEDOUT
) {
348 if (pool
->jobs
== NULL
) {
350 * we timed out and still no work for
353 pthreadpool_server_exit(pool
);
354 pthread_mutex_unlock(&pool
->mutex
);
366 int fd
= pool
->sig_pipe
[1];
370 * Ok, there's work for us to do, remove the job from
371 * the pthreadpool list
373 pool
->jobs
= job
->next
;
374 if (pool
->last_job
== job
) {
375 pool
->last_job
= NULL
;
379 * Do the work with the mutex unlocked :-)
382 res
= pthread_mutex_unlock(&pool
->mutex
);
385 job
->fn(job
->private_data
);
387 written
= sizeof(int);
389 res
= pthread_mutex_lock(&pool
->mutex
);
393 written
= write(fd
, &job
->id
, sizeof(int));
398 if (written
!= sizeof(int)) {
399 pthreadpool_server_exit(pool
);
400 pthread_mutex_unlock(&pool
->mutex
);
405 if ((pool
->jobs
== NULL
) && (pool
->shutdown
!= 0)) {
407 * No more work to do and we're asked to shut down, so
410 pthreadpool_server_exit(pool
);
412 if (pool
->num_threads
== 0) {
414 * Ping the main thread waiting for all of us
415 * workers to have quit.
417 pthread_cond_broadcast(&pool
->condvar
);
420 pthread_mutex_unlock(&pool
->mutex
);
426 int pthreadpool_add_job(struct pthreadpool
*pool
, int job_id
,
427 void (*fn
)(void *private_data
), void *private_data
)
429 struct pthreadpool_job
*job
;
432 sigset_t mask
, omask
;
434 job
= (struct pthreadpool_job
*)malloc(sizeof(struct pthreadpool_job
));
440 job
->private_data
= private_data
;
444 res
= pthread_mutex_lock(&pool
->mutex
);
451 * Just some cleanup under the mutex
453 pthreadpool_join_children(pool
);
456 * Add job to the end of the queue
458 if (pool
->jobs
== NULL
) {
462 pool
->last_job
->next
= job
;
464 pool
->last_job
= job
;
466 if (pool
->num_idle
> 0) {
468 * We have idle threads, wake one.
470 res
= pthread_cond_signal(&pool
->condvar
);
471 pthread_mutex_unlock(&pool
->mutex
);
475 if (pool
->num_threads
>= pool
->max_threads
) {
477 * No more new threads, we just queue the request
479 pthread_mutex_unlock(&pool
->mutex
);
484 * Create a new worker thread. It should not receive any signals.
489 res
= pthread_sigmask(SIG_BLOCK
, &mask
, &omask
);
491 pthread_mutex_unlock(&pool
->mutex
);
495 res
= pthread_create(&thread_id
, NULL
, pthreadpool_server
,
498 pool
->num_threads
+= 1;
501 assert(pthread_sigmask(SIG_SETMASK
, &omask
, NULL
) == 0);
503 pthread_mutex_unlock(&pool
->mutex
);