2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
7 * This is a semaphore-like API with explicit queueing and activation,
8 * so contended scheduling/wakeups happen via epoll/kqueue and there
9 * is never blocking of threads (other than the mutex which only protects
10 * small memory-only operations)
12 * The main operations are mog_ioq_ready and mog_ioq_next
16 * mog_ioq_ready ---> true --> normal dispatch --> mog_ioq_next
17 * | ^ (mog_ioq_unblock) |
19 * | `-------<--------\ |
24 * SIMPLEQ_INSERT_TAIL(push) ^ |
30 * `===(wait for)==========>===> SIMPLEQ_{FIRST,REMOVE_HEAD}(pop)
34 * ^ add to kqueue/epoll ready list
38 * `---------<---------'
40 * mog_ioq_next is automatically called when a thread releases a regular file.
42 __thread
struct mog_ioq
*mog_ioq_current
;
44 void mog_ioq_init(struct mog_ioq
*ioq
, struct mog_svc
*svc
, unsigned val
)
49 ioq
->contended
= false;
50 SIMPLEQ_INIT(&ioq
->ioq_head
);
51 CHECK(int, 0, pthread_mutex_init(&ioq
->mtx
, NULL
));
55 * we do not need this, yet, but this will allow us to have multi-threaded
56 * shutdown in the future (we currently drop into single-threaded mode)
58 void mog_ioq_requeue_prepare(struct mog_ioq
*ioq
)
60 assert(ioq
->cur
>= ioq
->max
&&
61 "we should only get here when idle before mog_fdmap_requeue");
63 SIMPLEQ_INIT(&ioq
->ioq_head
);
67 * this is only a hint, so no explicit memory barriers or atomics
69 static inline void ioq_set_contended(struct mog_ioq
*ioq
)
71 ioq
->contended
= true;
75 * This is like sem_trywait. Each thread is only allowed to acquire
78 * If this returns false, the caller _must_ return MOG_NEXT_IGNORE to
79 * prevent the mfd from being added to an epoll/kqueue watch list.
80 * Adding the mfd to an epoll/kqueue watch list in the same thread/context
81 * where this function returns true is a guaranteed bug.
83 * mfd is the client socket, not the open (regular) file
85 bool mog_ioq_ready(struct mog_ioq
*ioq
, struct mog_fd
*mfd
)
89 assert(mog_ioq_current
== NULL
&& "already holding mog_ioq_current");
90 CHECK(int, 0, pthread_mutex_lock(&ioq
->mtx
));
95 mog_ioq_current
= ioq
;
97 TRACE(CMOGSTORED_IOQ_BLOCKED(mfd
->fd
));
99 SIMPLEQ_INSERT_TAIL(&ioq
->ioq_head
, mfd
, ioqent
);
100 ioq_set_contended(ioq
);
103 CHECK(int, 0, pthread_mutex_unlock(&ioq
->mtx
));
109 * analogous to sem_post, this wakes up the next waiter
110 * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current
112 void mog_ioq_next(struct mog_ioq
*check_ioq
)
114 struct mog_fd
*mfd
= NULL
;
116 if (mog_ioq_current
== NULL
)
119 CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current
->mtx
));
121 assert((check_ioq
== NULL
) ||
122 (check_ioq
== mog_ioq_current
&& "ioq mismatch (tls vs check)"));
124 mog_ioq_current
->cur
++;
125 if (mog_ioq_current
->cur
<= mog_ioq_current
->max
) {
126 /* wake up any waiters */
127 mfd
= SIMPLEQ_FIRST(&mog_ioq_current
->ioq_head
);
129 SIMPLEQ_REMOVE_HEAD(&mog_ioq_current
->ioq_head
, ioqent
);
131 /* mog_ioq_adjust was called and lowered our capacity */
132 mog_ioq_current
->cur
--;
133 ioq_set_contended(mog_ioq_current
);
135 CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current
->mtx
));
137 /* wake up the next sleeper on this queue */
139 TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd
->fd
));
140 mog_activeq_push(mog_ioq_current
->svc
->queue
, mfd
);
143 * We may not touch mfd after mog_activeq_push. Another
144 * thread may already have it. In the worst case, it's been
145 * closed due to epoll/kqueue running out-of-space and another
146 * system call (open/accept) may have already reused the FD
149 mog_ioq_current
= NULL
;
153 * Returns true if the currently held ioq is contended.
154 * This releases the contended flag if it is set, so the caller
155 * is expected to yield the current thread shortly afterwards.
156 * This is only a hint.
158 bool mog_ioq_contended(void)
160 struct mog_ioq
*cur
= mog_ioq_current
;
162 /* assume contended for non /devXXX* paths */
167 * we only want to minimize the threads hitting true, so we use
168 * an atomic exchange and hope for the best. This is only a hint.
170 return __sync_bool_compare_and_swap(&cur
->contended
, true, false);
174 * called by the main/notify thread if the user has ever set
175 * "server aio_threads = XX" via sidechannel.
177 void mog_ioq_adjust(struct mog_ioq
*ioq
, unsigned value
)
179 struct mog_fd
*mfd
= NULL
;
182 assert(value
> 0 && "mog_ioq_adjust value must be non-zero");
183 CHECK(int, 0, pthread_mutex_lock(&ioq
->mtx
));
187 if (ioq
->cur
> ioq
->max
) {
188 /* capacity reduced, get some threads to yield themselves */
189 ioq_set_contended(ioq
);
191 unsigned diff
= value
- prev
;
196 * wake up all sleepers we made capacity for.
197 * unlike mog_ioq_next, we do not release ioq->mtx here
198 * to avoid infinite looping
201 mfd
= SIMPLEQ_FIRST(&ioq
->ioq_head
);
205 SIMPLEQ_REMOVE_HEAD(&ioq
->ioq_head
, ioqent
);
206 TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd
->fd
));
207 mog_activeq_push(ioq
->svc
->queue
, mfd
);
210 CHECK(int, 0, pthread_mutex_unlock(&ioq
->mtx
));
213 void mog_ioq_destroy(struct mog_ioq
*ioq
)
215 CHECK(int, 0, pthread_mutex_destroy(&ioq
->mtx
));
219 * If this returns true, the caller must continue processing a request
220 * without checking other state associated with the mfd.
221 * If this returns false (the common case), the caller continues as
224 bool mog_ioq_unblock(struct mog_fd
*mfd
)
226 if (mfd
->ioq_blocked
== 0)
229 TRACE(CMOGSTORED_IOQ_UNBLOCKED(mfd
->fd
));
230 mfd
->ioq_blocked
= 0;