document the new binlog stats
[beanstalkd.git] / prot.c
blob4e467a60882db5ebd1b098e6889e1cc57f56987c
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <fcntl.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <sys/resource.h>
9 #include <sys/uio.h>
10 #include <sys/types.h>
11 #include <sys/socket.h>
12 #include <netinet/in.h>
13 #include <ctype.h>
14 #include <inttypes.h>
15 #include <stdarg.h>
16 #include "dat.h"
18 /* job body cannot be greater than this many bytes long */
19 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
21 #define NAME_CHARS \
22 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
23 "abcdefghijklmnopqrstuvwxyz" \
24 "0123456789-+/;.$_()"
26 #define CMD_PUT "put "
27 #define CMD_PEEKJOB "peek "
28 #define CMD_PEEK_READY "peek-ready"
29 #define CMD_PEEK_DELAYED "peek-delayed"
30 #define CMD_PEEK_BURIED "peek-buried"
31 #define CMD_RESERVE "reserve"
32 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
33 #define CMD_DELETE "delete "
34 #define CMD_RELEASE "release "
35 #define CMD_BURY "bury "
36 #define CMD_KICK "kick "
37 #define CMD_TOUCH "touch "
38 #define CMD_STATS "stats"
39 #define CMD_JOBSTATS "stats-job "
40 #define CMD_USE "use "
41 #define CMD_WATCH "watch "
42 #define CMD_IGNORE "ignore "
43 #define CMD_LIST_TUBES "list-tubes"
44 #define CMD_LIST_TUBE_USED "list-tube-used"
45 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
46 #define CMD_STATS_TUBE "stats-tube "
47 #define CMD_QUIT "quit"
48 #define CMD_PAUSE_TUBE "pause-tube"
50 #define CONSTSTRLEN(m) (sizeof(m) - 1)
52 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
53 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
54 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
55 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
56 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
57 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
58 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
59 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
60 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
61 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
62 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
63 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
64 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
65 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
66 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
67 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
68 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
69 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
70 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
71 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
72 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
74 #define MSG_FOUND "FOUND"
75 #define MSG_NOTFOUND "NOT_FOUND\r\n"
76 #define MSG_RESERVED "RESERVED"
77 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
78 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
79 #define MSG_DELETED "DELETED\r\n"
80 #define MSG_RELEASED "RELEASED\r\n"
81 #define MSG_BURIED "BURIED\r\n"
82 #define MSG_TOUCHED "TOUCHED\r\n"
83 #define MSG_BURIED_FMT "BURIED %llu\r\n"
84 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
85 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
87 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
88 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
89 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
90 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
91 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
92 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
94 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
95 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
96 #define MSG_DRAINING "DRAINING\r\n"
97 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
98 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
99 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
100 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
102 #define STATE_WANTCOMMAND 0
103 #define STATE_WANTDATA 1
104 #define STATE_SENDJOB 2
105 #define STATE_SENDWORD 3
106 #define STATE_WAIT 4
107 #define STATE_BITBUCKET 5
109 #define OP_UNKNOWN 0
110 #define OP_PUT 1
111 #define OP_PEEKJOB 2
112 #define OP_RESERVE 3
113 #define OP_DELETE 4
114 #define OP_RELEASE 5
115 #define OP_BURY 6
116 #define OP_KICK 7
117 #define OP_STATS 8
118 #define OP_JOBSTATS 9
119 #define OP_PEEK_BURIED 10
120 #define OP_USE 11
121 #define OP_WATCH 12
122 #define OP_IGNORE 13
123 #define OP_LIST_TUBES 14
124 #define OP_LIST_TUBE_USED 15
125 #define OP_LIST_TUBES_WATCHED 16
126 #define OP_STATS_TUBE 17
127 #define OP_PEEK_READY 18
128 #define OP_PEEK_DELAYED 19
129 #define OP_RESERVE_TIMEOUT 20
130 #define OP_TOUCH 21
131 #define OP_QUIT 22
132 #define OP_PAUSE_TUBE 23
133 #define TOTAL_OPS 24
135 #define STATS_FMT "---\n" \
136 "current-jobs-urgent: %u\n" \
137 "current-jobs-ready: %u\n" \
138 "current-jobs-reserved: %u\n" \
139 "current-jobs-delayed: %u\n" \
140 "current-jobs-buried: %u\n" \
141 "cmd-put: %" PRIu64 "\n" \
142 "cmd-peek: %" PRIu64 "\n" \
143 "cmd-peek-ready: %" PRIu64 "\n" \
144 "cmd-peek-delayed: %" PRIu64 "\n" \
145 "cmd-peek-buried: %" PRIu64 "\n" \
146 "cmd-reserve: %" PRIu64 "\n" \
147 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
148 "cmd-delete: %" PRIu64 "\n" \
149 "cmd-release: %" PRIu64 "\n" \
150 "cmd-use: %" PRIu64 "\n" \
151 "cmd-watch: %" PRIu64 "\n" \
152 "cmd-ignore: %" PRIu64 "\n" \
153 "cmd-bury: %" PRIu64 "\n" \
154 "cmd-kick: %" PRIu64 "\n" \
155 "cmd-touch: %" PRIu64 "\n" \
156 "cmd-stats: %" PRIu64 "\n" \
157 "cmd-stats-job: %" PRIu64 "\n" \
158 "cmd-stats-tube: %" PRIu64 "\n" \
159 "cmd-list-tubes: %" PRIu64 "\n" \
160 "cmd-list-tube-used: %" PRIu64 "\n" \
161 "cmd-list-tubes-watched: %" PRIu64 "\n" \
162 "cmd-pause-tube: %" PRIu64 "\n" \
163 "job-timeouts: %" PRIu64 "\n" \
164 "total-jobs: %" PRIu64 "\n" \
165 "max-job-size: %zu\n" \
166 "current-tubes: %zu\n" \
167 "current-connections: %u\n" \
168 "current-producers: %u\n" \
169 "current-workers: %u\n" \
170 "current-waiting: %u\n" \
171 "total-connections: %u\n" \
172 "pid: %ld\n" \
173 "version: %s\n" \
174 "rusage-utime: %d.%06d\n" \
175 "rusage-stime: %d.%06d\n" \
176 "uptime: %u\n" \
177 "binlog-oldest-index: %d\n" \
178 "binlog-current-index: %d\n" \
179 "binlog-records-migrated: %" PRIu64 "\n" \
180 "binlog-records-written: %" PRIu64 "\n" \
181 "binlog-max-size: %d\n" \
182 "\r\n"
184 #define STATS_TUBE_FMT "---\n" \
185 "name: %s\n" \
186 "current-jobs-urgent: %u\n" \
187 "current-jobs-ready: %u\n" \
188 "current-jobs-reserved: %u\n" \
189 "current-jobs-delayed: %u\n" \
190 "current-jobs-buried: %u\n" \
191 "total-jobs: %" PRIu64 "\n" \
192 "current-using: %u\n" \
193 "current-watching: %u\n" \
194 "current-waiting: %u\n" \
195 "cmd-pause-tube: %u\n" \
196 "pause: %" PRIu64 "\n" \
197 "pause-time-left: %" PRIu64 "\n" \
198 "\r\n"
200 #define STATS_JOB_FMT "---\n" \
201 "id: %" PRIu64 "\n" \
202 "tube: %s\n" \
203 "state: %s\n" \
204 "pri: %u\n" \
205 "age: %" PRIu64 "\n" \
206 "delay: %" PRIu64 "\n" \
207 "ttr: %" PRIu64 "\n" \
208 "time-left: %" PRIu64 "\n" \
209 "reserves: %u\n" \
210 "timeouts: %u\n" \
211 "releases: %u\n" \
212 "buries: %u\n" \
213 "kicks: %u\n" \
214 "\r\n"
216 /* this number is pretty arbitrary */
217 #define BUCKET_BUF_SIZE 1024
219 static char bucket[BUCKET_BUF_SIZE];
221 static uint ready_ct = 0;
222 static struct stats global_stat = {0, 0, 0, 0, 0};
224 static tube default_tube;
226 static int drain_mode = 0;
227 static int64 started_at;
228 static uint64 op_ct[TOTAL_OPS], timeout_ct = 0;
230 static struct conn dirty = {&dirty, &dirty};
232 /* Doubly-linked list of connections with at least one reserved job. */
233 static struct conn running = { &running, &running };
235 #ifdef DEBUG
236 static const char * op_names[] = {
237 "<unknown>",
238 CMD_PUT,
239 CMD_PEEKJOB,
240 CMD_RESERVE,
241 CMD_DELETE,
242 CMD_RELEASE,
243 CMD_BURY,
244 CMD_KICK,
245 CMD_STATS,
246 CMD_JOBSTATS,
247 CMD_PEEK_BURIED,
248 CMD_USE,
249 CMD_WATCH,
250 CMD_IGNORE,
251 CMD_LIST_TUBES,
252 CMD_LIST_TUBE_USED,
253 CMD_LIST_TUBES_WATCHED,
254 CMD_STATS_TUBE,
255 CMD_PEEK_READY,
256 CMD_PEEK_DELAYED,
257 CMD_RESERVE_TIMEOUT,
258 CMD_TOUCH,
259 CMD_QUIT,
260 CMD_PAUSE_TUBE
262 #endif
264 static job remove_buried_job(job j);
266 static int
267 buried_job_p(tube t)
269 return job_list_any_p(&t->buried);
272 static void
273 reply(conn c, const char *line, int len, int state)
275 if (!c) return;
277 connwant(c, 'w', &dirty);
278 c->reply = line;
279 c->reply_len = len;
280 c->reply_sent = 0;
281 c->state = state;
282 dbgprintf("sending reply: %.*s", len, line);
285 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
287 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
288 reply_msg((c),(e)))
290 static void
291 reply_line(conn c, int state, const char *fmt, ...)
293 int r;
294 va_list ap;
296 va_start(ap, fmt);
297 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
298 va_end(ap);
300 /* Make sure the buffer was big enough. If not, we have a bug. */
301 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
303 return reply(c, c->reply_buf, r, state);
306 static void
307 reply_job(conn c, job j, const char *word)
309 /* tell this connection which job to send */
310 c->out_job = j;
311 c->out_job_sent = 0;
313 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
314 word, j->r.id, j->r.body_size - 2);
317 conn
318 remove_waiting_conn(conn c)
320 tube t;
321 size_t i;
323 if (!conn_waiting(c)) return NULL;
325 c->type &= ~CONN_TYPE_WAITING;
326 global_stat.waiting_ct--;
327 for (i = 0; i < c->watch.used; i++) {
328 t = c->watch.items[i];
329 t->stat.waiting_ct--;
330 ms_remove(&t->waiting, c);
332 return c;
335 static void
336 reserve_job(conn c, job j)
338 j->r.deadline_at = nanoseconds() + j->r.ttr;
339 global_stat.reserved_ct++; /* stats */
340 j->tube->stat.reserved_ct++;
341 j->r.reserve_ct++;
342 j->r.state = Reserved;
343 job_insert(&c->reserved_jobs, j);
344 j->reserver = c;
345 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) {
346 c->soonest_job = j;
348 return reply_job(c, j, MSG_RESERVED);
351 static job
352 next_eligible_job(int64 now)
354 tube t;
355 size_t i;
356 job j = NULL, candidate;
358 dbgprintf("tubes.used = %zu\n", tubes.used);
359 for (i = 0; i < tubes.used; i++) {
360 t = tubes.items[i];
361 dbgprintf("for %s t->waiting.used=%zu t->ready.len=%d t->pause=%" PRIu64 "\n",
362 t->name, t->waiting.used, t->ready.len, t->pause);
363 if (t->pause) {
364 if (t->deadline_at > now) continue;
365 t->pause = 0;
367 if (t->waiting.used && t->ready.len) {
368 candidate = t->ready.data[0];
369 if (!j || job_pri_less(candidate, j)) {
370 j = candidate;
373 dbgprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
376 return j;
379 static void
380 process_queue()
382 job j;
383 int64 now = nanoseconds();
385 dbgprintf("processing queue\n");
386 while ((j = next_eligible_job(now))) {
387 dbgprintf("got eligible job %llu in %s\n", j->r.id, j->tube->name);
388 heapremove(&j->tube->ready, j->heap_index);
389 ready_ct--;
390 if (j->r.pri < URGENT_THRESHOLD) {
391 global_stat.urgent_ct--;
392 j->tube->stat.urgent_ct--;
394 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
398 static job
399 delay_q_peek()
401 int i;
402 tube t;
403 job j = NULL, nj;
405 for (i = 0; i < tubes.used; i++) {
406 t = tubes.items[i];
407 if (t->delay.len == 0) {
408 continue;
410 nj = t->delay.data[0];
411 if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj;
414 return j;
417 static int
418 enqueue_job(Srv *s, job j, int64 delay, char update_store)
420 int r;
422 j->reserver = NULL;
423 if (delay) {
424 j->r.deadline_at = nanoseconds() + delay;
425 r = heapinsert(&j->tube->delay, j);
426 if (!r) return 0;
427 j->r.state = Delayed;
428 } else {
429 r = heapinsert(&j->tube->ready, j);
430 if (!r) return 0;
431 j->r.state = Ready;
432 ready_ct++;
433 if (j->r.pri < URGENT_THRESHOLD) {
434 global_stat.urgent_ct++;
435 j->tube->stat.urgent_ct++;
439 if (update_store) {
440 if (!walwrite(&s->wal, j)) {
441 return 0;
443 walmaint(&s->wal);
446 process_queue();
447 return 1;
450 static int
451 bury_job(Srv *s, job j, char update_store)
453 int z;
455 if (update_store) {
456 z = walresvupdate(&s->wal, j);
457 if (!z) return 0;
458 j->walresv += z;
461 job_insert(&j->tube->buried, j);
462 global_stat.buried_ct++;
463 j->tube->stat.buried_ct++;
464 j->r.state = Buried;
465 j->reserver = NULL;
466 j->r.bury_ct++;
468 if (update_store) {
469 if (!walwrite(&s->wal, j)) {
470 return 0;
472 walmaint(&s->wal);
475 return 1;
478 void
479 enqueue_reserved_jobs(conn c)
481 int r;
482 job j;
484 while (job_list_any_p(&c->reserved_jobs)) {
485 j = job_remove(c->reserved_jobs.next);
486 r = enqueue_job(c->srv, j, 0, 0);
487 if (r < 1) bury_job(c->srv, j, 0);
488 global_stat.reserved_ct--;
489 j->tube->stat.reserved_ct--;
490 c->soonest_job = NULL;
491 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
495 static job
496 delay_q_take()
498 job j = delay_q_peek();
499 if (!j) {
500 return 0;
502 heapremove(&j->tube->delay, j->heap_index);
503 return j;
506 static int
507 kick_buried_job(Srv *s, tube t)
509 int r;
510 job j;
511 int z;
513 if (!buried_job_p(t)) return 0;
514 j = remove_buried_job(t->buried.next);
516 z = walresvupdate(&s->wal, j);
517 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
518 j->walresv += z;
520 j->r.kick_ct++;
521 r = enqueue_job(s, j, 0, 1);
522 if (r == 1) return 1;
524 /* ready queue is full, so bury it */
525 bury_job(s, j, 0);
526 return 0;
529 static uint
530 get_delayed_job_ct()
532 tube t;
533 size_t i;
534 uint count = 0;
536 for (i = 0; i < tubes.used; i++) {
537 t = tubes.items[i];
538 count += t->delay.len;
540 return count;
543 static int
544 kick_delayed_job(Srv *s, tube t)
546 int r;
547 job j;
548 int z;
550 if (t->delay.len == 0) {
551 return 0;
554 j = heapremove(&t->delay, 0);
556 z = walresvupdate(&s->wal, j);
557 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
558 j->walresv += z;
560 j->r.kick_ct++;
561 r = enqueue_job(s, j, 0, 1);
562 if (r == 1) return 1;
564 /* ready queue is full, so delay it again */
565 r = enqueue_job(s, j, j->r.delay, 0);
566 if (r == 1) return 0;
568 /* last resort */
569 bury_job(s, j, 0);
570 return 0;
573 /* return the number of jobs successfully kicked */
574 static uint
575 kick_buried_jobs(Srv *s, tube t, uint n)
577 uint i;
578 for (i = 0; (i < n) && kick_buried_job(s, t); ++i);
579 return i;
582 /* return the number of jobs successfully kicked */
583 static uint
584 kick_delayed_jobs(Srv *s, tube t, uint n)
586 uint i;
587 for (i = 0; (i < n) && kick_delayed_job(s, t); ++i);
588 return i;
591 static uint
592 kick_jobs(Srv *s, tube t, uint n)
594 if (buried_job_p(t)) return kick_buried_jobs(s, t, n);
595 return kick_delayed_jobs(s, t, n);
598 static job
599 remove_buried_job(job j)
601 if (!j || j->r.state != Buried) return NULL;
602 j = job_remove(j);
603 if (j) {
604 global_stat.buried_ct--;
605 j->tube->stat.buried_ct--;
607 return j;
610 static job
611 remove_ready_job(job j)
613 if (!j || j->r.state != Ready) return NULL;
614 heapremove(&j->tube->ready, j->heap_index);
615 ready_ct--;
616 if (j->r.pri < URGENT_THRESHOLD) {
617 global_stat.urgent_ct--;
618 j->tube->stat.urgent_ct--;
620 return j;
623 static void
624 enqueue_waiting_conn(conn c)
626 tube t;
627 size_t i;
629 global_stat.waiting_ct++;
630 c->type |= CONN_TYPE_WAITING;
631 for (i = 0; i < c->watch.used; i++) {
632 t = c->watch.items[i];
633 t->stat.waiting_ct++;
634 ms_append(&t->waiting, c);
638 static job
639 find_reserved_job_in_conn(conn c, job j)
641 return (j && j->reserver == c && j->r.state == Reserved) ? j : NULL;
644 static job
645 touch_job(conn c, job j)
647 j = find_reserved_job_in_conn(c, j);
648 if (j) {
649 j->r.deadline_at = nanoseconds() + j->r.ttr;
650 c->soonest_job = NULL;
652 return j;
655 static job
656 peek_job(uint64 id)
658 return job_find(id);
661 static void
662 check_err(conn c, const char *s)
664 if (errno == EAGAIN) return;
665 if (errno == EINTR) return;
666 if (errno == EWOULDBLOCK) return;
668 twarn("%s", s);
669 conn_close(c);
670 return;
673 /* Scan the given string for the sequence "\r\n" and return the line length.
674 * Always returns at least 2 if a match is found. Returns 0 if no match. */
675 static int
676 scan_line_end(const char *s, int size)
678 char *match;
680 match = memchr(s, '\r', size - 1);
681 if (!match) return 0;
683 /* this is safe because we only scan size - 1 chars above */
684 if (match[1] == '\n') return match - s + 2;
686 return 0;
689 static int
690 cmd_len(conn c)
692 return scan_line_end(c->cmd, c->cmd_read);
695 /* parse the command line */
696 static int
697 which_cmd(conn c)
699 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
700 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
701 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
702 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
703 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
704 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
705 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
706 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
707 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
708 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
709 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
710 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
711 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
712 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
713 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
714 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
715 TEST_CMD(c->cmd, CMD_USE, OP_USE);
716 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
717 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
718 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
719 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
720 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
721 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
722 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
723 return OP_UNKNOWN;
726 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
727 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
728 * This function is idempotent(). */
729 static void
730 fill_extra_data(conn c)
732 int extra_bytes, job_data_bytes = 0, cmd_bytes;
734 if (!c->sock.fd) return; /* the connection was closed */
735 if (!c->cmd_len) return; /* we don't have a complete command */
737 /* how many extra bytes did we read? */
738 extra_bytes = c->cmd_read - c->cmd_len;
740 /* how many bytes should we put into the job body? */
741 if (c->in_job) {
742 job_data_bytes = min(extra_bytes, c->in_job->r.body_size);
743 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
744 c->in_job_read = job_data_bytes;
745 } else if (c->in_job_read) {
746 /* we are in bit-bucket mode, throwing away data */
747 job_data_bytes = min(extra_bytes, c->in_job_read);
748 c->in_job_read -= job_data_bytes;
751 /* how many bytes are left to go into the future cmd? */
752 cmd_bytes = extra_bytes - job_data_bytes;
753 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
754 c->cmd_read = cmd_bytes;
755 c->cmd_len = 0; /* we no longer know the length of the new command */
758 static void
759 _skip(conn c, int n, const char *line, int len)
761 /* Invert the meaning of in_job_read while throwing away data -- it
762 * counts the bytes that remain to be thrown away. */
763 c->in_job = 0;
764 c->in_job_read = n;
765 fill_extra_data(c);
767 if (c->in_job_read == 0) return reply(c, line, len, STATE_SENDWORD);
769 c->reply = line;
770 c->reply_len = len;
771 c->reply_sent = 0;
772 c->state = STATE_BITBUCKET;
773 return;
776 #define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
778 static void
779 enqueue_incoming_job(conn c)
781 int r;
782 job j = c->in_job;
784 c->in_job = NULL; /* the connection no longer owns this job */
785 c->in_job_read = 0;
787 /* check if the trailer is present and correct */
788 if (memcmp(j->body + j->r.body_size - 2, "\r\n", 2)) {
789 job_free(j);
790 return reply_msg(c, MSG_EXPECTED_CRLF);
793 if (drain_mode) {
794 job_free(j);
795 return reply_serr(c, MSG_DRAINING);
798 if (j->walresv) return reply_serr(c, MSG_INTERNAL_ERROR);
799 j->walresv = walresvput(&c->srv->wal, j);
800 if (!j->walresv) return reply_serr(c, MSG_OUT_OF_MEMORY);
802 /* we have a complete job, so let's stick it in the pqueue */
803 r = enqueue_job(c->srv, j, j->r.delay, 1);
804 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
806 op_ct[OP_PUT]++; /* stats */
807 global_stat.total_jobs_ct++;
808 j->tube->stat.total_jobs_ct++;
810 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->r.id);
812 /* out of memory trying to grow the queue, so it gets buried */
813 bury_job(c->srv, j, 0);
814 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
817 static uint
818 uptime()
820 return (nanoseconds() - started_at) / 1000000000;
823 static int
824 fmt_stats(char *buf, size_t size, void *x)
826 int whead = 0, wcur = 0;
827 Srv *srv;
828 struct rusage ru = {{0, 0}, {0, 0}};
830 srv = x;
832 if (srv->wal.head) {
833 whead = srv->wal.head->seq;
836 if (srv->wal.cur) {
837 wcur = srv->wal.cur->seq;
840 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
841 return snprintf(buf, size, STATS_FMT,
842 global_stat.urgent_ct,
843 ready_ct,
844 global_stat.reserved_ct,
845 get_delayed_job_ct(),
846 global_stat.buried_ct,
847 op_ct[OP_PUT],
848 op_ct[OP_PEEKJOB],
849 op_ct[OP_PEEK_READY],
850 op_ct[OP_PEEK_DELAYED],
851 op_ct[OP_PEEK_BURIED],
852 op_ct[OP_RESERVE],
853 op_ct[OP_RESERVE_TIMEOUT],
854 op_ct[OP_DELETE],
855 op_ct[OP_RELEASE],
856 op_ct[OP_USE],
857 op_ct[OP_WATCH],
858 op_ct[OP_IGNORE],
859 op_ct[OP_BURY],
860 op_ct[OP_KICK],
861 op_ct[OP_TOUCH],
862 op_ct[OP_STATS],
863 op_ct[OP_JOBSTATS],
864 op_ct[OP_STATS_TUBE],
865 op_ct[OP_LIST_TUBES],
866 op_ct[OP_LIST_TUBE_USED],
867 op_ct[OP_LIST_TUBES_WATCHED],
868 op_ct[OP_PAUSE_TUBE],
869 timeout_ct,
870 global_stat.total_jobs_ct,
871 job_data_size_limit,
872 tubes.used,
873 count_cur_conns(),
874 count_cur_producers(),
875 count_cur_workers(),
876 global_stat.waiting_ct,
877 count_tot_conns(),
878 (long) getpid(),
879 version,
880 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
881 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
882 uptime(),
883 whead,
884 wcur,
885 srv->wal.nmig,
886 srv->wal.nrec,
887 srv->wal.filesz);
891 /* Read a priority value from the given buffer and place it in pri.
892 * Update end to point to the address after the last character consumed.
893 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
894 * conversion and return the status code but not update any values. This is an
895 * easy way to check for errors.
896 * If end is NULL, read_pri will also check that the entire input string was
897 * consumed and return an error code otherwise.
898 * Return 0 on success, or nonzero on failure.
899 * If a failure occurs, pri and end are not modified. */
900 static int
901 read_pri(uint *pri, const char *buf, char **end)
903 char *tend;
904 uint tpri;
906 errno = 0;
907 while (buf[0] == ' ') buf++;
908 if (!isdigit(buf[0])) return -1;
909 tpri = strtoul(buf, &tend, 10);
910 if (tend == buf) return -1;
911 if (errno && errno != ERANGE) return -1;
912 if (!end && tend[0] != '\0') return -1;
914 if (pri) *pri = tpri;
915 if (end) *end = tend;
916 return 0;
919 /* Read a delay value from the given buffer and place it in delay.
920 * The interface and behavior are analogous to read_pri(). */
921 static int
922 read_delay(int64 *delay, const char *buf, char **end)
924 int r;
925 uint delay_sec;
927 r = read_pri(&delay_sec, buf, end);
928 if (r) return r;
929 *delay = ((int64) delay_sec) * 1000000000;
930 return 0;
933 /* Read a timeout value from the given buffer and place it in ttr.
934 * The interface and behavior are the same as in read_delay(). */
935 static int
936 read_ttr(int64 *ttr, const char *buf, char **end)
938 return read_delay(ttr, buf, end);
941 /* Read a tube name from the given buffer moving the buffer to the name start */
942 static int
943 read_tube_name(char **tubename, char *buf, char **end)
945 size_t len;
947 while (buf[0] == ' ') buf++;
948 len = strspn(buf, NAME_CHARS);
949 if (len == 0) return -1;
950 if (tubename) *tubename = buf;
951 if (end) *end = buf + len;
952 return 0;
955 static void
956 wait_for_job(conn c, int timeout)
958 c->state = STATE_WAIT;
959 enqueue_waiting_conn(c);
961 /* Set the pending timeout to the requested timeout amount */
962 c->pending_timeout = timeout;
964 /* this conn is waiting, but we want to know if they hang up */
965 connwant(c, 'r', &dirty);
968 typedef int(*fmt_fn)(char *, size_t, void *);
970 static void
971 do_stats(conn c, fmt_fn fmt, void *data)
973 int r, stats_len;
975 /* first, measure how big a buffer we will need */
976 stats_len = fmt(NULL, 0, data) + 16;
978 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
979 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
981 /* Mark this job as a copy so it can be appropriately freed later on */
982 c->out_job->r.state = Copy;
984 /* now actually format the stats data */
985 r = fmt(c->out_job->body, stats_len, data);
986 /* and set the actual body size */
987 c->out_job->r.body_size = r;
988 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
990 c->out_job_sent = 0;
991 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
994 static void
995 do_list_tubes(conn c, ms l)
997 char *buf;
998 tube t;
999 size_t i, resp_z;
1001 /* first, measure how big a buffer we will need */
1002 resp_z = 6; /* initial "---\n" and final "\r\n" */
1003 for (i = 0; i < l->used; i++) {
1004 t = l->items[i];
1005 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
1008 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
1009 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1011 /* Mark this job as a copy so it can be appropriately freed later on */
1012 c->out_job->r.state = Copy;
1014 /* now actually format the response */
1015 buf = c->out_job->body;
1016 buf += snprintf(buf, 5, "---\n");
1017 for (i = 0; i < l->used; i++) {
1018 t = l->items[i];
1019 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1021 buf[0] = '\r';
1022 buf[1] = '\n';
1024 c->out_job_sent = 0;
1025 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
1028 static int
1029 fmt_job_stats(char *buf, size_t size, job j)
1031 int64 t;
1032 int64 time_left;
1034 t = nanoseconds();
1035 if (j->r.state == Reserved || j->r.state == Delayed) {
1036 time_left = (j->r.deadline_at - t) / 1000000000;
1037 } else {
1038 time_left = 0;
1040 return snprintf(buf, size, STATS_JOB_FMT,
1041 j->r.id,
1042 j->tube->name,
1043 job_state(j),
1044 j->r.pri,
1045 (t - j->r.created_at) / 1000000000,
1046 j->r.delay / 1000000000,
1047 j->r.ttr / 1000000000,
1048 time_left,
1049 j->r.reserve_ct,
1050 j->r.timeout_ct,
1051 j->r.release_ct,
1052 j->r.bury_ct,
1053 j->r.kick_ct);
1056 static int
1057 fmt_stats_tube(char *buf, size_t size, tube t)
1059 uint64 time_left;
1061 if (t->pause > 0) {
1062 time_left = (t->deadline_at - nanoseconds()) / 1000000000;
1063 } else {
1064 time_left = 0;
1066 return snprintf(buf, size, STATS_TUBE_FMT,
1067 t->name,
1068 t->stat.urgent_ct,
1069 t->ready.len,
1070 t->stat.reserved_ct,
1071 t->delay.len,
1072 t->stat.buried_ct,
1073 t->stat.total_jobs_ct,
1074 t->using_ct,
1075 t->watching_ct,
1076 t->stat.waiting_ct,
1077 t->stat.pause_ct,
1078 t->pause / 1000000000,
1079 time_left);
1082 static void
1083 maybe_enqueue_incoming_job(conn c)
1085 job j = c->in_job;
1087 /* do we have a complete job? */
1088 if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);
1090 /* otherwise we have incomplete data, so just keep waiting */
1091 c->state = STATE_WANTDATA;
1094 /* j can be NULL */
1095 static job
1096 remove_this_reserved_job(conn c, job j)
1098 j = job_remove(j);
1099 if (j) {
1100 global_stat.reserved_ct--;
1101 j->tube->stat.reserved_ct--;
1102 j->reserver = NULL;
1104 c->soonest_job = NULL;
1105 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
1106 return j;
1109 static job
1110 remove_reserved_job(conn c, job j)
1112 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1115 static int
1116 name_is_ok(const char *name, size_t max)
1118 size_t len = strlen(name);
1119 return len > 0 && len <= max &&
1120 strspn(name, NAME_CHARS) == len && name[0] != '-';
1123 void
1124 prot_remove_tube(tube t)
1126 ms_remove(&tubes, t);
1129 static void
1130 dispatch_cmd(conn c)
1132 int r, i, timeout = -1;
1133 int z;
1134 uint count;
1135 job j = 0;
1136 byte type;
1137 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1138 uint pri, body_size;
1139 int64 delay, ttr;
1140 uint64 id;
1141 tube t = NULL;
1143 /* NUL-terminate this string so we can use strtol and friends */
1144 c->cmd[c->cmd_len - 2] = '\0';
1146 /* check for possible maliciousness */
1147 if (strlen(c->cmd) != c->cmd_len - 2) {
1148 return reply_msg(c, MSG_BAD_FORMAT);
1151 type = which_cmd(c);
1152 dbgprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1154 switch (type) {
1155 case OP_PUT:
1156 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1157 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1159 r = read_delay(&delay, delay_buf, &ttr_buf);
1160 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1162 r = read_ttr(&ttr, ttr_buf, &size_buf);
1163 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1165 errno = 0;
1166 body_size = strtoul(size_buf, &end_buf, 10);
1167 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1169 if (body_size > job_data_size_limit) {
1170 /* throw away the job body and respond with JOB_TOO_BIG */
1171 return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
1174 /* don't allow trailing garbage */
1175 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1177 conn_set_producer(c);
1179 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1181 /* OOM? */
1182 if (!c->in_job) {
1183 /* throw away the job body and respond with OUT_OF_MEMORY */
1184 twarnx("server error: " MSG_OUT_OF_MEMORY);
1185 return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
1188 fill_extra_data(c);
1190 /* it's possible we already have a complete job */
1191 maybe_enqueue_incoming_job(c);
1193 break;
1194 case OP_PEEK_READY:
1195 /* don't allow trailing garbage */
1196 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1197 return reply_msg(c, MSG_BAD_FORMAT);
1199 op_ct[type]++;
1201 if (c->use->ready.len) {
1202 j = job_copy(c->use->ready.data[0]);
1205 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1207 reply_job(c, j, MSG_FOUND);
1208 break;
1209 case OP_PEEK_DELAYED:
1210 /* don't allow trailing garbage */
1211 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1212 return reply_msg(c, MSG_BAD_FORMAT);
1214 op_ct[type]++;
1216 if (c->use->delay.len) {
1217 j = job_copy(c->use->delay.data[0]);
1220 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1222 reply_job(c, j, MSG_FOUND);
1223 break;
1224 case OP_PEEK_BURIED:
1225 /* don't allow trailing garbage */
1226 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1227 return reply_msg(c, MSG_BAD_FORMAT);
1229 op_ct[type]++;
1231 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1233 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1235 reply_job(c, j, MSG_FOUND);
1236 break;
1237 case OP_PEEKJOB:
1238 errno = 0;
1239 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1240 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1241 op_ct[type]++;
1243 /* So, peek is annoying, because some other connection might free the
1244 * job while we are still trying to write it out. So we copy it and
1245 * then free the copy when it's done sending. */
1246 j = job_copy(peek_job(id));
1248 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1250 reply_job(c, j, MSG_FOUND);
1251 break;
1252 case OP_RESERVE_TIMEOUT:
1253 errno = 0;
1254 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1255 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1256 case OP_RESERVE: /* FALLTHROUGH */
1257 /* don't allow trailing garbage */
1258 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1259 return reply_msg(c, MSG_BAD_FORMAT);
1262 op_ct[type]++;
1263 conn_set_worker(c);
1265 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1266 return reply_msg(c, MSG_DEADLINE_SOON);
1269 /* try to get a new job for this guy */
1270 wait_for_job(c, timeout);
1271 process_queue();
1272 break;
1273 case OP_DELETE:
1274 errno = 0;
1275 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1276 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1277 op_ct[type]++;
1279 j = job_find(id);
1280 j = remove_reserved_job(c, j) ? :
1281 remove_ready_job(j) ? :
1282 remove_buried_job(j);
1284 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1286 j->r.state = Invalid;
1287 r = walwrite(&c->srv->wal, j);
1288 walmaint(&c->srv->wal);
1289 job_free(j);
1291 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1293 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1294 break;
1295 case OP_RELEASE:
1296 errno = 0;
1297 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1298 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1300 r = read_pri(&pri, pri_buf, &delay_buf);
1301 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1303 r = read_delay(&delay, delay_buf, NULL);
1304 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1305 op_ct[type]++;
1307 j = remove_reserved_job(c, job_find(id));
1309 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1311 /* We want to update the delay deadline on disk, so reserve space for
1312 * that. */
1313 if (delay) {
1314 z = walresvupdate(&c->srv->wal, j);
1315 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1316 j->walresv += z;
1319 j->r.pri = pri;
1320 j->r.delay = delay;
1321 j->r.release_ct++;
1323 r = enqueue_job(c->srv, j, delay, !!delay);
1324 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1325 if (r == 1) {
1326 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1329 /* out of memory trying to grow the queue, so it gets buried */
1330 bury_job(c->srv, j, 0);
1331 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1332 break;
1333 case OP_BURY:
1334 errno = 0;
1335 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1336 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1338 r = read_pri(&pri, pri_buf, NULL);
1339 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1340 op_ct[type]++;
1342 j = remove_reserved_job(c, job_find(id));
1344 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1346 j->r.pri = pri;
1347 r = bury_job(c->srv, j, 1);
1348 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1349 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1350 break;
1351 case OP_KICK:
1352 errno = 0;
1353 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1354 if (end_buf == c->cmd + CMD_KICK_LEN) {
1355 return reply_msg(c, MSG_BAD_FORMAT);
1357 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1359 op_ct[type]++;
1361 i = kick_jobs(c->srv, c->use, count);
1363 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1364 case OP_TOUCH:
1365 errno = 0;
1366 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1367 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1369 op_ct[type]++;
1371 j = touch_job(c, job_find(id));
1373 if (j) {
1374 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1375 } else {
1376 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1378 break;
1379 case OP_STATS:
1380 /* don't allow trailing garbage */
1381 if (c->cmd_len != CMD_STATS_LEN + 2) {
1382 return reply_msg(c, MSG_BAD_FORMAT);
1385 op_ct[type]++;
1387 do_stats(c, fmt_stats, c->srv);
1388 break;
1389 case OP_JOBSTATS:
1390 errno = 0;
1391 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1392 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1394 op_ct[type]++;
1396 j = peek_job(id);
1397 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1399 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1400 do_stats(c, (fmt_fn) fmt_job_stats, j);
1401 break;
1402 case OP_STATS_TUBE:
1403 name = c->cmd + CMD_STATS_TUBE_LEN;
1404 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1406 op_ct[type]++;
1408 t = tube_find(name);
1409 if (!t) return reply_msg(c, MSG_NOTFOUND);
1411 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1412 t = NULL;
1413 break;
1414 case OP_LIST_TUBES:
1415 /* don't allow trailing garbage */
1416 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1417 return reply_msg(c, MSG_BAD_FORMAT);
1420 op_ct[type]++;
1421 do_list_tubes(c, &tubes);
1422 break;
1423 case OP_LIST_TUBE_USED:
1424 /* don't allow trailing garbage */
1425 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1426 return reply_msg(c, MSG_BAD_FORMAT);
1429 op_ct[type]++;
1430 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1431 break;
1432 case OP_LIST_TUBES_WATCHED:
1433 /* don't allow trailing garbage */
1434 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1435 return reply_msg(c, MSG_BAD_FORMAT);
1438 op_ct[type]++;
1439 do_list_tubes(c, &c->watch);
1440 break;
1441 case OP_USE:
1442 name = c->cmd + CMD_USE_LEN;
1443 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1444 op_ct[type]++;
1446 TUBE_ASSIGN(t, tube_find_or_make(name));
1447 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1449 c->use->using_ct--;
1450 TUBE_ASSIGN(c->use, t);
1451 TUBE_ASSIGN(t, NULL);
1452 c->use->using_ct++;
1454 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1455 break;
1456 case OP_WATCH:
1457 name = c->cmd + CMD_WATCH_LEN;
1458 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1459 op_ct[type]++;
1461 TUBE_ASSIGN(t, tube_find_or_make(name));
1462 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1464 r = 1;
1465 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1466 TUBE_ASSIGN(t, NULL);
1467 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1469 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1470 break;
1471 case OP_IGNORE:
1472 name = c->cmd + CMD_IGNORE_LEN;
1473 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1474 op_ct[type]++;
1476 t = NULL;
1477 for (i = 0; i < c->watch.used; i++) {
1478 t = c->watch.items[i];
1479 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1480 t = NULL;
1483 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1485 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1486 t = NULL;
1488 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1489 break;
1490 case OP_QUIT:
1491 conn_close(c);
1492 break;
1493 case OP_PAUSE_TUBE:
1494 op_ct[type]++;
1496 r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
1497 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1499 r = read_delay(&delay, delay_buf, NULL);
1500 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1502 *delay_buf = '\0';
1503 t = tube_find(name);
1504 if (!t) return reply_msg(c, MSG_NOTFOUND);
1506 t->deadline_at = nanoseconds() + delay;
1507 t->pause = delay;
1508 t->stat.pause_ct++;
1510 reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
1511 break;
1512 default:
1513 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1517 /* There are three reasons this function may be called. We need to check for
1518 * all of them.
1520 * 1. A reserved job has run out of time.
1521 * 2. A waiting client's reserved job has entered the safety margin.
1522 * 3. A waiting client's requested timeout has occurred.
1524 * If any of these happen, we must do the appropriate thing. */
1525 static void
1526 conn_timeout(conn c)
1528 int r, should_timeout = 0;
1529 job j;
1531 /* Check if the client was trying to reserve a job. */
1532 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1534 /* Check if any reserved jobs have run out of time. We should do this
1535 * whether or not the client is waiting for a new reservation. */
1536 while ((j = soonest_job(c))) {
1537 if (j->r.deadline_at >= nanoseconds()) break;
1539 /* This job is in the middle of being written out. If we return it to
1540 * the ready queue, someone might free it before we finish writing it
1541 * out to the socket. So we'll copy it here and free the copy when it's
1542 * done sending. */
1543 if (j == c->out_job) {
1544 c->out_job = job_copy(c->out_job);
1547 timeout_ct++; /* stats */
1548 j->r.timeout_ct++;
1549 r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0);
1550 if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */
1551 connsched(c);
1554 if (should_timeout) {
1555 dbgprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1556 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1557 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1558 dbgprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1559 c->pending_timeout = -1;
1560 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1564 void
1565 enter_drain_mode(int sig)
1567 drain_mode = 1;
1570 static void
1571 do_cmd(conn c)
1573 dispatch_cmd(c);
1574 fill_extra_data(c);
1577 static void
1578 reset_conn(conn c)
1580 connwant(c, 'r', &dirty);
1582 /* was this a peek or stats command? */
1583 if (c->out_job && c->out_job->r.state == Copy) job_free(c->out_job);
1584 c->out_job = NULL;
1586 c->reply_sent = 0; /* now that we're done, reset this */
1587 c->state = STATE_WANTCOMMAND;
1590 static void
1591 conn_data(conn c)
1593 int r, to_read;
1594 job j;
1595 struct iovec iov[2];
1597 switch (c->state) {
1598 case STATE_WANTCOMMAND:
1599 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1600 if (r == -1) return check_err(c, "read()");
1601 if (r == 0) return conn_close(c); /* the client hung up */
1603 c->cmd_read += r; /* we got some bytes */
1605 c->cmd_len = cmd_len(c); /* find the EOL */
1607 /* yay, complete command line */
1608 if (c->cmd_len) return do_cmd(c);
1610 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1612 /* command line too long? */
1613 if (c->cmd_read == LINE_BUF_SIZE) {
1614 c->cmd_read = 0; /* discard the input so far */
1615 return reply_msg(c, MSG_BAD_FORMAT);
1618 /* otherwise we have an incomplete line, so just keep waiting */
1619 break;
1620 case STATE_BITBUCKET:
1621 /* Invert the meaning of in_job_read while throwing away data -- it
1622 * counts the bytes that remain to be thrown away. */
1623 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1624 r = read(c->sock.fd, bucket, to_read);
1625 if (r == -1) return check_err(c, "read()");
1626 if (r == 0) return conn_close(c); /* the client hung up */
1628 c->in_job_read -= r; /* we got some bytes */
1630 /* (c->in_job_read < 0) can't happen */
1632 if (c->in_job_read == 0) {
1633 return reply(c, c->reply, c->reply_len, STATE_SENDWORD);
1635 break;
1636 case STATE_WANTDATA:
1637 j = c->in_job;
1639 r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
1640 if (r == -1) return check_err(c, "read()");
1641 if (r == 0) return conn_close(c); /* the client hung up */
1643 c->in_job_read += r; /* we got some bytes */
1645 /* (j->in_job_read > j->r.body_size) can't happen */
1647 maybe_enqueue_incoming_job(c);
1648 break;
1649 case STATE_SENDWORD:
1650 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1651 if (r == -1) return check_err(c, "write()");
1652 if (r == 0) return conn_close(c); /* the client hung up */
1654 c->reply_sent += r; /* we got some bytes */
1656 /* (c->reply_sent > c->reply_len) can't happen */
1658 if (c->reply_sent == c->reply_len) return reset_conn(c);
1660 /* otherwise we sent an incomplete reply, so just keep waiting */
1661 break;
1662 case STATE_SENDJOB:
1663 j = c->out_job;
1665 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1666 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1667 iov[1].iov_base = j->body + c->out_job_sent;
1668 iov[1].iov_len = j->r.body_size - c->out_job_sent;
1670 r = writev(c->sock.fd, iov, 2);
1671 if (r == -1) return check_err(c, "writev()");
1672 if (r == 0) return conn_close(c); /* the client hung up */
1674 /* update the sent values */
1675 c->reply_sent += r;
1676 if (c->reply_sent >= c->reply_len) {
1677 c->out_job_sent += c->reply_sent - c->reply_len;
1678 c->reply_sent = c->reply_len;
1681 /* (c->out_job_sent > j->r.body_size) can't happen */
1683 /* are we done? */
1684 if (c->out_job_sent == j->r.body_size) return reset_conn(c);
1686 /* otherwise we sent incomplete data, so just keep waiting */
1687 break;
1688 case STATE_WAIT: /* keep an eye out in case they hang up */
1689 /* but don't hang up just because our buffer is full */
1690 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1692 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1693 if (r == -1) return check_err(c, "read()");
1694 if (r == 0) return conn_close(c); /* the client hung up */
1695 c->cmd_read += r; /* we got some bytes */
1699 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
1700 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1702 static void
1703 update_conns()
1705 int r;
1706 conn c;
1708 while ((c = conn_remove(dirty.next))) { /* assignment */
1709 r = sockwant(&c->sock, c->rw);
1710 if (r == -1) {
1711 twarn("sockwant");
1712 conn_close(c);
1717 static void
1718 h_conn(const int fd, const short which, conn c)
1720 if (fd != c->sock.fd) {
1721 twarnx("Argh! event fd doesn't match conn fd.");
1722 close(fd);
1723 conn_close(c);
1724 update_conns();
1725 return;
1728 conn_data(c);
1729 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1730 update_conns();
1733 static void
1734 prothandle(conn c, int ev)
1736 h_conn(c->sock.fd, ev, c);
1739 void
1740 prottick(Srv *s)
1742 int r;
1743 job j;
1744 int64 now;
1745 int i;
1746 tube t;
1748 now = nanoseconds();
1749 while ((j = delay_q_peek())) {
1750 if (j->r.deadline_at > now) break;
1751 j = delay_q_take();
1752 r = enqueue_job(s, j, 0, 0);
1753 if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */
1756 for (i = 0; i < tubes.used; i++) {
1757 t = tubes.items[i];
1759 dbgprintf("delay for %s t->waiting.used=%zu t->ready.len=%d t->pause=%" PRIu64 "\n",
1760 t->name, t->waiting.used, t->ready.len, t->pause);
1761 if (t->pause && t->deadline_at <= now) {
1762 t->pause = 0;
1763 process_queue();
1767 while (s->conns.len) {
1768 conn c = s->conns.data[0];
1769 if (c->tickat > now) {
1770 break;
1773 heapremove(&s->conns, 0);
1774 conn_timeout(c);
1777 update_conns();
1780 void
1781 h_accept(const int fd, const short which, Srv *s)
1783 conn c;
1784 int cfd, flags, r;
1785 socklen_t addrlen;
1786 struct sockaddr_in6 addr;
1788 addrlen = sizeof addr;
1789 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
1790 if (cfd == -1) {
1791 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1792 update_conns();
1793 return;
1796 flags = fcntl(cfd, F_GETFL, 0);
1797 if (flags < 0) {
1798 twarn("getting flags");
1799 close(cfd);
1800 update_conns();
1801 return;
1804 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1805 if (r < 0) {
1806 twarn("setting O_NONBLOCK");
1807 close(cfd);
1808 update_conns();
1809 return;
1812 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1813 if (!c) {
1814 twarnx("make_conn() failed");
1815 close(cfd);
1816 update_conns();
1817 return;
1819 c->srv = s;
1820 c->sock.x = c;
1821 c->sock.f = (Handle)prothandle;
1822 c->sock.fd = cfd;
1824 dbgprintf("accepted conn, fd=%d\n", cfd);
1825 r = sockwant(&c->sock, 'r');
1826 if (r == -1) {
1827 twarn("sockwant");
1828 close(cfd);
1829 update_conns();
1830 return;
1832 update_conns();
1835 void
1836 prot_init()
1838 started_at = nanoseconds();
1839 memset(op_ct, 0, sizeof(op_ct));
1841 ms_init(&tubes, NULL, NULL);
1843 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1844 if (!default_tube) twarnx("Out of memory during startup!");
1847 void
1848 prot_replay(Srv *s, job list)
1850 job j, nj;
1851 int64 t, delay;
1852 int r;
1854 for (j = list->next ; j != list ; j = nj) {
1855 nj = j->next;
1856 job_remove(j);
1857 walresvupdate(&s->wal, j); /* reserve space for a delete */
1858 delay = 0;
1859 switch (j->r.state) {
1860 case Buried:
1861 bury_job(s, j, 0);
1862 break;
1863 case Delayed:
1864 t = nanoseconds();
1865 if (t < j->r.deadline_at) {
1866 delay = j->r.deadline_at - t;
1868 /* fall through */
1869 default:
1870 r = enqueue_job(s, j, delay, 0);
1871 if (r < 1) twarnx("error recovering job %llu", j->r.id);