2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
6 #define _GNU_SOURCE 1 /* needed for _ATFILE_SOURCE on glibc 2.5 - 2.9 */
7 #include <dirent.h> /* needed for FreeBSD */
8 #include "cmogstored.h"
10 /* same default as MogileFS upstream */
11 static pthread_mutex_t svc_lock
= PTHREAD_MUTEX_INITIALIZER
;
12 static Hash_table
*by_docroot
; /* enforce one mog_svc per docroot: */
13 static mode_t mog_umask
;
16 * maintain an internal queue of requests for the "server aio_threads = N"
17 * command in the side channel. The worker handling the client request must
18 * tell the main thread do change thread counts asynchronously (because
19 * the worker thread handling the request may die from a thread count
20 * reduction, so we have a worker thread make a fire-and-forget request
21 * to the notify thread.
23 static pthread_mutex_t aio_threads_lock
= PTHREAD_MUTEX_INITIALIZER
;
24 static SIMPLEQ_HEAD(sq
, mog_svc
) aio_threads_qhead
=
25 SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead
);
27 static void svc_free(void *ptr
)
29 struct mog_svc
*svc
= ptr
;
31 if (closedir(svc
->dir
) < 0)
32 syslog(LOG_ERR
, "closedir(%s) failed with: %m", svc
->docroot
);
33 CHECK(int, 0, pthread_mutex_destroy(&svc
->devstats_lock
));
34 mog_free(svc
->docroot
);
36 hash_free(svc
->by_st_dev
);
37 if (svc
->by_mog_devid
)
38 hash_free(svc
->by_mog_devid
);
42 static size_t svc_hash(const void *x
, size_t tablesize
)
44 const struct mog_svc
*svc
= x
;
46 return hash_string(svc
->docroot
, tablesize
);
49 static bool svc_cmp(const void *a
, const void *b
)
51 const struct mog_svc
*svc_a
= a
;
52 const struct mog_svc
*svc_b
= b
;
54 return strcmp(svc_a
->docroot
, svc_b
->docroot
) == 0;
57 static void svc_atexit(void) /* called atexit */
59 hash_free(by_docroot
);
62 static void svc_once(void)
64 by_docroot
= hash_initialize(7, NULL
, svc_hash
, svc_cmp
, svc_free
);
65 mog_oom_if_null(by_docroot
);
72 bool mog_svc_atfork_child(void *svc_ptr
, void *parent
)
74 struct mog_svc
*svc
= svc_ptr
;
75 pid_t ppid
= *((pid_t
*)parent
);
78 if (closedir(svc
->dir
) < 0) {
83 svc
->dir
= opendir(svc
->docroot
);
84 if (svc
->dir
== NULL
) {
89 svc
->docroot_fd
= dirfd(svc
->dir
);
90 if (svc
->docroot_fd
< 0) {
96 syslog(LOG_ERR
, "%s(%s) failed with: %m", failfn
, svc
->docroot
);
101 struct mog_svc
* mog_svc_new(const char *docroot
)
107 if (!docroot
) docroot
= MOG_DEFAULT_DOCROOT
;
109 docroot
= mog_canonpath_die(docroot
, CAN_EXISTING
);
111 dir
= opendir(docroot
);
113 syslog(LOG_ERR
, "opendir(%s) failed with: %m", docroot
);
120 syslog(LOG_ERR
, "dirfd(%s) failed with: %m", docroot
);
125 CHECK(int, 0, pthread_mutex_lock(&svc_lock
));
130 svc
= xzalloc(sizeof(struct mog_svc
));
131 svc
->persist_client
= 1;
132 svc
->docroot
= docroot
;
133 svc
->docroot_fd
= fd
;
135 svc
->put_perms
= (~mog_umask
) & 0666;
136 svc
->mkcol_perms
= (~mog_umask
) & 0777;
137 svc
->thr_per_dev
= 10;
138 svc
->idle_timeout
= 5;
139 CHECK(int, 0, pthread_mutex_init(&svc
->devstats_lock
, NULL
));
140 CHECK(int, 0, pthread_mutex_init(&svc
->by_mog_devid_lock
, NULL
));
141 svc
->by_mog_devid
= hash_initialize(7, NULL
, mog_dev_hash
,
142 mog_dev_cmp
, mog_dev_free
);
143 mog_oom_if_null(svc
->by_mog_devid
);
145 switch (hash_insert_if_absent(by_docroot
, svc
, NULL
)) {
153 CHECK(int, 0, pthread_mutex_unlock(&svc_lock
));
158 size_t mog_svc_each(Hash_processor processor
, void *data
)
162 CHECK(int, 0, pthread_mutex_lock(&svc_lock
));
163 rv
= hash_do_for_each(by_docroot
, processor
, data
);
164 CHECK(int, 0, pthread_mutex_unlock(&svc_lock
));
169 static bool cloexec_disable(struct mog_fd
*mfd
) /* vfork-safe */
172 CHECK(int, 0, mog_set_cloexec(mfd
->fd
, false));
176 static bool svc_cloexec_off_i(void *svcptr
, void *unused
) /* vfork-safe */
178 struct mog_svc
*svc
= svcptr
;
180 return (cloexec_disable(svc
->mgmt_mfd
)
181 && cloexec_disable(svc
->http_mfd
)
182 && cloexec_disable(svc
->httpget_mfd
));
186 * Only call this from a freshly forked upgrade child process.
187 * This holds no locks to avoid potential deadlocks in post-fork mutexes
189 void mog_svc_upgrade_prepare(void) /* vfork-safe */
191 (void)hash_do_for_each(by_docroot
, svc_cloexec_off_i
, NULL
);
194 /* this is only called by the main (notify) thread */
195 void mog_svc_thrpool_rescale(struct mog_svc
*svc
, unsigned ndev_new
)
197 unsigned size
= ndev_new
* svc
->thr_per_dev
;
198 struct mog_thrpool
*tp
= &svc
->queue
->thrpool
;
200 /* respect user-setting */
201 if (svc
->user_set_aio_threads
) {
202 mog_svc_dev_user_rescale(svc
, ndev_new
);
203 if (tp
->n_threads
>= ndev_new
)
207 "server aio_threads=%u is less than devcount=%u",
208 tp
->n_threads
, ndev_new
);
213 if (size
< svc
->thr_per_dev
)
214 size
= svc
->thr_per_dev
;
218 "devcount(%u->%u), updating server aio_threads=%u",
219 svc
->nmogdev
, ndev_new
, size
);
220 mog_thrpool_set_size(tp
, size
);
223 /* Hash iterator function */
224 bool mog_svc_start_each(void *svc_ptr
, void *main_ptr
)
226 struct mog_svc
*svc
= svc_ptr
;
227 struct mog_main
*mog_main
= main_ptr
;
228 struct mog_accept
*ac
;
229 size_t athr
= (size_t)num_processors(NPROC_CURRENT
);
230 struct mog_queue
*q
= mog_queue_new();
231 size_t nthr
= svc
->nmogdev
* svc
->thr_per_dev
;
234 nthr
= svc
->thr_per_dev
;
237 * try to distribute accept() callers between workers more evenly
238 * with wake-one accept() behavior by trimming down on acceptors
239 * having too many acceptor threads does not make sense, these
240 * threads are only bounded by lock contention and local bus speeds.
241 * Increasing threads here just leads to lock contention inside the
242 * kernel (accept/accept4/EPOLL_CTL_ADD)
244 athr
= mog_main
->worker_processes
> 1 ? 1 : MIN(2, athr
);
247 mog_thrpool_start(&q
->thrpool
, nthr
, mog_queue_loop
, q
);
250 mog_main
->have_mgmt
= true;
251 ac
= &svc
->mgmt_mfd
->as
.accept
;
254 * mgmt port is rarely used and always persistent, so it
255 * does not need multiple threads for blocking accept()
257 mog_thrpool_start(&ac
->thrpool
, 1, mog_accept_loop
, ac
);
261 ac
= &svc
->http_mfd
->as
.accept
;
262 mog_thrpool_start(&ac
->thrpool
, athr
, mog_accept_loop
, ac
);
265 if (svc
->httpget_mfd
) {
266 ac
= &svc
->httpget_mfd
->as
.accept
;
267 mog_thrpool_start(&ac
->thrpool
, athr
, mog_accept_loop
, ac
);
274 * Fire and forget, we must run the actual thread count manipulation
275 * in the main notify thread because we may end up terminating the
276 * thread which invoked this.
278 * Called by threads inside the thrpool to wake-up the main/notify thread.
280 void mog_svc_aio_threads_enqueue(struct mog_svc
*svc
, unsigned size
)
284 CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock
));
286 prev_enq
= svc
->user_req_aio_threads
;
287 svc
->user_req_aio_threads
= size
;
289 /* put into the queue so main thread can process it */
290 SIMPLEQ_INSERT_TAIL(&aio_threads_qhead
, svc
, qentry
);
292 CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock
));
294 /* wake up the main thread so it can process the queue */
295 mog_notify(MOG_NOTIFY_AIO_THREADS
);
298 /* this runs in the main (notify) thread */
299 void mog_svc_aio_threads_handler(void)
303 /* guard against requests bundled in one wakeup by looping here */
305 unsigned req_size
= 0;
307 CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock
));
308 svc
= SIMPLEQ_FIRST(&aio_threads_qhead
);
310 SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead
, qentry
);
311 req_size
= svc
->user_req_aio_threads
;
312 svc
->user_req_aio_threads
= 0;
314 CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock
));
317 * spurious wakeup is possible since we loop here
318 * (and we must loop, see above comment)
320 if (svc
== NULL
|| req_size
== 0)
323 syslog(LOG_INFO
, "server aio_threads=%u", req_size
);
324 svc
->user_set_aio_threads
= req_size
;
326 mog_svc_dev_user_rescale(svc
, svc
->nmogdev
);
327 mog_thrpool_set_size(&svc
->queue
->thrpool
, req_size
);