2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
6 * common headers, macros, and static inline functions for the entire project
8 * Internal APIs are very much in flux and subject to change frequently
11 #include "queue_kqueue.h"
14 #define _XOPEN_SOURCE 700
16 #ifndef _POSIX_C_SOURCE
17 #define _POSIX_C_SOURCE 200809L
22 #include <sys/types.h>
23 #include <sys/select.h>
25 #include <sys/socket.h>
26 #include <sys/ioctl.h>
42 #include <netinet/in.h>
43 #include <netinet/tcp.h>
44 #include <sys/statvfs.h>
48 #include <error.h> /* GNU */
50 #include "bsd/queue_safe.h"
51 #include "bsd/simpleq.h"
57 #include "xvasprintf.h"
58 #include "canonicalize.h"
60 #include "mountlist.h"
75 #define MOG_WR_ERROR ((void *)-1)
76 #define MOG_IOSTAT (MAP_FAILED)
77 #define MOG_FD_MAX (INT_MAX-1)
79 enum mog_write_state
{
85 enum mog_parser_state
{
86 MOG_PARSER_ERROR
= -1,
88 MOG_PARSER_CONTINUE
= 1
96 MOG_NEXT_IGNORE
/* for iostat and fsck MD5 */
102 pthread_mutex_t mtx
; /* protects cur, max, ioq_head */
103 SIMPLEQ_HEAD(ioq_head
, mog_fd
) ioq_head
;
104 bool contended
; /* hint, not protected */
105 struct mog_svc
*svc
; /* initialized once at creation */
111 unsigned no_me_warned
: 1;
113 pthread_mutex_t usage_lock
; /* protects usage_txt */
117 struct mog_ioq ioq
; /* normal requests */
118 struct mog_ioq fsckq
; /* low-priority for MogileFS fsck */
123 size_t rsize
; /* only set on stash */
124 char rptr
[FLEXIBLE_ARRAY_MEMBER
];
127 #define MOG_RBUF_OVERHEAD (sizeof(struct mog_rbuf))
128 #define MOG_RBUF_BASE_SIZE (512 - MOG_RBUF_OVERHEAD)
129 #define MOG_RBUF_MAX_SIZE (UINT16_MAX)
136 enum mog_mgmt_method
{
137 MOG_MGMT_METHOD_NONE
= 0,
138 MOG_MGMT_METHOD_SIZE
,
146 struct mog_fd
*forward
;
150 struct mog_rbuf
*rbuf
;
151 struct mog_wbuf
*wbuf
; /* uncommonly needed */
154 enum mog_mgmt_method mgmt_method
;
155 LIST_ENTRY(mog_mgmt
) subscribed
;
163 unsigned persist_client
;
165 unsigned user_set_aio_threads
; /* only touched by main/notify thread */
166 unsigned user_req_aio_threads
; /* protected by aio_threads_lock */
167 unsigned thr_per_dev
;
171 pthread_mutex_t by_mog_devid_lock
;
172 Hash_table
*by_mog_devid
;
173 Hash_table
*by_st_dev
;
174 pthread_mutex_t devstats_lock
;
175 struct mog_queue
*queue
;
176 LIST_HEAD(mgmt_head
, mog_mgmt
) devstats_subscribers
;
177 SIMPLEQ_ENTRY(mog_svc
) qentry
;
180 struct mog_fd
*http_mfd
;
181 struct mog_fd
*httpget_mfd
;
182 struct mog_fd
*mgmt_mfd
;
183 uint32_t idle_timeout
;
186 enum mog_http_method
{
187 MOG_HTTP_METHOD_NONE
= 0,
189 MOG_HTTP_METHOD_HEAD
,
191 MOG_HTTP_METHOD_DELETE
,
192 MOG_HTTP_METHOD_MKCOL
195 enum mog_chunk_state
{
196 MOG_CHUNK_STATE_SIZE
= 0,
197 MOG_CHUNK_STATE_DATA
,
198 MOG_CHUNK_STATE_TRAILER
,
205 enum mog_http_method http_method
:4;
206 unsigned persistent
:1;
207 unsigned persist_client_at_start
:1;
210 unsigned has_content_range
:1; /* for PUT */
211 unsigned has_range
:1; /* for GET */
212 unsigned bad_range
:1;
213 unsigned skip_rbuf_defer
:1;
214 unsigned usage_txt
:1;
215 enum mog_chunk_state chunk_state
:2;
216 unsigned unused_padding
:1;
227 struct mog_fd
*forward
;
228 struct mog_rbuf
*rbuf
;
229 struct mog_wbuf
*wbuf
; /* uncommonly needed */
231 uint8_t expect_md5
[16];
232 struct mog_packaddr mpa
;
233 } __attribute__((packed
));
241 pthread_mutex_t lock
;
243 unsigned want_threads
;
244 struct mog_thread
*threads
;
245 void *(*start_fn
)(void *);
252 * this is a queue: epoll or kqueue return events in the order they occur
253 * mog_queue objects can be shared by any number of mog_svcs
256 int queue_fd
; /* epoll or kqueue */
257 struct mog_thrpool thrpool
;
258 LIST_ENTRY(mog_queue
) qbuddies
;
263 typedef void (*mog_post_accept_fn
)(int fd
, struct mog_accept
*,
264 union mog_sockaddr
*, socklen_t
);
269 mog_post_accept_fn post_accept_fn
;
270 struct mog_addrinfo
*addrinfo
; /* shared with cfg */
271 struct mog_thrpool thrpool
;
273 struct mog_fd
*mog_accept_init(int fd
, struct mog_svc
*,
274 struct mog_addrinfo
*, mog_post_accept_fn
);
275 void * mog_accept_loop(void *ac
);
287 char *tmppath
; /* NULL-ed if rename()-ed away */
290 struct mog_digest digest
;
293 #include "queue_epoll.h"
297 extern sigset_t mog_emptyset
;
298 void mog_intr_disable(void);
299 void mog_intr_enable(void);
300 int mog_sleep(long seconds
);
301 #include "selfwake.h"
304 MOG_FD_TYPE_UNUSED
= 0,
309 MOG_FD_TYPE_SELFWAKE
,
310 MOG_FD_TYPE_SELFPIPE
,
314 MOG_FD_TYPE_SVC
/* for docroot_fd */
319 enum mog_fd_type fd_type
:16;
320 uint16_t ioq_blocked
;
322 pthread_spinlock_t expiring
;
324 struct mog_accept accept
;
325 struct mog_mgmt mgmt
;
326 struct mog_http http
;
327 struct mog_iostat iostat
;
328 struct mog_selfwake selfwake
;
329 struct mog_selfpipe selfpipe
;
330 struct mog_file file
;
331 struct mog_queue queue
;
334 SIMPLEQ_ENTRY(mog_fd
) ioqent
;
336 void mog_fd_put(struct mog_fd
*mfd
);
337 void mog_fdmap_requeue(struct mog_queue
*quit_queue
);
338 size_t mog_fdmap_expire(uint32_t sec
);
339 extern size_t mog_nr_active_at_quit
;
343 void mog_free_and_null(void *ptrptr
);
344 _Noreturn
void mog_oom(void);
345 void *mog_cachealign(size_t size
) __attribute__((malloc
));
346 struct mog_rbuf
*mog_rbuf_new(size_t size
);
347 struct mog_rbuf
*mog_rbuf_get(size_t size
);
348 struct mog_rbuf
*mog_rbuf_detach(struct mog_rbuf
*rbuf
);
349 struct mog_rbuf
*mog_rbuf_grow(struct mog_rbuf
*);
350 void mog_rbuf_free(struct mog_rbuf
*);
351 void mog_rbuf_reattach_and_null(struct mog_rbuf
**);
352 void *mog_fsbuf_get(size_t *size
);
353 void mog_alloc_quit(void);
354 void mog_oom_if_null(const void *);
356 #define die_errno(...) do { \
357 error(EXIT_FAILURE, errno, __VA_ARGS__); \
361 #define die(...) do { \
362 error(EXIT_FAILURE, 0, __VA_ARGS__); \
366 #define warn(...) error(0, 0, __VA_ARGS__)
369 * vfork is poorly-specified, but at least on Linux it improves
370 * performance when used for spawning iostat processes
372 #if defined(HAVE_VFORK) && defined(__linux__)
373 # define mog_fork_for_exec() vfork()
375 # define mog_fork_for_exec() fork()
379 void mog_set_maxconns(unsigned long);
382 struct mog_svc
*mog_svc_new(const char *docroot
);
383 typedef int (*mog_scandev_cb
)(struct mog_dev
*, struct mog_svc
*);
384 size_t mog_svc_each(Hash_processor processor
, void *data
);
385 void mog_svc_upgrade_prepare(void);
386 bool mog_svc_start_each(void *svc_ptr
, void *have_mgmt_ptr
);
387 void mog_svc_thrpool_rescale(struct mog_svc
*, unsigned ndev_new
);
388 void mog_svc_aio_threads_enqueue(struct mog_svc
*, unsigned nr
);
389 void mog_svc_aio_threads_handler(void);
390 bool mog_svc_atfork_child(void *svc_ptr
, void *parent
);
393 struct mog_dev
*mog_dev_for(struct mog_svc
*, uint32_t mog_devid
, bool update
);
394 int mog_dev_mkusage(struct mog_dev
*, struct mog_svc
*);
395 size_t mog_dev_hash(const void *, size_t tablesize
);
396 bool mog_dev_cmp(const void *a
, const void *b
);
397 void mog_dev_free(void *devptr
);
398 bool mog_dev_user_rescale_i(void *devp
, void *svcp
);
399 bool mog_dev_requeue_prepare(void *devp
, void *ign
);
400 void mog_dev_usage_update(struct mog_dev
*, struct mog_svc
*);
403 int mog_valid_path(const char *buf
, size_t len
);
406 void * mog_trywritev(int fd
, struct iovec
*iov
, int iovcnt
);
407 enum mog_write_state
mog_tryflush(int fd
, struct mog_wbuf
**);
408 void * mog_trysend(int fd
, void *buf
, size_t len
, off_t more
);
413 int mog_pidfile_prepare(const char *path
);
414 int mog_pidfile_commit(int fd
);
415 bool mog_pidfile_upgrade_prepare(void);
416 void mog_pidfile_upgrade_abort(void);
419 bool mog_svc_devstats_broadcast(void *svc
, void *ignored
);
420 void mog_svc_devstats_subscribe(struct mog_mgmt
*);
421 void mog_svc_dev_shutdown(void);
422 void mog_mkusage_all(void);
423 void mog_svc_dev_user_rescale(struct mog_svc
*, size_t ndev_new
);
424 void mog_svc_dev_quit_prepare(struct mog_svc
*);
425 void mog_svc_dev_requeue_prepare(struct mog_svc
*svc
);
427 /* cloexec_detect.c */
428 extern bool mog_cloexec_atomic
;
431 void mog_cloexec_from(int lowfd
);
433 /* iostat_process.c */
434 bool mog_iostat_respawn(int oldstatus
) MOG_CHECK
;
438 int mog_cfg_parse(struct mog_cfg
*, char *buf
, size_t len
);
441 struct mog_cfg
* mog_cfg_new(const char *configfile
);
442 int mog_cfg_load(struct mog_cfg
*);
443 void mog_cfg_svc_start_or_die(struct mog_cfg
*cli
);
444 extern struct mog_cfg mog_cli
;
445 extern bool mog_cfg_multi
;
447 /* listen_parser.rl */
448 struct mog_addrinfo
*mog_listen_parse(const char *host_with_port
);
451 char *mog_canonpath(const char *path
, enum canonicalize_mode_t canon_mode
);
452 char *mog_canonpath_die(const char *path
, enum canonicalize_mode_t canon_mode
);
455 void mog_thr_test_quit(void);
456 bool mog_thr_prepare_quit(void) MOG_CHECK
;
457 void mog_thrpool_start(struct mog_thrpool
*, unsigned n
,
458 void *(*start_fn
)(void *), void *arg
);
459 void mog_thrpool_quit(struct mog_thrpool
*, struct mog_queue
*);
460 void mog_thrpool_process_queue(void);
461 void mog_thrpool_set_size(struct mog_thrpool
*, unsigned size
);
464 void mog_mgmt_writev(struct mog_mgmt
*, struct iovec
*, int iovcnt
);
465 void mog_mgmt_post_accept(int fd
, struct mog_accept
*,
466 union mog_sockaddr
*, socklen_t
);
467 enum mog_next
mog_mgmt_queue_step(struct mog_fd
*) MOG_CHECK
;
468 void mog_mgmt_quit_step(struct mog_fd
*);
469 void mog_mgmt_drop(struct mog_fd
*);
472 struct mog_queue
* mog_queue_new(void);
473 void mog_idleq_add(struct mog_queue
*, struct mog_fd
*, enum mog_qev
);
474 void mog_idleq_push(struct mog_queue
*, struct mog_fd
*, enum mog_qev
);
475 struct mog_fd
* mog_idleq_wait(struct mog_queue
*, int timeout
);
477 mog_queue_xchg(struct mog_queue
*, struct mog_fd
*, enum mog_qev
);
478 struct mog_fd
* mog_idleq_wait_intr(struct mog_queue
*q
, int timeout
);
481 struct mog_addrinfo
{
483 struct addrinfo
*addr
;
485 void mog_addrinfo_free(struct mog_addrinfo
**);
488 int mog_bind_listen(struct addrinfo
*);
491 void mog_close(int fd
);
493 /* mog_queue_loop.c */
494 void * mog_queue_loop(void *arg
);
495 void mog_queue_quit_loop(struct mog_queue
*queue
);
498 enum mog_next
mog_queue_step(struct mog_fd
*mfd
) MOG_CHECK
;
501 struct mog_fd
* mog_file_open_read(struct mog_svc
*, char *path
);
502 struct mog_fd
* mog_file_open_put(struct mog_svc
*, char *path
, int flags
);
503 void mog_file_close(struct mog_fd
*);
504 bool mog_open_expire_retry(struct mog_svc
*);
507 void mog_notify_init(void);
508 void mog_notify(enum mog_notification
);
509 void mog_notify_wait(bool need_usage_file
);
512 void mog_http_reset_parser(struct mog_http
*);
513 void mog_http_init(struct mog_http
*, struct mog_svc
*);
514 enum mog_parser_state
mog_http_parse(struct mog_http
*, char *buf
, size_t len
);
517 void mog_http_get_open(struct mog_fd
*, char *buf
);
518 enum mog_next
mog_http_get_in_progress(struct mog_fd
*);
521 void mog_http_post_accept(int fd
, struct mog_accept
*,
522 union mog_sockaddr
*, socklen_t
);
523 void mog_httpget_post_accept(int fd
, struct mog_accept
*,
524 union mog_sockaddr
*, socklen_t
);
525 enum mog_next
mog_http_queue_step(struct mog_fd
*) MOG_CHECK
;
526 void mog_http_quit_step(struct mog_fd
*);
527 char *mog_http_path(struct mog_http
*, char *buf
);
528 void mog_http_reset(struct mog_fd
*);
529 void mog_http_unlink_ftmp(struct mog_http
*);
530 void mog_http_drop(struct mog_fd
*);
533 void mog_http_delete(struct mog_fd
*, char *buf
);
534 void mog_http_mkcol(struct mog_fd
*, char *buf
);
537 void mog_http_put(struct mog_fd
*, char *buf
, size_t buf_len
);
538 enum mog_next
mog_http_put_in_progress(struct mog_fd
*);
539 bool mog_http_write_full(struct mog_fd
*file_mfd
, char *buf
, size_t buf_len
);
541 /* chunk_parser.rl */
542 void mog_chunk_init(struct mog_http
*);
543 enum mog_parser_state
mog_chunk_parse(struct mog_http
*, char *buf
, size_t len
);
546 #define MOG_HTTPDATE_CAPA (sizeof("Thu, 01 Jan 1970 00:00:00 GMT"))
549 char httpdate
[MOG_HTTPDATE_CAPA
];
551 char *mog_http_date(char *dst
, size_t len
, const time_t *timep
);
552 struct mog_now
*mog_now(void);
555 int mog_mkpath_for(struct mog_svc
*svc
, char *path
);
560 struct mog_queue
*mog_queue_init(int queue_fd
);
561 void mog_queue_stop(struct mog_queue
*keep
);
562 void mog_queue_drop(struct mog_fd
*);
564 /* valid_put_path.rl */
565 bool mog_valid_put_path(const char *buf
, size_t len
);
568 void mog_iou_cleanup_begin(void);
569 void mog_iou_cleanup_finish(void);
570 void mog_iou_read(dev_t
, char buf
[MOG_IOUTIL_LEN
]);
571 void mog_iou_write(dev_t
, const char buf
[MOG_IOUTIL_LEN
]);
572 void mog_iou_active(dev_t
);
577 * non-Linux may not allow MSG_MORE on stream sockets,
578 * so limit MSG_MORE usage to Linux for now
580 #if defined(MSG_MORE) && defined(__linux__)
581 # define MOG_MSG_MORE (MSG_MORE)
583 # define MOG_MSG_MORE (0)
586 #if defined(TCP_NOPUSH) /* FreeBSD */
588 * TCP_NOPUSH in modern versions of FreeBSD behave identically to
589 * TCP_CORK under Linux (which we used before we switched Linux to MSG_MORE)
591 # define MOG_TCP_NOPUSH TCP_NOPUSH
593 # define MOG_TCP_NOPUSH (0)
596 /* publically visible attributes of the current process */
598 unsigned long worker_processes
;
603 void cmogstored_quit(void);
606 void mog_inherit_init(void);
607 int mog_inherit_get(struct sockaddr
*addr
, socklen_t len
);
608 void mog_inherit_cleanup(void);
611 #define MOG_PROC_UNKNOWN (UINT_MAX)
612 #define MOG_PROC_IOSTAT (UINT_MAX-1)
613 #define MOG_PROC_UPGRADE (UINT_MAX-2)
619 void mog_process_init(size_t nr
);
620 void mog_process_reset(void);
621 char *mog_process_name(unsigned id
);
622 bool mog_process_is_worker(unsigned id
);
623 size_t mog_kill_each_worker(int signo
);
624 void mog_process_register(pid_t
, unsigned id
);
625 unsigned mog_process_reaped(pid_t
);
628 void mog_upgrade_prepare(int argc
, char *argv
[], char *envp
[]);
629 pid_t
mog_upgrade_spawn(void);
632 _Noreturn
void cmogstored_exit(void);
634 verify(sizeof(in_port_t
) <= sizeof(uint16_t));
636 * We only deal with ipv4 and ipv6 addresses (and no human-friendly
637 * hostnames/service names), so we can use smaller constants than the
638 * standard NI_MAXHOST/NI_MAXSERV values (1025 and 32 respectively).
639 * This reduces our per-thread stack usage and keeps caches hotter.
642 char ni_host
[INET6_ADDRSTRLEN
+ sizeof("[]") - 1];
645 * we may not always be serving protocols with port numbers in them,
646 * so we embed space for the ":" in the ni_serv field to make managing
647 * syslog(3) format strings more flexible.
649 char ni_serv
[sizeof(":65536")];
653 void mog_nameinfo(struct mog_packaddr
*, struct mog_ni
*);
656 void mog_yield(void);
660 extern __thread
struct mog_ioq
*mog_ioq_current
;
661 void mog_ioq_init(struct mog_ioq
*, struct mog_svc
*, unsigned val
);
662 bool mog_ioq_ready(struct mog_ioq
*, struct mog_fd
*) MOG_CHECK
;
663 bool mog_ioq_contended(void) MOG_CHECK
;
664 void mog_ioq_next(struct mog_ioq
*);
665 void mog_ioq_adjust(struct mog_ioq
*, unsigned value
);
666 void mog_ioq_destroy(struct mog_ioq
*);
667 bool mog_ioq_unblock(struct mog_fd
*);
668 void mog_ioq_requeue_prepare(struct mog_ioq
*);
670 /* systemtap stuff */