cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / mgmt.c
blob9746952faa0d0c260d2b6792a6481b595440afeb
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
6 #include "mgmt.h"
7 #include "digest.h"
8 #include "ioprio.h"
10 static void mgmt_digest_step(struct mog_fd *mfd)
12 struct mog_mgmt *mgmt = &mfd->as.mgmt;
13 struct mog_fd *fmfd = mgmt->forward;
14 enum mog_digest_next next;
17 * MOG_PRIO_FSCK means we're likely the _only_ thread handling
18 * MD5, so run it as fast as possible.
20 if (mgmt->prio == MOG_PRIO_FSCK) {
21 int ioprio = mog_ioprio_drop();
23 do {
24 next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd);
25 } while (next == MOG_DIGEST_CONTINUE);
27 if (ioprio != -1)
28 mog_ioprio_restore(ioprio);
29 } else {
30 next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd);
33 assert(mgmt->wbuf == NULL && "wbuf should be NULL here");
35 switch (next) {
36 case MOG_DIGEST_CONTINUE:
37 case MOG_DIGEST_YIELD:
38 return;
39 case MOG_DIGEST_EOF:
40 mog_mgmt_fn_digest_emit(mfd);
41 break;
42 case MOG_DIGEST_ERROR:
43 mog_mgmt_fn_digest_err(mfd);
46 mog_file_close(mgmt->forward);
47 mgmt->prio = MOG_PRIO_NONE;
48 mgmt->forward = NULL;
51 static enum mog_next mgmt_digest_in_progress(struct mog_fd *mfd)
53 struct mog_mgmt *mgmt = &mfd->as.mgmt;
54 struct mog_file *file;
56 assert(mgmt->forward && mgmt->forward != MOG_IOSTAT && "bad forward");
57 file = &mgmt->forward->as.file;
59 if (file->ioq && !mog_ioq_ready(file->ioq, mfd))
60 return MOG_NEXT_IGNORE;
62 mgmt_digest_step(mfd);
64 if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE;
65 if (mgmt->wbuf) return MOG_NEXT_WAIT_WR;
68 * we can error on the MD5 but continue if we didn't
69 * have a socket error (from wbuf == MOG_WR_ERROR)
71 return MOG_NEXT_ACTIVE;
74 MOG_NOINLINE static void mgmt_close(struct mog_fd *mfd)
76 struct mog_mgmt *mgmt = &mfd->as.mgmt;
78 mog_rbuf_reattach_and_null(&mgmt->rbuf);
79 assert((mgmt->wbuf == NULL || mgmt->wbuf == MOG_WR_ERROR) &&
80 "would leak mgmt->wbuf on close");
82 mog_fd_put(mfd);
85 /* called only if epoll/kevent is out-of-space (see mog_http_drop) */
86 void mog_mgmt_drop(struct mog_fd *mfd)
88 struct mog_mgmt *mgmt = &mfd->as.mgmt;
90 if (mgmt->forward && mgmt->forward != MOG_IOSTAT)
91 mog_file_close(mgmt->forward);
92 mgmt_close(mfd);
95 void mog_mgmt_writev(struct mog_mgmt *mgmt, struct iovec *iov, int iovcnt)
97 struct mog_fd *mfd = mog_fd_of(mgmt);
99 assert(mgmt->wbuf == NULL && "tried to write while busy");
100 mgmt->wbuf = mog_trywritev(mfd->fd, iov, iovcnt);
103 static enum mog_next mgmt_iostat_forever(struct mog_mgmt *mgmt)
105 mog_rbuf_reattach_and_null(&mgmt->rbuf); /* no coming back from this */
106 mog_notify(MOG_NOTIFY_DEVICE_REFRESH);
107 mog_svc_devstats_subscribe(mgmt);
109 return MOG_NEXT_IGNORE;
112 /* returns true if we can continue queue step, false if not */
113 static enum mog_next mgmt_wbuf_in_progress(struct mog_mgmt *mgmt)
115 assert(mgmt->wbuf != MOG_WR_ERROR && "still active after write error");
116 switch (mog_tryflush(mog_fd_of(mgmt)->fd, &mgmt->wbuf)) {
117 case MOG_WRSTATE_ERR: return MOG_NEXT_CLOSE;
118 case MOG_WRSTATE_DONE:
119 if (mgmt->forward == MOG_IOSTAT)
120 return mgmt_iostat_forever(mgmt);
121 return MOG_NEXT_ACTIVE;
122 case MOG_WRSTATE_BUSY:
123 /* unlikely, we never put anything big in wbuf */
124 return MOG_NEXT_WAIT_WR;
126 assert(0 && "compiler bug?");
127 return MOG_NEXT_CLOSE;
130 /* stash any pipelined data for the next round */
131 static void
132 mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len)
134 struct mog_rbuf *old = mgmt->rbuf;
135 size_t defer_bytes = buf_len - mgmt->buf_off;
136 char *src = rbuf->rptr + mgmt->buf_off;
138 assert(mgmt->buf_off >= 0 && "mgmt->buf_off negative");
139 assert(defer_bytes <= MOG_RBUF_MAX_SIZE && "defer bytes overflow");
141 if (defer_bytes == 0) {
142 mog_rbuf_reattach_and_null(&mgmt->rbuf);
143 } else if (old) { /* no allocation needed, reuse existing */
144 assert(old == rbuf && "mgmt->rbuf not reused properly");
145 memmove(old->rptr, src, defer_bytes);
146 old->rsize = defer_bytes;
147 } else {
148 mgmt->rbuf = mog_rbuf_new(defer_bytes);
149 memcpy(mgmt->rbuf->rptr, src, defer_bytes);
150 mgmt->rbuf->rsize = defer_bytes;
152 mgmt->buf_off = 0;
155 static bool
156 mgmt_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf,
157 char *buf, size_t buf_len)
159 struct mog_mgmt *mgmt = &mfd->as.mgmt;
160 struct mog_dev *dev;
161 struct mog_ioq *ioq;
163 /* we handle non-filesystem-using commands inline in the parser */
164 if (mgmt->mgmt_method == MOG_MGMT_METHOD_NONE)
165 return true;
167 dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false);
169 if (dev) {
170 ioq = mgmt->prio == MOG_PRIO_NONE ? &dev->ioq : &dev->fsckq;
171 if (!mog_ioq_ready(ioq, mfd)) {
172 if (!mgmt->rbuf)
173 mgmt->rbuf = mog_rbuf_detach(rbuf);
174 mgmt->rbuf->rsize = buf_len;
175 return false;
179 switch (mgmt->mgmt_method) {
180 case MOG_MGMT_METHOD_NONE:
181 assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE");
182 case MOG_MGMT_METHOD_SIZE:
183 mog_mgmt_fn_size(mgmt, buf);
184 break;
185 case MOG_MGMT_METHOD_DIG:
186 mog_mgmt_fn_digest(mfd, buf);
188 if (dev && mgmt->forward)
189 assert(mgmt->forward->as.file.ioq
190 && "ioq not stashed");
191 break;
193 mgmt->mgmt_method = MOG_MGMT_METHOD_NONE;
194 return true;
197 static enum mog_next mgmt_run(struct mog_fd *mfd, struct mog_rbuf *rbuf,
198 char *buf, size_t buf_len)
200 struct mog_mgmt *mgmt = &mfd->as.mgmt;
202 if (!mgmt_process_client(mfd, rbuf, buf, buf_len))
203 return MOG_NEXT_IGNORE; /* in ioq */
204 if (mgmt->wbuf == MOG_WR_ERROR)
205 return MOG_NEXT_CLOSE;
206 if (mgmt->forward == MOG_IOSTAT)
207 return mgmt_iostat_forever(mgmt);
209 /* stash unread portion in a new buffer */
210 mgmt_defer_rbuf(mgmt, rbuf, buf_len);
211 mog_mgmt_reset_parser(mgmt);
212 assert(mgmt->wbuf != MOG_WR_ERROR);
213 return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE;
216 static char *
217 mgmt_rbuf_grow(struct mog_fd *mfd, struct mog_rbuf **rbuf, size_t buf_len)
219 struct mog_mgmt *mgmt = &mfd->as.mgmt;
221 TRACE(CMOGSTORED_MGMT_RBUF_GROW(mfd->fd, buf_len));
222 (*rbuf)->rsize = buf_len;
223 mgmt->rbuf = *rbuf = mog_rbuf_grow(*rbuf);
224 return *rbuf ? (*rbuf)->rptr : NULL;
227 MOG_NOINLINE static bool
228 mgmt_parse_continue(struct mog_fd *mfd, struct mog_rbuf **rbuf,
229 char **buf, size_t buf_len, off_t *off)
231 struct mog_mgmt *mgmt = &mfd->as.mgmt;
233 TRACE(CMOGSTORED_MGMT_PARSE_CONTINUE(mfd->fd, buf_len));
235 assert(mgmt->wbuf == NULL &&
236 "tried to write (and failed) with partial req");
237 if (mgmt->buf_off >= (*rbuf)->rcapa) {
238 *buf = mgmt_rbuf_grow(mfd, rbuf, buf_len);
239 if (!*buf)
240 return false;
243 *off = mgmt->buf_off;
244 return true;
248 * this is the main event callback and called whenever mgmt
249 * is pulled out of a queue (either idle or active)
251 static enum mog_next __mgmt_queue_step(struct mog_fd *mfd)
253 struct mog_mgmt *mgmt = &mfd->as.mgmt;
254 struct mog_rbuf *rbuf;
255 char *buf;
256 ssize_t r;
257 off_t off;
258 size_t buf_len = 0;
259 enum mog_parser_state state;
261 assert(mfd->fd >= 0 && "mgmt fd is invalid");
263 if (mgmt->wbuf) return mgmt_wbuf_in_progress(mgmt);
264 if (mgmt->forward) return mgmt_digest_in_progress(mfd);
266 /* we may have pipelined data in mgmt->rbuf */
267 rbuf = mgmt->rbuf ? mgmt->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE);
268 buf = rbuf->rptr;
269 off = mgmt->buf_off;
270 assert(off >= 0 && "offset is negative");
271 assert(off <= rbuf->rcapa && "offset is too big");
273 if (mgmt->rbuf) {
274 buf_len = mgmt->rbuf->rsize;
275 if (mog_ioq_unblock(mfd))
276 return mgmt_run(mfd, rbuf, buf, buf_len);
277 assert(off < rbuf->rcapa && "offset is too big");
278 if (off == 0) /* request got "pipelined", resuming now */
279 goto parse;
281 assert(off < rbuf->rcapa && "offset is too big");
282 reread:
283 r = read(mfd->fd, buf + off, rbuf->rcapa - off);
284 if (r > 0) {
285 buf_len = r + off;
286 parse:
287 state = mog_mgmt_parse(mgmt, buf, buf_len);
288 if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE;
290 switch (state) {
291 case MOG_PARSER_ERROR:
292 syslog(LOG_ERR, "mgmt parser error");
293 return MOG_NEXT_CLOSE;
294 case MOG_PARSER_CONTINUE:
295 if (mgmt_parse_continue(mfd, &rbuf, &buf, buf_len,
296 &off))
297 goto reread;
298 goto too_large;
299 case MOG_PARSER_DONE:
300 return mgmt_run(mfd, rbuf, buf, buf_len);
302 } else if (r == 0) { /* client shut down */
303 TRACE(CMOGSTORED_MGMT_CLIENT_CLOSE(mfd->fd, buf_len));
304 return MOG_NEXT_CLOSE;
305 } else {
306 switch (errno) {
307 case_EAGAIN:
308 if (buf_len > 0) {
309 if (mgmt->rbuf == NULL)
310 mgmt->rbuf = mog_rbuf_detach(rbuf);
311 mgmt->rbuf->rsize = buf_len;
313 return MOG_NEXT_WAIT_RD;
314 case EINTR: goto reread;
315 case ECONNRESET:
316 case ENOTCONN:
317 /* these errors are too common to log, normally */
318 TRACE(CMOGSTORED_MGMT_RDERR(mfd->fd, buf_len, errno));
319 return MOG_NEXT_CLOSE;
320 default:
321 TRACE(CMOGSTORED_MGMT_RDERR(mfd->fd, buf_len, errno));
322 syslog(LOG_NOTICE, "mgmt client died: %m");
323 return MOG_NEXT_CLOSE;
327 assert(0 && "compiler bug?");
328 too_large:
329 syslog(LOG_ERR, "mgmt request too large");
330 return MOG_NEXT_CLOSE;
333 static enum mog_next mgmt_queue_step(struct mog_fd *mfd)
335 enum mog_next ret = __mgmt_queue_step(mfd);
337 /* enqueue any pending waiters before we become enqueued ourselves */
338 mog_ioq_next(NULL);
340 return ret;
344 * this function is called whenever a mgmt client is pulled out of
345 * _any_ queue (listen/idle/active). Our queueing model should be
346 * designed to prevent this function from executing concurrently
347 * for any fd.
349 enum mog_next mog_mgmt_queue_step(struct mog_fd *mfd)
351 enum mog_next rv = mgmt_queue_step(mfd);
353 if (rv == MOG_NEXT_CLOSE)
354 mgmt_close(mfd);
355 return rv;
358 /* called during graceful shutdown instead of mog_mgmt_queue_step */
359 void mog_mgmt_quit_step(struct mog_fd *mfd)
361 struct mog_mgmt *mgmt = &mfd->as.mgmt;
362 struct mog_queue *q = mgmt->svc->queue;
364 /* centralize all queue transitions here: */
365 switch (mgmt_queue_step(mfd)) {
366 case MOG_NEXT_WAIT_RD:
367 if (mgmt->forward || mgmt->rbuf) {
368 /* something is in progress, do not drop it */
369 mog_idleq_push(q, mfd, MOG_QEV_RD);
370 return;
372 /* fall-through */
373 case MOG_NEXT_IGNORE: /* no new iostat watchers during shutdown */
374 assert(mgmt->prio == MOG_PRIO_NONE && "bad prio");
375 /* fall-through */
376 case MOG_NEXT_CLOSE:
377 mog_nr_active_at_quit--;
378 mgmt_close(mfd);
379 return;
380 case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return;
381 case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return;
385 /* stringify the address for tracers */
386 static MOG_NOINLINE void
387 trace_mgmt_accepted(struct mog_fd *mfd, const char *listen_addr,
388 union mog_sockaddr *msa, socklen_t salen)
390 #ifdef HAVE_SYSTEMTAP
391 struct mog_packaddr mpa;
392 struct mog_ni ni;
394 mog_nameinfo(&mpa, &ni);
395 TRACE(CMOGSTORED_MGMT_ACCEPTED(mfd->fd, ni.ni_host, ni.ni_serv,
396 listen_addr));
397 #endif /* !HAVE_SYSTEMTAP */
400 /* called immediately after accept(), this initializes the mfd (once) */
401 void mog_mgmt_post_accept(int fd, struct mog_accept *ac,
402 union mog_sockaddr *msa, socklen_t salen)
404 struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_MGMT);
405 struct mog_mgmt *mgmt = &mfd->as.mgmt;
407 if (TRACE_ENABLED(CMOGSTORED_MGMT_ACCEPTED))
408 trace_mgmt_accepted(mfd, ac->addrinfo->orig, msa, salen);
410 mog_mgmt_init(mgmt, ac->svc);
411 mog_idleq_add(ac->svc->queue, mfd, MOG_QEV_RD);