cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / queue_kqueue.c
blob4d3fc4beba5ec8fb2eb8302f2034ddc948b7bf6f
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
6 /*
7 * a poll/select/libev/libevent-based implementation would have a hard time
8 * migrating clients between threads
9 */
10 #ifdef HAVE_KQUEUE
12 struct mog_queue * mog_queue_new(void)
14 int kqueue_fd = kqueue();
16 if (kqueue_fd < 0)
17 die_errno("kqueue() failed");
19 return mog_queue_init(kqueue_fd);
23 * grabs one active event off the event queue
25 struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
27 int rc;
28 struct mog_fd *mfd;
29 struct kevent event;
30 struct timespec ts;
31 struct timespec *tsp;
32 bool cancellable = timeout != 0;
34 if (timeout < 0) {
35 tsp = NULL;
36 } else {
37 ts.tv_sec = timeout / 1000;
38 ts.tv_nsec = (timeout % 1000) * 1000000;
39 tsp = &ts;
43 * we enable SIGURG from pthread_kill() in thrpool.c when sleeping
44 * in kevent(). This allows us to wake up an respond to a
45 * cancellation request (since kevent() is not a cancellation point).
47 if (cancellable)
48 mog_thr_test_quit();
50 rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
52 if (rc > 0) {
53 mfd = event.udata;
54 mog_fd_check_out(mfd);
56 /* ignore pending cancel until the next round */
57 return mfd;
59 if (cancellable)
60 mog_thr_test_quit();
61 if (rc == 0)
62 return NULL;
64 if (errno != EINTR)
65 die_errno("kevent(wait) failed with (%d)", rc);
67 return NULL;
70 struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
72 struct mog_fd *mfd;
74 /* this is racy, using a self-pipe covers the race */
75 mog_intr_enable();
76 mfd = mog_idleq_wait(q, timeout);
77 mog_intr_disable();
78 return mfd;
81 MOG_NOINLINE static void
82 kevent_add_error(struct mog_queue *q, struct mog_fd *mfd)
84 switch (errno) {
85 case ENOMEM:
86 syslog(LOG_ERR,
87 "kevent(EV_ADD) out-of-space, dropping file descriptor");
88 mog_queue_drop(mfd);
89 return;
90 default:
91 syslog(LOG_ERR, "unhandled kevent(EV_ADD) error: %m");
92 assert(0 && "BUG in our usage of kevent");
96 static int add_event(int kqueue_fd, struct kevent *event)
98 int rc;
100 do {
101 rc = kevent(kqueue_fd, event, 1, NULL, 0, NULL);
102 } while (rc < 0 && errno == EINTR);
104 return rc;
107 static void qpush(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
109 struct kevent event;
110 u_short flags = EV_ADD | EV_ONESHOT;
112 EV_SET(&event, mfd->fd, (short)ev, flags, 0, 0, mfd);
114 mog_fd_check_in(mfd);
115 if (add_event(q->queue_fd, &event) != 0) {
116 mog_fd_check_out(mfd);
117 kevent_add_error(q, mfd);
122 * Pushes in one mog_fd for kqueue to watch.
124 * Only call this with MOG_QEV_RW *or* if EAGAIN/EWOULDBLOCK is
125 * encountered in mog_queue_loop.
127 void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
129 if (ev == MOG_QEV_RW) {
130 switch (mfd->fd_type) {
131 case MOG_FD_TYPE_IOSTAT:
132 case MOG_FD_TYPE_SELFWAKE:
133 ev = MOG_QEV_RD;
134 break;
135 case MOG_FD_TYPE_UNUSED:
136 case MOG_FD_TYPE_ACCEPT:
137 case MOG_FD_TYPE_FILE:
138 case MOG_FD_TYPE_QUEUE:
139 case MOG_FD_TYPE_SVC:
140 assert(0 && "invalid fd_type for mog_idleq_push");
141 default:
142 ev = MOG_QEV_WR;
143 break;
146 qpush(q, mfd, ev);
149 void mog_idleq_add(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
151 mog_idleq_push(q, mfd, ev);
154 struct mog_fd *
155 mog_queue_xchg(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
158 * kqueue() _should_ be able to implement this function with
159 * one syscall, however, we currently rely on mog_idleq_wait()
160 * being a cancellation point. So we must ensure the mfd is
161 * back in the queue (for other threads to access) before
162 * cancelling this thread...
164 mog_idleq_push(q, mfd, ev);
166 return mog_idleq_wait(q, -1);
168 #else /* ! HAVE_KQUEUE */
169 typedef int avoid_empty_file;
170 #endif /* ! HAVE_KQUEUE */