Travis: stifle progress on git clone
[beanstalkd.git] / prot.c
blob4cf92be168f180e1fcca089e62db0e3ea6d9d258
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <fcntl.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <sys/resource.h>
9 #include <sys/uio.h>
10 #include <sys/types.h>
11 #include <sys/utsname.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <inttypes.h>
15 #include <stdarg.h>
16 #include "dat.h"
18 /* job body cannot be greater than this many bytes long */
19 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
21 #define NAME_CHARS \
22 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
23 "abcdefghijklmnopqrstuvwxyz" \
24 "0123456789-+/;.$_()"
26 #define CMD_PUT "put "
27 #define CMD_PEEKJOB "peek "
28 #define CMD_PEEK_READY "peek-ready"
29 #define CMD_PEEK_DELAYED "peek-delayed"
30 #define CMD_PEEK_BURIED "peek-buried"
31 #define CMD_RESERVE "reserve"
32 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
33 #define CMD_DELETE "delete "
34 #define CMD_RELEASE "release "
35 #define CMD_BURY "bury "
36 #define CMD_KICK "kick "
37 #define CMD_JOBKICK "kick-job "
38 #define CMD_TOUCH "touch "
39 #define CMD_STATS "stats"
40 #define CMD_JOBSTATS "stats-job "
41 #define CMD_USE "use "
42 #define CMD_WATCH "watch "
43 #define CMD_IGNORE "ignore "
44 #define CMD_LIST_TUBES "list-tubes"
45 #define CMD_LIST_TUBE_USED "list-tube-used"
46 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
47 #define CMD_STATS_TUBE "stats-tube "
48 #define CMD_QUIT "quit"
49 #define CMD_PAUSE_TUBE "pause-tube"
51 #define CONSTSTRLEN(m) (sizeof(m) - 1)
53 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
54 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
55 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
56 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
57 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
58 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
59 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
60 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
61 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
62 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
63 #define CMD_JOBKICK_LEN CONSTSTRLEN(CMD_JOBKICK)
64 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
65 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
66 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
67 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
68 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
69 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
70 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
71 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
72 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
73 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
74 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
76 #define MSG_FOUND "FOUND"
77 #define MSG_NOTFOUND "NOT_FOUND\r\n"
78 #define MSG_RESERVED "RESERVED"
79 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
80 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
81 #define MSG_DELETED "DELETED\r\n"
82 #define MSG_RELEASED "RELEASED\r\n"
83 #define MSG_BURIED "BURIED\r\n"
84 #define MSG_KICKED "KICKED\r\n"
85 #define MSG_TOUCHED "TOUCHED\r\n"
86 #define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
87 #define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
88 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
90 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
91 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
92 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
93 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
94 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
95 #define MSG_KICKED_LEN CONSTSTRLEN(MSG_KICKED)
96 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
98 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
99 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
100 #define MSG_DRAINING "DRAINING\r\n"
101 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
102 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
103 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
104 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
106 #define STATE_WANTCOMMAND 0
107 #define STATE_WANTDATA 1
108 #define STATE_SENDJOB 2
109 #define STATE_SENDWORD 3
110 #define STATE_WAIT 4
111 #define STATE_BITBUCKET 5
112 #define STATE_CLOSE 6
114 #define OP_UNKNOWN 0
115 #define OP_PUT 1
116 #define OP_PEEKJOB 2
117 #define OP_RESERVE 3
118 #define OP_DELETE 4
119 #define OP_RELEASE 5
120 #define OP_BURY 6
121 #define OP_KICK 7
122 #define OP_STATS 8
123 #define OP_JOBSTATS 9
124 #define OP_PEEK_BURIED 10
125 #define OP_USE 11
126 #define OP_WATCH 12
127 #define OP_IGNORE 13
128 #define OP_LIST_TUBES 14
129 #define OP_LIST_TUBE_USED 15
130 #define OP_LIST_TUBES_WATCHED 16
131 #define OP_STATS_TUBE 17
132 #define OP_PEEK_READY 18
133 #define OP_PEEK_DELAYED 19
134 #define OP_RESERVE_TIMEOUT 20
135 #define OP_TOUCH 21
136 #define OP_QUIT 22
137 #define OP_PAUSE_TUBE 23
138 #define OP_JOBKICK 24
139 #define TOTAL_OPS 25
141 #define STATS_FMT "---\n" \
142 "current-jobs-urgent: %u\n" \
143 "current-jobs-ready: %u\n" \
144 "current-jobs-reserved: %u\n" \
145 "current-jobs-delayed: %u\n" \
146 "current-jobs-buried: %u\n" \
147 "cmd-put: %" PRIu64 "\n" \
148 "cmd-peek: %" PRIu64 "\n" \
149 "cmd-peek-ready: %" PRIu64 "\n" \
150 "cmd-peek-delayed: %" PRIu64 "\n" \
151 "cmd-peek-buried: %" PRIu64 "\n" \
152 "cmd-reserve: %" PRIu64 "\n" \
153 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
154 "cmd-delete: %" PRIu64 "\n" \
155 "cmd-release: %" PRIu64 "\n" \
156 "cmd-use: %" PRIu64 "\n" \
157 "cmd-watch: %" PRIu64 "\n" \
158 "cmd-ignore: %" PRIu64 "\n" \
159 "cmd-bury: %" PRIu64 "\n" \
160 "cmd-kick: %" PRIu64 "\n" \
161 "cmd-touch: %" PRIu64 "\n" \
162 "cmd-stats: %" PRIu64 "\n" \
163 "cmd-stats-job: %" PRIu64 "\n" \
164 "cmd-stats-tube: %" PRIu64 "\n" \
165 "cmd-list-tubes: %" PRIu64 "\n" \
166 "cmd-list-tube-used: %" PRIu64 "\n" \
167 "cmd-list-tubes-watched: %" PRIu64 "\n" \
168 "cmd-pause-tube: %" PRIu64 "\n" \
169 "job-timeouts: %" PRIu64 "\n" \
170 "total-jobs: %" PRIu64 "\n" \
171 "max-job-size: %zu\n" \
172 "current-tubes: %zu\n" \
173 "current-connections: %u\n" \
174 "current-producers: %u\n" \
175 "current-workers: %u\n" \
176 "current-waiting: %u\n" \
177 "total-connections: %u\n" \
178 "pid: %ld\n" \
179 "version: \"%s\"\n" \
180 "rusage-utime: %d.%06d\n" \
181 "rusage-stime: %d.%06d\n" \
182 "uptime: %u\n" \
183 "binlog-oldest-index: %d\n" \
184 "binlog-current-index: %d\n" \
185 "binlog-records-migrated: %" PRId64 "\n" \
186 "binlog-records-written: %" PRId64 "\n" \
187 "binlog-max-size: %d\n" \
188 "id: %s\n" \
189 "hostname: %s\n" \
190 "\r\n"
192 #define STATS_TUBE_FMT "---\n" \
193 "name: %s\n" \
194 "current-jobs-urgent: %u\n" \
195 "current-jobs-ready: %u\n" \
196 "current-jobs-reserved: %u\n" \
197 "current-jobs-delayed: %u\n" \
198 "current-jobs-buried: %u\n" \
199 "total-jobs: %" PRIu64 "\n" \
200 "current-using: %u\n" \
201 "current-watching: %u\n" \
202 "current-waiting: %u\n" \
203 "cmd-delete: %" PRIu64 "\n" \
204 "cmd-pause-tube: %u\n" \
205 "pause: %" PRIu64 "\n" \
206 "pause-time-left: %" PRId64 "\n" \
207 "\r\n"
209 #define STATS_JOB_FMT "---\n" \
210 "id: %" PRIu64 "\n" \
211 "tube: %s\n" \
212 "state: %s\n" \
213 "pri: %u\n" \
214 "age: %" PRId64 "\n" \
215 "delay: %" PRId64 "\n" \
216 "ttr: %" PRId64 "\n" \
217 "time-left: %" PRId64 "\n" \
218 "file: %d\n" \
219 "reserves: %u\n" \
220 "timeouts: %u\n" \
221 "releases: %u\n" \
222 "buries: %u\n" \
223 "kicks: %u\n" \
224 "\r\n"
226 /* this number is pretty arbitrary */
227 #define BUCKET_BUF_SIZE 1024
229 static char bucket[BUCKET_BUF_SIZE];
231 static uint ready_ct = 0;
232 static struct stats global_stat = {0, 0, 0, 0, 0};
234 static tube default_tube;
236 static int drain_mode = 0;
237 static int64 started_at;
239 enum {
240 NumIdBytes = 8
243 static char id[NumIdBytes * 2 + 1]; // hex-encoded len of NumIdBytes
245 static struct utsname node_info;
246 static uint64 op_ct[TOTAL_OPS], timeout_ct = 0;
248 static Conn *dirty;
250 static const char * op_names[] = {
251 "<unknown>",
252 CMD_PUT,
253 CMD_PEEKJOB,
254 CMD_RESERVE,
255 CMD_DELETE,
256 CMD_RELEASE,
257 CMD_BURY,
258 CMD_KICK,
259 CMD_STATS,
260 CMD_JOBSTATS,
261 CMD_PEEK_BURIED,
262 CMD_USE,
263 CMD_WATCH,
264 CMD_IGNORE,
265 CMD_LIST_TUBES,
266 CMD_LIST_TUBE_USED,
267 CMD_LIST_TUBES_WATCHED,
268 CMD_STATS_TUBE,
269 CMD_PEEK_READY,
270 CMD_PEEK_DELAYED,
271 CMD_RESERVE_TIMEOUT,
272 CMD_TOUCH,
273 CMD_QUIT,
274 CMD_PAUSE_TUBE,
275 CMD_JOBKICK,
278 static job remove_buried_job(job j);
280 static int
281 buried_job_p(tube t)
283 return job_list_any_p(&t->buried);
286 static void
287 reply(Conn *c, char *line, int len, int state)
289 if (!c) return;
291 connwant(c, 'w');
292 c->next = dirty;
293 dirty = c;
294 c->reply = line;
295 c->reply_len = len;
296 c->reply_sent = 0;
297 c->state = state;
298 if (verbose >= 2) {
299 printf(">%d reply %.*s\n", c->sock.fd, len-2, line);
304 static void
305 protrmdirty(Conn *c)
307 Conn *x, *newdirty = NULL;
309 while (dirty) {
310 x = dirty;
311 dirty = dirty->next;
312 x->next = NULL;
314 if (x != c) {
315 x->next = newdirty;
316 newdirty = x;
319 dirty = newdirty;
323 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
325 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
326 reply_msg((c),(e)))
328 static void
329 reply_line(Conn*, int, const char*, ...)
330 __attribute__((format(printf, 3, 4)));
332 static void
333 reply_line(Conn *c, int state, const char *fmt, ...)
335 int r;
336 va_list ap;
338 va_start(ap, fmt);
339 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
340 va_end(ap);
342 /* Make sure the buffer was big enough. If not, we have a bug. */
343 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
345 return reply(c, c->reply_buf, r, state);
348 static void
349 reply_job(Conn *c, job j, const char *word)
351 /* tell this connection which job to send */
352 c->out_job = j;
353 c->out_job_sent = 0;
355 return reply_line(c, STATE_SENDJOB, "%s %"PRIu64" %u\r\n",
356 word, j->r.id, j->r.body_size - 2);
359 Conn *
360 remove_waiting_conn(Conn *c)
362 tube t;
363 size_t i;
365 if (!conn_waiting(c)) return NULL;
367 c->type &= ~CONN_TYPE_WAITING;
368 global_stat.waiting_ct--;
369 for (i = 0; i < c->watch.used; i++) {
370 t = c->watch.items[i];
371 t->stat.waiting_ct--;
372 ms_remove(&t->waiting, c);
374 return c;
377 static void
378 reserve_job(Conn *c, job j)
380 j->r.deadline_at = nanoseconds() + j->r.ttr;
381 global_stat.reserved_ct++; /* stats */
382 j->tube->stat.reserved_ct++;
383 j->r.reserve_ct++;
384 j->r.state = Reserved;
385 job_insert(&c->reserved_jobs, j);
386 j->reserver = c;
387 c->pending_timeout = -1;
388 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) {
389 c->soonest_job = j;
391 return reply_job(c, j, MSG_RESERVED);
394 static job
395 next_eligible_job(int64 now)
397 tube t;
398 size_t i;
399 job j = NULL, candidate;
401 for (i = 0; i < tubes.used; i++) {
402 t = tubes.items[i];
403 if (t->pause) {
404 if (t->deadline_at > now) continue;
405 t->pause = 0;
407 if (t->waiting.used && t->ready.len) {
408 candidate = t->ready.data[0];
409 if (!j || job_pri_less(candidate, j)) {
410 j = candidate;
415 return j;
418 static void
419 process_queue()
421 job j;
422 int64 now = nanoseconds();
424 while ((j = next_eligible_job(now))) {
425 heapremove(&j->tube->ready, j->heap_index);
426 ready_ct--;
427 if (j->r.pri < URGENT_THRESHOLD) {
428 global_stat.urgent_ct--;
429 j->tube->stat.urgent_ct--;
431 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
435 static job
436 delay_q_peek()
438 int i;
439 tube t;
440 job j = NULL, nj;
442 for (i = 0; i < tubes.used; i++) {
443 t = tubes.items[i];
444 if (t->delay.len == 0) {
445 continue;
447 nj = t->delay.data[0];
448 if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj;
451 return j;
454 static int
455 enqueue_job(Server *s, job j, int64 delay, char update_store)
457 int r;
459 j->reserver = NULL;
460 if (delay) {
461 j->r.deadline_at = nanoseconds() + delay;
462 r = heapinsert(&j->tube->delay, j);
463 if (!r) return 0;
464 j->r.state = Delayed;
465 } else {
466 r = heapinsert(&j->tube->ready, j);
467 if (!r) return 0;
468 j->r.state = Ready;
469 ready_ct++;
470 if (j->r.pri < URGENT_THRESHOLD) {
471 global_stat.urgent_ct++;
472 j->tube->stat.urgent_ct++;
476 if (update_store) {
477 if (!walwrite(&s->wal, j)) {
478 return 0;
480 walmaint(&s->wal);
483 process_queue();
484 return 1;
487 static int
488 bury_job(Server *s, job j, char update_store)
490 int z;
492 if (update_store) {
493 z = walresvupdate(&s->wal, j);
494 if (!z) return 0;
495 j->walresv += z;
498 job_insert(&j->tube->buried, j);
499 global_stat.buried_ct++;
500 j->tube->stat.buried_ct++;
501 j->r.state = Buried;
502 j->reserver = NULL;
503 j->r.bury_ct++;
505 if (update_store) {
506 if (!walwrite(&s->wal, j)) {
507 return 0;
509 walmaint(&s->wal);
512 return 1;
515 void
516 enqueue_reserved_jobs(Conn *c)
518 int r;
519 job j;
521 while (job_list_any_p(&c->reserved_jobs)) {
522 j = job_remove(c->reserved_jobs.next);
523 r = enqueue_job(c->srv, j, 0, 0);
524 if (r < 1) bury_job(c->srv, j, 0);
525 global_stat.reserved_ct--;
526 j->tube->stat.reserved_ct--;
527 c->soonest_job = NULL;
531 static job
532 delay_q_take()
534 job j = delay_q_peek();
535 if (!j) {
536 return 0;
538 heapremove(&j->tube->delay, j->heap_index);
539 return j;
542 static int
543 kick_buried_job(Server *s, job j)
545 int r;
546 int z;
548 z = walresvupdate(&s->wal, j);
549 if (!z) return 0;
550 j->walresv += z;
552 remove_buried_job(j);
554 j->r.kick_ct++;
555 r = enqueue_job(s, j, 0, 1);
556 if (r == 1) return 1;
558 /* ready queue is full, so bury it */
559 bury_job(s, j, 0);
560 return 0;
563 static uint
564 get_delayed_job_ct()
566 tube t;
567 size_t i;
568 uint count = 0;
570 for (i = 0; i < tubes.used; i++) {
571 t = tubes.items[i];
572 count += t->delay.len;
574 return count;
577 static int
578 kick_delayed_job(Server *s, job j)
580 int r;
581 int z;
583 z = walresvupdate(&s->wal, j);
584 if (!z) return 0;
585 j->walresv += z;
587 heapremove(&j->tube->delay, j->heap_index);
589 j->r.kick_ct++;
590 r = enqueue_job(s, j, 0, 1);
591 if (r == 1) return 1;
593 /* ready queue is full, so delay it again */
594 r = enqueue_job(s, j, j->r.delay, 0);
595 if (r == 1) return 0;
597 /* last resort */
598 bury_job(s, j, 0);
599 return 0;
602 /* return the number of jobs successfully kicked */
603 static uint
604 kick_buried_jobs(Server *s, tube t, uint n)
606 uint i;
607 for (i = 0; (i < n) && buried_job_p(t); ++i) {
608 kick_buried_job(s, t->buried.next);
610 return i;
613 /* return the number of jobs successfully kicked */
614 static uint
615 kick_delayed_jobs(Server *s, tube t, uint n)
617 uint i;
618 for (i = 0; (i < n) && (t->delay.len > 0); ++i) {
619 kick_delayed_job(s, (job)t->delay.data[0]);
621 return i;
624 static uint
625 kick_jobs(Server *s, tube t, uint n)
627 if (buried_job_p(t)) return kick_buried_jobs(s, t, n);
628 return kick_delayed_jobs(s, t, n);
631 static job
632 remove_buried_job(job j)
634 if (!j || j->r.state != Buried) return NULL;
635 j = job_remove(j);
636 if (j) {
637 global_stat.buried_ct--;
638 j->tube->stat.buried_ct--;
640 return j;
643 static job
644 remove_delayed_job(job j)
646 if (!j || j->r.state != Delayed) return NULL;
647 heapremove(&j->tube->delay, j->heap_index);
649 return j;
652 static job
653 remove_ready_job(job j)
655 if (!j || j->r.state != Ready) return NULL;
656 heapremove(&j->tube->ready, j->heap_index);
657 ready_ct--;
658 if (j->r.pri < URGENT_THRESHOLD) {
659 global_stat.urgent_ct--;
660 j->tube->stat.urgent_ct--;
662 return j;
665 static void
666 enqueue_waiting_conn(Conn *c)
668 tube t;
669 size_t i;
671 global_stat.waiting_ct++;
672 c->type |= CONN_TYPE_WAITING;
673 for (i = 0; i < c->watch.used; i++) {
674 t = c->watch.items[i];
675 t->stat.waiting_ct++;
676 ms_append(&t->waiting, c);
680 static job
681 find_reserved_job_in_conn(Conn *c, job j)
683 return (j && j->reserver == c && j->r.state == Reserved) ? j : NULL;
686 static job
687 touch_job(Conn *c, job j)
689 j = find_reserved_job_in_conn(c, j);
690 if (j) {
691 j->r.deadline_at = nanoseconds() + j->r.ttr;
692 c->soonest_job = NULL;
694 return j;
697 static job
698 peek_job(uint64 id)
700 return job_find(id);
703 static void
704 check_err(Conn *c, const char *s)
706 if (errno == EAGAIN) return;
707 if (errno == EINTR) return;
708 if (errno == EWOULDBLOCK) return;
710 twarn("%s", s);
711 c->state = STATE_CLOSE;
712 return;
715 /* Scan the given string for the sequence "\r\n" and return the line length.
716 * Always returns at least 2 if a match is found. Returns 0 if no match. */
717 static int
718 scan_line_end(const char *s, int size)
720 char *match;
722 match = memchr(s, '\r', size - 1);
723 if (!match) return 0;
725 /* this is safe because we only scan size - 1 chars above */
726 if (match[1] == '\n') return match - s + 2;
728 return 0;
731 static int
732 cmd_len(Conn *c)
734 return scan_line_end(c->cmd, c->cmd_read);
737 /* parse the command line */
738 static int
739 which_cmd(Conn *c)
741 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
742 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
743 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
744 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
745 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
746 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
747 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
748 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
749 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
750 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
751 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
752 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
753 TEST_CMD(c->cmd, CMD_JOBKICK, OP_JOBKICK);
754 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
755 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
756 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
757 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
758 TEST_CMD(c->cmd, CMD_USE, OP_USE);
759 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
760 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
761 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
762 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
763 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
764 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
765 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
766 return OP_UNKNOWN;
769 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
770 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
771 * This function is idempotent(). */
772 static void
773 fill_extra_data(Conn *c)
775 int extra_bytes, job_data_bytes = 0, cmd_bytes;
777 if (!c->sock.fd) return; /* the connection was closed */
778 if (!c->cmd_len) return; /* we don't have a complete command */
780 /* how many extra bytes did we read? */
781 extra_bytes = c->cmd_read - c->cmd_len;
783 /* how many bytes should we put into the job body? */
784 if (c->in_job) {
785 job_data_bytes = min(extra_bytes, c->in_job->r.body_size);
786 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
787 c->in_job_read = job_data_bytes;
788 } else if (c->in_job_read) {
789 /* we are in bit-bucket mode, throwing away data */
790 job_data_bytes = min(extra_bytes, c->in_job_read);
791 c->in_job_read -= job_data_bytes;
794 /* how many bytes are left to go into the future cmd? */
795 cmd_bytes = extra_bytes - job_data_bytes;
796 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
797 c->cmd_read = cmd_bytes;
798 c->cmd_len = 0; /* we no longer know the length of the new command */
801 static void
802 _skip(Conn *c, int n, char *line, int len)
804 /* Invert the meaning of in_job_read while throwing away data -- it
805 * counts the bytes that remain to be thrown away. */
806 c->in_job = 0;
807 c->in_job_read = n;
808 fill_extra_data(c);
810 if (c->in_job_read == 0) return reply(c, line, len, STATE_SENDWORD);
812 c->reply = line;
813 c->reply_len = len;
814 c->reply_sent = 0;
815 c->state = STATE_BITBUCKET;
816 return;
819 #define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
821 static void
822 enqueue_incoming_job(Conn *c)
824 int r;
825 job j = c->in_job;
827 c->in_job = NULL; /* the connection no longer owns this job */
828 c->in_job_read = 0;
830 /* check if the trailer is present and correct */
831 if (memcmp(j->body + j->r.body_size - 2, "\r\n", 2)) {
832 job_free(j);
833 return reply_msg(c, MSG_EXPECTED_CRLF);
836 if (verbose >= 2) {
837 printf("<%d job %"PRIu64"\n", c->sock.fd, j->r.id);
840 if (drain_mode) {
841 job_free(j);
842 return reply_serr(c, MSG_DRAINING);
845 if (j->walresv) return reply_serr(c, MSG_INTERNAL_ERROR);
846 j->walresv = walresvput(&c->srv->wal, j);
847 if (!j->walresv) return reply_serr(c, MSG_OUT_OF_MEMORY);
849 /* we have a complete job, so let's stick it in the pqueue */
850 r = enqueue_job(c->srv, j, j->r.delay, 1);
851 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
853 global_stat.total_jobs_ct++;
854 j->tube->stat.total_jobs_ct++;
856 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->r.id);
858 /* out of memory trying to grow the queue, so it gets buried */
859 bury_job(c->srv, j, 0);
860 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
863 static uint
864 uptime()
866 return (nanoseconds() - started_at) / 1000000000;
869 static int
870 fmt_stats(char *buf, size_t size, void *x)
872 int whead = 0, wcur = 0;
873 Server *srv;
874 struct rusage ru = {{0, 0}, {0, 0}};
876 srv = x;
878 if (srv->wal.head) {
879 whead = srv->wal.head->seq;
882 if (srv->wal.cur) {
883 wcur = srv->wal.cur->seq;
886 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
887 return snprintf(buf, size, STATS_FMT,
888 global_stat.urgent_ct,
889 ready_ct,
890 global_stat.reserved_ct,
891 get_delayed_job_ct(),
892 global_stat.buried_ct,
893 op_ct[OP_PUT],
894 op_ct[OP_PEEKJOB],
895 op_ct[OP_PEEK_READY],
896 op_ct[OP_PEEK_DELAYED],
897 op_ct[OP_PEEK_BURIED],
898 op_ct[OP_RESERVE],
899 op_ct[OP_RESERVE_TIMEOUT],
900 op_ct[OP_DELETE],
901 op_ct[OP_RELEASE],
902 op_ct[OP_USE],
903 op_ct[OP_WATCH],
904 op_ct[OP_IGNORE],
905 op_ct[OP_BURY],
906 op_ct[OP_KICK],
907 op_ct[OP_TOUCH],
908 op_ct[OP_STATS],
909 op_ct[OP_JOBSTATS],
910 op_ct[OP_STATS_TUBE],
911 op_ct[OP_LIST_TUBES],
912 op_ct[OP_LIST_TUBE_USED],
913 op_ct[OP_LIST_TUBES_WATCHED],
914 op_ct[OP_PAUSE_TUBE],
915 timeout_ct,
916 global_stat.total_jobs_ct,
917 job_data_size_limit,
918 tubes.used,
919 count_cur_conns(),
920 count_cur_producers(),
921 count_cur_workers(),
922 global_stat.waiting_ct,
923 count_tot_conns(),
924 (long) getpid(),
925 version,
926 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
927 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
928 uptime(),
929 whead,
930 wcur,
931 srv->wal.nmig,
932 srv->wal.nrec,
933 srv->wal.filesize,
935 node_info.nodename);
939 /* Read a priority value from the given buffer and place it in pri.
940 * Update end to point to the address after the last character consumed.
941 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
942 * conversion and return the status code but not update any values. This is an
943 * easy way to check for errors.
944 * If end is NULL, read_pri will also check that the entire input string was
945 * consumed and return an error code otherwise.
946 * Return 0 on success, or nonzero on failure.
947 * If a failure occurs, pri and end are not modified. */
948 static int
949 read_pri(uint *pri, const char *buf, char **end)
951 char *tend;
952 uint tpri;
954 errno = 0;
955 while (buf[0] == ' ') buf++;
956 if (buf[0] < '0' || '9' < buf[0]) return -1;
957 tpri = strtoul(buf, &tend, 10);
958 if (tend == buf) return -1;
959 if (errno && errno != ERANGE) return -1;
960 if (!end && tend[0] != '\0') return -1;
962 if (pri) *pri = tpri;
963 if (end) *end = tend;
964 return 0;
967 /* Read a delay value from the given buffer and place it in delay.
968 * The interface and behavior are analogous to read_pri(). */
969 static int
970 read_delay(int64 *delay, const char *buf, char **end)
972 int r;
973 uint delay_sec;
975 r = read_pri(&delay_sec, buf, end);
976 if (r) return r;
977 *delay = ((int64) delay_sec) * 1000000000;
978 return 0;
981 /* Read a timeout value from the given buffer and place it in ttr.
982 * The interface and behavior are the same as in read_delay(). */
983 static int
984 read_ttr(int64 *ttr, const char *buf, char **end)
986 return read_delay(ttr, buf, end);
989 /* Read a tube name from the given buffer moving the buffer to the name start */
990 static int
991 read_tube_name(char **tubename, char *buf, char **end)
993 size_t len;
995 while (buf[0] == ' ') buf++;
996 len = strspn(buf, NAME_CHARS);
997 if (len == 0) return -1;
998 if (tubename) *tubename = buf;
999 if (end) *end = buf + len;
1000 return 0;
1003 static void
1004 wait_for_job(Conn *c, int timeout)
1006 c->state = STATE_WAIT;
1007 enqueue_waiting_conn(c);
1009 /* Set the pending timeout to the requested timeout amount */
1010 c->pending_timeout = timeout;
1012 connwant(c, 'h'); // only care if they hang up
1013 c->next = dirty;
1014 dirty = c;
1017 typedef int(*fmt_fn)(char *, size_t, void *);
1019 static void
1020 do_stats(Conn *c, fmt_fn fmt, void *data)
1022 int r, stats_len;
1024 /* first, measure how big a buffer we will need */
1025 stats_len = fmt(NULL, 0, data) + 16;
1027 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
1028 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1030 /* Mark this job as a copy so it can be appropriately freed later on */
1031 c->out_job->r.state = Copy;
1033 /* now actually format the stats data */
1034 r = fmt(c->out_job->body, stats_len, data);
1035 /* and set the actual body size */
1036 c->out_job->r.body_size = r;
1037 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
1039 c->out_job_sent = 0;
1040 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
1043 static void
1044 do_list_tubes(Conn *c, ms l)
1046 char *buf;
1047 tube t;
1048 size_t i, resp_z;
1050 /* first, measure how big a buffer we will need */
1051 resp_z = 6; /* initial "---\n" and final "\r\n" */
1052 for (i = 0; i < l->used; i++) {
1053 t = l->items[i];
1054 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
1057 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
1058 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1060 /* Mark this job as a copy so it can be appropriately freed later on */
1061 c->out_job->r.state = Copy;
1063 /* now actually format the response */
1064 buf = c->out_job->body;
1065 buf += snprintf(buf, 5, "---\n");
1066 for (i = 0; i < l->used; i++) {
1067 t = l->items[i];
1068 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1070 buf[0] = '\r';
1071 buf[1] = '\n';
1073 c->out_job_sent = 0;
1074 return reply_line(c, STATE_SENDJOB, "OK %zu\r\n", resp_z - 2);
1077 static int
1078 fmt_job_stats(char *buf, size_t size, job j)
1080 int64 t;
1081 int64 time_left;
1082 int file = 0;
1084 t = nanoseconds();
1085 if (j->r.state == Reserved || j->r.state == Delayed) {
1086 time_left = (j->r.deadline_at - t) / 1000000000;
1087 } else {
1088 time_left = 0;
1090 if (j->file) {
1091 file = j->file->seq;
1093 return snprintf(buf, size, STATS_JOB_FMT,
1094 j->r.id,
1095 j->tube->name,
1096 job_state(j),
1097 j->r.pri,
1098 (t - j->r.created_at) / 1000000000,
1099 j->r.delay / 1000000000,
1100 j->r.ttr / 1000000000,
1101 time_left,
1102 file,
1103 j->r.reserve_ct,
1104 j->r.timeout_ct,
1105 j->r.release_ct,
1106 j->r.bury_ct,
1107 j->r.kick_ct);
1110 static int
1111 fmt_stats_tube(char *buf, size_t size, tube t)
1113 uint64 time_left;
1115 if (t->pause > 0) {
1116 time_left = (t->deadline_at - nanoseconds()) / 1000000000;
1117 } else {
1118 time_left = 0;
1120 return snprintf(buf, size, STATS_TUBE_FMT,
1121 t->name,
1122 t->stat.urgent_ct,
1123 t->ready.len,
1124 t->stat.reserved_ct,
1125 t->delay.len,
1126 t->stat.buried_ct,
1127 t->stat.total_jobs_ct,
1128 t->using_ct,
1129 t->watching_ct,
1130 t->stat.waiting_ct,
1131 t->stat.total_delete_ct,
1132 t->stat.pause_ct,
1133 t->pause / 1000000000,
1134 time_left);
1137 static void
1138 maybe_enqueue_incoming_job(Conn *c)
1140 job j = c->in_job;
1142 /* do we have a complete job? */
1143 if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);
1145 /* otherwise we have incomplete data, so just keep waiting */
1146 c->state = STATE_WANTDATA;
1149 /* j can be NULL */
1150 static job
1151 remove_this_reserved_job(Conn *c, job j)
1153 j = job_remove(j);
1154 if (j) {
1155 global_stat.reserved_ct--;
1156 j->tube->stat.reserved_ct--;
1157 j->reserver = NULL;
1159 c->soonest_job = NULL;
1160 return j;
1163 static job
1164 remove_reserved_job(Conn *c, job j)
1166 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1169 static int
1170 name_is_ok(const char *name, size_t max)
1172 size_t len = strlen(name);
1173 return len > 0 && len <= max &&
1174 strspn(name, NAME_CHARS) == len && name[0] != '-';
1177 void
1178 prot_remove_tube(tube t)
1180 ms_remove(&tubes, t);
1183 static void
1184 dispatch_cmd(Conn *c)
1186 int r, i, timeout = -1;
1187 int z;
1188 uint count;
1189 job j = 0;
1190 byte type;
1191 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1192 uint pri, body_size;
1193 int64 delay, ttr;
1194 uint64 id;
1195 tube t = NULL;
1197 /* NUL-terminate this string so we can use strtol and friends */
1198 c->cmd[c->cmd_len - 2] = '\0';
1200 /* check for possible maliciousness */
1201 if (strlen(c->cmd) != c->cmd_len - 2) {
1202 return reply_msg(c, MSG_BAD_FORMAT);
1205 type = which_cmd(c);
1206 if (verbose >= 2) {
1207 printf("<%d command %s\n", c->sock.fd, op_names[type]);
1210 switch (type) {
1211 case OP_PUT:
1212 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1213 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1215 r = read_delay(&delay, delay_buf, &ttr_buf);
1216 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1218 r = read_ttr(&ttr, ttr_buf, &size_buf);
1219 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1221 errno = 0;
1222 body_size = strtoul(size_buf, &end_buf, 10);
1223 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1225 op_ct[type]++;
1227 if (body_size > job_data_size_limit) {
1228 /* throw away the job body and respond with JOB_TOO_BIG */
1229 return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
1232 /* don't allow trailing garbage */
1233 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1235 connsetproducer(c);
1237 if (ttr < 1000000000) {
1238 ttr = 1000000000;
1241 c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);
1243 /* OOM? */
1244 if (!c->in_job) {
1245 /* throw away the job body and respond with OUT_OF_MEMORY */
1246 twarnx("server error: " MSG_OUT_OF_MEMORY);
1247 return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
1250 fill_extra_data(c);
1252 /* it's possible we already have a complete job */
1253 maybe_enqueue_incoming_job(c);
1255 break;
1256 case OP_PEEK_READY:
1257 /* don't allow trailing garbage */
1258 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1259 return reply_msg(c, MSG_BAD_FORMAT);
1261 op_ct[type]++;
1263 if (c->use->ready.len) {
1264 j = job_copy(c->use->ready.data[0]);
1267 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1269 reply_job(c, j, MSG_FOUND);
1270 break;
1271 case OP_PEEK_DELAYED:
1272 /* don't allow trailing garbage */
1273 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1274 return reply_msg(c, MSG_BAD_FORMAT);
1276 op_ct[type]++;
1278 if (c->use->delay.len) {
1279 j = job_copy(c->use->delay.data[0]);
1282 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1284 reply_job(c, j, MSG_FOUND);
1285 break;
1286 case OP_PEEK_BURIED:
1287 /* don't allow trailing garbage */
1288 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1289 return reply_msg(c, MSG_BAD_FORMAT);
1291 op_ct[type]++;
1293 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1295 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1297 reply_job(c, j, MSG_FOUND);
1298 break;
1299 case OP_PEEKJOB:
1300 errno = 0;
1301 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1302 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1303 op_ct[type]++;
1305 /* So, peek is annoying, because some other connection might free the
1306 * job while we are still trying to write it out. So we copy it and
1307 * then free the copy when it's done sending. */
1308 j = job_copy(peek_job(id));
1310 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1312 reply_job(c, j, MSG_FOUND);
1313 break;
1314 case OP_RESERVE_TIMEOUT:
1315 errno = 0;
1316 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1317 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1318 case OP_RESERVE: /* FALLTHROUGH */
1319 /* don't allow trailing garbage */
1320 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1321 return reply_msg(c, MSG_BAD_FORMAT);
1324 op_ct[type]++;
1325 connsetworker(c);
1327 if (conndeadlinesoon(c) && !conn_ready(c)) {
1328 return reply_msg(c, MSG_DEADLINE_SOON);
1331 /* try to get a new job for this guy */
1332 wait_for_job(c, timeout);
1333 process_queue();
1334 break;
1335 case OP_DELETE:
1336 errno = 0;
1337 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1338 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1339 op_ct[type]++;
1341 j = job_find(id);
1342 j = remove_reserved_job(c, j) ? :
1343 remove_ready_job(j) ? :
1344 remove_buried_job(j) ? :
1345 remove_delayed_job(j);
1347 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1349 j->tube->stat.total_delete_ct++;
1351 j->r.state = Invalid;
1352 r = walwrite(&c->srv->wal, j);
1353 walmaint(&c->srv->wal);
1354 job_free(j);
1356 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1358 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1359 break;
1360 case OP_RELEASE:
1361 errno = 0;
1362 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1363 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1365 r = read_pri(&pri, pri_buf, &delay_buf);
1366 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1368 r = read_delay(&delay, delay_buf, NULL);
1369 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1370 op_ct[type]++;
1372 j = remove_reserved_job(c, job_find(id));
1374 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1376 /* We want to update the delay deadline on disk, so reserve space for
1377 * that. */
1378 if (delay) {
1379 z = walresvupdate(&c->srv->wal, j);
1380 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1381 j->walresv += z;
1384 j->r.pri = pri;
1385 j->r.delay = delay;
1386 j->r.release_ct++;
1388 r = enqueue_job(c->srv, j, delay, !!delay);
1389 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1390 if (r == 1) {
1391 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1394 /* out of memory trying to grow the queue, so it gets buried */
1395 bury_job(c->srv, j, 0);
1396 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1397 break;
1398 case OP_BURY:
1399 errno = 0;
1400 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1401 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1403 r = read_pri(&pri, pri_buf, NULL);
1404 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1405 op_ct[type]++;
1407 j = remove_reserved_job(c, job_find(id));
1409 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1411 j->r.pri = pri;
1412 r = bury_job(c->srv, j, 1);
1413 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1414 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1415 break;
1416 case OP_KICK:
1417 errno = 0;
1418 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1419 if (end_buf == c->cmd + CMD_KICK_LEN) {
1420 return reply_msg(c, MSG_BAD_FORMAT);
1422 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1424 op_ct[type]++;
1426 i = kick_jobs(c->srv, c->use, count);
1428 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1429 case OP_JOBKICK:
1430 errno = 0;
1431 id = strtoull(c->cmd + CMD_JOBKICK_LEN, &end_buf, 10);
1432 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1434 op_ct[type]++;
1436 j = job_find(id);
1437 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1439 if ((j->r.state == Buried && kick_buried_job(c->srv, j)) ||
1440 (j->r.state == Delayed && kick_delayed_job(c->srv, j))) {
1441 reply(c, MSG_KICKED, MSG_KICKED_LEN, STATE_SENDWORD);
1442 } else {
1443 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1445 break;
1446 case OP_TOUCH:
1447 errno = 0;
1448 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1449 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1451 op_ct[type]++;
1453 j = touch_job(c, job_find(id));
1455 if (j) {
1456 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1457 } else {
1458 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1460 break;
1461 case OP_STATS:
1462 /* don't allow trailing garbage */
1463 if (c->cmd_len != CMD_STATS_LEN + 2) {
1464 return reply_msg(c, MSG_BAD_FORMAT);
1467 op_ct[type]++;
1469 do_stats(c, fmt_stats, c->srv);
1470 break;
1471 case OP_JOBSTATS:
1472 errno = 0;
1473 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1474 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1476 op_ct[type]++;
1478 j = peek_job(id);
1479 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1481 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1482 do_stats(c, (fmt_fn) fmt_job_stats, j);
1483 break;
1484 case OP_STATS_TUBE:
1485 name = c->cmd + CMD_STATS_TUBE_LEN;
1486 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1488 op_ct[type]++;
1490 t = tube_find(name);
1491 if (!t) return reply_msg(c, MSG_NOTFOUND);
1493 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1494 t = NULL;
1495 break;
1496 case OP_LIST_TUBES:
1497 /* don't allow trailing garbage */
1498 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1499 return reply_msg(c, MSG_BAD_FORMAT);
1502 op_ct[type]++;
1503 do_list_tubes(c, &tubes);
1504 break;
1505 case OP_LIST_TUBE_USED:
1506 /* don't allow trailing garbage */
1507 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1508 return reply_msg(c, MSG_BAD_FORMAT);
1511 op_ct[type]++;
1512 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1513 break;
1514 case OP_LIST_TUBES_WATCHED:
1515 /* don't allow trailing garbage */
1516 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1517 return reply_msg(c, MSG_BAD_FORMAT);
1520 op_ct[type]++;
1521 do_list_tubes(c, &c->watch);
1522 break;
1523 case OP_USE:
1524 name = c->cmd + CMD_USE_LEN;
1525 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1526 op_ct[type]++;
1528 TUBE_ASSIGN(t, tube_find_or_make(name));
1529 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1531 c->use->using_ct--;
1532 TUBE_ASSIGN(c->use, t);
1533 TUBE_ASSIGN(t, NULL);
1534 c->use->using_ct++;
1536 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1537 break;
1538 case OP_WATCH:
1539 name = c->cmd + CMD_WATCH_LEN;
1540 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1541 op_ct[type]++;
1543 TUBE_ASSIGN(t, tube_find_or_make(name));
1544 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1546 r = 1;
1547 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1548 TUBE_ASSIGN(t, NULL);
1549 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1551 reply_line(c, STATE_SENDWORD, "WATCHING %zu\r\n", c->watch.used);
1552 break;
1553 case OP_IGNORE:
1554 name = c->cmd + CMD_IGNORE_LEN;
1555 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1556 op_ct[type]++;
1558 t = NULL;
1559 for (i = 0; i < c->watch.used; i++) {
1560 t = c->watch.items[i];
1561 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1562 t = NULL;
1565 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1567 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1568 t = NULL;
1570 reply_line(c, STATE_SENDWORD, "WATCHING %zu\r\n", c->watch.used);
1571 break;
1572 case OP_QUIT:
1573 c->state = STATE_CLOSE;
1574 break;
1575 case OP_PAUSE_TUBE:
1576 op_ct[type]++;
1578 r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
1579 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1581 r = read_delay(&delay, delay_buf, NULL);
1582 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1584 *delay_buf = '\0';
1585 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1586 t = tube_find(name);
1587 if (!t) return reply_msg(c, MSG_NOTFOUND);
1589 // Always pause for a positive amount of time, to make sure
1590 // that waiting clients wake up when the deadline arrives.
1591 if (delay == 0) {
1592 delay = 1;
1595 t->deadline_at = nanoseconds() + delay;
1596 t->pause = delay;
1597 t->stat.pause_ct++;
1599 reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
1600 break;
1601 default:
1602 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1606 /* There are three reasons this function may be called. We need to check for
1607 * all of them.
1609 * 1. A reserved job has run out of time.
1610 * 2. A waiting client's reserved job has entered the safety margin.
1611 * 3. A waiting client's requested timeout has occurred.
1613 * If any of these happen, we must do the appropriate thing. */
1614 static void
1615 conn_timeout(Conn *c)
1617 int r, should_timeout = 0;
1618 job j;
1620 /* Check if the client was trying to reserve a job. */
1621 if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout = 1;
1623 /* Check if any reserved jobs have run out of time. We should do this
1624 * whether or not the client is waiting for a new reservation. */
1625 while ((j = connsoonestjob(c))) {
1626 if (j->r.deadline_at >= nanoseconds()) break;
1628 /* This job is in the middle of being written out. If we return it to
1629 * the ready queue, someone might free it before we finish writing it
1630 * out to the socket. So we'll copy it here and free the copy when it's
1631 * done sending. */
1632 if (j == c->out_job) {
1633 c->out_job = job_copy(c->out_job);
1636 timeout_ct++; /* stats */
1637 j->r.timeout_ct++;
1638 r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0);
1639 if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */
1640 connsched(c);
1643 if (should_timeout) {
1644 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1645 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1646 c->pending_timeout = -1;
1647 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1651 void
1652 enter_drain_mode(int sig)
1654 drain_mode = 1;
1657 static void
1658 do_cmd(Conn *c)
1660 dispatch_cmd(c);
1661 fill_extra_data(c);
1664 static void
1665 reset_conn(Conn *c)
1667 connwant(c, 'r');
1668 c->next = dirty;
1669 dirty = c;
1671 /* was this a peek or stats command? */
1672 if (c->out_job && c->out_job->r.state == Copy) job_free(c->out_job);
1673 c->out_job = NULL;
1675 c->reply_sent = 0; /* now that we're done, reset this */
1676 c->state = STATE_WANTCOMMAND;
1679 static void
1680 conn_data(Conn *c)
1682 int r, to_read;
1683 job j;
1684 struct iovec iov[2];
1686 switch (c->state) {
1687 case STATE_WANTCOMMAND:
1688 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1689 if (r == -1) return check_err(c, "read()");
1690 if (r == 0) {
1691 c->state = STATE_CLOSE;
1692 return;
1695 c->cmd_read += r; /* we got some bytes */
1697 c->cmd_len = cmd_len(c); /* find the EOL */
1699 /* yay, complete command line */
1700 if (c->cmd_len) return do_cmd(c);
1702 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1704 /* command line too long? */
1705 if (c->cmd_read == LINE_BUF_SIZE) {
1706 c->cmd_read = 0; /* discard the input so far */
1707 return reply_msg(c, MSG_BAD_FORMAT);
1710 /* otherwise we have an incomplete line, so just keep waiting */
1711 break;
1712 case STATE_BITBUCKET:
1713 /* Invert the meaning of in_job_read while throwing away data -- it
1714 * counts the bytes that remain to be thrown away. */
1715 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1716 r = read(c->sock.fd, bucket, to_read);
1717 if (r == -1) return check_err(c, "read()");
1718 if (r == 0) {
1719 c->state = STATE_CLOSE;
1720 return;
1723 c->in_job_read -= r; /* we got some bytes */
1725 /* (c->in_job_read < 0) can't happen */
1727 if (c->in_job_read == 0) {
1728 return reply(c, c->reply, c->reply_len, STATE_SENDWORD);
1730 break;
1731 case STATE_WANTDATA:
1732 j = c->in_job;
1734 r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
1735 if (r == -1) return check_err(c, "read()");
1736 if (r == 0) {
1737 c->state = STATE_CLOSE;
1738 return;
1741 c->in_job_read += r; /* we got some bytes */
1743 /* (j->in_job_read > j->r.body_size) can't happen */
1745 maybe_enqueue_incoming_job(c);
1746 break;
1747 case STATE_SENDWORD:
1748 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1749 if (r == -1) return check_err(c, "write()");
1750 if (r == 0) {
1751 c->state = STATE_CLOSE;
1752 return;
1755 c->reply_sent += r; /* we got some bytes */
1757 /* (c->reply_sent > c->reply_len) can't happen */
1759 if (c->reply_sent == c->reply_len) return reset_conn(c);
1761 /* otherwise we sent an incomplete reply, so just keep waiting */
1762 break;
1763 case STATE_SENDJOB:
1764 j = c->out_job;
1766 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1767 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1768 iov[1].iov_base = j->body + c->out_job_sent;
1769 iov[1].iov_len = j->r.body_size - c->out_job_sent;
1771 r = writev(c->sock.fd, iov, 2);
1772 if (r == -1) return check_err(c, "writev()");
1773 if (r == 0) {
1774 c->state = STATE_CLOSE;
1775 return;
1778 /* update the sent values */
1779 c->reply_sent += r;
1780 if (c->reply_sent >= c->reply_len) {
1781 c->out_job_sent += c->reply_sent - c->reply_len;
1782 c->reply_sent = c->reply_len;
1785 /* (c->out_job_sent > j->r.body_size) can't happen */
1787 /* are we done? */
1788 if (c->out_job_sent == j->r.body_size) {
1789 if (verbose >= 2) {
1790 printf(">%d job %"PRIu64"\n", c->sock.fd, j->r.id);
1792 return reset_conn(c);
1795 /* otherwise we sent incomplete data, so just keep waiting */
1796 break;
1797 case STATE_WAIT:
1798 if (c->halfclosed) {
1799 c->pending_timeout = -1;
1800 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1802 break;
1806 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
1807 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1809 static void
1810 update_conns()
1812 int r;
1813 Conn *c;
1815 while (dirty) {
1816 c = dirty;
1817 dirty = dirty->next;
1818 c->next = NULL;
1819 r = sockwant(&c->sock, c->rw);
1820 if (r == -1) {
1821 twarn("sockwant");
1822 connclose(c);
1827 static void
1828 h_conn(const int fd, const short which, Conn *c)
1830 if (fd != c->sock.fd) {
1831 twarnx("Argh! event fd doesn't match conn fd.");
1832 close(fd);
1833 connclose(c);
1834 update_conns();
1835 return;
1838 if (which == 'h') {
1839 c->halfclosed = 1;
1842 conn_data(c);
1843 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1844 if (c->state == STATE_CLOSE) {
1845 protrmdirty(c);
1846 connclose(c);
1848 update_conns();
1851 static void
1852 prothandle(Conn *c, int ev)
1854 h_conn(c->sock.fd, ev, c);
1857 int64
1858 prottick(Server *s)
1860 int r;
1861 job j;
1862 int64 now;
1863 int i;
1864 tube t;
1865 int64 period = 0x34630B8A000LL; /* 1 hour in nanoseconds */
1866 int64 d;
1868 now = nanoseconds();
1869 while ((j = delay_q_peek())) {
1870 d = j->r.deadline_at - now;
1871 if (d > 0) {
1872 period = min(period, d);
1873 break;
1875 j = delay_q_take();
1876 r = enqueue_job(s, j, 0, 0);
1877 if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */
1880 for (i = 0; i < tubes.used; i++) {
1881 t = tubes.items[i];
1882 d = t->deadline_at - now;
1883 if (t->pause && d <= 0) {
1884 t->pause = 0;
1885 process_queue();
1887 else if (d > 0) {
1888 period = min(period, d);
1892 while (s->conns.len) {
1893 Conn *c = s->conns.data[0];
1894 d = c->tickat - now;
1895 if (d > 0) {
1896 period = min(period, d);
1897 break;
1900 heapremove(&s->conns, 0);
1901 conn_timeout(c);
1904 update_conns();
1906 return period;
1909 void
1910 h_accept(const int fd, const short which, Server *s)
1912 Conn *c;
1913 int cfd, flags, r;
1914 socklen_t addrlen;
1915 struct sockaddr_in6 addr;
1917 addrlen = sizeof addr;
1918 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
1919 if (cfd == -1) {
1920 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1921 update_conns();
1922 return;
1924 if (verbose) {
1925 printf("accept %d\n", cfd);
1928 flags = fcntl(cfd, F_GETFL, 0);
1929 if (flags < 0) {
1930 twarn("getting flags");
1931 close(cfd);
1932 if (verbose) {
1933 printf("close %d\n", cfd);
1935 update_conns();
1936 return;
1939 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1940 if (r < 0) {
1941 twarn("setting O_NONBLOCK");
1942 close(cfd);
1943 if (verbose) {
1944 printf("close %d\n", cfd);
1946 update_conns();
1947 return;
1950 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1951 if (!c) {
1952 twarnx("make_conn() failed");
1953 close(cfd);
1954 if (verbose) {
1955 printf("close %d\n", cfd);
1957 update_conns();
1958 return;
1960 c->srv = s;
1961 c->sock.x = c;
1962 c->sock.f = (Handle)prothandle;
1963 c->sock.fd = cfd;
1965 r = sockwant(&c->sock, 'r');
1966 if (r == -1) {
1967 twarn("sockwant");
1968 close(cfd);
1969 if (verbose) {
1970 printf("close %d\n", cfd);
1972 update_conns();
1973 return;
1975 update_conns();
1978 void
1979 prot_init()
1981 started_at = nanoseconds();
1982 memset(op_ct, 0, sizeof(op_ct));
1984 int dev_random = open("/dev/urandom", O_RDONLY);
1985 if (dev_random < 0) {
1986 twarn("open /dev/urandom");
1987 exit(50);
1990 int i, r;
1991 byte rand_data[NumIdBytes];
1992 r = read(dev_random, &rand_data, NumIdBytes);
1993 if (r != NumIdBytes) {
1994 twarn("read /dev/urandom");
1995 exit(50);
1997 for (i = 0; i < NumIdBytes; i++) {
1998 sprintf(id + (i * 2), "%02x", rand_data[i]);
2000 close(dev_random);
2002 if (uname(&node_info) == -1) {
2003 warn("uname");
2004 exit(50);
2007 ms_init(&tubes, NULL, NULL);
2009 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
2010 if (!default_tube) twarnx("Out of memory during startup!");
2013 // For each job in list, inserts the job into the appropriate data
2014 // structures and adds it to the log.
2016 // Returns 1 on success, 0 on failure.
2018 prot_replay(Server *s, job list)
2020 job j, nj;
2021 int64 t, delay;
2022 int r, z;
2024 for (j = list->next ; j != list ; j = nj) {
2025 nj = j->next;
2026 job_remove(j);
2027 z = walresvupdate(&s->wal, j);
2028 if (!z) {
2029 twarnx("failed to reserve space");
2030 return 0;
2032 delay = 0;
2033 switch (j->r.state) {
2034 case Buried:
2035 bury_job(s, j, 0);
2036 break;
2037 case Delayed:
2038 t = nanoseconds();
2039 if (t < j->r.deadline_at) {
2040 delay = j->r.deadline_at - t;
2042 /* fall through */
2043 default:
2044 r = enqueue_job(s, j, delay, 0);
2045 if (r < 1) twarnx("error recovering job %"PRIu64, j->r.id);
2048 return 1;