cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / svc.c
blobb3b920099d1b26263efab9dbde86c045a59d014b
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
6 #define _GNU_SOURCE 1 /* needed for _ATFILE_SOURCE on glibc 2.5 - 2.9 */
7 #include <dirent.h> /* needed for FreeBSD */
8 #include "cmogstored.h"
10 /* same default as MogileFS upstream */
11 static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER;
12 static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */
13 static mode_t mog_umask;
16 * maintain an internal queue of requests for the "server aio_threads = N"
17 * command in the side channel. The worker handling the client request must
18 * tell the main thread do change thread counts asynchronously (because
19 * the worker thread handling the request may die from a thread count
20 * reduction, so we have a worker thread make a fire-and-forget request
21 * to the notify thread.
23 static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER;
24 static SIMPLEQ_HEAD(sq, mog_svc) aio_threads_qhead =
25 SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead);
27 static void svc_free(void *ptr)
29 struct mog_svc *svc = ptr;
31 if (closedir(svc->dir) < 0)
32 syslog(LOG_ERR, "closedir(%s) failed with: %m", svc->docroot);
33 CHECK(int, 0, pthread_mutex_destroy(&svc->devstats_lock));
34 mog_free(svc->docroot);
35 if (svc->by_st_dev)
36 hash_free(svc->by_st_dev);
37 if (svc->by_mog_devid)
38 hash_free(svc->by_mog_devid);
39 free(svc);
42 static size_t svc_hash(const void *x, size_t tablesize)
44 const struct mog_svc *svc = x;
46 return hash_string(svc->docroot, tablesize);
49 static bool svc_cmp(const void *a, const void *b)
51 const struct mog_svc *svc_a = a;
52 const struct mog_svc *svc_b = b;
54 return strcmp(svc_a->docroot, svc_b->docroot) == 0;
57 static void svc_atexit(void) /* called atexit */
59 hash_free(by_docroot);
62 static void svc_once(void)
64 by_docroot = hash_initialize(7, NULL, svc_hash, svc_cmp, svc_free);
65 mog_oom_if_null(by_docroot);
67 mog_umask = umask(0);
68 umask(mog_umask);
69 atexit(svc_atexit);
72 bool mog_svc_atfork_child(void *svc_ptr, void *parent)
74 struct mog_svc *svc = svc_ptr;
75 pid_t ppid = *((pid_t *)parent);
76 const char *failfn;
78 if (closedir(svc->dir) < 0) {
79 failfn = "closedir";
80 goto err;
83 svc->dir = opendir(svc->docroot);
84 if (svc->dir == NULL) {
85 failfn = "opendir";
86 goto err;
89 svc->docroot_fd = dirfd(svc->dir);
90 if (svc->docroot_fd < 0) {
91 failfn = "dirfd";
92 goto err;
94 return true;
95 err:
96 syslog(LOG_ERR, "%s(%s) failed with: %m", failfn, svc->docroot);
97 kill(ppid, SIGTERM);
98 return false;
101 struct mog_svc * mog_svc_new(const char *docroot)
103 struct mog_svc *svc;
104 DIR *dir;
105 int fd;
107 if (!docroot) docroot = MOG_DEFAULT_DOCROOT;
109 docroot = mog_canonpath_die(docroot, CAN_EXISTING);
111 dir = opendir(docroot);
112 if (dir == NULL) {
113 syslog(LOG_ERR, "opendir(%s) failed with: %m", docroot);
114 mog_free(docroot);
115 return NULL;
118 fd = dirfd(dir);
119 if (fd < 0) {
120 syslog(LOG_ERR, "dirfd(%s) failed with: %m", docroot);
121 mog_free(docroot);
122 return NULL;
125 CHECK(int, 0, pthread_mutex_lock(&svc_lock));
127 if (!by_docroot)
128 svc_once();
130 svc = xzalloc(sizeof(struct mog_svc));
131 svc->persist_client = 1;
132 svc->docroot = docroot;
133 svc->docroot_fd = fd;
134 svc->dir = dir;
135 svc->put_perms = (~mog_umask) & 0666;
136 svc->mkcol_perms = (~mog_umask) & 0777;
137 svc->thr_per_dev = 10;
138 svc->idle_timeout = 5;
139 CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL));
140 CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL));
141 svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash,
142 mog_dev_cmp, mog_dev_free);
143 mog_oom_if_null(svc->by_mog_devid);
145 switch (hash_insert_if_absent(by_docroot, svc, NULL)) {
146 case 0:
147 svc_free(svc);
148 svc = NULL;
149 case 1: break;
150 default: mog_oom();
153 CHECK(int, 0, pthread_mutex_unlock(&svc_lock));
155 return svc;
158 size_t mog_svc_each(Hash_processor processor, void *data)
160 size_t rv;
162 CHECK(int, 0, pthread_mutex_lock(&svc_lock));
163 rv = hash_do_for_each(by_docroot, processor, data);
164 CHECK(int, 0, pthread_mutex_unlock(&svc_lock));
166 return rv;
169 static bool cloexec_disable(struct mog_fd *mfd) /* vfork-safe */
171 if (mfd)
172 CHECK(int, 0, mog_set_cloexec(mfd->fd, false));
173 return true;
176 static bool svc_cloexec_off_i(void *svcptr, void *unused) /* vfork-safe */
178 struct mog_svc *svc = svcptr;
180 return (cloexec_disable(svc->mgmt_mfd)
181 && cloexec_disable(svc->http_mfd)
182 && cloexec_disable(svc->httpget_mfd));
186 * Only call this from a freshly forked upgrade child process.
187 * This holds no locks to avoid potential deadlocks in post-fork mutexes
189 void mog_svc_upgrade_prepare(void) /* vfork-safe */
191 (void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL);
194 /* this is only called by the main (notify) thread */
195 void mog_svc_thrpool_rescale(struct mog_svc *svc, unsigned ndev_new)
197 unsigned size = ndev_new * svc->thr_per_dev;
198 struct mog_thrpool *tp = &svc->queue->thrpool;
200 /* respect user-setting */
201 if (svc->user_set_aio_threads) {
202 mog_svc_dev_user_rescale(svc, ndev_new);
203 if (tp->n_threads >= ndev_new)
204 return;
206 syslog(LOG_WARNING,
207 "server aio_threads=%u is less than devcount=%u",
208 tp->n_threads, ndev_new);
210 return;
213 if (size < svc->thr_per_dev)
214 size = svc->thr_per_dev;
216 if (svc->nmogdev)
217 syslog(LOG_INFO,
218 "devcount(%u->%u), updating server aio_threads=%u",
219 svc->nmogdev, ndev_new, size);
220 mog_thrpool_set_size(tp, size);
223 /* Hash iterator function */
224 bool mog_svc_start_each(void *svc_ptr, void *main_ptr)
226 struct mog_svc *svc = svc_ptr;
227 struct mog_main *mog_main = main_ptr;
228 struct mog_accept *ac;
229 size_t athr = (size_t)num_processors(NPROC_CURRENT);
230 struct mog_queue *q = mog_queue_new();
231 size_t nthr = svc->nmogdev * svc->thr_per_dev;
233 if (!nthr)
234 nthr = svc->thr_per_dev;
237 * try to distribute accept() callers between workers more evenly
238 * with wake-one accept() behavior by trimming down on acceptors
239 * having too many acceptor threads does not make sense, these
240 * threads are only bounded by lock contention and local bus speeds.
241 * Increasing threads here just leads to lock contention inside the
242 * kernel (accept/accept4/EPOLL_CTL_ADD)
244 athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr);
246 svc->queue = q;
247 mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);
249 if (svc->mgmt_mfd) {
250 mog_main->have_mgmt = true;
251 ac = &svc->mgmt_mfd->as.accept;
254 * mgmt port is rarely used and always persistent, so it
255 * does not need multiple threads for blocking accept()
257 mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac);
260 if (svc->http_mfd) {
261 ac = &svc->http_mfd->as.accept;
262 mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
265 if (svc->httpget_mfd) {
266 ac = &svc->httpget_mfd->as.accept;
267 mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
270 return true;
274 * Fire and forget, we must run the actual thread count manipulation
275 * in the main notify thread because we may end up terminating the
276 * thread which invoked this.
278 * Called by threads inside the thrpool to wake-up the main/notify thread.
280 void mog_svc_aio_threads_enqueue(struct mog_svc *svc, unsigned size)
282 unsigned prev_enq;
284 CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
286 prev_enq = svc->user_req_aio_threads;
287 svc->user_req_aio_threads = size;
288 if (!prev_enq)
289 /* put into the queue so main thread can process it */
290 SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, svc, qentry);
292 CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
294 /* wake up the main thread so it can process the queue */
295 mog_notify(MOG_NOTIFY_AIO_THREADS);
298 /* this runs in the main (notify) thread */
299 void mog_svc_aio_threads_handler(void)
301 struct mog_svc *svc;
303 /* guard against requests bundled in one wakeup by looping here */
304 for (;;) {
305 unsigned req_size = 0;
307 CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
308 svc = SIMPLEQ_FIRST(&aio_threads_qhead);
309 if (svc) {
310 SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry);
311 req_size = svc->user_req_aio_threads;
312 svc->user_req_aio_threads = 0;
314 CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));
317 * spurious wakeup is possible since we loop here
318 * (and we must loop, see above comment)
320 if (svc == NULL || req_size == 0)
321 return;
323 syslog(LOG_INFO, "server aio_threads=%u", req_size);
324 svc->user_set_aio_threads = req_size;
325 if (svc->nmogdev)
326 mog_svc_dev_user_rescale(svc, svc->nmogdev);
327 mog_thrpool_set_size(&svc->queue->thrpool, req_size);