2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
10 static void mgmt_digest_step(struct mog_fd
*mfd
)
12 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
13 struct mog_fd
*fmfd
= mgmt
->forward
;
14 enum mog_digest_next next
;
17 * MOG_PRIO_FSCK means we're likely the _only_ thread handling
18 * MD5, so run it as fast as possible.
20 if (mgmt
->prio
== MOG_PRIO_FSCK
) {
21 int ioprio
= mog_ioprio_drop();
24 next
= mog_digest_read(&fmfd
->as
.file
.digest
, fmfd
->fd
);
25 } while (next
== MOG_DIGEST_CONTINUE
);
28 mog_ioprio_restore(ioprio
);
30 next
= mog_digest_read(&fmfd
->as
.file
.digest
, fmfd
->fd
);
33 assert(mgmt
->wbuf
== NULL
&& "wbuf should be NULL here");
36 case MOG_DIGEST_CONTINUE
:
37 case MOG_DIGEST_YIELD
:
40 mog_mgmt_fn_digest_emit(mfd
);
42 case MOG_DIGEST_ERROR
:
43 mog_mgmt_fn_digest_err(mfd
);
46 mog_file_close(mgmt
->forward
);
47 mgmt
->prio
= MOG_PRIO_NONE
;
51 static enum mog_next
mgmt_digest_in_progress(struct mog_fd
*mfd
)
53 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
54 struct mog_file
*file
;
56 assert(mgmt
->forward
&& mgmt
->forward
!= MOG_IOSTAT
&& "bad forward");
57 file
= &mgmt
->forward
->as
.file
;
59 if (file
->ioq
&& !mog_ioq_ready(file
->ioq
, mfd
))
60 return MOG_NEXT_IGNORE
;
62 mgmt_digest_step(mfd
);
64 if (mgmt
->wbuf
== MOG_WR_ERROR
) return MOG_NEXT_CLOSE
;
65 if (mgmt
->wbuf
) return MOG_NEXT_WAIT_WR
;
68 * we can error on the MD5 but continue if we didn't
69 * have a socket error (from wbuf == MOG_WR_ERROR)
71 return MOG_NEXT_ACTIVE
;
74 MOG_NOINLINE
static void mgmt_close(struct mog_fd
*mfd
)
76 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
78 mog_rbuf_reattach_and_null(&mgmt
->rbuf
);
79 assert((mgmt
->wbuf
== NULL
|| mgmt
->wbuf
== MOG_WR_ERROR
) &&
80 "would leak mgmt->wbuf on close");
85 /* called only if epoll/kevent is out-of-space (see mog_http_drop) */
86 void mog_mgmt_drop(struct mog_fd
*mfd
)
88 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
90 if (mgmt
->forward
&& mgmt
->forward
!= MOG_IOSTAT
)
91 mog_file_close(mgmt
->forward
);
95 void mog_mgmt_writev(struct mog_mgmt
*mgmt
, struct iovec
*iov
, int iovcnt
)
97 struct mog_fd
*mfd
= mog_fd_of(mgmt
);
99 assert(mgmt
->wbuf
== NULL
&& "tried to write while busy");
100 mgmt
->wbuf
= mog_trywritev(mfd
->fd
, iov
, iovcnt
);
103 static enum mog_next
mgmt_iostat_forever(struct mog_mgmt
*mgmt
)
105 mog_rbuf_reattach_and_null(&mgmt
->rbuf
); /* no coming back from this */
106 mog_notify(MOG_NOTIFY_DEVICE_REFRESH
);
107 mog_svc_devstats_subscribe(mgmt
);
109 return MOG_NEXT_IGNORE
;
112 /* returns true if we can continue queue step, false if not */
113 static enum mog_next
mgmt_wbuf_in_progress(struct mog_mgmt
*mgmt
)
115 assert(mgmt
->wbuf
!= MOG_WR_ERROR
&& "still active after write error");
116 switch (mog_tryflush(mog_fd_of(mgmt
)->fd
, &mgmt
->wbuf
)) {
117 case MOG_WRSTATE_ERR
: return MOG_NEXT_CLOSE
;
118 case MOG_WRSTATE_DONE
:
119 if (mgmt
->forward
== MOG_IOSTAT
)
120 return mgmt_iostat_forever(mgmt
);
121 return MOG_NEXT_ACTIVE
;
122 case MOG_WRSTATE_BUSY
:
123 /* unlikely, we never put anything big in wbuf */
124 return MOG_NEXT_WAIT_WR
;
126 assert(0 && "compiler bug?");
127 return MOG_NEXT_CLOSE
;
130 /* stash any pipelined data for the next round */
132 mgmt_defer_rbuf(struct mog_mgmt
*mgmt
, struct mog_rbuf
*rbuf
, size_t buf_len
)
134 struct mog_rbuf
*old
= mgmt
->rbuf
;
135 size_t defer_bytes
= buf_len
- mgmt
->buf_off
;
136 char *src
= rbuf
->rptr
+ mgmt
->buf_off
;
138 assert(mgmt
->buf_off
>= 0 && "mgmt->buf_off negative");
139 assert(defer_bytes
<= MOG_RBUF_MAX_SIZE
&& "defer bytes overflow");
141 if (defer_bytes
== 0) {
142 mog_rbuf_reattach_and_null(&mgmt
->rbuf
);
143 } else if (old
) { /* no allocation needed, reuse existing */
144 assert(old
== rbuf
&& "mgmt->rbuf not reused properly");
145 memmove(old
->rptr
, src
, defer_bytes
);
146 old
->rsize
= defer_bytes
;
148 mgmt
->rbuf
= mog_rbuf_new(defer_bytes
);
149 memcpy(mgmt
->rbuf
->rptr
, src
, defer_bytes
);
150 mgmt
->rbuf
->rsize
= defer_bytes
;
156 mgmt_process_client(struct mog_fd
*mfd
, struct mog_rbuf
*rbuf
,
157 char *buf
, size_t buf_len
)
159 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
163 /* we handle non-filesystem-using commands inline in the parser */
164 if (mgmt
->mgmt_method
== MOG_MGMT_METHOD_NONE
)
167 dev
= mog_dev_for(mgmt
->svc
, mgmt
->mog_devid
, false);
170 ioq
= mgmt
->prio
== MOG_PRIO_NONE
? &dev
->ioq
: &dev
->fsckq
;
171 if (!mog_ioq_ready(ioq
, mfd
)) {
173 mgmt
->rbuf
= mog_rbuf_detach(rbuf
);
174 mgmt
->rbuf
->rsize
= buf_len
;
179 switch (mgmt
->mgmt_method
) {
180 case MOG_MGMT_METHOD_NONE
:
181 assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE");
182 case MOG_MGMT_METHOD_SIZE
:
183 mog_mgmt_fn_size(mgmt
, buf
);
185 case MOG_MGMT_METHOD_DIG
:
186 mog_mgmt_fn_digest(mfd
, buf
);
188 if (dev
&& mgmt
->forward
)
189 assert(mgmt
->forward
->as
.file
.ioq
190 && "ioq not stashed");
193 mgmt
->mgmt_method
= MOG_MGMT_METHOD_NONE
;
197 static enum mog_next
mgmt_run(struct mog_fd
*mfd
, struct mog_rbuf
*rbuf
,
198 char *buf
, size_t buf_len
)
200 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
202 if (!mgmt_process_client(mfd
, rbuf
, buf
, buf_len
))
203 return MOG_NEXT_IGNORE
; /* in ioq */
204 if (mgmt
->wbuf
== MOG_WR_ERROR
)
205 return MOG_NEXT_CLOSE
;
206 if (mgmt
->forward
== MOG_IOSTAT
)
207 return mgmt_iostat_forever(mgmt
);
209 /* stash unread portion in a new buffer */
210 mgmt_defer_rbuf(mgmt
, rbuf
, buf_len
);
211 mog_mgmt_reset_parser(mgmt
);
212 assert(mgmt
->wbuf
!= MOG_WR_ERROR
);
213 return mgmt
->wbuf
? MOG_NEXT_WAIT_WR
: MOG_NEXT_ACTIVE
;
217 mgmt_rbuf_grow(struct mog_fd
*mfd
, struct mog_rbuf
**rbuf
, size_t buf_len
)
219 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
221 TRACE(CMOGSTORED_MGMT_RBUF_GROW(mfd
->fd
, buf_len
));
222 (*rbuf
)->rsize
= buf_len
;
223 mgmt
->rbuf
= *rbuf
= mog_rbuf_grow(*rbuf
);
224 return *rbuf
? (*rbuf
)->rptr
: NULL
;
227 MOG_NOINLINE
static bool
228 mgmt_parse_continue(struct mog_fd
*mfd
, struct mog_rbuf
**rbuf
,
229 char **buf
, size_t buf_len
, off_t
*off
)
231 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
233 TRACE(CMOGSTORED_MGMT_PARSE_CONTINUE(mfd
->fd
, buf_len
));
235 assert(mgmt
->wbuf
== NULL
&&
236 "tried to write (and failed) with partial req");
237 if (mgmt
->buf_off
>= (*rbuf
)->rcapa
) {
238 *buf
= mgmt_rbuf_grow(mfd
, rbuf
, buf_len
);
243 *off
= mgmt
->buf_off
;
248 * this is the main event callback and called whenever mgmt
249 * is pulled out of a queue (either idle or active)
251 static enum mog_next
__mgmt_queue_step(struct mog_fd
*mfd
)
253 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
254 struct mog_rbuf
*rbuf
;
259 enum mog_parser_state state
;
261 assert(mfd
->fd
>= 0 && "mgmt fd is invalid");
263 if (mgmt
->wbuf
) return mgmt_wbuf_in_progress(mgmt
);
264 if (mgmt
->forward
) return mgmt_digest_in_progress(mfd
);
266 /* we may have pipelined data in mgmt->rbuf */
267 rbuf
= mgmt
->rbuf
? mgmt
->rbuf
: mog_rbuf_get(MOG_RBUF_BASE_SIZE
);
270 assert(off
>= 0 && "offset is negative");
271 assert(off
<= rbuf
->rcapa
&& "offset is too big");
274 buf_len
= mgmt
->rbuf
->rsize
;
275 if (mog_ioq_unblock(mfd
))
276 return mgmt_run(mfd
, rbuf
, buf
, buf_len
);
277 assert(off
< rbuf
->rcapa
&& "offset is too big");
278 if (off
== 0) /* request got "pipelined", resuming now */
281 assert(off
< rbuf
->rcapa
&& "offset is too big");
283 r
= read(mfd
->fd
, buf
+ off
, rbuf
->rcapa
- off
);
287 state
= mog_mgmt_parse(mgmt
, buf
, buf_len
);
288 if (mgmt
->wbuf
== MOG_WR_ERROR
) return MOG_NEXT_CLOSE
;
291 case MOG_PARSER_ERROR
:
292 syslog(LOG_ERR
, "mgmt parser error");
293 return MOG_NEXT_CLOSE
;
294 case MOG_PARSER_CONTINUE
:
295 if (mgmt_parse_continue(mfd
, &rbuf
, &buf
, buf_len
,
299 case MOG_PARSER_DONE
:
300 return mgmt_run(mfd
, rbuf
, buf
, buf_len
);
302 } else if (r
== 0) { /* client shut down */
303 TRACE(CMOGSTORED_MGMT_CLIENT_CLOSE(mfd
->fd
, buf_len
));
304 return MOG_NEXT_CLOSE
;
309 if (mgmt
->rbuf
== NULL
)
310 mgmt
->rbuf
= mog_rbuf_detach(rbuf
);
311 mgmt
->rbuf
->rsize
= buf_len
;
313 return MOG_NEXT_WAIT_RD
;
314 case EINTR
: goto reread
;
317 /* these errors are too common to log, normally */
318 TRACE(CMOGSTORED_MGMT_RDERR(mfd
->fd
, buf_len
, errno
));
319 return MOG_NEXT_CLOSE
;
321 TRACE(CMOGSTORED_MGMT_RDERR(mfd
->fd
, buf_len
, errno
));
322 syslog(LOG_NOTICE
, "mgmt client died: %m");
323 return MOG_NEXT_CLOSE
;
327 assert(0 && "compiler bug?");
329 syslog(LOG_ERR
, "mgmt request too large");
330 return MOG_NEXT_CLOSE
;
333 static enum mog_next
mgmt_queue_step(struct mog_fd
*mfd
)
335 enum mog_next ret
= __mgmt_queue_step(mfd
);
337 /* enqueue any pending waiters before we become enqueued ourselves */
344 * this function is called whenever a mgmt client is pulled out of
345 * _any_ queue (listen/idle/active). Our queueing model should be
346 * designed to prevent this function from executing concurrently
349 enum mog_next
mog_mgmt_queue_step(struct mog_fd
*mfd
)
351 enum mog_next rv
= mgmt_queue_step(mfd
);
353 if (rv
== MOG_NEXT_CLOSE
)
358 /* called during graceful shutdown instead of mog_mgmt_queue_step */
359 void mog_mgmt_quit_step(struct mog_fd
*mfd
)
361 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
362 struct mog_queue
*q
= mgmt
->svc
->queue
;
364 /* centralize all queue transitions here: */
365 switch (mgmt_queue_step(mfd
)) {
366 case MOG_NEXT_WAIT_RD
:
367 if (mgmt
->forward
|| mgmt
->rbuf
) {
368 /* something is in progress, do not drop it */
369 mog_idleq_push(q
, mfd
, MOG_QEV_RD
);
373 case MOG_NEXT_IGNORE
: /* no new iostat watchers during shutdown */
374 assert(mgmt
->prio
== MOG_PRIO_NONE
&& "bad prio");
377 mog_nr_active_at_quit
--;
380 case MOG_NEXT_ACTIVE
: mog_activeq_push(q
, mfd
); return;
381 case MOG_NEXT_WAIT_WR
: mog_idleq_push(q
, mfd
, MOG_QEV_WR
); return;
385 /* stringify the address for tracers */
386 static MOG_NOINLINE
void
387 trace_mgmt_accepted(struct mog_fd
*mfd
, const char *listen_addr
,
388 union mog_sockaddr
*msa
, socklen_t salen
)
390 #ifdef HAVE_SYSTEMTAP
391 struct mog_packaddr mpa
;
394 mog_nameinfo(&mpa
, &ni
);
395 TRACE(CMOGSTORED_MGMT_ACCEPTED(mfd
->fd
, ni
.ni_host
, ni
.ni_serv
,
397 #endif /* !HAVE_SYSTEMTAP */
400 /* called immediately after accept(), this initializes the mfd (once) */
401 void mog_mgmt_post_accept(int fd
, struct mog_accept
*ac
,
402 union mog_sockaddr
*msa
, socklen_t salen
)
404 struct mog_fd
*mfd
= mog_fd_init(fd
, MOG_FD_TYPE_MGMT
);
405 struct mog_mgmt
*mgmt
= &mfd
->as
.mgmt
;
407 if (TRACE_ENABLED(CMOGSTORED_MGMT_ACCEPTED
))
408 trace_mgmt_accepted(mfd
, ac
->addrinfo
->orig
, msa
, salen
);
410 mog_mgmt_init(mgmt
, ac
->svc
);
411 mog_idleq_add(ac
->svc
->queue
, mfd
, MOG_QEV_RD
);