cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / ioq.c
blob98fa8b21d2132bb75f92527ffc44e1553354fbbb
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
6 /*
7 * This is a semaphore-like API with explicit queueing and activation,
8 * so contended scheduling/wakeups happen via epoll/kqueue and there
9 * is never blocking of threads (other than the mutex which only protects
10 * small memory-only operations)
12 * The main operations are mog_ioq_ready and mog_ioq_next
14 * Flow:
16 * mog_ioq_ready ---> true --> normal dispatch --> mog_ioq_next
17 * | ^ (mog_ioq_unblock) |
18 * | | |
19 * | `-------<--------\ |
20 * false | |
21 * | | V
22 * | | |
23 * V | |
24 * SIMPLEQ_INSERT_TAIL(push) ^ |
25 * || | V
26 * VV | /
27 * \\ | /
28 * \\ | /
29 * \\ | V
30 * `===(wait for)==========>===> SIMPLEQ_{FIRST,REMOVE_HEAD}(pop)
31 * | |
32 * | V
33 * | |
34 * ^ add to kqueue/epoll ready list
35 * | |
36 * | V
37 * | /
38 * `---------<---------'
40 * mog_ioq_next is automatically called when a thread releases a regular file.
42 __thread struct mog_ioq *mog_ioq_current;
44 void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, unsigned val)
46 ioq->cur = val;
47 ioq->max = val;
48 ioq->svc = svc;
49 ioq->contended = false;
50 SIMPLEQ_INIT(&ioq->ioq_head);
51 CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL));
55 * we do not need this, yet, but this will allow us to have multi-threaded
56 * shutdown in the future (we currently drop into single-threaded mode)
58 void mog_ioq_requeue_prepare(struct mog_ioq *ioq)
60 assert(ioq->cur >= ioq->max &&
61 "we should only get here when idle before mog_fdmap_requeue");
63 SIMPLEQ_INIT(&ioq->ioq_head);
67 * this is only a hint, so no explicit memory barriers or atomics
69 static inline void ioq_set_contended(struct mog_ioq *ioq)
71 ioq->contended = true;
75 * This is like sem_trywait. Each thread is only allowed to acquire
76 * one ioq at once.
78 * If this returns false, the caller _must_ return MOG_NEXT_IGNORE to
79 * prevent the mfd from being added to an epoll/kqueue watch list.
80 * Adding the mfd to an epoll/kqueue watch list in the same thread/context
81 * where this function returns true is a guaranteed bug.
83 * mfd is the client socket, not the open (regular) file
85 bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *mfd)
87 bool good;
89 assert(mog_ioq_current == NULL && "already holding mog_ioq_current");
90 CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));
92 good = ioq->cur > 0;
93 if (good) {
94 --ioq->cur;
95 mog_ioq_current = ioq;
96 } else {
97 TRACE(CMOGSTORED_IOQ_BLOCKED(mfd->fd));
98 mfd->ioq_blocked = 1;
99 SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, mfd, ioqent);
100 ioq_set_contended(ioq);
103 CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
105 return good;
109 * analogous to sem_post, this wakes up the next waiter
110 * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current
112 void mog_ioq_next(struct mog_ioq *check_ioq)
114 struct mog_fd *mfd = NULL;
116 if (mog_ioq_current == NULL)
117 return;
119 CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current->mtx));
121 assert((check_ioq == NULL) ||
122 (check_ioq == mog_ioq_current && "ioq mismatch (tls vs check)"));
124 mog_ioq_current->cur++;
125 if (mog_ioq_current->cur <= mog_ioq_current->max) {
126 /* wake up any waiters */
127 mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head);
128 if (mfd)
129 SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent);
130 } else {
131 /* mog_ioq_adjust was called and lowered our capacity */
132 mog_ioq_current->cur--;
133 ioq_set_contended(mog_ioq_current);
135 CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx));
137 /* wake up the next sleeper on this queue */
138 if (mfd) {
139 TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd));
140 mog_activeq_push(mog_ioq_current->svc->queue, mfd);
143 * We may not touch mfd after mog_activeq_push. Another
144 * thread may already have it. In the worst case, it's been
145 * closed due to epoll/kqueue running out-of-space and another
146 * system call (open/accept) may have already reused the FD
149 mog_ioq_current = NULL;
153 * Returns true if the currently held ioq is contended.
154 * This releases the contended flag if it is set, so the caller
155 * is expected to yield the current thread shortly afterwards.
156 * This is only a hint.
158 bool mog_ioq_contended(void)
160 struct mog_ioq *cur = mog_ioq_current;
162 /* assume contended for non /devXXX* paths */
163 if (!cur)
164 return true;
167 * we only want to minimize the threads hitting true, so we use
168 * an atomic exchange and hope for the best. This is only a hint.
170 return __sync_bool_compare_and_swap(&cur->contended, true, false);
174 * called by the main/notify thread if the user has ever set
175 * "server aio_threads = XX" via sidechannel.
177 void mog_ioq_adjust(struct mog_ioq *ioq, unsigned value)
179 struct mog_fd *mfd = NULL;
180 unsigned prev;
182 assert(value > 0 && "mog_ioq_adjust value must be non-zero");
183 CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));
184 prev = ioq->max;
185 ioq->max = value;
187 if (ioq->cur > ioq->max) {
188 /* capacity reduced, get some threads to yield themselves */
189 ioq_set_contended(ioq);
190 } else {
191 unsigned diff = value - prev;
193 ioq->cur += diff;
196 * wake up all sleepers we made capacity for.
197 * unlike mog_ioq_next, we do not release ioq->mtx here
198 * to avoid infinite looping
200 while (diff--) {
201 mfd = SIMPLEQ_FIRST(&ioq->ioq_head);
202 if (!mfd)
203 break;
205 SIMPLEQ_REMOVE_HEAD(&ioq->ioq_head, ioqent);
206 TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd));
207 mog_activeq_push(ioq->svc->queue, mfd);
210 CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
213 void mog_ioq_destroy(struct mog_ioq *ioq)
215 CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));
219 * If this returns true, the caller must continue processing a request
220 * without checking other state associated with the mfd.
221 * If this returns false (the common case), the caller continues as
222 * usual.
224 bool mog_ioq_unblock(struct mog_fd *mfd)
226 if (mfd->ioq_blocked == 0)
227 return false;
229 TRACE(CMOGSTORED_IOQ_UNBLOCKED(mfd->fd));
230 mfd->ioq_blocked = 0;
231 return true;