update README and man page
[beanstalkd.git] / prot.c
blob573a95c56f728c16a8ccb01e134ddd2cd6f55181
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <fcntl.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <sys/resource.h>
9 #include <sys/uio.h>
10 #include <sys/types.h>
11 #include <sys/socket.h>
12 #include <netinet/in.h>
13 #include <ctype.h>
14 #include <inttypes.h>
15 #include <stdarg.h>
16 #include "dat.h"
18 /* job body cannot be greater than this many bytes long */
19 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
21 #define NAME_CHARS \
22 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
23 "abcdefghijklmnopqrstuvwxyz" \
24 "0123456789-+/;.$_()"
26 #define CMD_PUT "put "
27 #define CMD_PEEKJOB "peek "
28 #define CMD_PEEK_READY "peek-ready"
29 #define CMD_PEEK_DELAYED "peek-delayed"
30 #define CMD_PEEK_BURIED "peek-buried"
31 #define CMD_RESERVE "reserve"
32 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
33 #define CMD_DELETE "delete "
34 #define CMD_RELEASE "release "
35 #define CMD_BURY "bury "
36 #define CMD_KICK "kick "
37 #define CMD_TOUCH "touch "
38 #define CMD_STATS "stats"
39 #define CMD_JOBSTATS "stats-job "
40 #define CMD_USE "use "
41 #define CMD_WATCH "watch "
42 #define CMD_IGNORE "ignore "
43 #define CMD_LIST_TUBES "list-tubes"
44 #define CMD_LIST_TUBE_USED "list-tube-used"
45 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
46 #define CMD_STATS_TUBE "stats-tube "
47 #define CMD_QUIT "quit"
48 #define CMD_PAUSE_TUBE "pause-tube"
50 #define CONSTSTRLEN(m) (sizeof(m) - 1)
52 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
53 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
54 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
55 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
56 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
57 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
58 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
59 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
60 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
61 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
62 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
63 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
64 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
65 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
66 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
67 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
68 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
69 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
70 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
71 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
72 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
74 #define MSG_FOUND "FOUND"
75 #define MSG_NOTFOUND "NOT_FOUND\r\n"
76 #define MSG_RESERVED "RESERVED"
77 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
78 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
79 #define MSG_DELETED "DELETED\r\n"
80 #define MSG_RELEASED "RELEASED\r\n"
81 #define MSG_BURIED "BURIED\r\n"
82 #define MSG_TOUCHED "TOUCHED\r\n"
83 #define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
84 #define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
85 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
87 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
88 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
89 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
90 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
91 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
92 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
94 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
95 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
96 #define MSG_DRAINING "DRAINING\r\n"
97 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
98 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
99 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
100 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
102 #define STATE_WANTCOMMAND 0
103 #define STATE_WANTDATA 1
104 #define STATE_SENDJOB 2
105 #define STATE_SENDWORD 3
106 #define STATE_WAIT 4
107 #define STATE_BITBUCKET 5
109 #define OP_UNKNOWN 0
110 #define OP_PUT 1
111 #define OP_PEEKJOB 2
112 #define OP_RESERVE 3
113 #define OP_DELETE 4
114 #define OP_RELEASE 5
115 #define OP_BURY 6
116 #define OP_KICK 7
117 #define OP_STATS 8
118 #define OP_JOBSTATS 9
119 #define OP_PEEK_BURIED 10
120 #define OP_USE 11
121 #define OP_WATCH 12
122 #define OP_IGNORE 13
123 #define OP_LIST_TUBES 14
124 #define OP_LIST_TUBE_USED 15
125 #define OP_LIST_TUBES_WATCHED 16
126 #define OP_STATS_TUBE 17
127 #define OP_PEEK_READY 18
128 #define OP_PEEK_DELAYED 19
129 #define OP_RESERVE_TIMEOUT 20
130 #define OP_TOUCH 21
131 #define OP_QUIT 22
132 #define OP_PAUSE_TUBE 23
133 #define TOTAL_OPS 24
135 #define STATS_FMT "---\n" \
136 "current-jobs-urgent: %u\n" \
137 "current-jobs-ready: %u\n" \
138 "current-jobs-reserved: %u\n" \
139 "current-jobs-delayed: %u\n" \
140 "current-jobs-buried: %u\n" \
141 "cmd-put: %" PRIu64 "\n" \
142 "cmd-peek: %" PRIu64 "\n" \
143 "cmd-peek-ready: %" PRIu64 "\n" \
144 "cmd-peek-delayed: %" PRIu64 "\n" \
145 "cmd-peek-buried: %" PRIu64 "\n" \
146 "cmd-reserve: %" PRIu64 "\n" \
147 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
148 "cmd-delete: %" PRIu64 "\n" \
149 "cmd-release: %" PRIu64 "\n" \
150 "cmd-use: %" PRIu64 "\n" \
151 "cmd-watch: %" PRIu64 "\n" \
152 "cmd-ignore: %" PRIu64 "\n" \
153 "cmd-bury: %" PRIu64 "\n" \
154 "cmd-kick: %" PRIu64 "\n" \
155 "cmd-touch: %" PRIu64 "\n" \
156 "cmd-stats: %" PRIu64 "\n" \
157 "cmd-stats-job: %" PRIu64 "\n" \
158 "cmd-stats-tube: %" PRIu64 "\n" \
159 "cmd-list-tubes: %" PRIu64 "\n" \
160 "cmd-list-tube-used: %" PRIu64 "\n" \
161 "cmd-list-tubes-watched: %" PRIu64 "\n" \
162 "cmd-pause-tube: %" PRIu64 "\n" \
163 "job-timeouts: %" PRIu64 "\n" \
164 "total-jobs: %" PRIu64 "\n" \
165 "max-job-size: %zu\n" \
166 "current-tubes: %zu\n" \
167 "current-connections: %u\n" \
168 "current-producers: %u\n" \
169 "current-workers: %u\n" \
170 "current-waiting: %u\n" \
171 "total-connections: %u\n" \
172 "pid: %ld\n" \
173 "version: %s\n" \
174 "rusage-utime: %d.%06d\n" \
175 "rusage-stime: %d.%06d\n" \
176 "uptime: %u\n" \
177 "binlog-oldest-index: %d\n" \
178 "binlog-current-index: %d\n" \
179 "binlog-records-migrated: %" PRId64 "\n" \
180 "binlog-records-written: %" PRId64 "\n" \
181 "binlog-max-size: %d\n" \
182 "\r\n"
184 #define STATS_TUBE_FMT "---\n" \
185 "name: %s\n" \
186 "current-jobs-urgent: %u\n" \
187 "current-jobs-ready: %u\n" \
188 "current-jobs-reserved: %u\n" \
189 "current-jobs-delayed: %u\n" \
190 "current-jobs-buried: %u\n" \
191 "total-jobs: %" PRIu64 "\n" \
192 "current-using: %u\n" \
193 "current-watching: %u\n" \
194 "current-waiting: %u\n" \
195 "cmd-delete: %" PRIu64 "\n" \
196 "cmd-pause-tube: %u\n" \
197 "pause: %" PRIu64 "\n" \
198 "pause-time-left: %" PRId64 "\n" \
199 "\r\n"
201 #define STATS_JOB_FMT "---\n" \
202 "id: %" PRIu64 "\n" \
203 "tube: %s\n" \
204 "state: %s\n" \
205 "pri: %u\n" \
206 "age: %" PRId64 "\n" \
207 "delay: %" PRId64 "\n" \
208 "ttr: %" PRId64 "\n" \
209 "time-left: %" PRId64 "\n" \
210 "file: %d\n" \
211 "reserves: %u\n" \
212 "timeouts: %u\n" \
213 "releases: %u\n" \
214 "buries: %u\n" \
215 "kicks: %u\n" \
216 "\r\n"
218 /* this number is pretty arbitrary */
219 #define BUCKET_BUF_SIZE 1024
221 static char bucket[BUCKET_BUF_SIZE];
223 static uint ready_ct = 0;
224 static struct stats global_stat = {0, 0, 0, 0, 0};
226 static tube default_tube;
228 static int drain_mode = 0;
229 static int64 started_at;
230 static uint64 op_ct[TOTAL_OPS], timeout_ct = 0;
232 static Conn *dirty;
234 static const char * op_names[] = {
235 "<unknown>",
236 CMD_PUT,
237 CMD_PEEKJOB,
238 CMD_RESERVE,
239 CMD_DELETE,
240 CMD_RELEASE,
241 CMD_BURY,
242 CMD_KICK,
243 CMD_STATS,
244 CMD_JOBSTATS,
245 CMD_PEEK_BURIED,
246 CMD_USE,
247 CMD_WATCH,
248 CMD_IGNORE,
249 CMD_LIST_TUBES,
250 CMD_LIST_TUBE_USED,
251 CMD_LIST_TUBES_WATCHED,
252 CMD_STATS_TUBE,
253 CMD_PEEK_READY,
254 CMD_PEEK_DELAYED,
255 CMD_RESERVE_TIMEOUT,
256 CMD_TOUCH,
257 CMD_QUIT,
258 CMD_PAUSE_TUBE
261 static job remove_buried_job(job j);
263 static int
264 buried_job_p(tube t)
266 return job_list_any_p(&t->buried);
269 static void
270 reply(Conn *c, char *line, int len, int state)
272 if (!c) return;
274 connwant(c, 'w');
275 c->next = dirty;
276 dirty = c;
277 c->reply = line;
278 c->reply_len = len;
279 c->reply_sent = 0;
280 c->state = state;
281 if (verbose >= 2) {
282 printf(">%d reply %.*s\n", c->sock.fd, len-2, line);
286 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
288 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
289 reply_msg((c),(e)))
291 static void
292 reply_line(Conn *c, int state, const char *fmt, ...)
294 int r;
295 va_list ap;
297 va_start(ap, fmt);
298 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
299 va_end(ap);
301 /* Make sure the buffer was big enough. If not, we have a bug. */
302 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
304 return reply(c, c->reply_buf, r, state);
307 static void
308 reply_job(Conn *c, job j, const char *word)
310 /* tell this connection which job to send */
311 c->out_job = j;
312 c->out_job_sent = 0;
314 return reply_line(c, STATE_SENDJOB, "%s %"PRIu64" %u\r\n",
315 word, j->r.id, j->r.body_size - 2);
318 Conn *
319 remove_waiting_conn(Conn *c)
321 tube t;
322 size_t i;
324 if (!conn_waiting(c)) return NULL;
326 c->type &= ~CONN_TYPE_WAITING;
327 global_stat.waiting_ct--;
328 for (i = 0; i < c->watch.used; i++) {
329 t = c->watch.items[i];
330 t->stat.waiting_ct--;
331 ms_remove(&t->waiting, c);
333 return c;
336 static void
337 reserve_job(Conn *c, job j)
339 j->r.deadline_at = nanoseconds() + j->r.ttr;
340 global_stat.reserved_ct++; /* stats */
341 j->tube->stat.reserved_ct++;
342 j->r.reserve_ct++;
343 j->r.state = Reserved;
344 job_insert(&c->reserved_jobs, j);
345 j->reserver = c;
346 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) {
347 c->soonest_job = j;
349 return reply_job(c, j, MSG_RESERVED);
352 static job
353 next_eligible_job(int64 now)
355 tube t;
356 size_t i;
357 job j = NULL, candidate;
359 for (i = 0; i < tubes.used; i++) {
360 t = tubes.items[i];
361 if (t->pause) {
362 if (t->deadline_at > now) continue;
363 t->pause = 0;
365 if (t->waiting.used && t->ready.len) {
366 candidate = t->ready.data[0];
367 if (!j || job_pri_less(candidate, j)) {
368 j = candidate;
373 return j;
376 static void
377 process_queue()
379 job j;
380 int64 now = nanoseconds();
382 while ((j = next_eligible_job(now))) {
383 heapremove(&j->tube->ready, j->heap_index);
384 ready_ct--;
385 if (j->r.pri < URGENT_THRESHOLD) {
386 global_stat.urgent_ct--;
387 j->tube->stat.urgent_ct--;
389 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
393 static job
394 delay_q_peek()
396 int i;
397 tube t;
398 job j = NULL, nj;
400 for (i = 0; i < tubes.used; i++) {
401 t = tubes.items[i];
402 if (t->delay.len == 0) {
403 continue;
405 nj = t->delay.data[0];
406 if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj;
409 return j;
412 static int
413 enqueue_job(Server *s, job j, int64 delay, char update_store)
415 int r;
417 j->reserver = NULL;
418 if (delay) {
419 j->r.deadline_at = nanoseconds() + delay;
420 r = heapinsert(&j->tube->delay, j);
421 if (!r) return 0;
422 j->r.state = Delayed;
423 } else {
424 r = heapinsert(&j->tube->ready, j);
425 if (!r) return 0;
426 j->r.state = Ready;
427 ready_ct++;
428 if (j->r.pri < URGENT_THRESHOLD) {
429 global_stat.urgent_ct++;
430 j->tube->stat.urgent_ct++;
434 if (update_store) {
435 if (!walwrite(&s->wal, j)) {
436 return 0;
438 walmaint(&s->wal);
441 process_queue();
442 return 1;
445 static int
446 bury_job(Server *s, job j, char update_store)
448 int z;
450 if (update_store) {
451 z = walresvupdate(&s->wal, j);
452 if (!z) return 0;
453 j->walresv += z;
456 job_insert(&j->tube->buried, j);
457 global_stat.buried_ct++;
458 j->tube->stat.buried_ct++;
459 j->r.state = Buried;
460 j->reserver = NULL;
461 j->r.bury_ct++;
463 if (update_store) {
464 if (!walwrite(&s->wal, j)) {
465 return 0;
467 walmaint(&s->wal);
470 return 1;
473 void
474 enqueue_reserved_jobs(Conn *c)
476 int r;
477 job j;
479 while (job_list_any_p(&c->reserved_jobs)) {
480 j = job_remove(c->reserved_jobs.next);
481 r = enqueue_job(c->srv, j, 0, 0);
482 if (r < 1) bury_job(c->srv, j, 0);
483 global_stat.reserved_ct--;
484 j->tube->stat.reserved_ct--;
485 c->soonest_job = NULL;
489 static job
490 delay_q_take()
492 job j = delay_q_peek();
493 if (!j) {
494 return 0;
496 heapremove(&j->tube->delay, j->heap_index);
497 return j;
500 static int
501 kick_buried_job(Server *s, tube t)
503 int r;
504 job j;
505 int z;
507 if (!buried_job_p(t)) return 0;
508 j = remove_buried_job(t->buried.next);
510 z = walresvupdate(&s->wal, j);
511 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
512 j->walresv += z;
514 j->r.kick_ct++;
515 r = enqueue_job(s, j, 0, 1);
516 if (r == 1) return 1;
518 /* ready queue is full, so bury it */
519 bury_job(s, j, 0);
520 return 0;
523 static uint
524 get_delayed_job_ct()
526 tube t;
527 size_t i;
528 uint count = 0;
530 for (i = 0; i < tubes.used; i++) {
531 t = tubes.items[i];
532 count += t->delay.len;
534 return count;
537 static int
538 kick_delayed_job(Server *s, tube t)
540 int r;
541 job j;
542 int z;
544 if (t->delay.len == 0) {
545 return 0;
548 j = heapremove(&t->delay, 0);
550 z = walresvupdate(&s->wal, j);
551 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
552 j->walresv += z;
554 j->r.kick_ct++;
555 r = enqueue_job(s, j, 0, 1);
556 if (r == 1) return 1;
558 /* ready queue is full, so delay it again */
559 r = enqueue_job(s, j, j->r.delay, 0);
560 if (r == 1) return 0;
562 /* last resort */
563 bury_job(s, j, 0);
564 return 0;
567 /* return the number of jobs successfully kicked */
568 static uint
569 kick_buried_jobs(Server *s, tube t, uint n)
571 uint i;
572 for (i = 0; (i < n) && kick_buried_job(s, t); ++i);
573 return i;
576 /* return the number of jobs successfully kicked */
577 static uint
578 kick_delayed_jobs(Server *s, tube t, uint n)
580 uint i;
581 for (i = 0; (i < n) && kick_delayed_job(s, t); ++i);
582 return i;
585 static uint
586 kick_jobs(Server *s, tube t, uint n)
588 if (buried_job_p(t)) return kick_buried_jobs(s, t, n);
589 return kick_delayed_jobs(s, t, n);
592 static job
593 remove_buried_job(job j)
595 if (!j || j->r.state != Buried) return NULL;
596 j = job_remove(j);
597 if (j) {
598 global_stat.buried_ct--;
599 j->tube->stat.buried_ct--;
601 return j;
604 static job
605 remove_delayed_job(job j)
607 if (!j || j->r.state != Delayed) return NULL;
608 heapremove(&j->tube->delay, j->heap_index);
610 return j;
613 static job
614 remove_ready_job(job j)
616 if (!j || j->r.state != Ready) return NULL;
617 heapremove(&j->tube->ready, j->heap_index);
618 ready_ct--;
619 if (j->r.pri < URGENT_THRESHOLD) {
620 global_stat.urgent_ct--;
621 j->tube->stat.urgent_ct--;
623 return j;
626 static void
627 enqueue_waiting_conn(Conn *c)
629 tube t;
630 size_t i;
632 global_stat.waiting_ct++;
633 c->type |= CONN_TYPE_WAITING;
634 for (i = 0; i < c->watch.used; i++) {
635 t = c->watch.items[i];
636 t->stat.waiting_ct++;
637 ms_append(&t->waiting, c);
641 static job
642 find_reserved_job_in_conn(Conn *c, job j)
644 return (j && j->reserver == c && j->r.state == Reserved) ? j : NULL;
647 static job
648 touch_job(Conn *c, job j)
650 j = find_reserved_job_in_conn(c, j);
651 if (j) {
652 j->r.deadline_at = nanoseconds() + j->r.ttr;
653 c->soonest_job = NULL;
655 return j;
658 static job
659 peek_job(uint64 id)
661 return job_find(id);
664 static void
665 check_err(Conn *c, const char *s)
667 if (errno == EAGAIN) return;
668 if (errno == EINTR) return;
669 if (errno == EWOULDBLOCK) return;
671 twarn("%s", s);
672 connclose(c);
673 return;
676 /* Scan the given string for the sequence "\r\n" and return the line length.
677 * Always returns at least 2 if a match is found. Returns 0 if no match. */
678 static int
679 scan_line_end(const char *s, int size)
681 char *match;
683 match = memchr(s, '\r', size - 1);
684 if (!match) return 0;
686 /* this is safe because we only scan size - 1 chars above */
687 if (match[1] == '\n') return match - s + 2;
689 return 0;
692 static int
693 cmd_len(Conn *c)
695 return scan_line_end(c->cmd, c->cmd_read);
698 /* parse the command line */
699 static int
700 which_cmd(Conn *c)
702 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
703 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
704 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
705 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
706 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
707 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
708 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
709 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
710 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
711 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
712 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
713 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
714 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
715 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
716 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
717 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
718 TEST_CMD(c->cmd, CMD_USE, OP_USE);
719 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
720 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
721 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
722 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
723 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
724 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
725 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
726 return OP_UNKNOWN;
729 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
730 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
731 * This function is idempotent(). */
732 static void
733 fill_extra_data(Conn *c)
735 int extra_bytes, job_data_bytes = 0, cmd_bytes;
737 if (!c->sock.fd) return; /* the connection was closed */
738 if (!c->cmd_len) return; /* we don't have a complete command */
740 /* how many extra bytes did we read? */
741 extra_bytes = c->cmd_read - c->cmd_len;
743 /* how many bytes should we put into the job body? */
744 if (c->in_job) {
745 job_data_bytes = min(extra_bytes, c->in_job->r.body_size);
746 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
747 c->in_job_read = job_data_bytes;
748 } else if (c->in_job_read) {
749 /* we are in bit-bucket mode, throwing away data */
750 job_data_bytes = min(extra_bytes, c->in_job_read);
751 c->in_job_read -= job_data_bytes;
754 /* how many bytes are left to go into the future cmd? */
755 cmd_bytes = extra_bytes - job_data_bytes;
756 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
757 c->cmd_read = cmd_bytes;
758 c->cmd_len = 0; /* we no longer know the length of the new command */
761 static void
762 _skip(Conn *c, int n, char *line, int len)
764 /* Invert the meaning of in_job_read while throwing away data -- it
765 * counts the bytes that remain to be thrown away. */
766 c->in_job = 0;
767 c->in_job_read = n;
768 fill_extra_data(c);
770 if (c->in_job_read == 0) return reply(c, line, len, STATE_SENDWORD);
772 c->reply = line;
773 c->reply_len = len;
774 c->reply_sent = 0;
775 c->state = STATE_BITBUCKET;
776 return;
779 #define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
781 static void
782 enqueue_incoming_job(Conn *c)
784 int r;
785 job j = c->in_job;
787 c->in_job = NULL; /* the connection no longer owns this job */
788 c->in_job_read = 0;
790 /* check if the trailer is present and correct */
791 if (memcmp(j->body + j->r.body_size - 2, "\r\n", 2)) {
792 job_free(j);
793 return reply_msg(c, MSG_EXPECTED_CRLF);
796 if (verbose >= 2) {
797 printf("<%d job %"PRIu64"\n", c->sock.fd, j->r.id);
800 if (drain_mode) {
801 job_free(j);
802 return reply_serr(c, MSG_DRAINING);
805 if (j->walresv) return reply_serr(c, MSG_INTERNAL_ERROR);
806 j->walresv = walresvput(&c->srv->wal, j);
807 if (!j->walresv) return reply_serr(c, MSG_OUT_OF_MEMORY);
809 /* we have a complete job, so let's stick it in the pqueue */
810 r = enqueue_job(c->srv, j, j->r.delay, 1);
811 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
813 op_ct[OP_PUT]++; /* stats */
814 global_stat.total_jobs_ct++;
815 j->tube->stat.total_jobs_ct++;
817 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->r.id);
819 /* out of memory trying to grow the queue, so it gets buried */
820 bury_job(c->srv, j, 0);
821 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
824 static uint
825 uptime()
827 return (nanoseconds() - started_at) / 1000000000;
830 static int
831 fmt_stats(char *buf, size_t size, void *x)
833 int whead = 0, wcur = 0;
834 Server *srv;
835 struct rusage ru = {{0, 0}, {0, 0}};
837 srv = x;
839 if (srv->wal.head) {
840 whead = srv->wal.head->seq;
843 if (srv->wal.cur) {
844 wcur = srv->wal.cur->seq;
847 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
848 return snprintf(buf, size, STATS_FMT,
849 global_stat.urgent_ct,
850 ready_ct,
851 global_stat.reserved_ct,
852 get_delayed_job_ct(),
853 global_stat.buried_ct,
854 op_ct[OP_PUT],
855 op_ct[OP_PEEKJOB],
856 op_ct[OP_PEEK_READY],
857 op_ct[OP_PEEK_DELAYED],
858 op_ct[OP_PEEK_BURIED],
859 op_ct[OP_RESERVE],
860 op_ct[OP_RESERVE_TIMEOUT],
861 op_ct[OP_DELETE],
862 op_ct[OP_RELEASE],
863 op_ct[OP_USE],
864 op_ct[OP_WATCH],
865 op_ct[OP_IGNORE],
866 op_ct[OP_BURY],
867 op_ct[OP_KICK],
868 op_ct[OP_TOUCH],
869 op_ct[OP_STATS],
870 op_ct[OP_JOBSTATS],
871 op_ct[OP_STATS_TUBE],
872 op_ct[OP_LIST_TUBES],
873 op_ct[OP_LIST_TUBE_USED],
874 op_ct[OP_LIST_TUBES_WATCHED],
875 op_ct[OP_PAUSE_TUBE],
876 timeout_ct,
877 global_stat.total_jobs_ct,
878 job_data_size_limit,
879 tubes.used,
880 count_cur_conns(),
881 count_cur_producers(),
882 count_cur_workers(),
883 global_stat.waiting_ct,
884 count_tot_conns(),
885 (long) getpid(),
886 version,
887 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
888 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
889 uptime(),
890 whead,
891 wcur,
892 srv->wal.nmig,
893 srv->wal.nrec,
894 srv->wal.filesize);
898 /* Read a priority value from the given buffer and place it in pri.
899 * Update end to point to the address after the last character consumed.
900 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
901 * conversion and return the status code but not update any values. This is an
902 * easy way to check for errors.
903 * If end is NULL, read_pri will also check that the entire input string was
904 * consumed and return an error code otherwise.
905 * Return 0 on success, or nonzero on failure.
906 * If a failure occurs, pri and end are not modified. */
907 static int
908 read_pri(uint *pri, const char *buf, char **end)
910 char *tend;
911 uint tpri;
913 errno = 0;
914 while (buf[0] == ' ') buf++;
915 if (!isdigit(buf[0])) return -1;
916 tpri = strtoul(buf, &tend, 10);
917 if (tend == buf) return -1;
918 if (errno && errno != ERANGE) return -1;
919 if (!end && tend[0] != '\0') return -1;
921 if (pri) *pri = tpri;
922 if (end) *end = tend;
923 return 0;
926 /* Read a delay value from the given buffer and place it in delay.
927 * The interface and behavior are analogous to read_pri(). */
928 static int
929 read_delay(int64 *delay, const char *buf, char **end)
931 int r;
932 uint delay_sec;
934 r = read_pri(&delay_sec, buf, end);
935 if (r) return r;
936 *delay = ((int64) delay_sec) * 1000000000;
937 return 0;
940 /* Read a timeout value from the given buffer and place it in ttr.
941 * The interface and behavior are the same as in read_delay(). */
942 static int
943 read_ttr(int64 *ttr, const char *buf, char **end)
945 return read_delay(ttr, buf, end);
948 /* Read a tube name from the given buffer moving the buffer to the name start */
949 static int
950 read_tube_name(char **tubename, char *buf, char **end)
952 size_t len;
954 while (buf[0] == ' ') buf++;
955 len = strspn(buf, NAME_CHARS);
956 if (len == 0) return -1;
957 if (tubename) *tubename = buf;
958 if (end) *end = buf + len;
959 return 0;
962 static void
963 wait_for_job(Conn *c, int timeout)
965 c->state = STATE_WAIT;
966 enqueue_waiting_conn(c);
968 /* Set the pending timeout to the requested timeout amount */
969 c->pending_timeout = timeout;
971 connwant(c, 'h'); // only care if they hang up
972 c->next = dirty;
973 dirty = c;
976 typedef int(*fmt_fn)(char *, size_t, void *);
978 static void
979 do_stats(Conn *c, fmt_fn fmt, void *data)
981 int r, stats_len;
983 /* first, measure how big a buffer we will need */
984 stats_len = fmt(NULL, 0, data) + 16;
986 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
987 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
989 /* Mark this job as a copy so it can be appropriately freed later on */
990 c->out_job->r.state = Copy;
992 /* now actually format the stats data */
993 r = fmt(c->out_job->body, stats_len, data);
994 /* and set the actual body size */
995 c->out_job->r.body_size = r;
996 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
998 c->out_job_sent = 0;
999 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
1002 static void
1003 do_list_tubes(Conn *c, ms l)
1005 char *buf;
1006 tube t;
1007 size_t i, resp_z;
1009 /* first, measure how big a buffer we will need */
1010 resp_z = 6; /* initial "---\n" and final "\r\n" */
1011 for (i = 0; i < l->used; i++) {
1012 t = l->items[i];
1013 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
1016 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
1017 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1019 /* Mark this job as a copy so it can be appropriately freed later on */
1020 c->out_job->r.state = Copy;
1022 /* now actually format the response */
1023 buf = c->out_job->body;
1024 buf += snprintf(buf, 5, "---\n");
1025 for (i = 0; i < l->used; i++) {
1026 t = l->items[i];
1027 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1029 buf[0] = '\r';
1030 buf[1] = '\n';
1032 c->out_job_sent = 0;
1033 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
1036 static int
1037 fmt_job_stats(char *buf, size_t size, job j)
1039 int64 t;
1040 int64 time_left;
1041 int file = 0;
1043 t = nanoseconds();
1044 if (j->r.state == Reserved || j->r.state == Delayed) {
1045 time_left = (j->r.deadline_at - t) / 1000000000;
1046 } else {
1047 time_left = 0;
1049 if (j->file) {
1050 file = j->file->seq;
1052 return snprintf(buf, size, STATS_JOB_FMT,
1053 j->r.id,
1054 j->tube->name,
1055 job_state(j),
1056 j->r.pri,
1057 (t - j->r.created_at) / 1000000000,
1058 j->r.delay / 1000000000,
1059 j->r.ttr / 1000000000,
1060 time_left,
1061 file,
1062 j->r.reserve_ct,
1063 j->r.timeout_ct,
1064 j->r.release_ct,
1065 j->r.bury_ct,
1066 j->r.kick_ct);
1069 static int
1070 fmt_stats_tube(char *buf, size_t size, tube t)
1072 uint64 time_left;
1074 if (t->pause > 0) {
1075 time_left = (t->deadline_at - nanoseconds()) / 1000000000;
1076 } else {
1077 time_left = 0;
1079 return snprintf(buf, size, STATS_TUBE_FMT,
1080 t->name,
1081 t->stat.urgent_ct,
1082 t->ready.len,
1083 t->stat.reserved_ct,
1084 t->delay.len,
1085 t->stat.buried_ct,
1086 t->stat.total_jobs_ct,
1087 t->using_ct,
1088 t->watching_ct,
1089 t->stat.waiting_ct,
1090 t->stat.total_delete_ct,
1091 t->stat.pause_ct,
1092 t->pause / 1000000000,
1093 time_left);
1096 static void
1097 maybe_enqueue_incoming_job(Conn *c)
1099 job j = c->in_job;
1101 /* do we have a complete job? */
1102 if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);
1104 /* otherwise we have incomplete data, so just keep waiting */
1105 c->state = STATE_WANTDATA;
1108 /* j can be NULL */
1109 static job
1110 remove_this_reserved_job(Conn *c, job j)
1112 j = job_remove(j);
1113 if (j) {
1114 global_stat.reserved_ct--;
1115 j->tube->stat.reserved_ct--;
1116 j->reserver = NULL;
1118 c->soonest_job = NULL;
1119 return j;
1122 static job
1123 remove_reserved_job(Conn *c, job j)
1125 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1128 static int
1129 name_is_ok(const char *name, size_t max)
1131 size_t len = strlen(name);
1132 return len > 0 && len <= max &&
1133 strspn(name, NAME_CHARS) == len && name[0] != '-';
1136 void
1137 prot_remove_tube(tube t)
1139 ms_remove(&tubes, t);
1142 static void
1143 dispatch_cmd(Conn *c)
1145 int r, i, timeout = -1;
1146 int z;
1147 uint count;
1148 job j = 0;
1149 byte type;
1150 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1151 uint pri, body_size;
1152 int64 delay, ttr;
1153 uint64 id;
1154 tube t = NULL;
1156 /* NUL-terminate this string so we can use strtol and friends */
1157 c->cmd[c->cmd_len - 2] = '\0';
1159 /* check for possible maliciousness */
1160 if (strlen(c->cmd) != c->cmd_len - 2) {
1161 return reply_msg(c, MSG_BAD_FORMAT);
1164 type = which_cmd(c);
1165 if (verbose >= 2) {
1166 printf("<%d command %s\n", c->sock.fd, op_names[type]);
1169 switch (type) {
1170 case OP_PUT:
1171 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1172 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1174 r = read_delay(&delay, delay_buf, &ttr_buf);
1175 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1177 r = read_ttr(&ttr, ttr_buf, &size_buf);
1178 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1180 errno = 0;
1181 body_size = strtoul(size_buf, &end_buf, 10);
1182 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1184 if (body_size > job_data_size_limit) {
1185 /* throw away the job body and respond with JOB_TOO_BIG */
1186 return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
1189 /* don't allow trailing garbage */
1190 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1192 connsetproducer(c);
1194 if (ttr < 1000000000) {
1195 ttr = 1000000000;
1198 c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);
1200 /* OOM? */
1201 if (!c->in_job) {
1202 /* throw away the job body and respond with OUT_OF_MEMORY */
1203 twarnx("server error: " MSG_OUT_OF_MEMORY);
1204 return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
1207 fill_extra_data(c);
1209 /* it's possible we already have a complete job */
1210 maybe_enqueue_incoming_job(c);
1212 break;
1213 case OP_PEEK_READY:
1214 /* don't allow trailing garbage */
1215 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1216 return reply_msg(c, MSG_BAD_FORMAT);
1218 op_ct[type]++;
1220 if (c->use->ready.len) {
1221 j = job_copy(c->use->ready.data[0]);
1224 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1226 reply_job(c, j, MSG_FOUND);
1227 break;
1228 case OP_PEEK_DELAYED:
1229 /* don't allow trailing garbage */
1230 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1231 return reply_msg(c, MSG_BAD_FORMAT);
1233 op_ct[type]++;
1235 if (c->use->delay.len) {
1236 j = job_copy(c->use->delay.data[0]);
1239 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1241 reply_job(c, j, MSG_FOUND);
1242 break;
1243 case OP_PEEK_BURIED:
1244 /* don't allow trailing garbage */
1245 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1246 return reply_msg(c, MSG_BAD_FORMAT);
1248 op_ct[type]++;
1250 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1252 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1254 reply_job(c, j, MSG_FOUND);
1255 break;
1256 case OP_PEEKJOB:
1257 errno = 0;
1258 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1259 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1260 op_ct[type]++;
1262 /* So, peek is annoying, because some other connection might free the
1263 * job while we are still trying to write it out. So we copy it and
1264 * then free the copy when it's done sending. */
1265 j = job_copy(peek_job(id));
1267 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1269 reply_job(c, j, MSG_FOUND);
1270 break;
1271 case OP_RESERVE_TIMEOUT:
1272 errno = 0;
1273 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1274 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1275 case OP_RESERVE: /* FALLTHROUGH */
1276 /* don't allow trailing garbage */
1277 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1278 return reply_msg(c, MSG_BAD_FORMAT);
1281 op_ct[type]++;
1282 connsetworker(c);
1284 if (conndeadlinesoon(c) && !conn_ready(c)) {
1285 return reply_msg(c, MSG_DEADLINE_SOON);
1288 /* try to get a new job for this guy */
1289 wait_for_job(c, timeout);
1290 process_queue();
1291 break;
1292 case OP_DELETE:
1293 errno = 0;
1294 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1295 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1296 op_ct[type]++;
1298 j = job_find(id);
1299 j = remove_reserved_job(c, j) ? :
1300 remove_ready_job(j) ? :
1301 remove_buried_job(j) ? :
1302 remove_delayed_job(j);
1304 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1306 j->tube->stat.total_delete_ct++;
1308 j->r.state = Invalid;
1309 r = walwrite(&c->srv->wal, j);
1310 walmaint(&c->srv->wal);
1311 job_free(j);
1313 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1315 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1316 break;
1317 case OP_RELEASE:
1318 errno = 0;
1319 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1320 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1322 r = read_pri(&pri, pri_buf, &delay_buf);
1323 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1325 r = read_delay(&delay, delay_buf, NULL);
1326 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1327 op_ct[type]++;
1329 j = remove_reserved_job(c, job_find(id));
1331 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1333 /* We want to update the delay deadline on disk, so reserve space for
1334 * that. */
1335 if (delay) {
1336 z = walresvupdate(&c->srv->wal, j);
1337 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1338 j->walresv += z;
1341 j->r.pri = pri;
1342 j->r.delay = delay;
1343 j->r.release_ct++;
1345 r = enqueue_job(c->srv, j, delay, !!delay);
1346 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1347 if (r == 1) {
1348 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1351 /* out of memory trying to grow the queue, so it gets buried */
1352 bury_job(c->srv, j, 0);
1353 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1354 break;
1355 case OP_BURY:
1356 errno = 0;
1357 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1358 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1360 r = read_pri(&pri, pri_buf, NULL);
1361 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1362 op_ct[type]++;
1364 j = remove_reserved_job(c, job_find(id));
1366 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1368 j->r.pri = pri;
1369 r = bury_job(c->srv, j, 1);
1370 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1371 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1372 break;
1373 case OP_KICK:
1374 errno = 0;
1375 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1376 if (end_buf == c->cmd + CMD_KICK_LEN) {
1377 return reply_msg(c, MSG_BAD_FORMAT);
1379 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1381 op_ct[type]++;
1383 i = kick_jobs(c->srv, c->use, count);
1385 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1386 case OP_TOUCH:
1387 errno = 0;
1388 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1389 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1391 op_ct[type]++;
1393 j = touch_job(c, job_find(id));
1395 if (j) {
1396 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1397 } else {
1398 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1400 break;
1401 case OP_STATS:
1402 /* don't allow trailing garbage */
1403 if (c->cmd_len != CMD_STATS_LEN + 2) {
1404 return reply_msg(c, MSG_BAD_FORMAT);
1407 op_ct[type]++;
1409 do_stats(c, fmt_stats, c->srv);
1410 break;
1411 case OP_JOBSTATS:
1412 errno = 0;
1413 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1414 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1416 op_ct[type]++;
1418 j = peek_job(id);
1419 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1421 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1422 do_stats(c, (fmt_fn) fmt_job_stats, j);
1423 break;
1424 case OP_STATS_TUBE:
1425 name = c->cmd + CMD_STATS_TUBE_LEN;
1426 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1428 op_ct[type]++;
1430 t = tube_find(name);
1431 if (!t) return reply_msg(c, MSG_NOTFOUND);
1433 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1434 t = NULL;
1435 break;
1436 case OP_LIST_TUBES:
1437 /* don't allow trailing garbage */
1438 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1439 return reply_msg(c, MSG_BAD_FORMAT);
1442 op_ct[type]++;
1443 do_list_tubes(c, &tubes);
1444 break;
1445 case OP_LIST_TUBE_USED:
1446 /* don't allow trailing garbage */
1447 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1448 return reply_msg(c, MSG_BAD_FORMAT);
1451 op_ct[type]++;
1452 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1453 break;
1454 case OP_LIST_TUBES_WATCHED:
1455 /* don't allow trailing garbage */
1456 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1457 return reply_msg(c, MSG_BAD_FORMAT);
1460 op_ct[type]++;
1461 do_list_tubes(c, &c->watch);
1462 break;
1463 case OP_USE:
1464 name = c->cmd + CMD_USE_LEN;
1465 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1466 op_ct[type]++;
1468 TUBE_ASSIGN(t, tube_find_or_make(name));
1469 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1471 c->use->using_ct--;
1472 TUBE_ASSIGN(c->use, t);
1473 TUBE_ASSIGN(t, NULL);
1474 c->use->using_ct++;
1476 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1477 break;
1478 case OP_WATCH:
1479 name = c->cmd + CMD_WATCH_LEN;
1480 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1481 op_ct[type]++;
1483 TUBE_ASSIGN(t, tube_find_or_make(name));
1484 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1486 r = 1;
1487 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1488 TUBE_ASSIGN(t, NULL);
1489 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1491 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1492 break;
1493 case OP_IGNORE:
1494 name = c->cmd + CMD_IGNORE_LEN;
1495 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1496 op_ct[type]++;
1498 t = NULL;
1499 for (i = 0; i < c->watch.used; i++) {
1500 t = c->watch.items[i];
1501 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1502 t = NULL;
1505 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1507 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1508 t = NULL;
1510 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1511 break;
1512 case OP_QUIT:
1513 connclose(c);
1514 break;
1515 case OP_PAUSE_TUBE:
1516 op_ct[type]++;
1518 r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
1519 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1521 r = read_delay(&delay, delay_buf, NULL);
1522 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1524 *delay_buf = '\0';
1525 t = tube_find(name);
1526 if (!t) return reply_msg(c, MSG_NOTFOUND);
1528 t->deadline_at = nanoseconds() + delay;
1529 t->pause = delay;
1530 t->stat.pause_ct++;
1532 reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
1533 break;
1534 default:
1535 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1539 /* There are three reasons this function may be called. We need to check for
1540 * all of them.
1542 * 1. A reserved job has run out of time.
1543 * 2. A waiting client's reserved job has entered the safety margin.
1544 * 3. A waiting client's requested timeout has occurred.
1546 * If any of these happen, we must do the appropriate thing. */
1547 static void
1548 conn_timeout(Conn *c)
1550 int r, should_timeout = 0;
1551 job j;
1553 /* Check if the client was trying to reserve a job. */
1554 if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout = 1;
1556 /* Check if any reserved jobs have run out of time. We should do this
1557 * whether or not the client is waiting for a new reservation. */
1558 while ((j = connsoonestjob(c))) {
1559 if (j->r.deadline_at >= nanoseconds()) break;
1561 /* This job is in the middle of being written out. If we return it to
1562 * the ready queue, someone might free it before we finish writing it
1563 * out to the socket. So we'll copy it here and free the copy when it's
1564 * done sending. */
1565 if (j == c->out_job) {
1566 c->out_job = job_copy(c->out_job);
1569 timeout_ct++; /* stats */
1570 j->r.timeout_ct++;
1571 r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0);
1572 if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */
1573 connsched(c);
1576 if (should_timeout) {
1577 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1578 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1579 c->pending_timeout = -1;
1580 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1584 void
1585 enter_drain_mode(int sig)
1587 drain_mode = 1;
1590 static void
1591 do_cmd(Conn *c)
1593 dispatch_cmd(c);
1594 fill_extra_data(c);
1597 static void
1598 reset_conn(Conn *c)
1600 connwant(c, 'r');
1601 c->next = dirty;
1602 dirty = c;
1604 /* was this a peek or stats command? */
1605 if (c->out_job && c->out_job->r.state == Copy) job_free(c->out_job);
1606 c->out_job = NULL;
1608 c->reply_sent = 0; /* now that we're done, reset this */
1609 c->state = STATE_WANTCOMMAND;
1612 static void
1613 conn_data(Conn *c)
1615 int r, to_read;
1616 job j;
1617 struct iovec iov[2];
1619 switch (c->state) {
1620 case STATE_WANTCOMMAND:
1621 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1622 if (r == -1) return check_err(c, "read()");
1623 if (r == 0) return connclose(c); /* the client hung up */
1625 c->cmd_read += r; /* we got some bytes */
1627 c->cmd_len = cmd_len(c); /* find the EOL */
1629 /* yay, complete command line */
1630 if (c->cmd_len) return do_cmd(c);
1632 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1634 /* command line too long? */
1635 if (c->cmd_read == LINE_BUF_SIZE) {
1636 c->cmd_read = 0; /* discard the input so far */
1637 return reply_msg(c, MSG_BAD_FORMAT);
1640 /* otherwise we have an incomplete line, so just keep waiting */
1641 break;
1642 case STATE_BITBUCKET:
1643 /* Invert the meaning of in_job_read while throwing away data -- it
1644 * counts the bytes that remain to be thrown away. */
1645 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1646 r = read(c->sock.fd, bucket, to_read);
1647 if (r == -1) return check_err(c, "read()");
1648 if (r == 0) return connclose(c); /* the client hung up */
1650 c->in_job_read -= r; /* we got some bytes */
1652 /* (c->in_job_read < 0) can't happen */
1654 if (c->in_job_read == 0) {
1655 return reply(c, c->reply, c->reply_len, STATE_SENDWORD);
1657 break;
1658 case STATE_WANTDATA:
1659 j = c->in_job;
1661 r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
1662 if (r == -1) return check_err(c, "read()");
1663 if (r == 0) return connclose(c); /* the client hung up */
1665 c->in_job_read += r; /* we got some bytes */
1667 /* (j->in_job_read > j->r.body_size) can't happen */
1669 maybe_enqueue_incoming_job(c);
1670 break;
1671 case STATE_SENDWORD:
1672 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1673 if (r == -1) return check_err(c, "write()");
1674 if (r == 0) return connclose(c); /* the client hung up */
1676 c->reply_sent += r; /* we got some bytes */
1678 /* (c->reply_sent > c->reply_len) can't happen */
1680 if (c->reply_sent == c->reply_len) return reset_conn(c);
1682 /* otherwise we sent an incomplete reply, so just keep waiting */
1683 break;
1684 case STATE_SENDJOB:
1685 j = c->out_job;
1687 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1688 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1689 iov[1].iov_base = j->body + c->out_job_sent;
1690 iov[1].iov_len = j->r.body_size - c->out_job_sent;
1692 r = writev(c->sock.fd, iov, 2);
1693 if (r == -1) return check_err(c, "writev()");
1694 if (r == 0) return connclose(c); /* the client hung up */
1696 /* update the sent values */
1697 c->reply_sent += r;
1698 if (c->reply_sent >= c->reply_len) {
1699 c->out_job_sent += c->reply_sent - c->reply_len;
1700 c->reply_sent = c->reply_len;
1703 /* (c->out_job_sent > j->r.body_size) can't happen */
1705 /* are we done? */
1706 if (c->out_job_sent == j->r.body_size) {
1707 if (verbose >= 2) {
1708 printf(">%d job %"PRIu64"\n", c->sock.fd, j->r.id);
1710 return reset_conn(c);
1713 /* otherwise we sent incomplete data, so just keep waiting */
1714 break;
1715 case STATE_WAIT:
1716 // nothing
1717 break;
1721 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
1722 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1724 static void
1725 update_conns()
1727 int r;
1728 Conn *c;
1730 while (dirty) {
1731 c = dirty;
1732 dirty = dirty->next;
1733 c->next = NULL;
1734 r = sockwant(&c->sock, c->rw);
1735 if (r == -1) {
1736 twarn("sockwant");
1737 connclose(c);
1742 static void
1743 h_conn(const int fd, const short which, Conn *c)
1745 if (fd != c->sock.fd) {
1746 twarnx("Argh! event fd doesn't match conn fd.");
1747 close(fd);
1748 connclose(c);
1749 update_conns();
1750 return;
1753 if (which == 'h') {
1754 connclose(c);
1755 return;
1758 conn_data(c);
1759 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1760 update_conns();
1763 static void
1764 prothandle(Conn *c, int ev)
1766 h_conn(c->sock.fd, ev, c);
1769 void
1770 prottick(Server *s)
1772 int r;
1773 job j;
1774 int64 now;
1775 int i;
1776 tube t;
1778 now = nanoseconds();
1779 while ((j = delay_q_peek())) {
1780 if (j->r.deadline_at > now) break;
1781 j = delay_q_take();
1782 r = enqueue_job(s, j, 0, 0);
1783 if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */
1786 for (i = 0; i < tubes.used; i++) {
1787 t = tubes.items[i];
1789 if (t->pause && t->deadline_at <= now) {
1790 t->pause = 0;
1791 process_queue();
1795 while (s->conns.len) {
1796 Conn *c = s->conns.data[0];
1797 if (c->tickat > now) {
1798 break;
1801 heapremove(&s->conns, 0);
1802 conn_timeout(c);
1805 update_conns();
1808 void
1809 h_accept(const int fd, const short which, Server *s)
1811 Conn *c;
1812 int cfd, flags, r;
1813 socklen_t addrlen;
1814 struct sockaddr_in6 addr;
1816 addrlen = sizeof addr;
1817 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
1818 if (cfd == -1) {
1819 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1820 update_conns();
1821 return;
1823 if (verbose) {
1824 printf("accept %d\n", cfd);
1827 flags = fcntl(cfd, F_GETFL, 0);
1828 if (flags < 0) {
1829 twarn("getting flags");
1830 close(cfd);
1831 if (verbose) {
1832 printf("close %d\n", cfd);
1834 update_conns();
1835 return;
1838 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1839 if (r < 0) {
1840 twarn("setting O_NONBLOCK");
1841 close(cfd);
1842 if (verbose) {
1843 printf("close %d\n", cfd);
1845 update_conns();
1846 return;
1849 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1850 if (!c) {
1851 twarnx("make_conn() failed");
1852 close(cfd);
1853 if (verbose) {
1854 printf("close %d\n", cfd);
1856 update_conns();
1857 return;
1859 c->srv = s;
1860 c->sock.x = c;
1861 c->sock.f = (Handle)prothandle;
1862 c->sock.fd = cfd;
1864 r = sockwant(&c->sock, 'r');
1865 if (r == -1) {
1866 twarn("sockwant");
1867 close(cfd);
1868 if (verbose) {
1869 printf("close %d\n", cfd);
1871 update_conns();
1872 return;
1874 update_conns();
1877 void
1878 prot_init()
1880 started_at = nanoseconds();
1881 memset(op_ct, 0, sizeof(op_ct));
1883 ms_init(&tubes, NULL, NULL);
1885 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1886 if (!default_tube) twarnx("Out of memory during startup!");
1889 void
1890 prot_replay(Server *s, job list)
1892 job j, nj;
1893 int64 t, delay;
1894 int r;
1896 for (j = list->next ; j != list ; j = nj) {
1897 nj = j->next;
1898 job_remove(j);
1899 walresvupdate(&s->wal, j); /* reserve space for a delete */
1900 delay = 0;
1901 switch (j->r.state) {
1902 case Buried:
1903 bury_job(s, j, 0);
1904 break;
1905 case Delayed:
1906 t = nanoseconds();
1907 if (t < j->r.deadline_at) {
1908 delay = j->r.deadline_at - t;
1910 /* fall through */
1911 default:
1912 r = enqueue_job(s, j, delay, 0);
1913 if (r < 1) twarnx("error recovering job %"PRIu64, j->r.id);