Oops, struct timeval is defined in sys/time.h.
[beanstalkd.git] / prot.c
blob84960a18050c2677691c32633caf7afe1cfb1588
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
27 #include <ctype.h>
29 #include "stat.h"
30 #include "prot.h"
31 #include "pq.h"
32 #include "ms.h"
33 #include "job.h"
34 #include "tube.h"
35 #include "conn.h"
36 #include "util.h"
37 #include "net.h"
38 #include "binlog.h"
39 #include "config.h"
41 /* job body cannot be greater than this many bytes long */
42 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
44 #define NAME_CHARS \
45 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
46 "abcdefghijklmnopqrstuvwxyz" \
47 "0123456789-+/;.$()"
49 #define CMD_PUT "put "
50 #define CMD_PEEKJOB "peek "
51 #define CMD_PEEK_READY "peek-ready"
52 #define CMD_PEEK_DELAYED "peek-delayed"
53 #define CMD_PEEK_BURIED "peek-buried"
54 #define CMD_RESERVE "reserve"
55 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
56 #define CMD_DELETE "delete "
57 #define CMD_RELEASE "release "
58 #define CMD_BURY "bury "
59 #define CMD_KICK "kick "
60 #define CMD_TOUCH "touch "
61 #define CMD_STATS "stats"
62 #define CMD_JOBSTATS "stats-job "
63 #define CMD_USE "use "
64 #define CMD_WATCH "watch "
65 #define CMD_IGNORE "ignore "
66 #define CMD_LIST_TUBES "list-tubes"
67 #define CMD_LIST_TUBE_USED "list-tube-used"
68 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
69 #define CMD_STATS_TUBE "stats-tube "
71 #define CONSTSTRLEN(m) (sizeof(m) - 1)
73 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
74 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
75 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
76 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
77 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
78 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
79 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
80 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
81 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
82 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
83 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
84 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
85 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
86 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
87 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
88 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
89 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
90 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
91 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
92 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
94 #define MSG_FOUND "FOUND"
95 #define MSG_NOTFOUND "NOT_FOUND\r\n"
96 #define MSG_RESERVED "RESERVED"
97 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
98 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
99 #define MSG_DELETED "DELETED\r\n"
100 #define MSG_RELEASED "RELEASED\r\n"
101 #define MSG_BURIED "BURIED\r\n"
102 #define MSG_TOUCHED "TOUCHED\r\n"
103 #define MSG_BURIED_FMT "BURIED %llu\r\n"
104 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
105 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
107 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
108 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
109 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
110 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
111 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
112 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
114 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
115 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
116 #define MSG_DRAINING "DRAINING\r\n"
117 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
118 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
119 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
120 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
122 #define STATE_WANTCOMMAND 0
123 #define STATE_WANTDATA 1
124 #define STATE_SENDJOB 2
125 #define STATE_SENDWORD 3
126 #define STATE_WAIT 4
127 #define STATE_BITBUCKET 5
129 #define OP_UNKNOWN 0
130 #define OP_PUT 1
131 #define OP_PEEKJOB 2
132 #define OP_RESERVE 3
133 #define OP_DELETE 4
134 #define OP_RELEASE 5
135 #define OP_BURY 6
136 #define OP_KICK 7
137 #define OP_STATS 8
138 #define OP_JOBSTATS 9
139 #define OP_PEEK_BURIED 10
140 #define OP_USE 11
141 #define OP_WATCH 12
142 #define OP_IGNORE 13
143 #define OP_LIST_TUBES 14
144 #define OP_LIST_TUBE_USED 15
145 #define OP_LIST_TUBES_WATCHED 16
146 #define OP_STATS_TUBE 17
147 #define OP_PEEK_READY 18
148 #define OP_PEEK_DELAYED 19
149 #define OP_RESERVE_TIMEOUT 20
150 #define OP_TOUCH 21
151 #define TOTAL_OPS 22
153 #define STATS_FMT "---\n" \
154 "current-jobs-urgent: %u\n" \
155 "current-jobs-ready: %u\n" \
156 "current-jobs-reserved: %u\n" \
157 "current-jobs-delayed: %u\n" \
158 "current-jobs-buried: %u\n" \
159 "cmd-put: %llu\n" \
160 "cmd-peek: %llu\n" \
161 "cmd-peek-ready: %llu\n" \
162 "cmd-peek-delayed: %llu\n" \
163 "cmd-peek-buried: %llu\n" \
164 "cmd-reserve: %llu\n" \
165 "cmd-reserve-with-timeout: %llu\n" \
166 "cmd-delete: %llu\n" \
167 "cmd-release: %llu\n" \
168 "cmd-use: %llu\n" \
169 "cmd-watch: %llu\n" \
170 "cmd-ignore: %llu\n" \
171 "cmd-bury: %llu\n" \
172 "cmd-kick: %llu\n" \
173 "cmd-touch: %llu\n" \
174 "cmd-stats: %llu\n" \
175 "cmd-stats-job: %llu\n" \
176 "cmd-stats-tube: %llu\n" \
177 "cmd-list-tubes: %llu\n" \
178 "cmd-list-tube-used: %llu\n" \
179 "cmd-list-tubes-watched: %llu\n" \
180 "job-timeouts: %llu\n" \
181 "total-jobs: %llu\n" \
182 "max-job-size: %zu\n" \
183 "current-tubes: %zu\n" \
184 "current-connections: %u\n" \
185 "current-producers: %u\n" \
186 "current-workers: %u\n" \
187 "current-waiting: %u\n" \
188 "total-connections: %u\n" \
189 "pid: %ld\n" \
190 "version: %s\n" \
191 "rusage-utime: %d.%06d\n" \
192 "rusage-stime: %d.%06d\n" \
193 "uptime: %u\n" \
194 "binlog-oldest-index: %s\n" \
195 "binlog-current-index: %s\n" \
196 "binlog-max-size: %zu\n" \
197 "\r\n"
199 #define STATS_TUBE_FMT "---\n" \
200 "name: %s\n" \
201 "current-jobs-urgent: %u\n" \
202 "current-jobs-ready: %u\n" \
203 "current-jobs-reserved: %u\n" \
204 "current-jobs-delayed: %u\n" \
205 "current-jobs-buried: %u\n" \
206 "total-jobs: %llu\n" \
207 "current-using: %u\n" \
208 "current-watching: %u\n" \
209 "current-waiting: %u\n" \
210 "\r\n"
212 #define JOB_STATS_FMT "---\n" \
213 "id: %llu\n" \
214 "tube: %s\n" \
215 "state: %s\n" \
216 "pri: %u\n" \
217 "age: %llu\n" \
218 "delay: %llu\n" \
219 "ttr: %llu\n" \
220 "time-left: %llu\n" \
221 "reserves: %u\n" \
222 "timeouts: %u\n" \
223 "releases: %u\n" \
224 "buries: %u\n" \
225 "kicks: %u\n" \
226 "\r\n"
228 /* this number is pretty arbitrary */
229 #define BUCKET_BUF_SIZE 1024
231 static char bucket[BUCKET_BUF_SIZE];
233 static unsigned int ready_ct = 0;
234 static struct stats global_stat = {0, 0, 0, 0, 0};
236 static tube default_tube;
238 static int drain_mode = 0;
239 static usec started_at;
240 static uint64_t op_ct[TOTAL_OPS], timeout_ct = 0;
243 /* Doubly-linked list of connections with at least one reserved job. */
244 static struct conn running = { &running, &running, 0 };
246 #ifdef DEBUG
247 static const char * op_names[] = {
248 "<unknown>",
249 CMD_PUT,
250 CMD_PEEKJOB,
251 CMD_RESERVE,
252 CMD_DELETE,
253 CMD_RELEASE,
254 CMD_BURY,
255 CMD_KICK,
256 CMD_STATS,
257 CMD_JOBSTATS,
258 CMD_PEEK_BURIED,
259 CMD_USE,
260 CMD_WATCH,
261 CMD_IGNORE,
262 CMD_LIST_TUBES,
263 CMD_LIST_TUBE_USED,
264 CMD_LIST_TUBES_WATCHED,
265 CMD_STATS_TUBE,
266 CMD_PEEK_READY,
267 CMD_PEEK_DELAYED,
268 CMD_RESERVE_TIMEOUT,
269 CMD_TOUCH
271 #endif
273 static job remove_buried_job(job j);
275 static int
276 buried_job_p(tube t)
278 return job_list_any_p(&t->buried);
281 static void
282 reply(conn c, const char *line, int len, int state)
284 int r;
286 if (!c) return;
288 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
289 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
291 c->reply = line;
292 c->reply_len = len;
293 c->reply_sent = 0;
294 c->state = state;
295 dprintf("sending reply: %.*s", len, line);
298 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
300 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
301 reply_msg((c),(e)))
303 static void
304 reply_line(conn c, int state, const char *fmt, ...)
306 int r;
307 va_list ap;
309 va_start(ap, fmt);
310 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
311 va_end(ap);
313 /* Make sure the buffer was big enough. If not, we have a bug. */
314 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
316 return reply(c, c->reply_buf, r, state);
319 static void
320 reply_job(conn c, job j, const char *word)
322 /* tell this connection which job to send */
323 c->out_job = j;
324 c->out_job_sent = 0;
326 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
327 word, j->id, j->body_size - 2);
330 conn
331 remove_waiting_conn(conn c)
333 tube t;
334 size_t i;
336 if (!conn_waiting(c)) return NULL;
338 c->type &= ~CONN_TYPE_WAITING;
339 global_stat.waiting_ct--;
340 for (i = 0; i < c->watch.used; i++) {
341 t = c->watch.items[i];
342 t->stat.waiting_ct--;
343 ms_remove(&t->waiting, c);
345 return c;
348 static void
349 reserve_job(conn c, job j)
351 j->deadline_at = now_usec() + j->ttr;
352 global_stat.reserved_ct++; /* stats */
353 j->tube->stat.reserved_ct++;
354 j->reserve_ct++;
355 conn_insert(&running, c);
356 j->state = JOB_STATE_RESERVED;
357 job_insert(&c->reserved_jobs, j);
358 j->reserver = c;
359 if (c->soonest_job && j->deadline_at < c->soonest_job->deadline_at) {
360 c->soonest_job = j;
362 return reply_job(c, j, MSG_RESERVED);
365 static job
366 next_eligible_job()
368 tube t;
369 size_t i;
370 job j = NULL, candidate;
372 dprintf("tubes.used = %zu\n", tubes.used);
373 for (i = 0; i < tubes.used; i++) {
374 t = tubes.items[i];
375 dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
376 t->name, t->waiting.used, t->ready.used);
377 if (t->waiting.used && t->ready.used) {
378 candidate = pq_peek(&t->ready);
379 if (!j || job_pri_cmp(candidate, j) < 0) j = candidate;
381 dprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
384 return j;
387 static void
388 process_queue()
390 job j;
392 dprintf("processing queue\n");
393 while ((j = next_eligible_job())) {
394 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
395 j = pq_take(&j->tube->ready);
396 ready_ct--;
397 if (j->pri < URGENT_THRESHOLD) {
398 global_stat.urgent_ct--;
399 j->tube->stat.urgent_ct--;
401 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
405 static job
406 delay_q_peek()
408 int i;
409 tube t;
410 job j = NULL, nj;
412 for (i = 0; i < tubes.used; i++) {
413 t = tubes.items[i];
414 nj = pq_peek(&t->delay);
415 if (!nj) continue;
416 if (!j || nj->deadline_at < j->deadline_at) j = nj;
419 return j;
422 static void
423 set_main_delay_timeout()
425 job j;
427 set_main_timeout((j = delay_q_peek()) ? j->deadline_at : 0);
430 static int
431 enqueue_job(job j, usec delay, char update_store)
433 int r;
435 j->reserver = NULL;
436 if (delay) {
437 j->deadline_at = now_usec() + delay;
438 r = pq_give(&j->tube->delay, j);
439 if (!r) return 0;
440 j->state = JOB_STATE_DELAYED;
441 set_main_delay_timeout();
442 } else {
443 r = pq_give(&j->tube->ready, j);
444 if (!r) return 0;
445 j->state = JOB_STATE_READY;
446 ready_ct++;
447 if (j->pri < URGENT_THRESHOLD) {
448 global_stat.urgent_ct++;
449 j->tube->stat.urgent_ct++;
453 if (update_store) {
454 r = binlog_write_job(j);
455 if (!r) return -1;
458 process_queue();
459 return 1;
462 static int
463 bury_job(job j, char update_store)
465 size_t z;
467 if (update_store) {
468 z = binlog_reserve_space_update(j);
469 if (!z) return 0;
470 j->reserved_binlog_space += z;
473 job_insert(&j->tube->buried, j);
474 global_stat.buried_ct++;
475 j->tube->stat.buried_ct++;
476 j->state = JOB_STATE_BURIED;
477 j->reserver = NULL;
478 j->bury_ct++;
480 if (update_store) return binlog_write_job(j);
482 return 1;
485 void
486 enqueue_reserved_jobs(conn c)
488 int r;
489 job j;
491 while (job_list_any_p(&c->reserved_jobs)) {
492 j = job_remove(c->reserved_jobs.next);
493 r = enqueue_job(j, 0, 0);
494 if (r < 1) bury_job(j, 0);
495 global_stat.reserved_ct--;
496 j->tube->stat.reserved_ct--;
497 c->soonest_job = NULL;
498 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
502 static job
503 delay_q_take()
505 job j = delay_q_peek();
506 return j ? pq_take(&j->tube->delay) : NULL;
509 static int
510 kick_buried_job(tube t)
512 int r;
513 job j;
514 size_t z;
516 if (!buried_job_p(t)) return 0;
517 j = remove_buried_job(t->buried.next);
519 z = binlog_reserve_space_update(j);
520 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
521 j->reserved_binlog_space += z;
523 j->kick_ct++;
524 r = enqueue_job(j, 0, 1);
525 if (r == 1) return 1;
527 /* ready queue is full, so bury it */
528 bury_job(j, 0);
529 return 0;
532 static unsigned int
533 get_delayed_job_ct()
535 tube t;
536 size_t i;
537 unsigned int count = 0;
539 for (i = 0; i < tubes.used; i++) {
540 t = tubes.items[i];
541 count += t->delay.used;
543 return count;
546 static int
547 kick_delayed_job(tube t)
549 int r;
550 job j;
551 size_t z;
553 j = pq_take(&t->delay);
554 if (!j) return 0;
556 z = binlog_reserve_space_update(j);
557 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
558 j->reserved_binlog_space += z;
560 j->kick_ct++;
561 r = enqueue_job(j, 0, 1);
562 if (r == 1) return 1;
564 /* ready queue is full, so delay it again */
565 r = enqueue_job(j, j->delay, 0);
566 if (r == 1) return 0;
568 /* last resort */
569 bury_job(j, 0);
570 return 0;
573 /* return the number of jobs successfully kicked */
574 static unsigned int
575 kick_buried_jobs(tube t, unsigned int n)
577 unsigned int i;
578 for (i = 0; (i < n) && kick_buried_job(t); ++i);
579 return i;
582 /* return the number of jobs successfully kicked */
583 static unsigned int
584 kick_delayed_jobs(tube t, unsigned int n)
586 unsigned int i;
587 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
588 return i;
591 static unsigned int
592 kick_jobs(tube t, unsigned int n)
594 if (buried_job_p(t)) return kick_buried_jobs(t, n);
595 return kick_delayed_jobs(t, n);
598 static job
599 remove_buried_job(job j)
601 if (!j || j->state != JOB_STATE_BURIED) return NULL;
602 j = job_remove(j);
603 if (j) {
604 global_stat.buried_ct--;
605 j->tube->stat.buried_ct--;
607 return j;
610 static job
611 remove_ready_job(job j)
613 if (!j || j->state != JOB_STATE_READY) return NULL;
614 j = pq_remove(&j->tube->ready, j);
615 if (j) {
616 ready_ct--;
617 if (j->pri < URGENT_THRESHOLD) {
618 global_stat.urgent_ct--;
619 j->tube->stat.urgent_ct--;
622 return j;
625 static void
626 enqueue_waiting_conn(conn c)
628 tube t;
629 size_t i;
631 global_stat.waiting_ct++;
632 c->type |= CONN_TYPE_WAITING;
633 for (i = 0; i < c->watch.used; i++) {
634 t = c->watch.items[i];
635 t->stat.waiting_ct++;
636 ms_append(&t->waiting, c);
640 static job
641 find_reserved_job_in_conn(conn c, job j)
643 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
646 static job
647 touch_job(conn c, job j)
649 j = find_reserved_job_in_conn(c, j);
650 if (j) {
651 j->deadline_at = now_usec() + j->ttr;
652 c->soonest_job = NULL;
654 return j;
657 static job
658 peek_job(uint64_t id)
660 return job_find(id);
663 static void
664 check_err(conn c, const char *s)
666 if (errno == EAGAIN) return;
667 if (errno == EINTR) return;
668 if (errno == EWOULDBLOCK) return;
670 twarn("%s", s);
671 conn_close(c);
672 return;
675 /* Scan the given string for the sequence "\r\n" and return the line length.
676 * Always returns at least 2 if a match is found. Returns 0 if no match. */
677 static int
678 scan_line_end(const char *s, int size)
680 char *match;
682 match = memchr(s, '\r', size - 1);
683 if (!match) return 0;
685 /* this is safe because we only scan size - 1 chars above */
686 if (match[1] == '\n') return match - s + 2;
688 return 0;
691 static int
692 cmd_len(conn c)
694 return scan_line_end(c->cmd, c->cmd_read);
697 /* parse the command line */
698 static int
699 which_cmd(conn c)
701 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
702 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
703 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
704 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
705 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
706 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
707 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
708 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
709 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
710 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
711 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
712 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
713 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
714 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
715 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
716 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
717 TEST_CMD(c->cmd, CMD_USE, OP_USE);
718 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
719 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
720 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
721 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
722 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
723 return OP_UNKNOWN;
726 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
727 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
728 * This function is idempotent(). */
729 static void
730 fill_extra_data(conn c)
732 int extra_bytes, job_data_bytes = 0, cmd_bytes;
734 if (!c->fd) return; /* the connection was closed */
735 if (!c->cmd_len) return; /* we don't have a complete command */
737 /* how many extra bytes did we read? */
738 extra_bytes = c->cmd_read - c->cmd_len;
740 /* how many bytes should we put into the job body? */
741 if (c->in_job) {
742 job_data_bytes = min(extra_bytes, c->in_job->body_size);
743 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
744 c->in_job_read = job_data_bytes;
745 } else if (c->in_job_read) {
746 /* we are in bit-bucket mode, throwing away data */
747 job_data_bytes = min(extra_bytes, c->in_job_read);
748 c->in_job_read -= job_data_bytes;
751 /* how many bytes are left to go into the future cmd? */
752 cmd_bytes = extra_bytes - job_data_bytes;
753 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
754 c->cmd_read = cmd_bytes;
755 c->cmd_len = 0; /* we no longer know the length of the new command */
758 static void
759 enqueue_incoming_job(conn c)
761 int r;
762 job j = c->in_job;
764 c->in_job = NULL; /* the connection no longer owns this job */
765 c->in_job_read = 0;
767 /* check if the trailer is present and correct */
768 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
769 job_free(j);
770 return reply_msg(c, MSG_EXPECTED_CRLF);
773 if (drain_mode) {
774 job_free(j);
775 return reply_serr(c, MSG_DRAINING);
778 if (j->reserved_binlog_space) return reply_serr(c, MSG_INTERNAL_ERROR);
779 j->reserved_binlog_space = binlog_reserve_space_put(j);
780 if (!j->reserved_binlog_space) return reply_serr(c, MSG_OUT_OF_MEMORY);
782 /* we have a complete job, so let's stick it in the pqueue */
783 r = enqueue_job(j, j->delay, 1);
784 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
786 op_ct[OP_PUT]++; /* stats */
787 global_stat.total_jobs_ct++;
788 j->tube->stat.total_jobs_ct++;
790 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
792 /* out of memory trying to grow the queue, so it gets buried */
793 bury_job(j, 0);
794 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
797 static unsigned int
798 uptime()
800 return (now_usec() - started_at) / 1000000;
803 static int
804 fmt_stats(char *buf, size_t size, void *x)
806 struct rusage ru = {{0, 0}, {0, 0}};
807 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
808 return snprintf(buf, size, STATS_FMT,
809 global_stat.urgent_ct,
810 ready_ct,
811 global_stat.reserved_ct,
812 get_delayed_job_ct(),
813 global_stat.buried_ct,
814 op_ct[OP_PUT],
815 op_ct[OP_PEEKJOB],
816 op_ct[OP_PEEK_READY],
817 op_ct[OP_PEEK_DELAYED],
818 op_ct[OP_PEEK_BURIED],
819 op_ct[OP_RESERVE],
820 op_ct[OP_RESERVE_TIMEOUT],
821 op_ct[OP_DELETE],
822 op_ct[OP_RELEASE],
823 op_ct[OP_USE],
824 op_ct[OP_WATCH],
825 op_ct[OP_IGNORE],
826 op_ct[OP_BURY],
827 op_ct[OP_KICK],
828 op_ct[OP_TOUCH],
829 op_ct[OP_STATS],
830 op_ct[OP_JOBSTATS],
831 op_ct[OP_STATS_TUBE],
832 op_ct[OP_LIST_TUBES],
833 op_ct[OP_LIST_TUBE_USED],
834 op_ct[OP_LIST_TUBES_WATCHED],
835 timeout_ct,
836 global_stat.total_jobs_ct,
837 job_data_size_limit,
838 tubes.used,
839 count_cur_conns(),
840 count_cur_producers(),
841 count_cur_workers(),
842 global_stat.waiting_ct,
843 count_tot_conns(),
844 (long) getpid(),
845 VERSION,
846 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
847 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
848 uptime(),
849 binlog_oldest_index(),
850 binlog_current_index(),
851 binlog_size_limit);
855 /* Read a priority value from the given buffer and place it in pri.
856 * Update end to point to the address after the last character consumed.
857 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
858 * conversion and return the status code but not update any values. This is an
859 * easy way to check for errors.
860 * If end is NULL, read_pri will also check that the entire input string was
861 * consumed and return an error code otherwise.
862 * Return 0 on success, or nonzero on failure.
863 * If a failure occurs, pri and end are not modified. */
864 static int
865 read_pri(unsigned int *pri, const char *buf, char **end)
867 char *tend;
868 unsigned int tpri;
870 errno = 0;
871 while (buf[0] == ' ') buf++;
872 if (!isdigit(buf[0])) return -1;
873 tpri = strtoul(buf, &tend, 10);
874 if (tend == buf) return -1;
875 if (errno && errno != ERANGE) return -1;
876 if (!end && tend[0] != '\0') return -1;
878 if (pri) *pri = tpri;
879 if (end) *end = tend;
880 return 0;
883 /* Read a delay value from the given buffer and place it in delay.
884 * The interface and behavior are analogous to read_pri(). */
885 static int
886 read_delay(usec *delay, const char *buf, char **end)
888 int r;
889 unsigned int delay_sec;
891 r = read_pri(&delay_sec, buf, end);
892 if (r) return r;
893 *delay = delay_sec * 1000000;
894 return 0;
897 /* Read a timeout value from the given buffer and place it in ttr.
898 * The interface and behavior are the same as in read_delay(). */
899 static int
900 read_ttr(usec *ttr, const char *buf, char **end)
902 return read_delay(ttr, buf, end);
905 static void
906 wait_for_job(conn c, int timeout)
908 int r;
910 c->state = STATE_WAIT;
911 enqueue_waiting_conn(c);
913 /* Set the pending timeout to the requested timeout amount */
914 c->pending_timeout = timeout;
916 /* this conn is waiting, but we want to know if they hang up */
917 r = conn_update_evq(c, EV_READ | EV_PERSIST);
918 if (r == -1) return twarnx("update events failed"), conn_close(c);
921 typedef int(*fmt_fn)(char *, size_t, void *);
923 static void
924 do_stats(conn c, fmt_fn fmt, void *data)
926 int r, stats_len;
928 /* first, measure how big a buffer we will need */
929 stats_len = fmt(NULL, 0, data) + 16;
931 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
932 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
934 /* now actually format the stats data */
935 r = fmt(c->out_job->body, stats_len, data);
936 /* and set the actual body size */
937 c->out_job->body_size = r;
938 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
940 c->out_job_sent = 0;
941 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
944 static void
945 do_list_tubes(conn c, ms l)
947 char *buf;
948 tube t;
949 size_t i, resp_z;
951 /* first, measure how big a buffer we will need */
952 resp_z = 6; /* initial "---\n" and final "\r\n" */
953 for (i = 0; i < l->used; i++) {
954 t = l->items[i];
955 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
958 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
959 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
961 /* now actually format the response */
962 buf = c->out_job->body;
963 buf += snprintf(buf, 5, "---\n");
964 for (i = 0; i < l->used; i++) {
965 t = l->items[i];
966 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
968 buf[0] = '\r';
969 buf[1] = '\n';
971 c->out_job_sent = 0;
972 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
975 static int
976 fmt_job_stats(char *buf, size_t size, job j)
978 usec t;
980 t = now_usec();
981 return snprintf(buf, size, JOB_STATS_FMT,
982 j->id,
983 j->tube->name,
984 job_state(j),
985 j->pri,
986 (t - j->created_at) / 1000000,
987 j->delay / 1000000,
988 j->ttr / 1000000,
989 (j->deadline_at - t) / 1000000,
990 j->reserve_ct,
991 j->timeout_ct,
992 j->release_ct,
993 j->bury_ct,
994 j->kick_ct);
997 static int
998 fmt_stats_tube(char *buf, size_t size, tube t)
1000 return snprintf(buf, size, STATS_TUBE_FMT,
1001 t->name,
1002 t->stat.urgent_ct,
1003 t->ready.used,
1004 t->stat.reserved_ct,
1005 t->delay.used,
1006 t->stat.buried_ct,
1007 t->stat.total_jobs_ct,
1008 t->using_ct,
1009 t->watching_ct,
1010 t->stat.waiting_ct);
1013 static void
1014 maybe_enqueue_incoming_job(conn c)
1016 job j = c->in_job;
1018 /* do we have a complete job? */
1019 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
1021 /* otherwise we have incomplete data, so just keep waiting */
1022 c->state = STATE_WANTDATA;
1025 /* j can be NULL */
1026 static job
1027 remove_this_reserved_job(conn c, job j)
1029 j = job_remove(j);
1030 if (j) {
1031 global_stat.reserved_ct--;
1032 j->tube->stat.reserved_ct--;
1033 j->reserver = NULL;
1035 c->soonest_job = NULL;
1036 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
1037 return j;
1040 static job
1041 remove_reserved_job(conn c, job j)
1043 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1046 static int
1047 name_is_ok(const char *name, size_t max)
1049 size_t len = strlen(name);
1050 return len > 0 && len <= max &&
1051 strspn(name, NAME_CHARS) == len && name[0] != '-';
1054 void
1055 prot_remove_tube(tube t)
1057 ms_remove(&tubes, t);
1060 static void
1061 dispatch_cmd(conn c)
1063 int r, i, timeout = -1;
1064 size_t z;
1065 unsigned int count;
1066 job j;
1067 unsigned char type;
1068 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1069 unsigned int pri, body_size;
1070 usec delay, ttr;
1071 uint64_t id;
1072 tube t = NULL;
1074 /* NUL-terminate this string so we can use strtol and friends */
1075 c->cmd[c->cmd_len - 2] = '\0';
1077 /* check for possible maliciousness */
1078 if (strlen(c->cmd) != c->cmd_len - 2) {
1079 return reply_msg(c, MSG_BAD_FORMAT);
1082 type = which_cmd(c);
1083 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1085 switch (type) {
1086 case OP_PUT:
1087 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1088 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1090 r = read_delay(&delay, delay_buf, &ttr_buf);
1091 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1093 r = read_ttr(&ttr, ttr_buf, &size_buf);
1094 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1096 errno = 0;
1097 body_size = strtoul(size_buf, &end_buf, 10);
1098 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1100 if (body_size > job_data_size_limit) {
1101 return reply_msg(c, MSG_JOB_TOO_BIG);
1104 /* don't allow trailing garbage */
1105 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1107 conn_set_producer(c);
1109 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1111 /* OOM? */
1112 if (!c->in_job) {
1113 /* throw away the job body and respond with OUT_OF_MEMORY */
1115 /* Invert the meaning of in_job_read while throwing away data -- it
1116 * counts the bytes that remain to be thrown away. */
1117 c->in_job_read = body_size + 2;
1118 fill_extra_data(c);
1120 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1122 c->state = STATE_BITBUCKET;
1123 return;
1126 fill_extra_data(c);
1128 /* it's possible we already have a complete job */
1129 maybe_enqueue_incoming_job(c);
1131 break;
1132 case OP_PEEK_READY:
1133 /* don't allow trailing garbage */
1134 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1135 return reply_msg(c, MSG_BAD_FORMAT);
1137 op_ct[type]++;
1139 j = job_copy(pq_peek(&c->use->ready));
1141 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1143 reply_job(c, j, MSG_FOUND);
1144 break;
1145 case OP_PEEK_DELAYED:
1146 /* don't allow trailing garbage */
1147 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1148 return reply_msg(c, MSG_BAD_FORMAT);
1150 op_ct[type]++;
1152 j = job_copy(pq_peek(&c->use->delay));
1154 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1156 reply_job(c, j, MSG_FOUND);
1157 break;
1158 case OP_PEEK_BURIED:
1159 /* don't allow trailing garbage */
1160 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1161 return reply_msg(c, MSG_BAD_FORMAT);
1163 op_ct[type]++;
1165 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1167 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1169 reply_job(c, j, MSG_FOUND);
1170 break;
1171 case OP_PEEKJOB:
1172 errno = 0;
1173 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1174 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1175 op_ct[type]++;
1177 /* So, peek is annoying, because some other connection might free the
1178 * job while we are still trying to write it out. So we copy it and
1179 * then free the copy when it's done sending. */
1180 j = job_copy(peek_job(id));
1182 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1184 reply_job(c, j, MSG_FOUND);
1185 break;
1186 case OP_RESERVE_TIMEOUT:
1187 errno = 0;
1188 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1189 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1190 case OP_RESERVE: /* FALLTHROUGH */
1191 /* don't allow trailing garbage */
1192 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1193 return reply_msg(c, MSG_BAD_FORMAT);
1196 op_ct[type]++;
1197 conn_set_worker(c);
1199 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1200 return reply_msg(c, MSG_DEADLINE_SOON);
1203 /* try to get a new job for this guy */
1204 wait_for_job(c, timeout);
1205 process_queue();
1206 break;
1207 case OP_DELETE:
1208 errno = 0;
1209 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1210 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1211 op_ct[type]++;
1213 j = job_find(id);
1214 j = remove_reserved_job(c, j) ? :
1215 remove_ready_job(j) ? :
1216 remove_buried_job(j);
1218 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1220 j->state = JOB_STATE_INVALID;
1221 r = binlog_write_job(j);
1222 job_free(j);
1224 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1226 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1227 break;
1228 case OP_RELEASE:
1229 errno = 0;
1230 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1231 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1233 r = read_pri(&pri, pri_buf, &delay_buf);
1234 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1236 r = read_delay(&delay, delay_buf, NULL);
1237 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1238 op_ct[type]++;
1240 j = remove_reserved_job(c, job_find(id));
1242 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1244 /* We want to update the delay deadline on disk, so reserve space for
1245 * that. */
1246 if (delay) {
1247 z = binlog_reserve_space_update(j);
1248 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1249 j->reserved_binlog_space += z;
1252 j->pri = pri;
1253 j->delay = delay;
1254 j->release_ct++;
1256 r = enqueue_job(j, delay, !!delay);
1257 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1258 if (r == 1) {
1259 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1262 /* out of memory trying to grow the queue, so it gets buried */
1263 bury_job(j, 0);
1264 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1265 break;
1266 case OP_BURY:
1267 errno = 0;
1268 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1269 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1271 r = read_pri(&pri, pri_buf, NULL);
1272 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1273 op_ct[type]++;
1275 j = remove_reserved_job(c, job_find(id));
1277 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1279 j->pri = pri;
1280 r = bury_job(j, 1);
1281 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1282 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1283 break;
1284 case OP_KICK:
1285 errno = 0;
1286 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1287 if (end_buf == c->cmd + CMD_KICK_LEN) {
1288 return reply_msg(c, MSG_BAD_FORMAT);
1290 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1292 op_ct[type]++;
1294 i = kick_jobs(c->use, count);
1296 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1297 case OP_TOUCH:
1298 errno = 0;
1299 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1300 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1302 op_ct[type]++;
1304 j = touch_job(c, job_find(id));
1306 if (j) {
1307 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1308 } else {
1309 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1311 break;
1312 case OP_STATS:
1313 /* don't allow trailing garbage */
1314 if (c->cmd_len != CMD_STATS_LEN + 2) {
1315 return reply_msg(c, MSG_BAD_FORMAT);
1318 op_ct[type]++;
1320 do_stats(c, fmt_stats, NULL);
1321 break;
1322 case OP_JOBSTATS:
1323 errno = 0;
1324 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1325 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1327 op_ct[type]++;
1329 j = peek_job(id);
1330 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1332 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1333 do_stats(c, (fmt_fn) fmt_job_stats, j);
1334 break;
1335 case OP_STATS_TUBE:
1336 name = c->cmd + CMD_STATS_TUBE_LEN;
1337 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1339 op_ct[type]++;
1341 t = tube_find(name);
1342 if (!t) return reply_msg(c, MSG_NOTFOUND);
1344 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1345 t = NULL;
1346 break;
1347 case OP_LIST_TUBES:
1348 /* don't allow trailing garbage */
1349 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1350 return reply_msg(c, MSG_BAD_FORMAT);
1353 op_ct[type]++;
1354 do_list_tubes(c, &tubes);
1355 break;
1356 case OP_LIST_TUBE_USED:
1357 /* don't allow trailing garbage */
1358 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1359 return reply_msg(c, MSG_BAD_FORMAT);
1362 op_ct[type]++;
1363 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1364 break;
1365 case OP_LIST_TUBES_WATCHED:
1366 /* don't allow trailing garbage */
1367 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1368 return reply_msg(c, MSG_BAD_FORMAT);
1371 op_ct[type]++;
1372 do_list_tubes(c, &c->watch);
1373 break;
1374 case OP_USE:
1375 name = c->cmd + CMD_USE_LEN;
1376 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1377 op_ct[type]++;
1379 TUBE_ASSIGN(t, tube_find_or_make(name));
1380 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1382 c->use->using_ct--;
1383 TUBE_ASSIGN(c->use, t);
1384 TUBE_ASSIGN(t, NULL);
1385 c->use->using_ct++;
1387 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1388 break;
1389 case OP_WATCH:
1390 name = c->cmd + CMD_WATCH_LEN;
1391 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1392 op_ct[type]++;
1394 TUBE_ASSIGN(t, tube_find_or_make(name));
1395 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1397 r = 1;
1398 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1399 TUBE_ASSIGN(t, NULL);
1400 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1402 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1403 break;
1404 case OP_IGNORE:
1405 name = c->cmd + CMD_IGNORE_LEN;
1406 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1407 op_ct[type]++;
1409 t = NULL;
1410 for (i = 0; i < c->watch.used; i++) {
1411 t = c->watch.items[i];
1412 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1413 t = NULL;
1416 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1418 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1419 t = NULL;
1421 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1422 break;
1423 default:
1424 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1428 /* There are three reasons this function may be called. We need to check for
1429 * all of them.
1431 * 1. A reserved job has run out of time.
1432 * 2. A waiting client's reserved job has entered the safety margin.
1433 * 3. A waiting client's requested timeout has occurred.
1435 * If any of these happen, we must do the appropriate thing. */
1436 static void
1437 h_conn_timeout(conn c)
1439 int r, should_timeout = 0;
1440 job j;
1442 /* Check if the client was trying to reserve a job. */
1443 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1445 /* Check if any reserved jobs have run out of time. We should do this
1446 * whether or not the client is waiting for a new reservation. */
1447 while ((j = soonest_job(c))) {
1448 if (j->deadline_at >= now_usec()) break;
1450 /* This job is in the middle of being written out. If we return it to
1451 * the ready queue, someone might free it before we finish writing it
1452 * out to the socket. So we'll copy it here and free the copy when it's
1453 * done sending. */
1454 if (j == c->out_job) {
1455 c->out_job = job_copy(c->out_job);
1458 timeout_ct++; /* stats */
1459 j->timeout_ct++;
1460 r = enqueue_job(remove_this_reserved_job(c, j), 0, 0);
1461 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1462 r = conn_update_evq(c, c->evq.ev_events);
1463 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1466 if (should_timeout) {
1467 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1468 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1469 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1470 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1471 c->pending_timeout = -1;
1472 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1476 void
1477 enter_drain_mode(int sig)
1479 drain_mode = 1;
1482 static void
1483 do_cmd(conn c)
1485 dispatch_cmd(c);
1486 fill_extra_data(c);
1489 static void
1490 reset_conn(conn c)
1492 int r;
1494 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1495 if (r == -1) return twarnx("update events failed"), conn_close(c);
1497 /* was this a peek or stats command? */
1498 if (c->out_job && c->out_job->state == JOB_STATE_COPY) job_free(c->out_job);
1499 c->out_job = NULL;
1501 c->reply_sent = 0; /* now that we're done, reset this */
1502 c->state = STATE_WANTCOMMAND;
1505 static void
1506 h_conn_data(conn c)
1508 int r, to_read;
1509 job j;
1510 struct iovec iov[2];
1512 switch (c->state) {
1513 case STATE_WANTCOMMAND:
1514 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1515 if (r == -1) return check_err(c, "read()");
1516 if (r == 0) return conn_close(c); /* the client hung up */
1518 c->cmd_read += r; /* we got some bytes */
1520 c->cmd_len = cmd_len(c); /* find the EOL */
1522 /* yay, complete command line */
1523 if (c->cmd_len) return do_cmd(c);
1525 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1527 /* command line too long? */
1528 if (c->cmd_read == LINE_BUF_SIZE) {
1529 c->cmd_read = 0; /* discard the input so far */
1530 return reply_msg(c, MSG_BAD_FORMAT);
1533 /* otherwise we have an incomplete line, so just keep waiting */
1534 break;
1535 case STATE_BITBUCKET:
1536 /* Invert the meaning of in_job_read while throwing away data -- it
1537 * counts the bytes that remain to be thrown away. */
1538 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1539 r = read(c->fd, bucket, to_read);
1540 if (r == -1) return check_err(c, "read()");
1541 if (r == 0) return conn_close(c); /* the client hung up */
1543 c->in_job_read -= r; /* we got some bytes */
1545 /* (c->in_job_read < 0) can't happen */
1547 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1548 break;
1549 case STATE_WANTDATA:
1550 j = c->in_job;
1552 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1553 if (r == -1) return check_err(c, "read()");
1554 if (r == 0) return conn_close(c); /* the client hung up */
1556 c->in_job_read += r; /* we got some bytes */
1558 /* (j->in_job_read > j->body_size) can't happen */
1560 maybe_enqueue_incoming_job(c);
1561 break;
1562 case STATE_SENDWORD:
1563 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1564 if (r == -1) return check_err(c, "write()");
1565 if (r == 0) return conn_close(c); /* the client hung up */
1567 c->reply_sent += r; /* we got some bytes */
1569 /* (c->reply_sent > c->reply_len) can't happen */
1571 if (c->reply_sent == c->reply_len) return reset_conn(c);
1573 /* otherwise we sent an incomplete reply, so just keep waiting */
1574 break;
1575 case STATE_SENDJOB:
1576 j = c->out_job;
1578 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1579 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1580 iov[1].iov_base = j->body + c->out_job_sent;
1581 iov[1].iov_len = j->body_size - c->out_job_sent;
1583 r = writev(c->fd, iov, 2);
1584 if (r == -1) return check_err(c, "writev()");
1585 if (r == 0) return conn_close(c); /* the client hung up */
1587 /* update the sent values */
1588 c->reply_sent += r;
1589 if (c->reply_sent >= c->reply_len) {
1590 c->out_job_sent += c->reply_sent - c->reply_len;
1591 c->reply_sent = c->reply_len;
1594 /* (c->out_job_sent > j->body_size) can't happen */
1596 /* are we done? */
1597 if (c->out_job_sent == j->body_size) return reset_conn(c);
1599 /* otherwise we sent incomplete data, so just keep waiting */
1600 break;
1601 case STATE_WAIT: /* keep an eye out in case they hang up */
1602 /* but don't hang up just because our buffer is full */
1603 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1605 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1606 if (r == -1) return check_err(c, "read()");
1607 if (r == 0) return conn_close(c); /* the client hung up */
1608 c->cmd_read += r; /* we got some bytes */
1612 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1613 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1615 static void
1616 h_conn(const int fd, const short which, conn c)
1618 if (fd != c->fd) {
1619 twarnx("Argh! event fd doesn't match conn fd.");
1620 close(fd);
1621 return conn_close(c);
1624 switch (which) {
1625 case EV_TIMEOUT:
1626 h_conn_timeout(c);
1627 event_add(&c->evq, NULL); /* seems to be necessary */
1628 break;
1629 case EV_READ:
1630 /* fall through... */
1631 case EV_WRITE:
1632 /* fall through... */
1633 default:
1634 h_conn_data(c);
1637 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1640 static void
1641 h_delay()
1643 int r;
1644 job j;
1645 usec t;
1647 t = now_usec();
1648 while ((j = delay_q_peek())) {
1649 if (j->deadline_at > t) break;
1650 j = delay_q_take();
1651 r = enqueue_job(j, 0, 0);
1652 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1655 set_main_delay_timeout();
1658 void
1659 h_accept(const int fd, const short which, struct event *ev)
1661 conn c;
1662 int cfd, flags, r;
1663 socklen_t addrlen;
1664 struct sockaddr addr;
1666 if (which == EV_TIMEOUT) return h_delay();
1668 addrlen = sizeof addr;
1669 cfd = accept(fd, &addr, &addrlen);
1670 if (cfd == -1) {
1671 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1672 if (errno == EMFILE) brake();
1673 return;
1676 flags = fcntl(cfd, F_GETFL, 0);
1677 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1679 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1680 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1682 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1683 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1685 dprintf("accepted conn, fd=%d\n", cfd);
1686 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1687 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1690 void
1691 prot_init()
1693 started_at = now_usec();
1694 memset(op_ct, 0, sizeof(op_ct));
1696 ms_init(&tubes, NULL, NULL);
1698 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1699 if (!default_tube) twarnx("Out of memory during startup!");
1702 void
1703 prot_replay_binlog(job binlog_jobs)
1705 job j, nj;
1706 usec delay;
1707 int r;
1709 for (j = binlog_jobs->next ; j != binlog_jobs ; j = nj) {
1710 nj = j->next;
1711 job_remove(j);
1712 binlog_reserve_space_update(j); /* reserve space for a delete */
1713 delay = 0;
1714 switch (j->state) {
1715 case JOB_STATE_BURIED:
1716 bury_job(j, 0);
1717 break;
1718 case JOB_STATE_DELAYED:
1719 if (started_at < j->deadline_at) {
1720 delay = j->deadline_at - started_at;
1722 /* fall through */
1723 default:
1724 r = enqueue_job(j, delay, 0);
1725 if (r < 1) twarnx("error processing binlog job %llu", j->id);