replace: Add don't include unistd.h directly and add uid_wrapper.
[Samba/gebeck_regimport.git] / source3 / lib / pthreadpool / pthreadpool.c
blobc916dc0f8d5ca1dc0f112c310a1f8d676386e5fd
1 /*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "config.h"
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include "system/time.h"
30 #include "system/filesys.h"
32 #include "pthreadpool.h"
33 #include "lib/util/dlinklist.h"
35 struct pthreadpool_job {
36 struct pthreadpool_job *next;
37 int id;
38 void (*fn)(void *private_data);
39 void *private_data;
42 struct pthreadpool {
44 * List pthreadpools for fork safety
46 struct pthreadpool *prev, *next;
49 * Control access to this struct
51 pthread_mutex_t mutex;
54 * Threads waiting for work do so here
56 pthread_cond_t condvar;
59 * List of work jobs
61 struct pthreadpool_job *jobs, *last_job;
64 * pipe for signalling
66 int sig_pipe[2];
69 * indicator to worker threads that they should shut down
71 int shutdown;
74 * maximum number of threads
76 int max_threads;
79 * Number of threads
81 int num_threads;
84 * Number of idle threads
86 int num_idle;
89 * An array of threads that require joining.
91 int num_exited;
92 pthread_t *exited; /* We alloc more */
95 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
96 static struct pthreadpool *pthreadpools = NULL;
97 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
99 static void pthreadpool_prep_atfork(void);
102 * Initialize a thread pool
105 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
107 struct pthreadpool *pool;
108 int ret;
110 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
111 if (pool == NULL) {
112 return ENOMEM;
115 ret = pipe(pool->sig_pipe);
116 if (ret == -1) {
117 int err = errno;
118 free(pool);
119 return err;
122 ret = pthread_mutex_init(&pool->mutex, NULL);
123 if (ret != 0) {
124 close(pool->sig_pipe[0]);
125 close(pool->sig_pipe[1]);
126 free(pool);
127 return ret;
130 ret = pthread_cond_init(&pool->condvar, NULL);
131 if (ret != 0) {
132 pthread_mutex_destroy(&pool->mutex);
133 close(pool->sig_pipe[0]);
134 close(pool->sig_pipe[1]);
135 free(pool);
136 return ret;
139 pool->shutdown = 0;
140 pool->jobs = pool->last_job = NULL;
141 pool->num_threads = 0;
142 pool->num_exited = 0;
143 pool->exited = NULL;
144 pool->max_threads = max_threads;
145 pool->num_idle = 0;
147 ret = pthread_mutex_lock(&pthreadpools_mutex);
148 if (ret != 0) {
149 pthread_cond_destroy(&pool->condvar);
150 pthread_mutex_destroy(&pool->mutex);
151 close(pool->sig_pipe[0]);
152 close(pool->sig_pipe[1]);
153 free(pool);
154 return ret;
156 DLIST_ADD(pthreadpools, pool);
158 ret = pthread_mutex_unlock(&pthreadpools_mutex);
159 assert(ret == 0);
161 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
163 *presult = pool;
165 return 0;
168 static void pthreadpool_prepare(void)
170 int ret;
171 struct pthreadpool *pool;
173 ret = pthread_mutex_lock(&pthreadpools_mutex);
174 assert(ret == 0);
176 pool = pthreadpools;
178 while (pool != NULL) {
179 ret = pthread_mutex_lock(&pool->mutex);
180 assert(ret == 0);
181 pool = pool->next;
185 static void pthreadpool_parent(void)
187 int ret;
188 struct pthreadpool *pool;
190 pool = DLIST_TAIL(pthreadpools);
192 while (1) {
193 ret = pthread_mutex_unlock(&pool->mutex);
194 assert(ret == 0);
196 if (pool == pthreadpools) {
197 break;
199 pool = pool->prev;
202 ret = pthread_mutex_unlock(&pthreadpools_mutex);
203 assert(ret == 0);
206 static void pthreadpool_child(void)
208 int ret;
209 struct pthreadpool *pool;
211 pool = DLIST_TAIL(pthreadpools);
213 while (1) {
214 close(pool->sig_pipe[0]);
215 close(pool->sig_pipe[1]);
217 ret = pipe(pool->sig_pipe);
218 assert(ret == 0);
220 pool->num_threads = 0;
222 pool->num_exited = 0;
223 free(pool->exited);
224 pool->exited = NULL;
226 pool->num_idle = 0;
228 while (pool->jobs != NULL) {
229 struct pthreadpool_job *job;
230 job = pool->jobs;
231 pool->jobs = job->next;
232 free(job);
234 pool->last_job = NULL;
236 ret = pthread_mutex_unlock(&pool->mutex);
237 assert(ret == 0);
239 if (pool == pthreadpools) {
240 break;
242 pool = pool->prev;
245 ret = pthread_mutex_unlock(&pthreadpools_mutex);
246 assert(ret == 0);
249 static void pthreadpool_prep_atfork(void)
251 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
252 pthreadpool_child);
256 * Return the file descriptor which becomes readable when a job has
257 * finished
260 int pthreadpool_signal_fd(struct pthreadpool *pool)
262 return pool->sig_pipe[0];
266 * Do a pthread_join() on all children that have exited, pool->mutex must be
267 * locked
269 static void pthreadpool_join_children(struct pthreadpool *pool)
271 int i;
273 for (i=0; i<pool->num_exited; i++) {
274 pthread_join(pool->exited[i], NULL);
276 pool->num_exited = 0;
279 * Deliberately not free and NULL pool->exited. That will be
280 * re-used by realloc later.
285 * Fetch a finished job number from the signal pipe
288 int pthreadpool_finished_job(struct pthreadpool *pool)
290 int result;
291 ssize_t nread;
293 nread = -1;
294 errno = EINTR;
296 while ((nread == -1) && (errno == EINTR)) {
297 nread = read(pool->sig_pipe[0], &result, sizeof(int));
299 if (nread == -1) {
300 return errno;
302 if (nread != sizeof(int)) {
303 return EINVAL;
305 return result;
309 * Destroy a thread pool, finishing all threads working for it
312 int pthreadpool_destroy(struct pthreadpool *pool)
314 int ret, ret1;
316 ret = pthread_mutex_lock(&pool->mutex);
317 if (ret != 0) {
318 return ret;
321 if ((pool->jobs != NULL) || pool->shutdown) {
322 ret = pthread_mutex_unlock(&pool->mutex);
323 assert(ret == 0);
324 return EBUSY;
327 if (pool->num_threads > 0) {
329 * We have active threads, tell them to finish, wait for that.
332 pool->shutdown = 1;
334 if (pool->num_idle > 0) {
336 * Wake the idle threads. They will find pool->quit to
337 * be set and exit themselves
339 ret = pthread_cond_broadcast(&pool->condvar);
340 if (ret != 0) {
341 pthread_mutex_unlock(&pool->mutex);
342 return ret;
346 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
348 if (pool->num_exited > 0) {
349 pthreadpool_join_children(pool);
350 continue;
353 * A thread that shuts down will also signal
354 * pool->condvar
356 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
357 if (ret != 0) {
358 pthread_mutex_unlock(&pool->mutex);
359 return ret;
364 ret = pthread_mutex_unlock(&pool->mutex);
365 if (ret != 0) {
366 return ret;
368 ret = pthread_mutex_destroy(&pool->mutex);
369 ret1 = pthread_cond_destroy(&pool->condvar);
371 if (ret != 0) {
372 return ret;
374 if (ret1 != 0) {
375 return ret1;
378 ret = pthread_mutex_lock(&pthreadpools_mutex);
379 if (ret != 0) {
380 return ret;
382 DLIST_REMOVE(pthreadpools, pool);
383 ret = pthread_mutex_unlock(&pthreadpools_mutex);
384 assert(ret == 0);
386 close(pool->sig_pipe[0]);
387 pool->sig_pipe[0] = -1;
389 close(pool->sig_pipe[1]);
390 pool->sig_pipe[1] = -1;
392 free(pool->exited);
393 free(pool);
395 return 0;
399 * Prepare for pthread_exit(), pool->mutex must be locked
401 static void pthreadpool_server_exit(struct pthreadpool *pool)
403 pthread_t *exited;
405 pool->num_threads -= 1;
407 exited = (pthread_t *)realloc(
408 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
410 if (exited == NULL) {
411 /* lost a thread status */
412 return;
414 pool->exited = exited;
416 pool->exited[pool->num_exited] = pthread_self();
417 pool->num_exited += 1;
420 static void *pthreadpool_server(void *arg)
422 struct pthreadpool *pool = (struct pthreadpool *)arg;
423 int res;
425 res = pthread_mutex_lock(&pool->mutex);
426 if (res != 0) {
427 return NULL;
430 while (1) {
431 struct timespec ts;
432 struct pthreadpool_job *job;
435 * idle-wait at most 1 second. If nothing happens in that
436 * time, exit this thread.
439 clock_gettime(CLOCK_REALTIME, &ts);
440 ts.tv_sec += 1;
442 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
444 pool->num_idle += 1;
445 res = pthread_cond_timedwait(
446 &pool->condvar, &pool->mutex, &ts);
447 pool->num_idle -= 1;
449 if (res == ETIMEDOUT) {
451 if (pool->jobs == NULL) {
453 * we timed out and still no work for
454 * us. Exit.
456 pthreadpool_server_exit(pool);
457 pthread_mutex_unlock(&pool->mutex);
458 return NULL;
461 break;
463 assert(res == 0);
466 job = pool->jobs;
468 if (job != NULL) {
469 ssize_t written;
472 * Ok, there's work for us to do, remove the job from
473 * the pthreadpool list
475 pool->jobs = job->next;
476 if (pool->last_job == job) {
477 pool->last_job = NULL;
481 * Do the work with the mutex unlocked
484 res = pthread_mutex_unlock(&pool->mutex);
485 assert(res == 0);
487 job->fn(job->private_data);
489 written = write(pool->sig_pipe[1], &job->id,
490 sizeof(int));
492 free(job);
494 res = pthread_mutex_lock(&pool->mutex);
495 assert(res == 0);
497 if (written != sizeof(int)) {
498 pthreadpool_server_exit(pool);
499 pthread_mutex_unlock(&pool->mutex);
500 return NULL;
504 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
506 * No more work to do and we're asked to shut down, so
507 * exit
509 pthreadpool_server_exit(pool);
511 if (pool->num_threads == 0) {
513 * Ping the main thread waiting for all of us
514 * workers to have quit.
516 pthread_cond_broadcast(&pool->condvar);
519 pthread_mutex_unlock(&pool->mutex);
520 return NULL;
525 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
526 void (*fn)(void *private_data), void *private_data)
528 struct pthreadpool_job *job;
529 pthread_t thread_id;
530 int res;
531 sigset_t mask, omask;
533 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
534 if (job == NULL) {
535 return ENOMEM;
538 job->fn = fn;
539 job->private_data = private_data;
540 job->id = job_id;
541 job->next = NULL;
543 res = pthread_mutex_lock(&pool->mutex);
544 if (res != 0) {
545 free(job);
546 return res;
549 if (pool->shutdown) {
551 * Protect against the pool being shut down while
552 * trying to add a job
554 res = pthread_mutex_unlock(&pool->mutex);
555 assert(res == 0);
556 free(job);
557 return EINVAL;
561 * Just some cleanup under the mutex
563 pthreadpool_join_children(pool);
566 * Add job to the end of the queue
568 if (pool->jobs == NULL) {
569 pool->jobs = job;
571 else {
572 pool->last_job->next = job;
574 pool->last_job = job;
576 if (pool->num_idle > 0) {
578 * We have idle threads, wake one.
580 res = pthread_cond_signal(&pool->condvar);
581 pthread_mutex_unlock(&pool->mutex);
582 return res;
585 if ((pool->max_threads != 0) &&
586 (pool->num_threads >= pool->max_threads)) {
588 * No more new threads, we just queue the request
590 pthread_mutex_unlock(&pool->mutex);
591 return 0;
595 * Create a new worker thread. It should not receive any signals.
598 sigfillset(&mask);
600 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
601 if (res != 0) {
602 pthread_mutex_unlock(&pool->mutex);
603 return res;
606 res = pthread_create(&thread_id, NULL, pthreadpool_server,
607 (void *)pool);
608 if (res == 0) {
609 pool->num_threads += 1;
612 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
614 pthread_mutex_unlock(&pool->mutex);
615 return res;