Need 64-bit math here to avoid overflow.
[beanstalkd.git] / prot.c
blob81db0afbf02d92bb43d6db04db40653b1cee4f38
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "config.h"
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <sys/resource.h>
27 #include <sys/uio.h>
28 #include <stdarg.h>
29 #include <ctype.h>
30 #include <inttypes.h>
32 #include "stat.h"
33 #include "prot.h"
34 #include "pq.h"
35 #include "ms.h"
36 #include "job.h"
37 #include "tube.h"
38 #include "conn.h"
39 #include "util.h"
40 #include "net.h"
41 #include "binlog.h"
43 /* job body cannot be greater than this many bytes long */
44 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
46 #define NAME_CHARS \
47 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
48 "abcdefghijklmnopqrstuvwxyz" \
49 "0123456789-+/;.$()"
51 #define CMD_PUT "put "
52 #define CMD_PEEKJOB "peek "
53 #define CMD_PEEK_READY "peek-ready"
54 #define CMD_PEEK_DELAYED "peek-delayed"
55 #define CMD_PEEK_BURIED "peek-buried"
56 #define CMD_RESERVE "reserve"
57 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
58 #define CMD_DELETE "delete "
59 #define CMD_RELEASE "release "
60 #define CMD_BURY "bury "
61 #define CMD_KICK "kick "
62 #define CMD_TOUCH "touch "
63 #define CMD_STATS "stats"
64 #define CMD_JOBSTATS "stats-job "
65 #define CMD_USE "use "
66 #define CMD_WATCH "watch "
67 #define CMD_IGNORE "ignore "
68 #define CMD_LIST_TUBES "list-tubes"
69 #define CMD_LIST_TUBE_USED "list-tube-used"
70 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
71 #define CMD_STATS_TUBE "stats-tube "
72 #define CMD_QUIT "quit"
74 #define CONSTSTRLEN(m) (sizeof(m) - 1)
76 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
77 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
78 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
79 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
80 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
81 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
82 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
83 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
84 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
85 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
86 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
87 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
88 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
89 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
90 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
91 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
92 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
93 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
94 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
95 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
97 #define MSG_FOUND "FOUND"
98 #define MSG_NOTFOUND "NOT_FOUND\r\n"
99 #define MSG_RESERVED "RESERVED"
100 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
101 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
102 #define MSG_DELETED "DELETED\r\n"
103 #define MSG_RELEASED "RELEASED\r\n"
104 #define MSG_BURIED "BURIED\r\n"
105 #define MSG_TOUCHED "TOUCHED\r\n"
106 #define MSG_BURIED_FMT "BURIED %llu\r\n"
107 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
108 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
110 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
111 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
112 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
113 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
114 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
115 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
117 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
118 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
119 #define MSG_DRAINING "DRAINING\r\n"
120 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
121 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
122 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
123 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
125 #define STATE_WANTCOMMAND 0
126 #define STATE_WANTDATA 1
127 #define STATE_SENDJOB 2
128 #define STATE_SENDWORD 3
129 #define STATE_WAIT 4
130 #define STATE_BITBUCKET 5
132 #define OP_UNKNOWN 0
133 #define OP_PUT 1
134 #define OP_PEEKJOB 2
135 #define OP_RESERVE 3
136 #define OP_DELETE 4
137 #define OP_RELEASE 5
138 #define OP_BURY 6
139 #define OP_KICK 7
140 #define OP_STATS 8
141 #define OP_JOBSTATS 9
142 #define OP_PEEK_BURIED 10
143 #define OP_USE 11
144 #define OP_WATCH 12
145 #define OP_IGNORE 13
146 #define OP_LIST_TUBES 14
147 #define OP_LIST_TUBE_USED 15
148 #define OP_LIST_TUBES_WATCHED 16
149 #define OP_STATS_TUBE 17
150 #define OP_PEEK_READY 18
151 #define OP_PEEK_DELAYED 19
152 #define OP_RESERVE_TIMEOUT 20
153 #define OP_TOUCH 21
154 #define OP_QUIT 22
155 #define TOTAL_OPS 23
157 #define STATS_FMT "---\n" \
158 "current-jobs-urgent: %u\n" \
159 "current-jobs-ready: %u\n" \
160 "current-jobs-reserved: %u\n" \
161 "current-jobs-delayed: %u\n" \
162 "current-jobs-buried: %u\n" \
163 "cmd-put: %" PRIu64 "\n" \
164 "cmd-peek: %" PRIu64 "\n" \
165 "cmd-peek-ready: %" PRIu64 "\n" \
166 "cmd-peek-delayed: %" PRIu64 "\n" \
167 "cmd-peek-buried: %" PRIu64 "\n" \
168 "cmd-reserve: %" PRIu64 "\n" \
169 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
170 "cmd-delete: %" PRIu64 "\n" \
171 "cmd-release: %" PRIu64 "\n" \
172 "cmd-use: %" PRIu64 "\n" \
173 "cmd-watch: %" PRIu64 "\n" \
174 "cmd-ignore: %" PRIu64 "\n" \
175 "cmd-bury: %" PRIu64 "\n" \
176 "cmd-kick: %" PRIu64 "\n" \
177 "cmd-touch: %" PRIu64 "\n" \
178 "cmd-stats: %" PRIu64 "\n" \
179 "cmd-stats-job: %" PRIu64 "\n" \
180 "cmd-stats-tube: %" PRIu64 "\n" \
181 "cmd-list-tubes: %" PRIu64 "\n" \
182 "cmd-list-tube-used: %" PRIu64 "\n" \
183 "cmd-list-tubes-watched: %" PRIu64 "\n" \
184 "job-timeouts: %" PRIu64 "\n" \
185 "total-jobs: %" PRIu64 "\n" \
186 "max-job-size: %zu\n" \
187 "current-tubes: %zu\n" \
188 "current-connections: %u\n" \
189 "current-producers: %u\n" \
190 "current-workers: %u\n" \
191 "current-waiting: %u\n" \
192 "total-connections: %u\n" \
193 "pid: %ld\n" \
194 "version: %s\n" \
195 "rusage-utime: %d.%06d\n" \
196 "rusage-stime: %d.%06d\n" \
197 "uptime: %u\n" \
198 "binlog-oldest-index: %s\n" \
199 "binlog-current-index: %s\n" \
200 "binlog-max-size: %zu\n" \
201 "\r\n"
203 #define STATS_TUBE_FMT "---\n" \
204 "name: %s\n" \
205 "current-jobs-urgent: %u\n" \
206 "current-jobs-ready: %u\n" \
207 "current-jobs-reserved: %u\n" \
208 "current-jobs-delayed: %u\n" \
209 "current-jobs-buried: %u\n" \
210 "total-jobs: %" PRIu64 "\n" \
211 "current-using: %u\n" \
212 "current-watching: %u\n" \
213 "current-waiting: %u\n" \
214 "\r\n"
216 /* this number is pretty arbitrary */
217 #define BUCKET_BUF_SIZE 1024
219 static char bucket[BUCKET_BUF_SIZE];
221 static unsigned int ready_ct = 0;
222 static struct stats global_stat = {0, 0, 0, 0, 0};
224 static tube default_tube;
226 static int drain_mode = 0;
227 static usec started_at;
228 static uint64_t op_ct[TOTAL_OPS], timeout_ct = 0;
231 /* Doubly-linked list of connections with at least one reserved job. */
232 static struct conn running = { &running, &running, 0 };
234 #ifdef DEBUG
235 static const char * op_names[] = {
236 "<unknown>",
237 CMD_PUT,
238 CMD_PEEKJOB,
239 CMD_RESERVE,
240 CMD_DELETE,
241 CMD_RELEASE,
242 CMD_BURY,
243 CMD_KICK,
244 CMD_STATS,
245 CMD_JOBSTATS,
246 CMD_PEEK_BURIED,
247 CMD_USE,
248 CMD_WATCH,
249 CMD_IGNORE,
250 CMD_LIST_TUBES,
251 CMD_LIST_TUBE_USED,
252 CMD_LIST_TUBES_WATCHED,
253 CMD_STATS_TUBE,
254 CMD_PEEK_READY,
255 CMD_PEEK_DELAYED,
256 CMD_RESERVE_TIMEOUT,
257 CMD_TOUCH,
258 CMD_QUIT
260 #endif
262 static job remove_buried_job(job j);
264 static int
265 buried_job_p(tube t)
267 return job_list_any_p(&t->buried);
270 static void
271 reply(conn c, const char *line, int len, int state)
273 int r;
275 if (!c) return;
277 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
278 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
280 c->reply = line;
281 c->reply_len = len;
282 c->reply_sent = 0;
283 c->state = state;
284 dprintf("sending reply: %.*s", len, line);
287 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
289 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
290 reply_msg((c),(e)))
292 static void
293 reply_line(conn c, int state, const char *fmt, ...)
295 int r;
296 va_list ap;
298 va_start(ap, fmt);
299 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
300 va_end(ap);
302 /* Make sure the buffer was big enough. If not, we have a bug. */
303 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
305 return reply(c, c->reply_buf, r, state);
308 static void
309 reply_job(conn c, job j, const char *word)
311 /* tell this connection which job to send */
312 c->out_job = j;
313 c->out_job_sent = 0;
315 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
316 word, j->id, j->body_size - 2);
319 conn
320 remove_waiting_conn(conn c)
322 tube t;
323 size_t i;
325 if (!conn_waiting(c)) return NULL;
327 c->type &= ~CONN_TYPE_WAITING;
328 global_stat.waiting_ct--;
329 for (i = 0; i < c->watch.used; i++) {
330 t = c->watch.items[i];
331 t->stat.waiting_ct--;
332 ms_remove(&t->waiting, c);
334 return c;
337 static void
338 reserve_job(conn c, job j)
340 j->deadline_at = now_usec() + j->ttr;
341 global_stat.reserved_ct++; /* stats */
342 j->tube->stat.reserved_ct++;
343 j->reserve_ct++;
344 conn_insert(&running, c);
345 j->state = JOB_STATE_RESERVED;
346 job_insert(&c->reserved_jobs, j);
347 j->reserver = c;
348 if (c->soonest_job && j->deadline_at < c->soonest_job->deadline_at) {
349 c->soonest_job = j;
351 return reply_job(c, j, MSG_RESERVED);
354 static job
355 next_eligible_job()
357 tube t;
358 size_t i;
359 job j = NULL, candidate;
361 dprintf("tubes.used = %zu\n", tubes.used);
362 for (i = 0; i < tubes.used; i++) {
363 t = tubes.items[i];
364 dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
365 t->name, t->waiting.used, t->ready.used);
366 if (t->waiting.used && t->ready.used) {
367 candidate = pq_peek(&t->ready);
368 if (!j || job_pri_cmp(candidate, j) < 0) j = candidate;
370 dprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
373 return j;
376 static void
377 process_queue()
379 job j;
381 dprintf("processing queue\n");
382 while ((j = next_eligible_job())) {
383 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
384 j = pq_take(&j->tube->ready);
385 ready_ct--;
386 if (j->pri < URGENT_THRESHOLD) {
387 global_stat.urgent_ct--;
388 j->tube->stat.urgent_ct--;
390 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
394 static job
395 delay_q_peek()
397 int i;
398 tube t;
399 job j = NULL, nj;
401 for (i = 0; i < tubes.used; i++) {
402 t = tubes.items[i];
403 nj = pq_peek(&t->delay);
404 if (!nj) continue;
405 if (!j || nj->deadline_at < j->deadline_at) j = nj;
408 return j;
411 static void
412 set_main_delay_timeout()
414 job j;
416 set_main_timeout((j = delay_q_peek()) ? j->deadline_at : 0);
419 static int
420 enqueue_job(job j, usec delay, char update_store)
422 int r;
424 j->reserver = NULL;
425 if (delay) {
426 j->deadline_at = now_usec() + delay;
427 r = pq_give(&j->tube->delay, j);
428 if (!r) return 0;
429 j->state = JOB_STATE_DELAYED;
430 set_main_delay_timeout();
431 } else {
432 r = pq_give(&j->tube->ready, j);
433 if (!r) return 0;
434 j->state = JOB_STATE_READY;
435 ready_ct++;
436 if (j->pri < URGENT_THRESHOLD) {
437 global_stat.urgent_ct++;
438 j->tube->stat.urgent_ct++;
442 if (update_store) {
443 r = binlog_write_job(j);
444 if (!r) return -1;
447 process_queue();
448 return 1;
451 static int
452 bury_job(job j, char update_store)
454 size_t z;
456 if (update_store) {
457 z = binlog_reserve_space_update(j);
458 if (!z) return 0;
459 j->reserved_binlog_space += z;
462 job_insert(&j->tube->buried, j);
463 global_stat.buried_ct++;
464 j->tube->stat.buried_ct++;
465 j->state = JOB_STATE_BURIED;
466 j->reserver = NULL;
467 j->bury_ct++;
469 if (update_store) return binlog_write_job(j);
471 return 1;
474 void
475 enqueue_reserved_jobs(conn c)
477 int r;
478 job j;
480 while (job_list_any_p(&c->reserved_jobs)) {
481 j = job_remove(c->reserved_jobs.next);
482 r = enqueue_job(j, 0, 0);
483 if (r < 1) bury_job(j, 0);
484 global_stat.reserved_ct--;
485 j->tube->stat.reserved_ct--;
486 c->soonest_job = NULL;
487 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
491 static job
492 delay_q_take()
494 job j = delay_q_peek();
495 return j ? pq_take(&j->tube->delay) : NULL;
498 static int
499 kick_buried_job(tube t)
501 int r;
502 job j;
503 size_t z;
505 if (!buried_job_p(t)) return 0;
506 j = remove_buried_job(t->buried.next);
508 z = binlog_reserve_space_update(j);
509 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
510 j->reserved_binlog_space += z;
512 j->kick_ct++;
513 r = enqueue_job(j, 0, 1);
514 if (r == 1) return 1;
516 /* ready queue is full, so bury it */
517 bury_job(j, 0);
518 return 0;
521 static unsigned int
522 get_delayed_job_ct()
524 tube t;
525 size_t i;
526 unsigned int count = 0;
528 for (i = 0; i < tubes.used; i++) {
529 t = tubes.items[i];
530 count += t->delay.used;
532 return count;
535 static int
536 kick_delayed_job(tube t)
538 int r;
539 job j;
540 size_t z;
542 j = pq_take(&t->delay);
543 if (!j) return 0;
545 z = binlog_reserve_space_update(j);
546 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
547 j->reserved_binlog_space += z;
549 j->kick_ct++;
550 r = enqueue_job(j, 0, 1);
551 if (r == 1) return 1;
553 /* ready queue is full, so delay it again */
554 r = enqueue_job(j, j->delay, 0);
555 if (r == 1) return 0;
557 /* last resort */
558 bury_job(j, 0);
559 return 0;
562 /* return the number of jobs successfully kicked */
563 static unsigned int
564 kick_buried_jobs(tube t, unsigned int n)
566 unsigned int i;
567 for (i = 0; (i < n) && kick_buried_job(t); ++i);
568 return i;
571 /* return the number of jobs successfully kicked */
572 static unsigned int
573 kick_delayed_jobs(tube t, unsigned int n)
575 unsigned int i;
576 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
577 return i;
580 static unsigned int
581 kick_jobs(tube t, unsigned int n)
583 if (buried_job_p(t)) return kick_buried_jobs(t, n);
584 return kick_delayed_jobs(t, n);
587 static job
588 remove_buried_job(job j)
590 if (!j || j->state != JOB_STATE_BURIED) return NULL;
591 j = job_remove(j);
592 if (j) {
593 global_stat.buried_ct--;
594 j->tube->stat.buried_ct--;
596 return j;
599 static job
600 remove_ready_job(job j)
602 if (!j || j->state != JOB_STATE_READY) return NULL;
603 j = pq_remove(&j->tube->ready, j);
604 if (j) {
605 ready_ct--;
606 if (j->pri < URGENT_THRESHOLD) {
607 global_stat.urgent_ct--;
608 j->tube->stat.urgent_ct--;
611 return j;
614 static void
615 enqueue_waiting_conn(conn c)
617 tube t;
618 size_t i;
620 global_stat.waiting_ct++;
621 c->type |= CONN_TYPE_WAITING;
622 for (i = 0; i < c->watch.used; i++) {
623 t = c->watch.items[i];
624 t->stat.waiting_ct++;
625 ms_append(&t->waiting, c);
629 static job
630 find_reserved_job_in_conn(conn c, job j)
632 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
635 static job
636 touch_job(conn c, job j)
638 j = find_reserved_job_in_conn(c, j);
639 if (j) {
640 j->deadline_at = now_usec() + j->ttr;
641 c->soonest_job = NULL;
643 return j;
646 static job
647 peek_job(uint64_t id)
649 return job_find(id);
652 static void
653 check_err(conn c, const char *s)
655 if (errno == EAGAIN) return;
656 if (errno == EINTR) return;
657 if (errno == EWOULDBLOCK) return;
659 twarn("%s", s);
660 conn_close(c);
661 return;
664 /* Scan the given string for the sequence "\r\n" and return the line length.
665 * Always returns at least 2 if a match is found. Returns 0 if no match. */
666 static int
667 scan_line_end(const char *s, int size)
669 char *match;
671 match = memchr(s, '\r', size - 1);
672 if (!match) return 0;
674 /* this is safe because we only scan size - 1 chars above */
675 if (match[1] == '\n') return match - s + 2;
677 return 0;
680 static int
681 cmd_len(conn c)
683 return scan_line_end(c->cmd, c->cmd_read);
686 /* parse the command line */
687 static int
688 which_cmd(conn c)
690 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
691 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
692 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
693 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
694 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
695 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
696 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
697 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
698 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
699 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
700 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
701 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
702 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
703 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
704 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
705 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
706 TEST_CMD(c->cmd, CMD_USE, OP_USE);
707 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
708 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
709 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
710 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
711 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
712 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
713 return OP_UNKNOWN;
716 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
717 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
718 * This function is idempotent(). */
719 static void
720 fill_extra_data(conn c)
722 int extra_bytes, job_data_bytes = 0, cmd_bytes;
724 if (!c->fd) return; /* the connection was closed */
725 if (!c->cmd_len) return; /* we don't have a complete command */
727 /* how many extra bytes did we read? */
728 extra_bytes = c->cmd_read - c->cmd_len;
730 /* how many bytes should we put into the job body? */
731 if (c->in_job) {
732 job_data_bytes = min(extra_bytes, c->in_job->body_size);
733 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
734 c->in_job_read = job_data_bytes;
735 } else if (c->in_job_read) {
736 /* we are in bit-bucket mode, throwing away data */
737 job_data_bytes = min(extra_bytes, c->in_job_read);
738 c->in_job_read -= job_data_bytes;
741 /* how many bytes are left to go into the future cmd? */
742 cmd_bytes = extra_bytes - job_data_bytes;
743 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
744 c->cmd_read = cmd_bytes;
745 c->cmd_len = 0; /* we no longer know the length of the new command */
748 static void
749 enqueue_incoming_job(conn c)
751 int r;
752 job j = c->in_job;
754 c->in_job = NULL; /* the connection no longer owns this job */
755 c->in_job_read = 0;
757 /* check if the trailer is present and correct */
758 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
759 job_free(j);
760 return reply_msg(c, MSG_EXPECTED_CRLF);
763 if (drain_mode) {
764 job_free(j);
765 return reply_serr(c, MSG_DRAINING);
768 if (j->reserved_binlog_space) return reply_serr(c, MSG_INTERNAL_ERROR);
769 j->reserved_binlog_space = binlog_reserve_space_put(j);
770 if (!j->reserved_binlog_space) return reply_serr(c, MSG_OUT_OF_MEMORY);
772 /* we have a complete job, so let's stick it in the pqueue */
773 r = enqueue_job(j, j->delay, 1);
774 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
776 op_ct[OP_PUT]++; /* stats */
777 global_stat.total_jobs_ct++;
778 j->tube->stat.total_jobs_ct++;
780 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
782 /* out of memory trying to grow the queue, so it gets buried */
783 bury_job(j, 0);
784 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
787 static unsigned int
788 uptime()
790 return (now_usec() - started_at) / 1000000;
793 static int
794 fmt_stats(char *buf, size_t size, void *x)
796 struct rusage ru = {{0, 0}, {0, 0}};
797 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
798 return snprintf(buf, size, STATS_FMT,
799 global_stat.urgent_ct,
800 ready_ct,
801 global_stat.reserved_ct,
802 get_delayed_job_ct(),
803 global_stat.buried_ct,
804 op_ct[OP_PUT],
805 op_ct[OP_PEEKJOB],
806 op_ct[OP_PEEK_READY],
807 op_ct[OP_PEEK_DELAYED],
808 op_ct[OP_PEEK_BURIED],
809 op_ct[OP_RESERVE],
810 op_ct[OP_RESERVE_TIMEOUT],
811 op_ct[OP_DELETE],
812 op_ct[OP_RELEASE],
813 op_ct[OP_USE],
814 op_ct[OP_WATCH],
815 op_ct[OP_IGNORE],
816 op_ct[OP_BURY],
817 op_ct[OP_KICK],
818 op_ct[OP_TOUCH],
819 op_ct[OP_STATS],
820 op_ct[OP_JOBSTATS],
821 op_ct[OP_STATS_TUBE],
822 op_ct[OP_LIST_TUBES],
823 op_ct[OP_LIST_TUBE_USED],
824 op_ct[OP_LIST_TUBES_WATCHED],
825 timeout_ct,
826 global_stat.total_jobs_ct,
827 job_data_size_limit,
828 tubes.used,
829 count_cur_conns(),
830 count_cur_producers(),
831 count_cur_workers(),
832 global_stat.waiting_ct,
833 count_tot_conns(),
834 (long) getpid(),
835 VERSION,
836 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
837 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
838 uptime(),
839 binlog_oldest_index(),
840 binlog_current_index(),
841 binlog_size_limit);
845 /* Read a priority value from the given buffer and place it in pri.
846 * Update end to point to the address after the last character consumed.
847 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
848 * conversion and return the status code but not update any values. This is an
849 * easy way to check for errors.
850 * If end is NULL, read_pri will also check that the entire input string was
851 * consumed and return an error code otherwise.
852 * Return 0 on success, or nonzero on failure.
853 * If a failure occurs, pri and end are not modified. */
854 static int
855 read_pri(unsigned int *pri, const char *buf, char **end)
857 char *tend;
858 unsigned int tpri;
860 errno = 0;
861 while (buf[0] == ' ') buf++;
862 if (!isdigit(buf[0])) return -1;
863 tpri = strtoul(buf, &tend, 10);
864 if (tend == buf) return -1;
865 if (errno && errno != ERANGE) return -1;
866 if (!end && tend[0] != '\0') return -1;
868 if (pri) *pri = tpri;
869 if (end) *end = tend;
870 return 0;
873 /* Read a delay value from the given buffer and place it in delay.
874 * The interface and behavior are analogous to read_pri(). */
875 static int
876 read_delay(usec *delay, const char *buf, char **end)
878 int r;
879 unsigned int delay_sec;
881 r = read_pri(&delay_sec, buf, end);
882 if (r) return r;
883 *delay = ((usec) delay_sec) * 1000000;
884 return 0;
887 /* Read a timeout value from the given buffer and place it in ttr.
888 * The interface and behavior are the same as in read_delay(). */
889 static int
890 read_ttr(usec *ttr, const char *buf, char **end)
892 return read_delay(ttr, buf, end);
895 static void
896 wait_for_job(conn c, int timeout)
898 int r;
900 c->state = STATE_WAIT;
901 enqueue_waiting_conn(c);
903 /* Set the pending timeout to the requested timeout amount */
904 c->pending_timeout = timeout;
906 /* this conn is waiting, but we want to know if they hang up */
907 r = conn_update_evq(c, EV_READ | EV_PERSIST);
908 if (r == -1) return twarnx("update events failed"), conn_close(c);
911 typedef int(*fmt_fn)(char *, size_t, void *);
913 static void
914 do_stats(conn c, fmt_fn fmt, void *data)
916 int r, stats_len;
918 /* first, measure how big a buffer we will need */
919 stats_len = fmt(NULL, 0, data) + 16;
921 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
922 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
924 /* now actually format the stats data */
925 r = fmt(c->out_job->body, stats_len, data);
926 /* and set the actual body size */
927 c->out_job->body_size = r;
928 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
930 c->out_job_sent = 0;
931 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
934 static void
935 do_list_tubes(conn c, ms l)
937 char *buf;
938 tube t;
939 size_t i, resp_z;
941 /* first, measure how big a buffer we will need */
942 resp_z = 6; /* initial "---\n" and final "\r\n" */
943 for (i = 0; i < l->used; i++) {
944 t = l->items[i];
945 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
948 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
949 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
951 /* now actually format the response */
952 buf = c->out_job->body;
953 buf += snprintf(buf, 5, "---\n");
954 for (i = 0; i < l->used; i++) {
955 t = l->items[i];
956 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
958 buf[0] = '\r';
959 buf[1] = '\n';
961 c->out_job_sent = 0;
962 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
965 static int
966 fmt_job_stats(char *buf, size_t size, job j)
968 usec t;
969 uint64_t time_left;
971 t = now_usec();
972 if (j->state == JOB_STATE_RESERVED || j->state == JOB_STATE_DELAYED) {
973 time_left = (j->deadline_at - t) / 1000000;
974 } else {
975 time_left = 0;
977 return snprintf(buf, size,
978 "id: %" PRIu64 "\n"
979 "tube: %s\n"
980 "state: %s\n"
981 "pri: %u\n"
982 "age: %" PRIu64 "\n"
983 "delay: %" PRIu64 "\n"
984 "ttr: %" PRIu64 "\n"
985 "time-left: %" PRIu64 "\n"
986 "reserves: %u\n"
987 "timeouts: %u\n"
988 "releases: %u\n"
989 "buries: %u\n"
990 "kicks: %u\n"
991 "\r\n",
993 j->id,
994 j->tube->name,
995 job_state(j),
996 j->pri,
997 (t - j->created_at) / 1000000,
998 j->delay / 1000000,
999 j->ttr / 1000000,
1000 time_left,
1001 j->reserve_ct,
1002 j->timeout_ct,
1003 j->release_ct,
1004 j->bury_ct,
1005 j->kick_ct);
1008 static int
1009 fmt_stats_tube(char *buf, size_t size, tube t)
1011 return snprintf(buf, size, STATS_TUBE_FMT,
1012 t->name,
1013 t->stat.urgent_ct,
1014 t->ready.used,
1015 t->stat.reserved_ct,
1016 t->delay.used,
1017 t->stat.buried_ct,
1018 t->stat.total_jobs_ct,
1019 t->using_ct,
1020 t->watching_ct,
1021 t->stat.waiting_ct);
1024 static void
1025 maybe_enqueue_incoming_job(conn c)
1027 job j = c->in_job;
1029 /* do we have a complete job? */
1030 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
1032 /* otherwise we have incomplete data, so just keep waiting */
1033 c->state = STATE_WANTDATA;
1036 /* j can be NULL */
1037 static job
1038 remove_this_reserved_job(conn c, job j)
1040 j = job_remove(j);
1041 if (j) {
1042 global_stat.reserved_ct--;
1043 j->tube->stat.reserved_ct--;
1044 j->reserver = NULL;
1046 c->soonest_job = NULL;
1047 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
1048 return j;
1051 static job
1052 remove_reserved_job(conn c, job j)
1054 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1057 static int
1058 name_is_ok(const char *name, size_t max)
1060 size_t len = strlen(name);
1061 return len > 0 && len <= max &&
1062 strspn(name, NAME_CHARS) == len && name[0] != '-';
1065 void
1066 prot_remove_tube(tube t)
1068 ms_remove(&tubes, t);
1071 static void
1072 dispatch_cmd(conn c)
1074 int r, i, timeout = -1;
1075 size_t z;
1076 unsigned int count;
1077 job j;
1078 unsigned char type;
1079 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1080 unsigned int pri, body_size;
1081 usec delay, ttr;
1082 uint64_t id;
1083 tube t = NULL;
1085 /* NUL-terminate this string so we can use strtol and friends */
1086 c->cmd[c->cmd_len - 2] = '\0';
1088 /* check for possible maliciousness */
1089 if (strlen(c->cmd) != c->cmd_len - 2) {
1090 return reply_msg(c, MSG_BAD_FORMAT);
1093 type = which_cmd(c);
1094 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1096 switch (type) {
1097 case OP_PUT:
1098 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1099 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1101 r = read_delay(&delay, delay_buf, &ttr_buf);
1102 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1104 r = read_ttr(&ttr, ttr_buf, &size_buf);
1105 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1107 errno = 0;
1108 body_size = strtoul(size_buf, &end_buf, 10);
1109 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1111 if (body_size > job_data_size_limit) {
1112 return reply_msg(c, MSG_JOB_TOO_BIG);
1115 /* don't allow trailing garbage */
1116 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1118 conn_set_producer(c);
1120 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1122 /* OOM? */
1123 if (!c->in_job) {
1124 /* throw away the job body and respond with OUT_OF_MEMORY */
1126 /* Invert the meaning of in_job_read while throwing away data -- it
1127 * counts the bytes that remain to be thrown away. */
1128 c->in_job_read = body_size + 2;
1129 fill_extra_data(c);
1131 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1133 c->state = STATE_BITBUCKET;
1134 return;
1137 fill_extra_data(c);
1139 /* it's possible we already have a complete job */
1140 maybe_enqueue_incoming_job(c);
1142 break;
1143 case OP_PEEK_READY:
1144 /* don't allow trailing garbage */
1145 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1146 return reply_msg(c, MSG_BAD_FORMAT);
1148 op_ct[type]++;
1150 j = job_copy(pq_peek(&c->use->ready));
1152 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1154 reply_job(c, j, MSG_FOUND);
1155 break;
1156 case OP_PEEK_DELAYED:
1157 /* don't allow trailing garbage */
1158 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1159 return reply_msg(c, MSG_BAD_FORMAT);
1161 op_ct[type]++;
1163 j = job_copy(pq_peek(&c->use->delay));
1165 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1167 reply_job(c, j, MSG_FOUND);
1168 break;
1169 case OP_PEEK_BURIED:
1170 /* don't allow trailing garbage */
1171 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1172 return reply_msg(c, MSG_BAD_FORMAT);
1174 op_ct[type]++;
1176 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1178 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1180 reply_job(c, j, MSG_FOUND);
1181 break;
1182 case OP_PEEKJOB:
1183 errno = 0;
1184 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1185 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1186 op_ct[type]++;
1188 /* So, peek is annoying, because some other connection might free the
1189 * job while we are still trying to write it out. So we copy it and
1190 * then free the copy when it's done sending. */
1191 j = job_copy(peek_job(id));
1193 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1195 reply_job(c, j, MSG_FOUND);
1196 break;
1197 case OP_RESERVE_TIMEOUT:
1198 errno = 0;
1199 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1200 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1201 case OP_RESERVE: /* FALLTHROUGH */
1202 /* don't allow trailing garbage */
1203 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1204 return reply_msg(c, MSG_BAD_FORMAT);
1207 op_ct[type]++;
1208 conn_set_worker(c);
1210 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1211 return reply_msg(c, MSG_DEADLINE_SOON);
1214 /* try to get a new job for this guy */
1215 wait_for_job(c, timeout);
1216 process_queue();
1217 break;
1218 case OP_DELETE:
1219 errno = 0;
1220 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1221 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1222 op_ct[type]++;
1224 j = job_find(id);
1225 j = remove_reserved_job(c, j) ? :
1226 remove_ready_job(j) ? :
1227 remove_buried_job(j);
1229 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1231 j->state = JOB_STATE_INVALID;
1232 r = binlog_write_job(j);
1233 job_free(j);
1235 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1237 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1238 break;
1239 case OP_RELEASE:
1240 errno = 0;
1241 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1242 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1244 r = read_pri(&pri, pri_buf, &delay_buf);
1245 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1247 r = read_delay(&delay, delay_buf, NULL);
1248 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1249 op_ct[type]++;
1251 j = remove_reserved_job(c, job_find(id));
1253 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1255 /* We want to update the delay deadline on disk, so reserve space for
1256 * that. */
1257 if (delay) {
1258 z = binlog_reserve_space_update(j);
1259 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1260 j->reserved_binlog_space += z;
1263 j->pri = pri;
1264 j->delay = delay;
1265 j->release_ct++;
1267 r = enqueue_job(j, delay, !!delay);
1268 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1269 if (r == 1) {
1270 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1273 /* out of memory trying to grow the queue, so it gets buried */
1274 bury_job(j, 0);
1275 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1276 break;
1277 case OP_BURY:
1278 errno = 0;
1279 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1280 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1282 r = read_pri(&pri, pri_buf, NULL);
1283 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1284 op_ct[type]++;
1286 j = remove_reserved_job(c, job_find(id));
1288 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1290 j->pri = pri;
1291 r = bury_job(j, 1);
1292 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1293 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1294 break;
1295 case OP_KICK:
1296 errno = 0;
1297 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1298 if (end_buf == c->cmd + CMD_KICK_LEN) {
1299 return reply_msg(c, MSG_BAD_FORMAT);
1301 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1303 op_ct[type]++;
1305 i = kick_jobs(c->use, count);
1307 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1308 case OP_TOUCH:
1309 errno = 0;
1310 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1311 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1313 op_ct[type]++;
1315 j = touch_job(c, job_find(id));
1317 if (j) {
1318 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1319 } else {
1320 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1322 break;
1323 case OP_STATS:
1324 /* don't allow trailing garbage */
1325 if (c->cmd_len != CMD_STATS_LEN + 2) {
1326 return reply_msg(c, MSG_BAD_FORMAT);
1329 op_ct[type]++;
1331 do_stats(c, fmt_stats, NULL);
1332 break;
1333 case OP_JOBSTATS:
1334 errno = 0;
1335 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1336 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1338 op_ct[type]++;
1340 j = peek_job(id);
1341 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1343 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1344 do_stats(c, (fmt_fn) fmt_job_stats, j);
1345 break;
1346 case OP_STATS_TUBE:
1347 name = c->cmd + CMD_STATS_TUBE_LEN;
1348 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1350 op_ct[type]++;
1352 t = tube_find(name);
1353 if (!t) return reply_msg(c, MSG_NOTFOUND);
1355 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1356 t = NULL;
1357 break;
1358 case OP_LIST_TUBES:
1359 /* don't allow trailing garbage */
1360 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1361 return reply_msg(c, MSG_BAD_FORMAT);
1364 op_ct[type]++;
1365 do_list_tubes(c, &tubes);
1366 break;
1367 case OP_LIST_TUBE_USED:
1368 /* don't allow trailing garbage */
1369 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1370 return reply_msg(c, MSG_BAD_FORMAT);
1373 op_ct[type]++;
1374 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1375 break;
1376 case OP_LIST_TUBES_WATCHED:
1377 /* don't allow trailing garbage */
1378 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1379 return reply_msg(c, MSG_BAD_FORMAT);
1382 op_ct[type]++;
1383 do_list_tubes(c, &c->watch);
1384 break;
1385 case OP_USE:
1386 name = c->cmd + CMD_USE_LEN;
1387 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1388 op_ct[type]++;
1390 TUBE_ASSIGN(t, tube_find_or_make(name));
1391 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1393 c->use->using_ct--;
1394 TUBE_ASSIGN(c->use, t);
1395 TUBE_ASSIGN(t, NULL);
1396 c->use->using_ct++;
1398 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1399 break;
1400 case OP_WATCH:
1401 name = c->cmd + CMD_WATCH_LEN;
1402 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1403 op_ct[type]++;
1405 TUBE_ASSIGN(t, tube_find_or_make(name));
1406 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1408 r = 1;
1409 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1410 TUBE_ASSIGN(t, NULL);
1411 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1413 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1414 break;
1415 case OP_IGNORE:
1416 name = c->cmd + CMD_IGNORE_LEN;
1417 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1418 op_ct[type]++;
1420 t = NULL;
1421 for (i = 0; i < c->watch.used; i++) {
1422 t = c->watch.items[i];
1423 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1424 t = NULL;
1427 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1429 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1430 t = NULL;
1432 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1433 break;
1434 case OP_QUIT:
1435 conn_close(c);
1436 break;
1437 default:
1438 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1442 /* There are three reasons this function may be called. We need to check for
1443 * all of them.
1445 * 1. A reserved job has run out of time.
1446 * 2. A waiting client's reserved job has entered the safety margin.
1447 * 3. A waiting client's requested timeout has occurred.
1449 * If any of these happen, we must do the appropriate thing. */
1450 static void
1451 h_conn_timeout(conn c)
1453 int r, should_timeout = 0;
1454 job j;
1456 /* Check if the client was trying to reserve a job. */
1457 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1459 /* Check if any reserved jobs have run out of time. We should do this
1460 * whether or not the client is waiting for a new reservation. */
1461 while ((j = soonest_job(c))) {
1462 if (j->deadline_at >= now_usec()) break;
1464 /* This job is in the middle of being written out. If we return it to
1465 * the ready queue, someone might free it before we finish writing it
1466 * out to the socket. So we'll copy it here and free the copy when it's
1467 * done sending. */
1468 if (j == c->out_job) {
1469 c->out_job = job_copy(c->out_job);
1472 timeout_ct++; /* stats */
1473 j->timeout_ct++;
1474 r = enqueue_job(remove_this_reserved_job(c, j), 0, 0);
1475 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1476 r = conn_update_evq(c, c->evq.ev_events);
1477 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1480 if (should_timeout) {
1481 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1482 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1483 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1484 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1485 c->pending_timeout = -1;
1486 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1490 void
1491 enter_drain_mode(int sig)
1493 drain_mode = 1;
1496 static void
1497 do_cmd(conn c)
1499 dispatch_cmd(c);
1500 fill_extra_data(c);
1503 static void
1504 reset_conn(conn c)
1506 int r;
1508 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1509 if (r == -1) return twarnx("update events failed"), conn_close(c);
1511 /* was this a peek or stats command? */
1512 if (c->out_job && c->out_job->state == JOB_STATE_COPY) job_free(c->out_job);
1513 c->out_job = NULL;
1515 c->reply_sent = 0; /* now that we're done, reset this */
1516 c->state = STATE_WANTCOMMAND;
1519 static void
1520 h_conn_data(conn c)
1522 int r, to_read;
1523 job j;
1524 struct iovec iov[2];
1526 switch (c->state) {
1527 case STATE_WANTCOMMAND:
1528 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1529 if (r == -1) return check_err(c, "read()");
1530 if (r == 0) return conn_close(c); /* the client hung up */
1532 c->cmd_read += r; /* we got some bytes */
1534 c->cmd_len = cmd_len(c); /* find the EOL */
1536 /* yay, complete command line */
1537 if (c->cmd_len) return do_cmd(c);
1539 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1541 /* command line too long? */
1542 if (c->cmd_read == LINE_BUF_SIZE) {
1543 c->cmd_read = 0; /* discard the input so far */
1544 return reply_msg(c, MSG_BAD_FORMAT);
1547 /* otherwise we have an incomplete line, so just keep waiting */
1548 break;
1549 case STATE_BITBUCKET:
1550 /* Invert the meaning of in_job_read while throwing away data -- it
1551 * counts the bytes that remain to be thrown away. */
1552 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1553 r = read(c->fd, bucket, to_read);
1554 if (r == -1) return check_err(c, "read()");
1555 if (r == 0) return conn_close(c); /* the client hung up */
1557 c->in_job_read -= r; /* we got some bytes */
1559 /* (c->in_job_read < 0) can't happen */
1561 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1562 break;
1563 case STATE_WANTDATA:
1564 j = c->in_job;
1566 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1567 if (r == -1) return check_err(c, "read()");
1568 if (r == 0) return conn_close(c); /* the client hung up */
1570 c->in_job_read += r; /* we got some bytes */
1572 /* (j->in_job_read > j->body_size) can't happen */
1574 maybe_enqueue_incoming_job(c);
1575 break;
1576 case STATE_SENDWORD:
1577 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1578 if (r == -1) return check_err(c, "write()");
1579 if (r == 0) return conn_close(c); /* the client hung up */
1581 c->reply_sent += r; /* we got some bytes */
1583 /* (c->reply_sent > c->reply_len) can't happen */
1585 if (c->reply_sent == c->reply_len) return reset_conn(c);
1587 /* otherwise we sent an incomplete reply, so just keep waiting */
1588 break;
1589 case STATE_SENDJOB:
1590 j = c->out_job;
1592 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1593 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1594 iov[1].iov_base = j->body + c->out_job_sent;
1595 iov[1].iov_len = j->body_size - c->out_job_sent;
1597 r = writev(c->fd, iov, 2);
1598 if (r == -1) return check_err(c, "writev()");
1599 if (r == 0) return conn_close(c); /* the client hung up */
1601 /* update the sent values */
1602 c->reply_sent += r;
1603 if (c->reply_sent >= c->reply_len) {
1604 c->out_job_sent += c->reply_sent - c->reply_len;
1605 c->reply_sent = c->reply_len;
1608 /* (c->out_job_sent > j->body_size) can't happen */
1610 /* are we done? */
1611 if (c->out_job_sent == j->body_size) return reset_conn(c);
1613 /* otherwise we sent incomplete data, so just keep waiting */
1614 break;
1615 case STATE_WAIT: /* keep an eye out in case they hang up */
1616 /* but don't hang up just because our buffer is full */
1617 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1619 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1620 if (r == -1) return check_err(c, "read()");
1621 if (r == 0) return conn_close(c); /* the client hung up */
1622 c->cmd_read += r; /* we got some bytes */
1626 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1627 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1629 static void
1630 h_conn(const int fd, const short which, conn c)
1632 if (fd != c->fd) {
1633 twarnx("Argh! event fd doesn't match conn fd.");
1634 close(fd);
1635 return conn_close(c);
1638 switch (which) {
1639 case EV_TIMEOUT:
1640 h_conn_timeout(c);
1641 event_add(&c->evq, NULL); /* seems to be necessary */
1642 break;
1643 case EV_READ:
1644 /* fall through... */
1645 case EV_WRITE:
1646 /* fall through... */
1647 default:
1648 h_conn_data(c);
1651 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1654 static void
1655 h_delay()
1657 int r;
1658 job j;
1659 usec t;
1661 t = now_usec();
1662 while ((j = delay_q_peek())) {
1663 if (j->deadline_at > t) break;
1664 j = delay_q_take();
1665 r = enqueue_job(j, 0, 0);
1666 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1669 set_main_delay_timeout();
1672 void
1673 h_accept(const int fd, const short which, struct event *ev)
1675 conn c;
1676 int cfd, flags, r;
1677 socklen_t addrlen;
1678 struct sockaddr addr;
1680 if (which == EV_TIMEOUT) return h_delay();
1682 addrlen = sizeof addr;
1683 cfd = accept(fd, &addr, &addrlen);
1684 if (cfd == -1) {
1685 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1686 if (errno == EMFILE) brake();
1687 return;
1690 flags = fcntl(cfd, F_GETFL, 0);
1691 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1693 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1694 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1696 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1697 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1699 dprintf("accepted conn, fd=%d\n", cfd);
1700 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1701 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1704 void
1705 prot_init()
1707 started_at = now_usec();
1708 memset(op_ct, 0, sizeof(op_ct));
1710 ms_init(&tubes, NULL, NULL);
1712 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1713 if (!default_tube) twarnx("Out of memory during startup!");
1716 void
1717 prot_replay_binlog(job binlog_jobs)
1719 job j, nj;
1720 usec delay;
1721 int r;
1723 for (j = binlog_jobs->next ; j != binlog_jobs ; j = nj) {
1724 nj = j->next;
1725 job_remove(j);
1726 binlog_reserve_space_update(j); /* reserve space for a delete */
1727 delay = 0;
1728 switch (j->state) {
1729 case JOB_STATE_BURIED:
1730 bury_job(j, 0);
1731 break;
1732 case JOB_STATE_DELAYED:
1733 if (started_at < j->deadline_at) {
1734 delay = j->deadline_at - started_at;
1736 /* fall through */
1737 default:
1738 r = enqueue_job(j, delay, 0);
1739 if (r < 1) twarnx("error processing binlog job %llu", j->id);