update to newest ct; fixes #60
[beanstalkd.git] / prot.c
blob2306ec8a92df43ffccaac8ef5d5916f9d971c4ff
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <sys/resource.h>
27 #include <sys/uio.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <ctype.h>
32 #include <inttypes.h>
33 #include <stdarg.h>
34 #include "dat.h"
36 /* job body cannot be greater than this many bytes long */
37 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
39 #define NAME_CHARS \
40 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
41 "abcdefghijklmnopqrstuvwxyz" \
42 "0123456789-+/;.$_()"
44 #define CMD_PUT "put "
45 #define CMD_PEEKJOB "peek "
46 #define CMD_PEEK_READY "peek-ready"
47 #define CMD_PEEK_DELAYED "peek-delayed"
48 #define CMD_PEEK_BURIED "peek-buried"
49 #define CMD_RESERVE "reserve"
50 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_TOUCH "touch "
56 #define CMD_STATS "stats"
57 #define CMD_JOBSTATS "stats-job "
58 #define CMD_USE "use "
59 #define CMD_WATCH "watch "
60 #define CMD_IGNORE "ignore "
61 #define CMD_LIST_TUBES "list-tubes"
62 #define CMD_LIST_TUBE_USED "list-tube-used"
63 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
64 #define CMD_STATS_TUBE "stats-tube "
65 #define CMD_QUIT "quit"
66 #define CMD_PAUSE_TUBE "pause-tube"
68 #define CONSTSTRLEN(m) (sizeof(m) - 1)
70 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
71 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
72 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
73 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
74 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
75 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
76 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
77 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
78 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
79 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
80 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
81 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
82 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
83 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
84 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
85 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
86 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
87 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
88 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
89 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
90 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
92 #define MSG_FOUND "FOUND"
93 #define MSG_NOTFOUND "NOT_FOUND\r\n"
94 #define MSG_RESERVED "RESERVED"
95 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
96 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
97 #define MSG_DELETED "DELETED\r\n"
98 #define MSG_RELEASED "RELEASED\r\n"
99 #define MSG_BURIED "BURIED\r\n"
100 #define MSG_TOUCHED "TOUCHED\r\n"
101 #define MSG_BURIED_FMT "BURIED %llu\r\n"
102 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
103 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
105 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
106 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
107 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
108 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
109 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
110 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
112 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
113 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
114 #define MSG_DRAINING "DRAINING\r\n"
115 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
116 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
117 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
118 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
120 #define STATE_WANTCOMMAND 0
121 #define STATE_WANTDATA 1
122 #define STATE_SENDJOB 2
123 #define STATE_SENDWORD 3
124 #define STATE_WAIT 4
125 #define STATE_BITBUCKET 5
127 #define OP_UNKNOWN 0
128 #define OP_PUT 1
129 #define OP_PEEKJOB 2
130 #define OP_RESERVE 3
131 #define OP_DELETE 4
132 #define OP_RELEASE 5
133 #define OP_BURY 6
134 #define OP_KICK 7
135 #define OP_STATS 8
136 #define OP_JOBSTATS 9
137 #define OP_PEEK_BURIED 10
138 #define OP_USE 11
139 #define OP_WATCH 12
140 #define OP_IGNORE 13
141 #define OP_LIST_TUBES 14
142 #define OP_LIST_TUBE_USED 15
143 #define OP_LIST_TUBES_WATCHED 16
144 #define OP_STATS_TUBE 17
145 #define OP_PEEK_READY 18
146 #define OP_PEEK_DELAYED 19
147 #define OP_RESERVE_TIMEOUT 20
148 #define OP_TOUCH 21
149 #define OP_QUIT 22
150 #define OP_PAUSE_TUBE 23
151 #define TOTAL_OPS 24
153 #define STATS_FMT "---\n" \
154 "current-jobs-urgent: %u\n" \
155 "current-jobs-ready: %u\n" \
156 "current-jobs-reserved: %u\n" \
157 "current-jobs-delayed: %u\n" \
158 "current-jobs-buried: %u\n" \
159 "cmd-put: %" PRIu64 "\n" \
160 "cmd-peek: %" PRIu64 "\n" \
161 "cmd-peek-ready: %" PRIu64 "\n" \
162 "cmd-peek-delayed: %" PRIu64 "\n" \
163 "cmd-peek-buried: %" PRIu64 "\n" \
164 "cmd-reserve: %" PRIu64 "\n" \
165 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
166 "cmd-delete: %" PRIu64 "\n" \
167 "cmd-release: %" PRIu64 "\n" \
168 "cmd-use: %" PRIu64 "\n" \
169 "cmd-watch: %" PRIu64 "\n" \
170 "cmd-ignore: %" PRIu64 "\n" \
171 "cmd-bury: %" PRIu64 "\n" \
172 "cmd-kick: %" PRIu64 "\n" \
173 "cmd-touch: %" PRIu64 "\n" \
174 "cmd-stats: %" PRIu64 "\n" \
175 "cmd-stats-job: %" PRIu64 "\n" \
176 "cmd-stats-tube: %" PRIu64 "\n" \
177 "cmd-list-tubes: %" PRIu64 "\n" \
178 "cmd-list-tube-used: %" PRIu64 "\n" \
179 "cmd-list-tubes-watched: %" PRIu64 "\n" \
180 "cmd-pause-tube: %" PRIu64 "\n" \
181 "job-timeouts: %" PRIu64 "\n" \
182 "total-jobs: %" PRIu64 "\n" \
183 "max-job-size: %zu\n" \
184 "current-tubes: %zu\n" \
185 "current-connections: %u\n" \
186 "current-producers: %u\n" \
187 "current-workers: %u\n" \
188 "current-waiting: %u\n" \
189 "total-connections: %u\n" \
190 "pid: %ld\n" \
191 "version: %s\n" \
192 "rusage-utime: %d.%06d\n" \
193 "rusage-stime: %d.%06d\n" \
194 "uptime: %u\n" \
195 "binlog-oldest-index: %s\n" \
196 "binlog-current-index: %s\n" \
197 "binlog-max-size: %zu\n" \
198 "\r\n"
200 #define STATS_TUBE_FMT "---\n" \
201 "name: %s\n" \
202 "current-jobs-urgent: %u\n" \
203 "current-jobs-ready: %u\n" \
204 "current-jobs-reserved: %u\n" \
205 "current-jobs-delayed: %u\n" \
206 "current-jobs-buried: %u\n" \
207 "total-jobs: %" PRIu64 "\n" \
208 "current-using: %u\n" \
209 "current-watching: %u\n" \
210 "current-waiting: %u\n" \
211 "cmd-pause-tube: %u\n" \
212 "pause: %" PRIu64 "\n" \
213 "pause-time-left: %" PRIu64 "\n" \
214 "\r\n"
216 #define STATS_JOB_FMT "---\n" \
217 "id: %" PRIu64 "\n" \
218 "tube: %s\n" \
219 "state: %s\n" \
220 "pri: %u\n" \
221 "age: %" PRIu64 "\n" \
222 "delay: %" PRIu64 "\n" \
223 "ttr: %" PRIu64 "\n" \
224 "time-left: %" PRIu64 "\n" \
225 "reserves: %u\n" \
226 "timeouts: %u\n" \
227 "releases: %u\n" \
228 "buries: %u\n" \
229 "kicks: %u\n" \
230 "\r\n"
232 /* this number is pretty arbitrary */
233 #define BUCKET_BUF_SIZE 1024
235 static char bucket[BUCKET_BUF_SIZE];
237 static uint ready_ct = 0;
238 static struct stats global_stat = {0, 0, 0, 0, 0};
240 static tube default_tube;
242 static int drain_mode = 0;
243 static int64 started_at;
244 static uint64 op_ct[TOTAL_OPS], timeout_ct = 0;
246 static struct conn dirty = {&dirty, &dirty};
248 /* Doubly-linked list of connections with at least one reserved job. */
249 static struct conn running = { &running, &running };
251 #ifdef DEBUG
252 static const char * op_names[] = {
253 "<unknown>",
254 CMD_PUT,
255 CMD_PEEKJOB,
256 CMD_RESERVE,
257 CMD_DELETE,
258 CMD_RELEASE,
259 CMD_BURY,
260 CMD_KICK,
261 CMD_STATS,
262 CMD_JOBSTATS,
263 CMD_PEEK_BURIED,
264 CMD_USE,
265 CMD_WATCH,
266 CMD_IGNORE,
267 CMD_LIST_TUBES,
268 CMD_LIST_TUBE_USED,
269 CMD_LIST_TUBES_WATCHED,
270 CMD_STATS_TUBE,
271 CMD_PEEK_READY,
272 CMD_PEEK_DELAYED,
273 CMD_RESERVE_TIMEOUT,
274 CMD_TOUCH,
275 CMD_QUIT,
276 CMD_PAUSE_TUBE
278 #endif
280 static job remove_buried_job(job j);
282 static int
283 buried_job_p(tube t)
285 return job_list_any_p(&t->buried);
288 static void
289 reply(conn c, const char *line, int len, int state)
291 if (!c) return;
293 connwant(c, 'w', &dirty);
294 c->reply = line;
295 c->reply_len = len;
296 c->reply_sent = 0;
297 c->state = state;
298 dbgprintf("sending reply: %.*s", len, line);
301 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
303 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
304 reply_msg((c),(e)))
306 static void
307 reply_line(conn c, int state, const char *fmt, ...)
309 int r;
310 va_list ap;
312 va_start(ap, fmt);
313 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
314 va_end(ap);
316 /* Make sure the buffer was big enough. If not, we have a bug. */
317 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
319 return reply(c, c->reply_buf, r, state);
322 static void
323 reply_job(conn c, job j, const char *word)
325 /* tell this connection which job to send */
326 c->out_job = j;
327 c->out_job_sent = 0;
329 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
330 word, j->id, j->body_size - 2);
333 conn
334 remove_waiting_conn(conn c)
336 tube t;
337 size_t i;
339 if (!conn_waiting(c)) return NULL;
341 c->type &= ~CONN_TYPE_WAITING;
342 global_stat.waiting_ct--;
343 for (i = 0; i < c->watch.used; i++) {
344 t = c->watch.items[i];
345 t->stat.waiting_ct--;
346 ms_remove(&t->waiting, c);
348 return c;
351 static void
352 reserve_job(conn c, job j)
354 j->deadline_at = nanoseconds() + j->ttr;
355 global_stat.reserved_ct++; /* stats */
356 j->tube->stat.reserved_ct++;
357 j->reserve_ct++;
358 j->state = JOB_STATE_RESERVED;
359 job_insert(&c->reserved_jobs, j);
360 j->reserver = c;
361 if (c->soonest_job && j->deadline_at < c->soonest_job->deadline_at) {
362 c->soonest_job = j;
364 return reply_job(c, j, MSG_RESERVED);
367 static job
368 next_eligible_job(int64 now)
370 tube t;
371 size_t i;
372 job j = NULL, candidate;
374 dbgprintf("tubes.used = %zu\n", tubes.used);
375 for (i = 0; i < tubes.used; i++) {
376 t = tubes.items[i];
377 dbgprintf("for %s t->waiting.used=%zu t->ready.len=%d t->pause=%" PRIu64 "\n",
378 t->name, t->waiting.used, t->ready.len, t->pause);
379 if (t->pause) {
380 if (t->deadline_at > now) continue;
381 t->pause = 0;
383 if (t->waiting.used && t->ready.len) {
384 candidate = t->ready.data[0];
385 if (!j || job_pri_cmp(candidate, j) < 0) {
386 j = candidate;
389 dbgprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
392 return j;
395 static void
396 process_queue()
398 job j;
399 int64 now = nanoseconds();
401 dbgprintf("processing queue\n");
402 while ((j = next_eligible_job(now))) {
403 dbgprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
404 heapremove(&j->tube->ready, j->heap_index);
405 ready_ct--;
406 if (j->pri < URGENT_THRESHOLD) {
407 global_stat.urgent_ct--;
408 j->tube->stat.urgent_ct--;
410 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
414 static job
415 delay_q_peek()
417 int i;
418 tube t;
419 job j = NULL, nj;
421 for (i = 0; i < tubes.used; i++) {
422 t = tubes.items[i];
423 if (t->delay.len == 0) {
424 continue;
426 nj = t->delay.data[0];
427 if (!j || nj->deadline_at < j->deadline_at) j = nj;
430 return j;
433 static int
434 enqueue_job(job j, int64 delay, char update_store)
436 int r;
438 j->reserver = NULL;
439 if (delay) {
440 j->deadline_at = nanoseconds() + delay;
441 r = heapinsert(&j->tube->delay, j);
442 if (!r) return 0;
443 j->state = JOB_STATE_DELAYED;
444 } else {
445 r = heapinsert(&j->tube->ready, j);
446 if (!r) return 0;
447 j->state = JOB_STATE_READY;
448 ready_ct++;
449 if (j->pri < URGENT_THRESHOLD) {
450 global_stat.urgent_ct++;
451 j->tube->stat.urgent_ct++;
455 if (update_store) {
456 r = binlog_write_job(j);
457 if (!r) return -1;
460 process_queue();
461 return 1;
464 static int
465 bury_job(job j, char update_store)
467 size_t z;
469 if (update_store) {
470 z = binlog_reserve_space_update(j);
471 if (!z) return 0;
472 j->reserved_binlog_space += z;
475 job_insert(&j->tube->buried, j);
476 global_stat.buried_ct++;
477 j->tube->stat.buried_ct++;
478 j->state = JOB_STATE_BURIED;
479 j->reserver = NULL;
480 j->bury_ct++;
482 if (update_store) return binlog_write_job(j);
484 return 1;
487 void
488 enqueue_reserved_jobs(conn c)
490 int r;
491 job j;
493 while (job_list_any_p(&c->reserved_jobs)) {
494 j = job_remove(c->reserved_jobs.next);
495 r = enqueue_job(j, 0, 0);
496 if (r < 1) bury_job(j, 0);
497 global_stat.reserved_ct--;
498 j->tube->stat.reserved_ct--;
499 c->soonest_job = NULL;
500 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
504 static job
505 delay_q_take()
507 job j = delay_q_peek();
508 if (!j) {
509 return 0;
511 heapremove(&j->tube->delay, j->heap_index);
512 return j;
515 static int
516 kick_buried_job(tube t)
518 int r;
519 job j;
520 size_t z;
522 if (!buried_job_p(t)) return 0;
523 j = remove_buried_job(t->buried.next);
525 z = binlog_reserve_space_update(j);
526 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
527 j->reserved_binlog_space += z;
529 j->kick_ct++;
530 r = enqueue_job(j, 0, 1);
531 if (r == 1) return 1;
533 /* ready queue is full, so bury it */
534 bury_job(j, 0);
535 return 0;
538 static uint
539 get_delayed_job_ct()
541 tube t;
542 size_t i;
543 uint count = 0;
545 for (i = 0; i < tubes.used; i++) {
546 t = tubes.items[i];
547 count += t->delay.len;
549 return count;
552 static int
553 kick_delayed_job(tube t)
555 int r;
556 job j;
557 size_t z;
559 if (t->delay.len == 0) {
560 return 0;
563 j = heapremove(&t->delay, 0);
565 z = binlog_reserve_space_update(j);
566 if (!z) return heapinsert(&t->delay, j), 0; /* put it back */
567 j->reserved_binlog_space += z;
569 j->kick_ct++;
570 r = enqueue_job(j, 0, 1);
571 if (r == 1) return 1;
573 /* ready queue is full, so delay it again */
574 r = enqueue_job(j, j->delay, 0);
575 if (r == 1) return 0;
577 /* last resort */
578 bury_job(j, 0);
579 return 0;
582 /* return the number of jobs successfully kicked */
583 static uint
584 kick_buried_jobs(tube t, uint n)
586 uint i;
587 for (i = 0; (i < n) && kick_buried_job(t); ++i);
588 return i;
591 /* return the number of jobs successfully kicked */
592 static uint
593 kick_delayed_jobs(tube t, uint n)
595 uint i;
596 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
597 return i;
600 static uint
601 kick_jobs(tube t, uint n)
603 if (buried_job_p(t)) return kick_buried_jobs(t, n);
604 return kick_delayed_jobs(t, n);
607 static job
608 remove_buried_job(job j)
610 if (!j || j->state != JOB_STATE_BURIED) return NULL;
611 j = job_remove(j);
612 if (j) {
613 global_stat.buried_ct--;
614 j->tube->stat.buried_ct--;
616 return j;
619 static job
620 remove_ready_job(job j)
622 if (!j || j->state != JOB_STATE_READY) return NULL;
623 heapremove(&j->tube->ready, j->heap_index);
624 ready_ct--;
625 if (j->pri < URGENT_THRESHOLD) {
626 global_stat.urgent_ct--;
627 j->tube->stat.urgent_ct--;
629 return j;
632 static void
633 enqueue_waiting_conn(conn c)
635 tube t;
636 size_t i;
638 global_stat.waiting_ct++;
639 c->type |= CONN_TYPE_WAITING;
640 for (i = 0; i < c->watch.used; i++) {
641 t = c->watch.items[i];
642 t->stat.waiting_ct++;
643 ms_append(&t->waiting, c);
647 static job
648 find_reserved_job_in_conn(conn c, job j)
650 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
653 static job
654 touch_job(conn c, job j)
656 j = find_reserved_job_in_conn(c, j);
657 if (j) {
658 j->deadline_at = nanoseconds() + j->ttr;
659 c->soonest_job = NULL;
661 return j;
664 static job
665 peek_job(uint64 id)
667 return job_find(id);
670 static void
671 check_err(conn c, const char *s)
673 if (errno == EAGAIN) return;
674 if (errno == EINTR) return;
675 if (errno == EWOULDBLOCK) return;
677 twarn("%s", s);
678 conn_close(c);
679 return;
682 /* Scan the given string for the sequence "\r\n" and return the line length.
683 * Always returns at least 2 if a match is found. Returns 0 if no match. */
684 static int
685 scan_line_end(const char *s, int size)
687 char *match;
689 match = memchr(s, '\r', size - 1);
690 if (!match) return 0;
692 /* this is safe because we only scan size - 1 chars above */
693 if (match[1] == '\n') return match - s + 2;
695 return 0;
698 static int
699 cmd_len(conn c)
701 return scan_line_end(c->cmd, c->cmd_read);
704 /* parse the command line */
705 static int
706 which_cmd(conn c)
708 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
709 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
710 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
711 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
712 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
713 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
714 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
715 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
716 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
717 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
718 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
719 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
720 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
721 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
722 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
723 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
724 TEST_CMD(c->cmd, CMD_USE, OP_USE);
725 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
726 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
727 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
728 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
729 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
730 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
731 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
732 return OP_UNKNOWN;
735 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
736 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
737 * This function is idempotent(). */
738 static void
739 fill_extra_data(conn c)
741 int extra_bytes, job_data_bytes = 0, cmd_bytes;
743 if (!c->sock.fd) return; /* the connection was closed */
744 if (!c->cmd_len) return; /* we don't have a complete command */
746 /* how many extra bytes did we read? */
747 extra_bytes = c->cmd_read - c->cmd_len;
749 /* how many bytes should we put into the job body? */
750 if (c->in_job) {
751 job_data_bytes = min(extra_bytes, c->in_job->body_size);
752 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
753 c->in_job_read = job_data_bytes;
754 } else if (c->in_job_read) {
755 /* we are in bit-bucket mode, throwing away data */
756 job_data_bytes = min(extra_bytes, c->in_job_read);
757 c->in_job_read -= job_data_bytes;
760 /* how many bytes are left to go into the future cmd? */
761 cmd_bytes = extra_bytes - job_data_bytes;
762 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
763 c->cmd_read = cmd_bytes;
764 c->cmd_len = 0; /* we no longer know the length of the new command */
767 static void
768 _skip(conn c, int n, const char *line, int len)
770 /* Invert the meaning of in_job_read while throwing away data -- it
771 * counts the bytes that remain to be thrown away. */
772 c->in_job = 0;
773 c->in_job_read = n;
774 fill_extra_data(c);
776 if (c->in_job_read == 0) return reply(c, line, len, STATE_SENDWORD);
778 c->reply = line;
779 c->reply_len = len;
780 c->reply_sent = 0;
781 c->state = STATE_BITBUCKET;
782 return;
785 #define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
787 static void
788 enqueue_incoming_job(conn c)
790 int r;
791 job j = c->in_job;
793 c->in_job = NULL; /* the connection no longer owns this job */
794 c->in_job_read = 0;
796 /* check if the trailer is present and correct */
797 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
798 job_free(j);
799 return reply_msg(c, MSG_EXPECTED_CRLF);
802 if (drain_mode) {
803 job_free(j);
804 return reply_serr(c, MSG_DRAINING);
807 if (j->reserved_binlog_space) return reply_serr(c, MSG_INTERNAL_ERROR);
808 j->reserved_binlog_space = binlog_reserve_space_put(j);
809 if (!j->reserved_binlog_space) return reply_serr(c, MSG_OUT_OF_MEMORY);
811 /* we have a complete job, so let's stick it in the pqueue */
812 r = enqueue_job(j, j->delay, 1);
813 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
815 op_ct[OP_PUT]++; /* stats */
816 global_stat.total_jobs_ct++;
817 j->tube->stat.total_jobs_ct++;
819 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
821 /* out of memory trying to grow the queue, so it gets buried */
822 bury_job(j, 0);
823 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
826 static uint
827 uptime()
829 return (nanoseconds() - started_at) / 1000000000;
832 static int
833 fmt_stats(char *buf, size_t size, void *x)
835 struct rusage ru = {{0, 0}, {0, 0}};
836 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
837 return snprintf(buf, size, STATS_FMT,
838 global_stat.urgent_ct,
839 ready_ct,
840 global_stat.reserved_ct,
841 get_delayed_job_ct(),
842 global_stat.buried_ct,
843 op_ct[OP_PUT],
844 op_ct[OP_PEEKJOB],
845 op_ct[OP_PEEK_READY],
846 op_ct[OP_PEEK_DELAYED],
847 op_ct[OP_PEEK_BURIED],
848 op_ct[OP_RESERVE],
849 op_ct[OP_RESERVE_TIMEOUT],
850 op_ct[OP_DELETE],
851 op_ct[OP_RELEASE],
852 op_ct[OP_USE],
853 op_ct[OP_WATCH],
854 op_ct[OP_IGNORE],
855 op_ct[OP_BURY],
856 op_ct[OP_KICK],
857 op_ct[OP_TOUCH],
858 op_ct[OP_STATS],
859 op_ct[OP_JOBSTATS],
860 op_ct[OP_STATS_TUBE],
861 op_ct[OP_LIST_TUBES],
862 op_ct[OP_LIST_TUBE_USED],
863 op_ct[OP_LIST_TUBES_WATCHED],
864 op_ct[OP_PAUSE_TUBE],
865 timeout_ct,
866 global_stat.total_jobs_ct,
867 job_data_size_limit,
868 tubes.used,
869 count_cur_conns(),
870 count_cur_producers(),
871 count_cur_workers(),
872 global_stat.waiting_ct,
873 count_tot_conns(),
874 (long) getpid(),
875 version,
876 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
877 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
878 uptime(),
879 binlog_oldest_index(),
880 binlog_current_index(),
881 binlog_size_limit);
885 /* Read a priority value from the given buffer and place it in pri.
886 * Update end to point to the address after the last character consumed.
887 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
888 * conversion and return the status code but not update any values. This is an
889 * easy way to check for errors.
890 * If end is NULL, read_pri will also check that the entire input string was
891 * consumed and return an error code otherwise.
892 * Return 0 on success, or nonzero on failure.
893 * If a failure occurs, pri and end are not modified. */
894 static int
895 read_pri(uint *pri, const char *buf, char **end)
897 char *tend;
898 uint tpri;
900 errno = 0;
901 while (buf[0] == ' ') buf++;
902 if (!isdigit(buf[0])) return -1;
903 tpri = strtoul(buf, &tend, 10);
904 if (tend == buf) return -1;
905 if (errno && errno != ERANGE) return -1;
906 if (!end && tend[0] != '\0') return -1;
908 if (pri) *pri = tpri;
909 if (end) *end = tend;
910 return 0;
913 /* Read a delay value from the given buffer and place it in delay.
914 * The interface and behavior are analogous to read_pri(). */
915 static int
916 read_delay(int64 *delay, const char *buf, char **end)
918 int r;
919 uint delay_sec;
921 r = read_pri(&delay_sec, buf, end);
922 if (r) return r;
923 *delay = ((int64) delay_sec) * 1000000000;
924 return 0;
927 /* Read a timeout value from the given buffer and place it in ttr.
928 * The interface and behavior are the same as in read_delay(). */
929 static int
930 read_ttr(int64 *ttr, const char *buf, char **end)
932 return read_delay(ttr, buf, end);
935 /* Read a tube name from the given buffer moving the buffer to the name start */
936 static int
937 read_tube_name(char **tubename, char *buf, char **end)
939 size_t len;
941 while (buf[0] == ' ') buf++;
942 len = strspn(buf, NAME_CHARS);
943 if (len == 0) return -1;
944 if (tubename) *tubename = buf;
945 if (end) *end = buf + len;
946 return 0;
949 static void
950 wait_for_job(conn c, int timeout)
952 c->state = STATE_WAIT;
953 enqueue_waiting_conn(c);
955 /* Set the pending timeout to the requested timeout amount */
956 c->pending_timeout = timeout;
958 /* this conn is waiting, but we want to know if they hang up */
959 connwant(c, 'r', &dirty);
962 typedef int(*fmt_fn)(char *, size_t, void *);
964 static void
965 do_stats(conn c, fmt_fn fmt, void *data)
967 int r, stats_len;
969 /* first, measure how big a buffer we will need */
970 stats_len = fmt(NULL, 0, data) + 16;
972 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
973 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
975 /* Mark this job as a copy so it can be appropriately freed later on */
976 c->out_job->state = JOB_STATE_COPY;
978 /* now actually format the stats data */
979 r = fmt(c->out_job->body, stats_len, data);
980 /* and set the actual body size */
981 c->out_job->body_size = r;
982 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
984 c->out_job_sent = 0;
985 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
988 static void
989 do_list_tubes(conn c, ms l)
991 char *buf;
992 tube t;
993 size_t i, resp_z;
995 /* first, measure how big a buffer we will need */
996 resp_z = 6; /* initial "---\n" and final "\r\n" */
997 for (i = 0; i < l->used; i++) {
998 t = l->items[i];
999 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
1002 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
1003 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1005 /* Mark this job as a copy so it can be appropriately freed later on */
1006 c->out_job->state = JOB_STATE_COPY;
1008 /* now actually format the response */
1009 buf = c->out_job->body;
1010 buf += snprintf(buf, 5, "---\n");
1011 for (i = 0; i < l->used; i++) {
1012 t = l->items[i];
1013 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1015 buf[0] = '\r';
1016 buf[1] = '\n';
1018 c->out_job_sent = 0;
1019 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
1022 static int
1023 fmt_job_stats(char *buf, size_t size, job j)
1025 int64 t;
1026 int64 time_left;
1028 t = nanoseconds();
1029 if (j->state == JOB_STATE_RESERVED || j->state == JOB_STATE_DELAYED) {
1030 time_left = (j->deadline_at - t) / 1000000000;
1031 } else {
1032 time_left = 0;
1034 return snprintf(buf, size, STATS_JOB_FMT,
1035 j->id,
1036 j->tube->name,
1037 job_state(j),
1038 j->pri,
1039 (t - j->created_at) / 1000000000,
1040 j->delay / 1000000000,
1041 j->ttr / 1000000000,
1042 time_left,
1043 j->reserve_ct,
1044 j->timeout_ct,
1045 j->release_ct,
1046 j->bury_ct,
1047 j->kick_ct);
1050 static int
1051 fmt_stats_tube(char *buf, size_t size, tube t)
1053 uint64 time_left;
1055 if (t->pause > 0) {
1056 time_left = (t->deadline_at - nanoseconds()) / 1000000000;
1057 } else {
1058 time_left = 0;
1060 return snprintf(buf, size, STATS_TUBE_FMT,
1061 t->name,
1062 t->stat.urgent_ct,
1063 t->ready.len,
1064 t->stat.reserved_ct,
1065 t->delay.len,
1066 t->stat.buried_ct,
1067 t->stat.total_jobs_ct,
1068 t->using_ct,
1069 t->watching_ct,
1070 t->stat.waiting_ct,
1071 t->stat.pause_ct,
1072 t->pause / 1000000000,
1073 time_left);
1076 static void
1077 maybe_enqueue_incoming_job(conn c)
1079 job j = c->in_job;
1081 /* do we have a complete job? */
1082 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
1084 /* otherwise we have incomplete data, so just keep waiting */
1085 c->state = STATE_WANTDATA;
1088 /* j can be NULL */
1089 static job
1090 remove_this_reserved_job(conn c, job j)
1092 j = job_remove(j);
1093 if (j) {
1094 global_stat.reserved_ct--;
1095 j->tube->stat.reserved_ct--;
1096 j->reserver = NULL;
1098 c->soonest_job = NULL;
1099 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
1100 return j;
1103 static job
1104 remove_reserved_job(conn c, job j)
1106 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1109 static int
1110 name_is_ok(const char *name, size_t max)
1112 size_t len = strlen(name);
1113 return len > 0 && len <= max &&
1114 strspn(name, NAME_CHARS) == len && name[0] != '-';
1117 void
1118 prot_remove_tube(tube t)
1120 ms_remove(&tubes, t);
1123 static void
1124 dispatch_cmd(conn c)
1126 int r, i, timeout = -1;
1127 size_t z;
1128 uint count;
1129 job j = 0;
1130 byte type;
1131 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1132 uint pri, body_size;
1133 int64 delay, ttr;
1134 uint64 id;
1135 tube t = NULL;
1137 /* NUL-terminate this string so we can use strtol and friends */
1138 c->cmd[c->cmd_len - 2] = '\0';
1140 /* check for possible maliciousness */
1141 if (strlen(c->cmd) != c->cmd_len - 2) {
1142 return reply_msg(c, MSG_BAD_FORMAT);
1145 type = which_cmd(c);
1146 dbgprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1148 switch (type) {
1149 case OP_PUT:
1150 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1151 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1153 r = read_delay(&delay, delay_buf, &ttr_buf);
1154 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1156 r = read_ttr(&ttr, ttr_buf, &size_buf);
1157 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1159 errno = 0;
1160 body_size = strtoul(size_buf, &end_buf, 10);
1161 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1163 if (body_size > job_data_size_limit) {
1164 /* throw away the job body and respond with JOB_TOO_BIG */
1165 return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
1168 /* don't allow trailing garbage */
1169 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1171 conn_set_producer(c);
1173 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1175 /* OOM? */
1176 if (!c->in_job) {
1177 /* throw away the job body and respond with OUT_OF_MEMORY */
1178 twarnx("server error: " MSG_OUT_OF_MEMORY);
1179 return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
1182 fill_extra_data(c);
1184 /* it's possible we already have a complete job */
1185 maybe_enqueue_incoming_job(c);
1187 break;
1188 case OP_PEEK_READY:
1189 /* don't allow trailing garbage */
1190 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1191 return reply_msg(c, MSG_BAD_FORMAT);
1193 op_ct[type]++;
1195 if (c->use->ready.len) {
1196 j = job_copy(c->use->ready.data[0]);
1199 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1201 reply_job(c, j, MSG_FOUND);
1202 break;
1203 case OP_PEEK_DELAYED:
1204 /* don't allow trailing garbage */
1205 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1206 return reply_msg(c, MSG_BAD_FORMAT);
1208 op_ct[type]++;
1210 if (c->use->delay.len) {
1211 j = job_copy(c->use->delay.data[0]);
1214 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1216 reply_job(c, j, MSG_FOUND);
1217 break;
1218 case OP_PEEK_BURIED:
1219 /* don't allow trailing garbage */
1220 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1221 return reply_msg(c, MSG_BAD_FORMAT);
1223 op_ct[type]++;
1225 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1227 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1229 reply_job(c, j, MSG_FOUND);
1230 break;
1231 case OP_PEEKJOB:
1232 errno = 0;
1233 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1234 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1235 op_ct[type]++;
1237 /* So, peek is annoying, because some other connection might free the
1238 * job while we are still trying to write it out. So we copy it and
1239 * then free the copy when it's done sending. */
1240 j = job_copy(peek_job(id));
1242 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1244 reply_job(c, j, MSG_FOUND);
1245 break;
1246 case OP_RESERVE_TIMEOUT:
1247 errno = 0;
1248 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1249 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1250 case OP_RESERVE: /* FALLTHROUGH */
1251 /* don't allow trailing garbage */
1252 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1253 return reply_msg(c, MSG_BAD_FORMAT);
1256 op_ct[type]++;
1257 conn_set_worker(c);
1259 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1260 return reply_msg(c, MSG_DEADLINE_SOON);
1263 /* try to get a new job for this guy */
1264 wait_for_job(c, timeout);
1265 process_queue();
1266 break;
1267 case OP_DELETE:
1268 errno = 0;
1269 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1270 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1271 op_ct[type]++;
1273 j = job_find(id);
1274 j = remove_reserved_job(c, j) ? :
1275 remove_ready_job(j) ? :
1276 remove_buried_job(j);
1278 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1280 j->state = JOB_STATE_INVALID;
1281 r = binlog_write_job(j);
1282 job_free(j);
1284 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1286 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1287 break;
1288 case OP_RELEASE:
1289 errno = 0;
1290 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1291 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1293 r = read_pri(&pri, pri_buf, &delay_buf);
1294 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1296 r = read_delay(&delay, delay_buf, NULL);
1297 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1298 op_ct[type]++;
1300 j = remove_reserved_job(c, job_find(id));
1302 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1304 /* We want to update the delay deadline on disk, so reserve space for
1305 * that. */
1306 if (delay) {
1307 z = binlog_reserve_space_update(j);
1308 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1309 j->reserved_binlog_space += z;
1312 j->pri = pri;
1313 j->delay = delay;
1314 j->release_ct++;
1316 r = enqueue_job(j, delay, !!delay);
1317 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1318 if (r == 1) {
1319 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1322 /* out of memory trying to grow the queue, so it gets buried */
1323 bury_job(j, 0);
1324 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1325 break;
1326 case OP_BURY:
1327 errno = 0;
1328 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1329 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1331 r = read_pri(&pri, pri_buf, NULL);
1332 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1333 op_ct[type]++;
1335 j = remove_reserved_job(c, job_find(id));
1337 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1339 j->pri = pri;
1340 r = bury_job(j, 1);
1341 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1342 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1343 break;
1344 case OP_KICK:
1345 errno = 0;
1346 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1347 if (end_buf == c->cmd + CMD_KICK_LEN) {
1348 return reply_msg(c, MSG_BAD_FORMAT);
1350 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1352 op_ct[type]++;
1354 i = kick_jobs(c->use, count);
1356 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1357 case OP_TOUCH:
1358 errno = 0;
1359 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1360 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1362 op_ct[type]++;
1364 j = touch_job(c, job_find(id));
1366 if (j) {
1367 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1368 } else {
1369 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1371 break;
1372 case OP_STATS:
1373 /* don't allow trailing garbage */
1374 if (c->cmd_len != CMD_STATS_LEN + 2) {
1375 return reply_msg(c, MSG_BAD_FORMAT);
1378 op_ct[type]++;
1380 do_stats(c, fmt_stats, NULL);
1381 break;
1382 case OP_JOBSTATS:
1383 errno = 0;
1384 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1385 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1387 op_ct[type]++;
1389 j = peek_job(id);
1390 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1392 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1393 do_stats(c, (fmt_fn) fmt_job_stats, j);
1394 break;
1395 case OP_STATS_TUBE:
1396 name = c->cmd + CMD_STATS_TUBE_LEN;
1397 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1399 op_ct[type]++;
1401 t = tube_find(name);
1402 if (!t) return reply_msg(c, MSG_NOTFOUND);
1404 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1405 t = NULL;
1406 break;
1407 case OP_LIST_TUBES:
1408 /* don't allow trailing garbage */
1409 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1410 return reply_msg(c, MSG_BAD_FORMAT);
1413 op_ct[type]++;
1414 do_list_tubes(c, &tubes);
1415 break;
1416 case OP_LIST_TUBE_USED:
1417 /* don't allow trailing garbage */
1418 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1419 return reply_msg(c, MSG_BAD_FORMAT);
1422 op_ct[type]++;
1423 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1424 break;
1425 case OP_LIST_TUBES_WATCHED:
1426 /* don't allow trailing garbage */
1427 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1428 return reply_msg(c, MSG_BAD_FORMAT);
1431 op_ct[type]++;
1432 do_list_tubes(c, &c->watch);
1433 break;
1434 case OP_USE:
1435 name = c->cmd + CMD_USE_LEN;
1436 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1437 op_ct[type]++;
1439 TUBE_ASSIGN(t, tube_find_or_make(name));
1440 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1442 c->use->using_ct--;
1443 TUBE_ASSIGN(c->use, t);
1444 TUBE_ASSIGN(t, NULL);
1445 c->use->using_ct++;
1447 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1448 break;
1449 case OP_WATCH:
1450 name = c->cmd + CMD_WATCH_LEN;
1451 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1452 op_ct[type]++;
1454 TUBE_ASSIGN(t, tube_find_or_make(name));
1455 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1457 r = 1;
1458 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1459 TUBE_ASSIGN(t, NULL);
1460 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1462 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1463 break;
1464 case OP_IGNORE:
1465 name = c->cmd + CMD_IGNORE_LEN;
1466 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1467 op_ct[type]++;
1469 t = NULL;
1470 for (i = 0; i < c->watch.used; i++) {
1471 t = c->watch.items[i];
1472 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1473 t = NULL;
1476 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1478 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1479 t = NULL;
1481 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1482 break;
1483 case OP_QUIT:
1484 conn_close(c);
1485 break;
1486 case OP_PAUSE_TUBE:
1487 op_ct[type]++;
1489 r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
1490 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1492 r = read_delay(&delay, delay_buf, NULL);
1493 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1495 *delay_buf = '\0';
1496 t = tube_find(name);
1497 if (!t) return reply_msg(c, MSG_NOTFOUND);
1499 t->deadline_at = nanoseconds() + delay;
1500 t->pause = delay;
1501 t->stat.pause_ct++;
1503 reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
1504 break;
1505 default:
1506 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1510 /* There are three reasons this function may be called. We need to check for
1511 * all of them.
1513 * 1. A reserved job has run out of time.
1514 * 2. A waiting client's reserved job has entered the safety margin.
1515 * 3. A waiting client's requested timeout has occurred.
1517 * If any of these happen, we must do the appropriate thing. */
1518 static void
1519 conn_timeout(conn c)
1521 int r, should_timeout = 0;
1522 job j;
1524 /* Check if the client was trying to reserve a job. */
1525 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1527 /* Check if any reserved jobs have run out of time. We should do this
1528 * whether or not the client is waiting for a new reservation. */
1529 while ((j = soonest_job(c))) {
1530 if (j->deadline_at >= nanoseconds()) break;
1532 /* This job is in the middle of being written out. If we return it to
1533 * the ready queue, someone might free it before we finish writing it
1534 * out to the socket. So we'll copy it here and free the copy when it's
1535 * done sending. */
1536 if (j == c->out_job) {
1537 c->out_job = job_copy(c->out_job);
1540 timeout_ct++; /* stats */
1541 j->timeout_ct++;
1542 r = enqueue_job(remove_this_reserved_job(c, j), 0, 0);
1543 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1544 connsched(c);
1547 if (should_timeout) {
1548 dbgprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1549 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1550 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1551 dbgprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1552 c->pending_timeout = -1;
1553 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1557 void
1558 enter_drain_mode(int sig)
1560 drain_mode = 1;
1563 static void
1564 do_cmd(conn c)
1566 dispatch_cmd(c);
1567 fill_extra_data(c);
1570 static void
1571 reset_conn(conn c)
1573 connwant(c, 'r', &dirty);
1575 /* was this a peek or stats command? */
1576 if (c->out_job && c->out_job->state == JOB_STATE_COPY) job_free(c->out_job);
1577 c->out_job = NULL;
1579 c->reply_sent = 0; /* now that we're done, reset this */
1580 c->state = STATE_WANTCOMMAND;
1583 static void
1584 conn_data(conn c)
1586 int r, to_read;
1587 job j;
1588 struct iovec iov[2];
1590 switch (c->state) {
1591 case STATE_WANTCOMMAND:
1592 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1593 if (r == -1) return check_err(c, "read()");
1594 if (r == 0) return conn_close(c); /* the client hung up */
1596 c->cmd_read += r; /* we got some bytes */
1598 c->cmd_len = cmd_len(c); /* find the EOL */
1600 /* yay, complete command line */
1601 if (c->cmd_len) return do_cmd(c);
1603 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1605 /* command line too long? */
1606 if (c->cmd_read == LINE_BUF_SIZE) {
1607 c->cmd_read = 0; /* discard the input so far */
1608 return reply_msg(c, MSG_BAD_FORMAT);
1611 /* otherwise we have an incomplete line, so just keep waiting */
1612 break;
1613 case STATE_BITBUCKET:
1614 /* Invert the meaning of in_job_read while throwing away data -- it
1615 * counts the bytes that remain to be thrown away. */
1616 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1617 r = read(c->sock.fd, bucket, to_read);
1618 if (r == -1) return check_err(c, "read()");
1619 if (r == 0) return conn_close(c); /* the client hung up */
1621 c->in_job_read -= r; /* we got some bytes */
1623 /* (c->in_job_read < 0) can't happen */
1625 if (c->in_job_read == 0) {
1626 return reply(c, c->reply, c->reply_len, STATE_SENDWORD);
1628 break;
1629 case STATE_WANTDATA:
1630 j = c->in_job;
1632 r = read(c->sock.fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1633 if (r == -1) return check_err(c, "read()");
1634 if (r == 0) return conn_close(c); /* the client hung up */
1636 c->in_job_read += r; /* we got some bytes */
1638 /* (j->in_job_read > j->body_size) can't happen */
1640 maybe_enqueue_incoming_job(c);
1641 break;
1642 case STATE_SENDWORD:
1643 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1644 if (r == -1) return check_err(c, "write()");
1645 if (r == 0) return conn_close(c); /* the client hung up */
1647 c->reply_sent += r; /* we got some bytes */
1649 /* (c->reply_sent > c->reply_len) can't happen */
1651 if (c->reply_sent == c->reply_len) return reset_conn(c);
1653 /* otherwise we sent an incomplete reply, so just keep waiting */
1654 break;
1655 case STATE_SENDJOB:
1656 j = c->out_job;
1658 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1659 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1660 iov[1].iov_base = j->body + c->out_job_sent;
1661 iov[1].iov_len = j->body_size - c->out_job_sent;
1663 r = writev(c->sock.fd, iov, 2);
1664 if (r == -1) return check_err(c, "writev()");
1665 if (r == 0) return conn_close(c); /* the client hung up */
1667 /* update the sent values */
1668 c->reply_sent += r;
1669 if (c->reply_sent >= c->reply_len) {
1670 c->out_job_sent += c->reply_sent - c->reply_len;
1671 c->reply_sent = c->reply_len;
1674 /* (c->out_job_sent > j->body_size) can't happen */
1676 /* are we done? */
1677 if (c->out_job_sent == j->body_size) return reset_conn(c);
1679 /* otherwise we sent incomplete data, so just keep waiting */
1680 break;
1681 case STATE_WAIT: /* keep an eye out in case they hang up */
1682 /* but don't hang up just because our buffer is full */
1683 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1685 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1686 if (r == -1) return check_err(c, "read()");
1687 if (r == 0) return conn_close(c); /* the client hung up */
1688 c->cmd_read += r; /* we got some bytes */
1692 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
1693 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1695 static void
1696 update_conns()
1698 int r;
1699 conn c;
1701 while ((c = conn_remove(dirty.next))) { /* assignment */
1702 r = sockwant(&c->sock, c->rw);
1703 if (r == -1) {
1704 twarn("sockwant");
1705 conn_close(c);
1710 static void
1711 h_conn(const int fd, const short which, conn c)
1713 if (fd != c->sock.fd) {
1714 twarnx("Argh! event fd doesn't match conn fd.");
1715 close(fd);
1716 conn_close(c);
1717 update_conns();
1718 return;
1721 conn_data(c);
1722 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1723 update_conns();
1726 static void
1727 prothandle(conn c, int ev)
1729 h_conn(c->sock.fd, ev, c);
1732 void
1733 prottick(Srv *s)
1735 int r;
1736 job j;
1737 int64 now;
1738 int i;
1739 tube t;
1741 now = nanoseconds();
1742 while ((j = delay_q_peek())) {
1743 if (j->deadline_at > now) break;
1744 j = delay_q_take();
1745 r = enqueue_job(j, 0, 0);
1746 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1749 for (i = 0; i < tubes.used; i++) {
1750 t = tubes.items[i];
1752 dbgprintf("delay for %s t->waiting.used=%zu t->ready.len=%d t->pause=%" PRIu64 "\n",
1753 t->name, t->waiting.used, t->ready.len, t->pause);
1754 if (t->pause && t->deadline_at <= now) {
1755 t->pause = 0;
1756 process_queue();
1760 while (s->conns.len) {
1761 conn c = s->conns.data[0];
1762 if (c->tickat > now) {
1763 break;
1766 heapremove(&s->conns, 0);
1767 conn_timeout(c);
1770 update_conns();
1773 void
1774 h_accept(const int fd, const short which, Srv *s)
1776 conn c;
1777 int cfd, flags, r;
1778 socklen_t addrlen;
1779 struct sockaddr_in6 addr;
1781 addrlen = sizeof addr;
1782 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
1783 if (cfd == -1) {
1784 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1785 update_conns();
1786 return;
1789 flags = fcntl(cfd, F_GETFL, 0);
1790 if (flags < 0) {
1791 twarn("getting flags");
1792 close(cfd);
1793 update_conns();
1794 return;
1797 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1798 if (r < 0) {
1799 twarn("setting O_NONBLOCK");
1800 close(cfd);
1801 update_conns();
1802 return;
1805 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1806 if (!c) {
1807 twarnx("make_conn() failed");
1808 close(cfd);
1809 update_conns();
1810 return;
1812 c->srv = s;
1813 c->sock.x = c;
1814 c->sock.f = (Handle)prothandle;
1815 c->sock.fd = cfd;
1817 dbgprintf("accepted conn, fd=%d\n", cfd);
1818 r = sockwant(&c->sock, 'r');
1819 if (r == -1) {
1820 twarn("sockwant");
1821 close(cfd);
1822 update_conns();
1823 return;
1825 update_conns();
1828 void
1829 prot_init()
1831 started_at = nanoseconds();
1832 memset(op_ct, 0, sizeof(op_ct));
1834 ms_init(&tubes, NULL, NULL);
1836 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1837 if (!default_tube) twarnx("Out of memory during startup!");
1840 void
1841 prot_replay_binlog(job binlog_jobs)
1843 job j, nj;
1844 int64 delay;
1845 int r;
1847 for (j = binlog_jobs->next ; j != binlog_jobs ; j = nj) {
1848 nj = j->next;
1849 job_remove(j);
1850 binlog_reserve_space_update(j); /* reserve space for a delete */
1851 delay = 0;
1852 switch (j->state) {
1853 case JOB_STATE_BURIED:
1854 bury_job(j, 0);
1855 break;
1856 case JOB_STATE_DELAYED:
1857 if (started_at < j->deadline_at) {
1858 delay = j->deadline_at - started_at;
1860 /* fall through */
1861 default:
1862 r = enqueue_job(j, delay, 0);
1863 if (r < 1) twarnx("error processing binlog job %llu", j->id);