2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
7 static __thread
unsigned mog_do_quit
;
8 struct mog_thr_start_arg
{
9 struct mog_thrpool
*tp
;
15 static sigset_t quitset
;
17 __attribute__((constructor
)) static void thrpool_init(void)
19 CHECK(int, 0, sigfillset(&quitset
));
20 CHECK(int, 0, sigdelset(&quitset
, SIGURG
));
23 /* child thread notifies the parent about its readiness */
24 static void *thr_start_wrapper(void *ptr
)
26 struct mog_thr_start_arg
*arg
= ptr
;
27 struct mog_thrpool
*tp
;
30 CHECK(int, 0, pthread_sigmask(SIG_SETMASK
, &quitset
, NULL
));
31 CHECK(int, 0, pthread_mutex_lock(&arg
->mtx
));
33 arg
->do_quit
= &mog_do_quit
;
34 tp
= arg
->tp
; /* arg becomes invalid once we unlock */
36 CHECK(int, 0, pthread_cond_signal(&arg
->cond
));
37 CHECK(int, 0, pthread_mutex_unlock(&arg
->mtx
));
39 return tp
->start_fn(tp
->start_arg
);
42 /* child thread tests if its quit flag is set and exits if it is */
43 void mog_thr_test_quit(void)
45 if (__sync_add_and_fetch(&mog_do_quit
, 0) != 0) {
51 bool mog_thr_prepare_quit(void)
53 /* no barriers or atomic instructions, this is just a hint */
58 * we no longer rely on pthreads cancellation, so our explicit checks for
59 * thread quitting requires us to continuously signal a thread for death
60 * in case it enters a sleeping syscall (epoll_wait/kevent) immediately
61 * after checking the mog_do_quit TLS variable
63 static void poke(pthread_t thr
, int sig
)
68 * This is an uncommon code path and only triggered when
69 * we lower thread counts or shut down
71 while ((err
= pthread_kill(thr
, sig
)) == 0) {
73 * sleep for 10 ms, sched_yield still burns too much CPU
74 * on FreeBSD (and likely other OSes) if a thread is waiting
77 struct timespec ts
= { .tv_sec
= 0, .tv_nsec
= 10e6
};
78 int rc
= nanosleep(&ts
, NULL
);
81 assert(errno
!= EINVAL
&& "bug in using nanosleep");
83 assert(err
== ESRCH
&& "pthread_kill() usage bug");
87 thr_create_fail_retry(struct mog_thrpool
*tp
, unsigned size
,
88 unsigned long *nr_eagain
, int err
)
90 /* do not leave the pool w/o threads at all */
91 if (tp
->n_threads
== 0) {
92 if ((++*nr_eagain
% 1024) == 0) {
94 syslog(LOG_ERR
, "pthread_create: %m (tries: %lu)",
102 "pthread_create: %m, only running %u of %u threads",
103 tp
->n_threads
, size
);
109 thrpool_add(struct mog_thrpool
*tp
, unsigned size
, unsigned long *nr_eagain
)
111 struct mog_thr_start_arg arg
= {
112 .mtx
= PTHREAD_MUTEX_INITIALIZER
,
113 .cond
= PTHREAD_COND_INITIALIZER
,
117 size_t bytes
= (tp
->n_threads
+ 1) * sizeof(struct mog_thread
);
120 assert(tp
&& "tp no defined");
122 tp
->threads
= xrealloc(tp
->threads
, bytes
);
124 CHECK(int, 0, pthread_attr_init(&attr
));
126 thr
= &tp
->threads
[tp
->n_threads
].thr
;
128 CHECK(int, 0, pthread_mutex_lock(&arg
.mtx
));
129 rc
= pthread_create(thr
, &attr
, thr_start_wrapper
, &arg
);
130 CHECK(int, 0, pthread_attr_destroy(&attr
));
132 CHECK(int, 0, pthread_cond_wait(&arg
.cond
, &arg
.mtx
));
133 tp
->threads
[tp
->n_threads
].do_quit
= arg
.do_quit
;
135 CHECK(int, 0, pthread_mutex_unlock(&arg
.mtx
));
140 } else if (mog_pthread_create_retryable(rc
)) {
141 if (!thr_create_fail_retry(tp
, size
, nr_eagain
, rc
))
144 assert(rc
== 0 && "pthread_create usage error");
149 void mog_thrpool_set_size(struct mog_thrpool
*tp
, unsigned size
)
151 unsigned long nr_eagain
= 0;
153 CHECK(int, 0, pthread_mutex_lock(&tp
->lock
));
155 while (size
> tp
->n_threads
&& thrpool_add(tp
, size
, &nr_eagain
))
158 if (tp
->n_threads
> size
) {
162 /* set the do_quit flag for all threads we kill */
163 for (i
= size
; i
< tp
->n_threads
; i
++) {
164 __sync_add_and_fetch(tp
->threads
[i
].do_quit
, 1);
165 err
= pthread_kill(tp
->threads
[i
].thr
, SIGURG
);
172 assert(0 && "pthread_kill usage bug" && err
);
176 /* keep poking them to kick them out out epoll_wait/kevent */
177 for (i
= size
; i
< tp
->n_threads
; i
++) {
178 poke(tp
->threads
[i
].thr
, SIGURG
);
180 CHECK(int, 0, pthread_join(tp
->threads
[i
].thr
, NULL
));
182 tp
->n_threads
= size
;
184 CHECK(int, 0, pthread_mutex_unlock(&tp
->lock
));
188 mog_thrpool_start(struct mog_thrpool
*tp
, unsigned nthr
,
189 void *(*start_fn
)(void *), void *arg
)
191 /* we may be started on a new server before device dirs exist */
197 tp
->start_fn
= start_fn
;
199 CHECK(int, 0, pthread_mutex_init(&tp
->lock
, NULL
));
200 mog_thrpool_set_size(tp
, nthr
);
203 void mog_thrpool_quit(struct mog_thrpool
*tp
, struct mog_queue
*q
)
205 mog_thrpool_set_size(tp
, 0);
206 CHECK(int, 0, pthread_mutex_destroy(&tp
->lock
));
207 mog_free_and_null(&tp
->threads
);