cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / thrpool.c
blob56a7ef0a8ca87f1ee9317bb210e8d4703a23e845
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
7 static __thread unsigned mog_do_quit;
8 struct mog_thr_start_arg {
9 struct mog_thrpool *tp;
10 pthread_mutex_t mtx;
11 pthread_cond_t cond;
12 unsigned *do_quit;
15 static sigset_t quitset;
17 __attribute__((constructor)) static void thrpool_init(void)
19 CHECK(int, 0, sigfillset(&quitset));
20 CHECK(int, 0, sigdelset(&quitset, SIGURG));
23 /* child thread notifies the parent about its readiness */
24 static void *thr_start_wrapper(void *ptr)
26 struct mog_thr_start_arg *arg = ptr;
27 struct mog_thrpool *tp;
29 mog_do_quit = 0;
30 CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL));
31 CHECK(int, 0, pthread_mutex_lock(&arg->mtx));
33 arg->do_quit = &mog_do_quit;
34 tp = arg->tp; /* arg becomes invalid once we unlock */
36 CHECK(int, 0, pthread_cond_signal(&arg->cond));
37 CHECK(int, 0, pthread_mutex_unlock(&arg->mtx));
39 return tp->start_fn(tp->start_arg);
42 /* child thread tests if its quit flag is set and exits if it is */
43 void mog_thr_test_quit(void)
45 if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) {
46 mog_alloc_quit();
47 pthread_exit(NULL);
51 bool mog_thr_prepare_quit(void)
53 /* no barriers or atomic instructions, this is just a hint */
54 return !!mog_do_quit;
58 * we no longer rely on pthreads cancellation, so our explicit checks for
59 * thread quitting requires us to continuously signal a thread for death
60 * in case it enters a sleeping syscall (epoll_wait/kevent) immediately
61 * after checking the mog_do_quit TLS variable
63 static void poke(pthread_t thr, int sig)
65 int err;
68 * This is an uncommon code path and only triggered when
69 * we lower thread counts or shut down
71 while ((err = pthread_kill(thr, sig)) == 0) {
73 * sleep for 10 ms, sched_yield still burns too much CPU
74 * on FreeBSD (and likely other OSes) if a thread is waiting
75 * on disk I/O.
77 struct timespec ts = { .tv_sec = 0, .tv_nsec = 10e6 };
78 int rc = nanosleep(&ts, NULL);
80 if (rc != 0)
81 assert(errno != EINVAL && "bug in using nanosleep");
83 assert(err == ESRCH && "pthread_kill() usage bug");
86 static bool
87 thr_create_fail_retry(struct mog_thrpool *tp, unsigned size,
88 unsigned long *nr_eagain, int err)
90 /* do not leave the pool w/o threads at all */
91 if (tp->n_threads == 0) {
92 if ((++*nr_eagain % 1024) == 0) {
93 errno = err;
94 syslog(LOG_ERR, "pthread_create: %m (tries: %lu)",
95 *nr_eagain);
97 mog_yield();
98 return true;
99 } else {
100 errno = err;
101 syslog(LOG_ERR,
102 "pthread_create: %m, only running %u of %u threads",
103 tp->n_threads, size);
104 return false;
108 static bool
109 thrpool_add(struct mog_thrpool *tp, unsigned size, unsigned long *nr_eagain)
111 struct mog_thr_start_arg arg = {
112 .mtx = PTHREAD_MUTEX_INITIALIZER,
113 .cond = PTHREAD_COND_INITIALIZER,
115 pthread_t *thr;
116 pthread_attr_t attr;
117 size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread);
118 int rc;
120 assert(tp && "tp no defined");
121 arg.tp = tp;
122 tp->threads = xrealloc(tp->threads, bytes);
124 CHECK(int, 0, pthread_attr_init(&attr));
126 thr = &tp->threads[tp->n_threads].thr;
128 CHECK(int, 0, pthread_mutex_lock(&arg.mtx));
129 rc = pthread_create(thr, &attr, thr_start_wrapper, &arg);
130 CHECK(int, 0, pthread_attr_destroy(&attr));
131 if (rc == 0) {
132 CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx));
133 tp->threads[tp->n_threads].do_quit = arg.do_quit;
135 CHECK(int, 0, pthread_mutex_unlock(&arg.mtx));
137 if (rc == 0) {
138 tp->n_threads++;
139 *nr_eagain = 0;
140 } else if (mog_pthread_create_retryable(rc)) {
141 if (!thr_create_fail_retry(tp, size, nr_eagain, rc))
142 return false;
143 } else {
144 assert(rc == 0 && "pthread_create usage error");
146 return true;
149 void mog_thrpool_set_size(struct mog_thrpool *tp, unsigned size)
151 unsigned long nr_eagain = 0;
153 CHECK(int, 0, pthread_mutex_lock(&tp->lock));
155 while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain))
156 /* nothing */;
158 if (tp->n_threads > size) {
159 unsigned i;
160 int err;
162 /* set the do_quit flag for all threads we kill */
163 for (i = size; i < tp->n_threads; i++) {
164 __sync_add_and_fetch(tp->threads[i].do_quit, 1);
165 err = pthread_kill(tp->threads[i].thr, SIGURG);
167 switch (err) {
168 case 0:
169 case ESRCH:
170 break;
171 default:
172 assert(0 && "pthread_kill usage bug" && err);
176 /* keep poking them to kick them out out epoll_wait/kevent */
177 for (i = size; i < tp->n_threads; i++) {
178 poke(tp->threads[i].thr, SIGURG);
180 CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL));
182 tp->n_threads = size;
184 CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
187 void
188 mog_thrpool_start(struct mog_thrpool *tp, unsigned nthr,
189 void *(*start_fn)(void *), void *arg)
191 /* we may be started on a new server before device dirs exist */
192 if (nthr == 0)
193 nthr = 1;
195 tp->threads = NULL;
196 tp->n_threads = 0;
197 tp->start_fn = start_fn;
198 tp->start_arg = arg;
199 CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL));
200 mog_thrpool_set_size(tp, nthr);
203 void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
205 mog_thrpool_set_size(tp, 0);
206 CHECK(int, 0, pthread_mutex_destroy(&tp->lock));
207 mog_free_and_null(&tp->threads);