Placate all the compilers. Ugly.
[beanstalkd.git] / prot.c
blobb8a5fdf1f9879a405907fbfa52f3278f6d64cb00
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "config.h"
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <sys/resource.h>
27 #include <sys/uio.h>
28 #include <stdarg.h>
29 #include <ctype.h>
30 #include <inttypes.h>
32 #include "stat.h"
33 #include "prot.h"
34 #include "pq.h"
35 #include "ms.h"
36 #include "job.h"
37 #include "tube.h"
38 #include "conn.h"
39 #include "util.h"
40 #include "net.h"
41 #include "binlog.h"
43 /* job body cannot be greater than this many bytes long */
44 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
46 #define NAME_CHARS \
47 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
48 "abcdefghijklmnopqrstuvwxyz" \
49 "0123456789-+/;.$()"
51 #define CMD_PUT "put "
52 #define CMD_PEEKJOB "peek "
53 #define CMD_PEEK_READY "peek-ready"
54 #define CMD_PEEK_DELAYED "peek-delayed"
55 #define CMD_PEEK_BURIED "peek-buried"
56 #define CMD_RESERVE "reserve"
57 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
58 #define CMD_DELETE "delete "
59 #define CMD_RELEASE "release "
60 #define CMD_BURY "bury "
61 #define CMD_KICK "kick "
62 #define CMD_TOUCH "touch "
63 #define CMD_STATS "stats"
64 #define CMD_JOBSTATS "stats-job "
65 #define CMD_USE "use "
66 #define CMD_WATCH "watch "
67 #define CMD_IGNORE "ignore "
68 #define CMD_LIST_TUBES "list-tubes"
69 #define CMD_LIST_TUBE_USED "list-tube-used"
70 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
71 #define CMD_STATS_TUBE "stats-tube "
72 #define CMD_QUIT "quit"
73 #define CMD_PAUSE_TUBE "pause-tube"
75 #define CONSTSTRLEN(m) (sizeof(m) - 1)
77 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
78 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
79 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
80 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
81 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
82 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
83 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
84 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
85 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
86 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
87 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
88 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
89 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
90 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
91 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
92 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
93 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
94 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
95 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
96 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
97 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
99 #define MSG_FOUND "FOUND"
100 #define MSG_NOTFOUND "NOT_FOUND\r\n"
101 #define MSG_RESERVED "RESERVED"
102 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
103 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
104 #define MSG_DELETED "DELETED\r\n"
105 #define MSG_RELEASED "RELEASED\r\n"
106 #define MSG_BURIED "BURIED\r\n"
107 #define MSG_TOUCHED "TOUCHED\r\n"
108 #define MSG_BURIED_FMT "BURIED %llu\r\n"
109 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
110 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
112 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
113 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
114 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
115 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
116 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
117 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
119 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
120 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
121 #define MSG_DRAINING "DRAINING\r\n"
122 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
123 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
124 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
125 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
127 #define STATE_WANTCOMMAND 0
128 #define STATE_WANTDATA 1
129 #define STATE_SENDJOB 2
130 #define STATE_SENDWORD 3
131 #define STATE_WAIT 4
132 #define STATE_BITBUCKET 5
134 #define OP_UNKNOWN 0
135 #define OP_PUT 1
136 #define OP_PEEKJOB 2
137 #define OP_RESERVE 3
138 #define OP_DELETE 4
139 #define OP_RELEASE 5
140 #define OP_BURY 6
141 #define OP_KICK 7
142 #define OP_STATS 8
143 #define OP_JOBSTATS 9
144 #define OP_PEEK_BURIED 10
145 #define OP_USE 11
146 #define OP_WATCH 12
147 #define OP_IGNORE 13
148 #define OP_LIST_TUBES 14
149 #define OP_LIST_TUBE_USED 15
150 #define OP_LIST_TUBES_WATCHED 16
151 #define OP_STATS_TUBE 17
152 #define OP_PEEK_READY 18
153 #define OP_PEEK_DELAYED 19
154 #define OP_RESERVE_TIMEOUT 20
155 #define OP_TOUCH 21
156 #define OP_QUIT 22
157 #define OP_PAUSE_TUBE 23
158 #define TOTAL_OPS 24
160 #define STATS_FMT "---\n" \
161 "current-jobs-urgent: %u\n" \
162 "current-jobs-ready: %u\n" \
163 "current-jobs-reserved: %u\n" \
164 "current-jobs-delayed: %u\n" \
165 "current-jobs-buried: %u\n" \
166 "cmd-put: %" PRIu64 "\n" \
167 "cmd-peek: %" PRIu64 "\n" \
168 "cmd-peek-ready: %" PRIu64 "\n" \
169 "cmd-peek-delayed: %" PRIu64 "\n" \
170 "cmd-peek-buried: %" PRIu64 "\n" \
171 "cmd-reserve: %" PRIu64 "\n" \
172 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
173 "cmd-delete: %" PRIu64 "\n" \
174 "cmd-release: %" PRIu64 "\n" \
175 "cmd-use: %" PRIu64 "\n" \
176 "cmd-watch: %" PRIu64 "\n" \
177 "cmd-ignore: %" PRIu64 "\n" \
178 "cmd-bury: %" PRIu64 "\n" \
179 "cmd-kick: %" PRIu64 "\n" \
180 "cmd-touch: %" PRIu64 "\n" \
181 "cmd-stats: %" PRIu64 "\n" \
182 "cmd-stats-job: %" PRIu64 "\n" \
183 "cmd-stats-tube: %" PRIu64 "\n" \
184 "cmd-list-tubes: %" PRIu64 "\n" \
185 "cmd-list-tube-used: %" PRIu64 "\n" \
186 "cmd-list-tubes-watched: %" PRIu64 "\n" \
187 "cmd-pause-tube: %" PRIu64 "\n" \
188 "job-timeouts: %" PRIu64 "\n" \
189 "total-jobs: %" PRIu64 "\n" \
190 "max-job-size: %zu\n" \
191 "current-tubes: %zu\n" \
192 "current-connections: %u\n" \
193 "current-producers: %u\n" \
194 "current-workers: %u\n" \
195 "current-waiting: %u\n" \
196 "total-connections: %u\n" \
197 "pid: %ld\n" \
198 "version: %s\n" \
199 "rusage-utime: %d.%06d\n" \
200 "rusage-stime: %d.%06d\n" \
201 "uptime: %u\n" \
202 "binlog-oldest-index: %s\n" \
203 "binlog-current-index: %s\n" \
204 "binlog-max-size: %zu\n" \
205 "\r\n"
207 #define STATS_TUBE_FMT "---\n" \
208 "name: %s\n" \
209 "current-jobs-urgent: %u\n" \
210 "current-jobs-ready: %u\n" \
211 "current-jobs-reserved: %u\n" \
212 "current-jobs-delayed: %u\n" \
213 "current-jobs-buried: %u\n" \
214 "total-jobs: %" PRIu64 "\n" \
215 "current-using: %u\n" \
216 "current-watching: %u\n" \
217 "current-waiting: %u\n" \
218 "cmd-pause-tube: %u\n" \
219 "pause: %" PRIu64 "\n" \
220 "pause-time-left: %" PRIu64 "\n" \
221 "\r\n"
223 /* this number is pretty arbitrary */
224 #define BUCKET_BUF_SIZE 1024
226 static char bucket[BUCKET_BUF_SIZE];
228 static unsigned int ready_ct = 0;
229 static struct stats global_stat = {0, 0, 0, 0, 0};
231 static tube default_tube;
233 static int drain_mode = 0;
234 static usec started_at;
235 static uint64_t op_ct[TOTAL_OPS], timeout_ct = 0;
238 /* Doubly-linked list of connections with at least one reserved job. */
239 static struct conn running = { &running, &running, 0 };
241 #ifdef DEBUG
242 static const char * op_names[] = {
243 "<unknown>",
244 CMD_PUT,
245 CMD_PEEKJOB,
246 CMD_RESERVE,
247 CMD_DELETE,
248 CMD_RELEASE,
249 CMD_BURY,
250 CMD_KICK,
251 CMD_STATS,
252 CMD_JOBSTATS,
253 CMD_PEEK_BURIED,
254 CMD_USE,
255 CMD_WATCH,
256 CMD_IGNORE,
257 CMD_LIST_TUBES,
258 CMD_LIST_TUBE_USED,
259 CMD_LIST_TUBES_WATCHED,
260 CMD_STATS_TUBE,
261 CMD_PEEK_READY,
262 CMD_PEEK_DELAYED,
263 CMD_RESERVE_TIMEOUT,
264 CMD_TOUCH,
265 CMD_QUIT,
266 CMD_PAUSE_TUBE
268 #endif
270 static job remove_buried_job(job j);
272 static int
273 buried_job_p(tube t)
275 return job_list_any_p(&t->buried);
278 static void
279 reply(conn c, const char *line, int len, int state)
281 int r;
283 if (!c) return;
285 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
286 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
288 c->reply = line;
289 c->reply_len = len;
290 c->reply_sent = 0;
291 c->state = state;
292 dprintf("sending reply: %.*s", len, line);
295 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
297 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
298 reply_msg((c),(e)))
300 static void
301 reply_line(conn c, int state, const char *fmt, ...)
303 int r;
304 va_list ap;
306 va_start(ap, fmt);
307 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
308 va_end(ap);
310 /* Make sure the buffer was big enough. If not, we have a bug. */
311 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
313 return reply(c, c->reply_buf, r, state);
316 static void
317 reply_job(conn c, job j, const char *word)
319 /* tell this connection which job to send */
320 c->out_job = j;
321 c->out_job_sent = 0;
323 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
324 word, j->id, j->body_size - 2);
327 conn
328 remove_waiting_conn(conn c)
330 tube t;
331 size_t i;
333 if (!conn_waiting(c)) return NULL;
335 c->type &= ~CONN_TYPE_WAITING;
336 global_stat.waiting_ct--;
337 for (i = 0; i < c->watch.used; i++) {
338 t = c->watch.items[i];
339 t->stat.waiting_ct--;
340 ms_remove(&t->waiting, c);
342 return c;
345 static void
346 reserve_job(conn c, job j)
348 j->deadline_at = now_usec() + j->ttr;
349 global_stat.reserved_ct++; /* stats */
350 j->tube->stat.reserved_ct++;
351 j->reserve_ct++;
352 conn_insert(&running, c);
353 j->state = JOB_STATE_RESERVED;
354 job_insert(&c->reserved_jobs, j);
355 j->reserver = c;
356 if (c->soonest_job && j->deadline_at < c->soonest_job->deadline_at) {
357 c->soonest_job = j;
359 return reply_job(c, j, MSG_RESERVED);
362 static job
363 next_eligible_job(usec now)
365 tube t;
366 size_t i;
367 job j = NULL, candidate;
369 dprintf("tubes.used = %zu\n", tubes.used);
370 for (i = 0; i < tubes.used; i++) {
371 t = tubes.items[i];
372 dprintf("for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64 "\n",
373 t->name, t->waiting.used, t->ready.used, t->pause);
374 if (t->pause) {
375 if (t->deadline_at > now) continue;
376 t->pause = 0;
378 if (t->waiting.used && t->ready.used) {
379 candidate = pq_peek(&t->ready);
380 if (!j || job_pri_cmp(candidate, j) < 0) j = candidate;
382 dprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
385 return j;
388 static void
389 process_queue()
391 job j;
392 usec now = now_usec();
394 dprintf("processing queue\n");
395 while ((j = next_eligible_job(now))) {
396 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
397 j = pq_take(&j->tube->ready);
398 ready_ct--;
399 if (j->pri < URGENT_THRESHOLD) {
400 global_stat.urgent_ct--;
401 j->tube->stat.urgent_ct--;
403 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
407 static job
408 delay_q_peek()
410 int i;
411 tube t;
412 job j = NULL, nj;
414 for (i = 0; i < tubes.used; i++) {
415 t = tubes.items[i];
416 nj = pq_peek(&t->delay);
417 if (!nj) continue;
418 if (!j || nj->deadline_at < j->deadline_at) j = nj;
421 return j;
424 static tube
425 pause_tube_peek()
427 int i;
428 tube t, nt = NULL;
430 for (i = 0; i < tubes.used; i++) {
431 t = tubes.items[i];
432 if (t->pause) {
433 if (!nt || t->deadline_at < nt->deadline_at) nt = t;
437 return nt;
440 static void
441 set_main_delay_timeout()
443 job j = delay_q_peek();
444 tube t = pause_tube_peek();
445 usec deadline_at = t ? t->deadline_at : 0;
447 if (j && (!deadline_at || j->deadline_at < deadline_at)) deadline_at = j->deadline_at;
449 dprintf("deadline_at=%" PRIu64 "\n", deadline_at);
450 set_main_timeout(deadline_at);
453 static int
454 enqueue_job(job j, usec delay, char update_store)
456 int r;
458 j->reserver = NULL;
459 if (delay) {
460 j->deadline_at = now_usec() + delay;
461 r = pq_give(&j->tube->delay, j);
462 if (!r) return 0;
463 j->state = JOB_STATE_DELAYED;
464 set_main_delay_timeout();
465 } else {
466 r = pq_give(&j->tube->ready, j);
467 if (!r) return 0;
468 j->state = JOB_STATE_READY;
469 ready_ct++;
470 if (j->pri < URGENT_THRESHOLD) {
471 global_stat.urgent_ct++;
472 j->tube->stat.urgent_ct++;
476 if (update_store) {
477 r = binlog_write_job(j);
478 if (!r) return -1;
481 process_queue();
482 return 1;
485 static int
486 bury_job(job j, char update_store)
488 size_t z;
490 if (update_store) {
491 z = binlog_reserve_space_update(j);
492 if (!z) return 0;
493 j->reserved_binlog_space += z;
496 job_insert(&j->tube->buried, j);
497 global_stat.buried_ct++;
498 j->tube->stat.buried_ct++;
499 j->state = JOB_STATE_BURIED;
500 j->reserver = NULL;
501 j->bury_ct++;
503 if (update_store) return binlog_write_job(j);
505 return 1;
508 void
509 enqueue_reserved_jobs(conn c)
511 int r;
512 job j;
514 while (job_list_any_p(&c->reserved_jobs)) {
515 j = job_remove(c->reserved_jobs.next);
516 r = enqueue_job(j, 0, 0);
517 if (r < 1) bury_job(j, 0);
518 global_stat.reserved_ct--;
519 j->tube->stat.reserved_ct--;
520 c->soonest_job = NULL;
521 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
525 static job
526 delay_q_take()
528 job j = delay_q_peek();
529 return j ? pq_take(&j->tube->delay) : NULL;
532 static int
533 kick_buried_job(tube t)
535 int r;
536 job j;
537 size_t z;
539 if (!buried_job_p(t)) return 0;
540 j = remove_buried_job(t->buried.next);
542 z = binlog_reserve_space_update(j);
543 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
544 j->reserved_binlog_space += z;
546 j->kick_ct++;
547 r = enqueue_job(j, 0, 1);
548 if (r == 1) return 1;
550 /* ready queue is full, so bury it */
551 bury_job(j, 0);
552 return 0;
555 static unsigned int
556 get_delayed_job_ct()
558 tube t;
559 size_t i;
560 unsigned int count = 0;
562 for (i = 0; i < tubes.used; i++) {
563 t = tubes.items[i];
564 count += t->delay.used;
566 return count;
569 static int
570 kick_delayed_job(tube t)
572 int r;
573 job j;
574 size_t z;
576 j = pq_take(&t->delay);
577 if (!j) return 0;
579 z = binlog_reserve_space_update(j);
580 if (!z) return pq_give(&t->delay, j), 0; /* put it back */
581 j->reserved_binlog_space += z;
583 j->kick_ct++;
584 r = enqueue_job(j, 0, 1);
585 if (r == 1) return 1;
587 /* ready queue is full, so delay it again */
588 r = enqueue_job(j, j->delay, 0);
589 if (r == 1) return 0;
591 /* last resort */
592 bury_job(j, 0);
593 return 0;
596 /* return the number of jobs successfully kicked */
597 static unsigned int
598 kick_buried_jobs(tube t, unsigned int n)
600 unsigned int i;
601 for (i = 0; (i < n) && kick_buried_job(t); ++i);
602 return i;
605 /* return the number of jobs successfully kicked */
606 static unsigned int
607 kick_delayed_jobs(tube t, unsigned int n)
609 unsigned int i;
610 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
611 return i;
614 static unsigned int
615 kick_jobs(tube t, unsigned int n)
617 if (buried_job_p(t)) return kick_buried_jobs(t, n);
618 return kick_delayed_jobs(t, n);
621 static job
622 remove_buried_job(job j)
624 if (!j || j->state != JOB_STATE_BURIED) return NULL;
625 j = job_remove(j);
626 if (j) {
627 global_stat.buried_ct--;
628 j->tube->stat.buried_ct--;
630 return j;
633 static job
634 remove_ready_job(job j)
636 if (!j || j->state != JOB_STATE_READY) return NULL;
637 j = pq_remove(&j->tube->ready, j);
638 if (j) {
639 ready_ct--;
640 if (j->pri < URGENT_THRESHOLD) {
641 global_stat.urgent_ct--;
642 j->tube->stat.urgent_ct--;
645 return j;
648 static void
649 enqueue_waiting_conn(conn c)
651 tube t;
652 size_t i;
654 global_stat.waiting_ct++;
655 c->type |= CONN_TYPE_WAITING;
656 for (i = 0; i < c->watch.used; i++) {
657 t = c->watch.items[i];
658 t->stat.waiting_ct++;
659 ms_append(&t->waiting, c);
663 static job
664 find_reserved_job_in_conn(conn c, job j)
666 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
669 static job
670 touch_job(conn c, job j)
672 j = find_reserved_job_in_conn(c, j);
673 if (j) {
674 j->deadline_at = now_usec() + j->ttr;
675 c->soonest_job = NULL;
677 return j;
680 static job
681 peek_job(uint64_t id)
683 return job_find(id);
686 static void
687 check_err(conn c, const char *s)
689 if (errno == EAGAIN) return;
690 if (errno == EINTR) return;
691 if (errno == EWOULDBLOCK) return;
693 twarn("%s", s);
694 conn_close(c);
695 return;
698 /* Scan the given string for the sequence "\r\n" and return the line length.
699 * Always returns at least 2 if a match is found. Returns 0 if no match. */
700 static int
701 scan_line_end(const char *s, int size)
703 char *match;
705 match = memchr(s, '\r', size - 1);
706 if (!match) return 0;
708 /* this is safe because we only scan size - 1 chars above */
709 if (match[1] == '\n') return match - s + 2;
711 return 0;
714 static int
715 cmd_len(conn c)
717 return scan_line_end(c->cmd, c->cmd_read);
720 /* parse the command line */
721 static int
722 which_cmd(conn c)
724 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
725 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
726 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
727 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
728 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
729 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
730 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
731 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
732 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
733 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
734 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
735 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
736 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
737 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
738 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
739 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
740 TEST_CMD(c->cmd, CMD_USE, OP_USE);
741 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
742 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
743 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
744 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
745 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
746 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
747 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
748 return OP_UNKNOWN;
751 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
752 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
753 * This function is idempotent(). */
754 static void
755 fill_extra_data(conn c)
757 int extra_bytes, job_data_bytes = 0, cmd_bytes;
759 if (!c->fd) return; /* the connection was closed */
760 if (!c->cmd_len) return; /* we don't have a complete command */
762 /* how many extra bytes did we read? */
763 extra_bytes = c->cmd_read - c->cmd_len;
765 /* how many bytes should we put into the job body? */
766 if (c->in_job) {
767 job_data_bytes = min(extra_bytes, c->in_job->body_size);
768 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
769 c->in_job_read = job_data_bytes;
770 } else if (c->in_job_read) {
771 /* we are in bit-bucket mode, throwing away data */
772 job_data_bytes = min(extra_bytes, c->in_job_read);
773 c->in_job_read -= job_data_bytes;
776 /* how many bytes are left to go into the future cmd? */
777 cmd_bytes = extra_bytes - job_data_bytes;
778 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
779 c->cmd_read = cmd_bytes;
780 c->cmd_len = 0; /* we no longer know the length of the new command */
783 static void
784 enqueue_incoming_job(conn c)
786 int r;
787 job j = c->in_job;
789 c->in_job = NULL; /* the connection no longer owns this job */
790 c->in_job_read = 0;
792 /* check if the trailer is present and correct */
793 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
794 job_free(j);
795 return reply_msg(c, MSG_EXPECTED_CRLF);
798 if (drain_mode) {
799 job_free(j);
800 return reply_serr(c, MSG_DRAINING);
803 if (j->reserved_binlog_space) return reply_serr(c, MSG_INTERNAL_ERROR);
804 j->reserved_binlog_space = binlog_reserve_space_put(j);
805 if (!j->reserved_binlog_space) return reply_serr(c, MSG_OUT_OF_MEMORY);
807 /* we have a complete job, so let's stick it in the pqueue */
808 r = enqueue_job(j, j->delay, 1);
809 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
811 op_ct[OP_PUT]++; /* stats */
812 global_stat.total_jobs_ct++;
813 j->tube->stat.total_jobs_ct++;
815 if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
817 /* out of memory trying to grow the queue, so it gets buried */
818 bury_job(j, 0);
819 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
822 static unsigned int
823 uptime()
825 return (now_usec() - started_at) / 1000000;
828 static int
829 fmt_stats(char *buf, size_t size, void *x)
831 struct rusage ru = {{0, 0}, {0, 0}};
832 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
833 return snprintf(buf, size, STATS_FMT,
834 global_stat.urgent_ct,
835 ready_ct,
836 global_stat.reserved_ct,
837 get_delayed_job_ct(),
838 global_stat.buried_ct,
839 op_ct[OP_PUT],
840 op_ct[OP_PEEKJOB],
841 op_ct[OP_PEEK_READY],
842 op_ct[OP_PEEK_DELAYED],
843 op_ct[OP_PEEK_BURIED],
844 op_ct[OP_RESERVE],
845 op_ct[OP_RESERVE_TIMEOUT],
846 op_ct[OP_DELETE],
847 op_ct[OP_RELEASE],
848 op_ct[OP_USE],
849 op_ct[OP_WATCH],
850 op_ct[OP_IGNORE],
851 op_ct[OP_BURY],
852 op_ct[OP_KICK],
853 op_ct[OP_TOUCH],
854 op_ct[OP_STATS],
855 op_ct[OP_JOBSTATS],
856 op_ct[OP_STATS_TUBE],
857 op_ct[OP_LIST_TUBES],
858 op_ct[OP_LIST_TUBE_USED],
859 op_ct[OP_LIST_TUBES_WATCHED],
860 op_ct[OP_PAUSE_TUBE],
861 timeout_ct,
862 global_stat.total_jobs_ct,
863 job_data_size_limit,
864 tubes.used,
865 count_cur_conns(),
866 count_cur_producers(),
867 count_cur_workers(),
868 global_stat.waiting_ct,
869 count_tot_conns(),
870 (long) getpid(),
871 VERSION,
872 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
873 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
874 uptime(),
875 binlog_oldest_index(),
876 binlog_current_index(),
877 binlog_size_limit);
881 /* Read a priority value from the given buffer and place it in pri.
882 * Update end to point to the address after the last character consumed.
883 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
884 * conversion and return the status code but not update any values. This is an
885 * easy way to check for errors.
886 * If end is NULL, read_pri will also check that the entire input string was
887 * consumed and return an error code otherwise.
888 * Return 0 on success, or nonzero on failure.
889 * If a failure occurs, pri and end are not modified. */
890 static int
891 read_pri(unsigned int *pri, const char *buf, char **end)
893 char *tend;
894 unsigned int tpri;
896 errno = 0;
897 while (buf[0] == ' ') buf++;
898 if (!isdigit(buf[0])) return -1;
899 tpri = strtoul(buf, &tend, 10);
900 if (tend == buf) return -1;
901 if (errno && errno != ERANGE) return -1;
902 if (!end && tend[0] != '\0') return -1;
904 if (pri) *pri = tpri;
905 if (end) *end = tend;
906 return 0;
909 /* Read a delay value from the given buffer and place it in delay.
910 * The interface and behavior are analogous to read_pri(). */
911 static int
912 read_delay(usec *delay, const char *buf, char **end)
914 int r;
915 unsigned int delay_sec;
917 r = read_pri(&delay_sec, buf, end);
918 if (r) return r;
919 *delay = ((usec) delay_sec) * 1000000;
920 return 0;
923 /* Read a timeout value from the given buffer and place it in ttr.
924 * The interface and behavior are the same as in read_delay(). */
925 static int
926 read_ttr(usec *ttr, const char *buf, char **end)
928 return read_delay(ttr, buf, end);
931 /* Read a tube name from the given buffer moving the buffer to the name start */
932 static int
933 read_tube_name(char **tubename, char *buf, char **end)
935 size_t len;
937 while (buf[0] == ' ') buf++;
938 len = strspn(buf, NAME_CHARS);
939 if (len == 0) return -1;
940 if (tubename) *tubename = buf;
941 if (end) *end = buf + len;
942 return 0;
945 static void
946 wait_for_job(conn c, int timeout)
948 int r;
950 c->state = STATE_WAIT;
951 enqueue_waiting_conn(c);
953 /* Set the pending timeout to the requested timeout amount */
954 c->pending_timeout = timeout;
956 /* this conn is waiting, but we want to know if they hang up */
957 r = conn_update_evq(c, EV_READ | EV_PERSIST);
958 if (r == -1) return twarnx("update events failed"), conn_close(c);
961 typedef int(*fmt_fn)(char *, size_t, void *);
963 static void
964 do_stats(conn c, fmt_fn fmt, void *data)
966 int r, stats_len;
968 /* first, measure how big a buffer we will need */
969 stats_len = fmt(NULL, 0, data) + 16;
971 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
972 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
974 /* now actually format the stats data */
975 r = fmt(c->out_job->body, stats_len, data);
976 /* and set the actual body size */
977 c->out_job->body_size = r;
978 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
980 c->out_job_sent = 0;
981 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
984 static void
985 do_list_tubes(conn c, ms l)
987 char *buf;
988 tube t;
989 size_t i, resp_z;
991 /* first, measure how big a buffer we will need */
992 resp_z = 6; /* initial "---\n" and final "\r\n" */
993 for (i = 0; i < l->used; i++) {
994 t = l->items[i];
995 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
998 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
999 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
1001 /* now actually format the response */
1002 buf = c->out_job->body;
1003 buf += snprintf(buf, 5, "---\n");
1004 for (i = 0; i < l->used; i++) {
1005 t = l->items[i];
1006 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1008 buf[0] = '\r';
1009 buf[1] = '\n';
1011 c->out_job_sent = 0;
1012 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
1015 static int
1016 fmt_job_stats(char *buf, size_t size, job j)
1018 usec t;
1019 uint64_t time_left;
1021 t = now_usec();
1022 if (j->state == JOB_STATE_RESERVED || j->state == JOB_STATE_DELAYED) {
1023 time_left = (j->deadline_at - t) / 1000000;
1024 } else {
1025 time_left = 0;
1027 return snprintf(buf, size,
1028 "id: %" PRIu64 "\n"
1029 "tube: %s\n"
1030 "state: %s\n"
1031 "pri: %u\n"
1032 "age: %" PRIu64 "\n"
1033 "delay: %" PRIu64 "\n"
1034 "ttr: %" PRIu64 "\n"
1035 "time-left: %" PRIu64 "\n"
1036 "reserves: %u\n"
1037 "timeouts: %u\n"
1038 "releases: %u\n"
1039 "buries: %u\n"
1040 "kicks: %u\n"
1041 "\r\n",
1043 j->id,
1044 j->tube->name,
1045 job_state(j),
1046 j->pri,
1047 (t - j->created_at) / 1000000,
1048 j->delay / 1000000,
1049 j->ttr / 1000000,
1050 time_left,
1051 j->reserve_ct,
1052 j->timeout_ct,
1053 j->release_ct,
1054 j->bury_ct,
1055 j->kick_ct);
1058 static int
1059 fmt_stats_tube(char *buf, size_t size, tube t)
1061 uint64_t time_left;
1063 if (t->pause > 0) {
1064 time_left = (t->deadline_at - now_usec()) / 1000000;
1065 } else {
1066 time_left = 0;
1068 return snprintf(buf, size, STATS_TUBE_FMT,
1069 t->name,
1070 t->stat.urgent_ct,
1071 t->ready.used,
1072 t->stat.reserved_ct,
1073 t->delay.used,
1074 t->stat.buried_ct,
1075 t->stat.total_jobs_ct,
1076 t->using_ct,
1077 t->watching_ct,
1078 t->stat.waiting_ct,
1079 t->stat.pause_ct,
1080 t->pause / 1000000,
1081 time_left);
1084 static void
1085 maybe_enqueue_incoming_job(conn c)
1087 job j = c->in_job;
1089 /* do we have a complete job? */
1090 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
1092 /* otherwise we have incomplete data, so just keep waiting */
1093 c->state = STATE_WANTDATA;
1096 /* j can be NULL */
1097 static job
1098 remove_this_reserved_job(conn c, job j)
1100 j = job_remove(j);
1101 if (j) {
1102 global_stat.reserved_ct--;
1103 j->tube->stat.reserved_ct--;
1104 j->reserver = NULL;
1106 c->soonest_job = NULL;
1107 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
1108 return j;
1111 static job
1112 remove_reserved_job(conn c, job j)
1114 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
1117 static int
1118 name_is_ok(const char *name, size_t max)
1120 size_t len = strlen(name);
1121 return len > 0 && len <= max &&
1122 strspn(name, NAME_CHARS) == len && name[0] != '-';
1125 void
1126 prot_remove_tube(tube t)
1128 ms_remove(&tubes, t);
1131 static void
1132 dispatch_cmd(conn c)
1134 int r, i, timeout = -1;
1135 size_t z;
1136 unsigned int count;
1137 job j;
1138 unsigned char type;
1139 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1140 unsigned int pri, body_size;
1141 usec delay, ttr;
1142 uint64_t id;
1143 tube t = NULL;
1145 /* NUL-terminate this string so we can use strtol and friends */
1146 c->cmd[c->cmd_len - 2] = '\0';
1148 /* check for possible maliciousness */
1149 if (strlen(c->cmd) != c->cmd_len - 2) {
1150 return reply_msg(c, MSG_BAD_FORMAT);
1153 type = which_cmd(c);
1154 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1156 switch (type) {
1157 case OP_PUT:
1158 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1159 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1161 r = read_delay(&delay, delay_buf, &ttr_buf);
1162 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1164 r = read_ttr(&ttr, ttr_buf, &size_buf);
1165 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1167 errno = 0;
1168 body_size = strtoul(size_buf, &end_buf, 10);
1169 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1171 if (body_size > job_data_size_limit) {
1172 return reply_msg(c, MSG_JOB_TOO_BIG);
1175 /* don't allow trailing garbage */
1176 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1178 conn_set_producer(c);
1180 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1182 /* OOM? */
1183 if (!c->in_job) {
1184 /* throw away the job body and respond with OUT_OF_MEMORY */
1186 /* Invert the meaning of in_job_read while throwing away data -- it
1187 * counts the bytes that remain to be thrown away. */
1188 c->in_job_read = body_size + 2;
1189 fill_extra_data(c);
1191 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1193 c->state = STATE_BITBUCKET;
1194 return;
1197 fill_extra_data(c);
1199 /* it's possible we already have a complete job */
1200 maybe_enqueue_incoming_job(c);
1202 break;
1203 case OP_PEEK_READY:
1204 /* don't allow trailing garbage */
1205 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1206 return reply_msg(c, MSG_BAD_FORMAT);
1208 op_ct[type]++;
1210 j = job_copy(pq_peek(&c->use->ready));
1212 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1214 reply_job(c, j, MSG_FOUND);
1215 break;
1216 case OP_PEEK_DELAYED:
1217 /* don't allow trailing garbage */
1218 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1219 return reply_msg(c, MSG_BAD_FORMAT);
1221 op_ct[type]++;
1223 j = job_copy(pq_peek(&c->use->delay));
1225 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1227 reply_job(c, j, MSG_FOUND);
1228 break;
1229 case OP_PEEK_BURIED:
1230 /* don't allow trailing garbage */
1231 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1232 return reply_msg(c, MSG_BAD_FORMAT);
1234 op_ct[type]++;
1236 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1238 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1240 reply_job(c, j, MSG_FOUND);
1241 break;
1242 case OP_PEEKJOB:
1243 errno = 0;
1244 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1245 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1246 op_ct[type]++;
1248 /* So, peek is annoying, because some other connection might free the
1249 * job while we are still trying to write it out. So we copy it and
1250 * then free the copy when it's done sending. */
1251 j = job_copy(peek_job(id));
1253 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1255 reply_job(c, j, MSG_FOUND);
1256 break;
1257 case OP_RESERVE_TIMEOUT:
1258 errno = 0;
1259 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1260 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1261 case OP_RESERVE: /* FALLTHROUGH */
1262 /* don't allow trailing garbage */
1263 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1264 return reply_msg(c, MSG_BAD_FORMAT);
1267 op_ct[type]++;
1268 conn_set_worker(c);
1270 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1271 return reply_msg(c, MSG_DEADLINE_SOON);
1274 /* try to get a new job for this guy */
1275 wait_for_job(c, timeout);
1276 process_queue();
1277 break;
1278 case OP_DELETE:
1279 errno = 0;
1280 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1281 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1282 op_ct[type]++;
1284 j = job_find(id);
1285 j = remove_reserved_job(c, j) ? :
1286 remove_ready_job(j) ? :
1287 remove_buried_job(j);
1289 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1291 j->state = JOB_STATE_INVALID;
1292 r = binlog_write_job(j);
1293 job_free(j);
1295 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1297 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1298 break;
1299 case OP_RELEASE:
1300 errno = 0;
1301 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1302 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1304 r = read_pri(&pri, pri_buf, &delay_buf);
1305 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1307 r = read_delay(&delay, delay_buf, NULL);
1308 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1309 op_ct[type]++;
1311 j = remove_reserved_job(c, job_find(id));
1313 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1315 /* We want to update the delay deadline on disk, so reserve space for
1316 * that. */
1317 if (delay) {
1318 z = binlog_reserve_space_update(j);
1319 if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
1320 j->reserved_binlog_space += z;
1323 j->pri = pri;
1324 j->delay = delay;
1325 j->release_ct++;
1327 r = enqueue_job(j, delay, !!delay);
1328 if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
1329 if (r == 1) {
1330 return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1333 /* out of memory trying to grow the queue, so it gets buried */
1334 bury_job(j, 0);
1335 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1336 break;
1337 case OP_BURY:
1338 errno = 0;
1339 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1340 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1342 r = read_pri(&pri, pri_buf, NULL);
1343 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1344 op_ct[type]++;
1346 j = remove_reserved_job(c, job_find(id));
1348 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1350 j->pri = pri;
1351 r = bury_job(j, 1);
1352 if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
1353 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1354 break;
1355 case OP_KICK:
1356 errno = 0;
1357 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1358 if (end_buf == c->cmd + CMD_KICK_LEN) {
1359 return reply_msg(c, MSG_BAD_FORMAT);
1361 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1363 op_ct[type]++;
1365 i = kick_jobs(c->use, count);
1367 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1368 case OP_TOUCH:
1369 errno = 0;
1370 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1371 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1373 op_ct[type]++;
1375 j = touch_job(c, job_find(id));
1377 if (j) {
1378 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1379 } else {
1380 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1382 break;
1383 case OP_STATS:
1384 /* don't allow trailing garbage */
1385 if (c->cmd_len != CMD_STATS_LEN + 2) {
1386 return reply_msg(c, MSG_BAD_FORMAT);
1389 op_ct[type]++;
1391 do_stats(c, fmt_stats, NULL);
1392 break;
1393 case OP_JOBSTATS:
1394 errno = 0;
1395 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1396 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1398 op_ct[type]++;
1400 j = peek_job(id);
1401 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1403 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1404 do_stats(c, (fmt_fn) fmt_job_stats, j);
1405 break;
1406 case OP_STATS_TUBE:
1407 name = c->cmd + CMD_STATS_TUBE_LEN;
1408 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1410 op_ct[type]++;
1412 t = tube_find(name);
1413 if (!t) return reply_msg(c, MSG_NOTFOUND);
1415 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1416 t = NULL;
1417 break;
1418 case OP_LIST_TUBES:
1419 /* don't allow trailing garbage */
1420 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1421 return reply_msg(c, MSG_BAD_FORMAT);
1424 op_ct[type]++;
1425 do_list_tubes(c, &tubes);
1426 break;
1427 case OP_LIST_TUBE_USED:
1428 /* don't allow trailing garbage */
1429 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1430 return reply_msg(c, MSG_BAD_FORMAT);
1433 op_ct[type]++;
1434 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1435 break;
1436 case OP_LIST_TUBES_WATCHED:
1437 /* don't allow trailing garbage */
1438 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1439 return reply_msg(c, MSG_BAD_FORMAT);
1442 op_ct[type]++;
1443 do_list_tubes(c, &c->watch);
1444 break;
1445 case OP_USE:
1446 name = c->cmd + CMD_USE_LEN;
1447 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1448 op_ct[type]++;
1450 TUBE_ASSIGN(t, tube_find_or_make(name));
1451 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1453 c->use->using_ct--;
1454 TUBE_ASSIGN(c->use, t);
1455 TUBE_ASSIGN(t, NULL);
1456 c->use->using_ct++;
1458 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1459 break;
1460 case OP_WATCH:
1461 name = c->cmd + CMD_WATCH_LEN;
1462 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1463 op_ct[type]++;
1465 TUBE_ASSIGN(t, tube_find_or_make(name));
1466 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1468 r = 1;
1469 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1470 TUBE_ASSIGN(t, NULL);
1471 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1473 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1474 break;
1475 case OP_IGNORE:
1476 name = c->cmd + CMD_IGNORE_LEN;
1477 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1478 op_ct[type]++;
1480 t = NULL;
1481 for (i = 0; i < c->watch.used; i++) {
1482 t = c->watch.items[i];
1483 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1484 t = NULL;
1487 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1489 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1490 t = NULL;
1492 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1493 break;
1494 case OP_QUIT:
1495 conn_close(c);
1496 break;
1497 case OP_PAUSE_TUBE:
1498 op_ct[type]++;
1500 r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
1501 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1503 r = read_delay(&delay, delay_buf, NULL);
1504 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1506 *delay_buf = '\0';
1507 t = tube_find(name);
1508 if (!t) return reply_msg(c, MSG_NOTFOUND);
1510 t->deadline_at = now_usec() + delay;
1511 t->pause = delay;
1512 t->stat.pause_ct++;
1513 set_main_delay_timeout();
1515 reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
1516 break;
1517 default:
1518 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1522 /* There are three reasons this function may be called. We need to check for
1523 * all of them.
1525 * 1. A reserved job has run out of time.
1526 * 2. A waiting client's reserved job has entered the safety margin.
1527 * 3. A waiting client's requested timeout has occurred.
1529 * If any of these happen, we must do the appropriate thing. */
1530 static void
1531 h_conn_timeout(conn c)
1533 int r, should_timeout = 0;
1534 job j;
1536 /* Check if the client was trying to reserve a job. */
1537 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1539 /* Check if any reserved jobs have run out of time. We should do this
1540 * whether or not the client is waiting for a new reservation. */
1541 while ((j = soonest_job(c))) {
1542 if (j->deadline_at >= now_usec()) break;
1544 /* This job is in the middle of being written out. If we return it to
1545 * the ready queue, someone might free it before we finish writing it
1546 * out to the socket. So we'll copy it here and free the copy when it's
1547 * done sending. */
1548 if (j == c->out_job) {
1549 c->out_job = job_copy(c->out_job);
1552 timeout_ct++; /* stats */
1553 j->timeout_ct++;
1554 r = enqueue_job(remove_this_reserved_job(c, j), 0, 0);
1555 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1556 r = conn_update_evq(c, c->evq.ev_events);
1557 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1560 if (should_timeout) {
1561 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1562 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1563 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1564 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1565 c->pending_timeout = -1;
1566 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1570 void
1571 enter_drain_mode(int sig)
1573 drain_mode = 1;
1576 static void
1577 do_cmd(conn c)
1579 dispatch_cmd(c);
1580 fill_extra_data(c);
1583 static void
1584 reset_conn(conn c)
1586 int r;
1588 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1589 if (r == -1) return twarnx("update events failed"), conn_close(c);
1591 /* was this a peek or stats command? */
1592 if (c->out_job && c->out_job->state == JOB_STATE_COPY) job_free(c->out_job);
1593 c->out_job = NULL;
1595 c->reply_sent = 0; /* now that we're done, reset this */
1596 c->state = STATE_WANTCOMMAND;
1599 static void
1600 h_conn_data(conn c)
1602 int r, to_read;
1603 job j;
1604 struct iovec iov[2];
1606 switch (c->state) {
1607 case STATE_WANTCOMMAND:
1608 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1609 if (r == -1) return check_err(c, "read()");
1610 if (r == 0) return conn_close(c); /* the client hung up */
1612 c->cmd_read += r; /* we got some bytes */
1614 c->cmd_len = cmd_len(c); /* find the EOL */
1616 /* yay, complete command line */
1617 if (c->cmd_len) return do_cmd(c);
1619 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1621 /* command line too long? */
1622 if (c->cmd_read == LINE_BUF_SIZE) {
1623 c->cmd_read = 0; /* discard the input so far */
1624 return reply_msg(c, MSG_BAD_FORMAT);
1627 /* otherwise we have an incomplete line, so just keep waiting */
1628 break;
1629 case STATE_BITBUCKET:
1630 /* Invert the meaning of in_job_read while throwing away data -- it
1631 * counts the bytes that remain to be thrown away. */
1632 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1633 r = read(c->fd, bucket, to_read);
1634 if (r == -1) return check_err(c, "read()");
1635 if (r == 0) return conn_close(c); /* the client hung up */
1637 c->in_job_read -= r; /* we got some bytes */
1639 /* (c->in_job_read < 0) can't happen */
1641 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1642 break;
1643 case STATE_WANTDATA:
1644 j = c->in_job;
1646 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1647 if (r == -1) return check_err(c, "read()");
1648 if (r == 0) return conn_close(c); /* the client hung up */
1650 c->in_job_read += r; /* we got some bytes */
1652 /* (j->in_job_read > j->body_size) can't happen */
1654 maybe_enqueue_incoming_job(c);
1655 break;
1656 case STATE_SENDWORD:
1657 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1658 if (r == -1) return check_err(c, "write()");
1659 if (r == 0) return conn_close(c); /* the client hung up */
1661 c->reply_sent += r; /* we got some bytes */
1663 /* (c->reply_sent > c->reply_len) can't happen */
1665 if (c->reply_sent == c->reply_len) return reset_conn(c);
1667 /* otherwise we sent an incomplete reply, so just keep waiting */
1668 break;
1669 case STATE_SENDJOB:
1670 j = c->out_job;
1672 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1673 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1674 iov[1].iov_base = j->body + c->out_job_sent;
1675 iov[1].iov_len = j->body_size - c->out_job_sent;
1677 r = writev(c->fd, iov, 2);
1678 if (r == -1) return check_err(c, "writev()");
1679 if (r == 0) return conn_close(c); /* the client hung up */
1681 /* update the sent values */
1682 c->reply_sent += r;
1683 if (c->reply_sent >= c->reply_len) {
1684 c->out_job_sent += c->reply_sent - c->reply_len;
1685 c->reply_sent = c->reply_len;
1688 /* (c->out_job_sent > j->body_size) can't happen */
1690 /* are we done? */
1691 if (c->out_job_sent == j->body_size) return reset_conn(c);
1693 /* otherwise we sent incomplete data, so just keep waiting */
1694 break;
1695 case STATE_WAIT: /* keep an eye out in case they hang up */
1696 /* but don't hang up just because our buffer is full */
1697 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1699 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1700 if (r == -1) return check_err(c, "read()");
1701 if (r == 0) return conn_close(c); /* the client hung up */
1702 c->cmd_read += r; /* we got some bytes */
1706 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1707 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1709 static void
1710 h_conn(const int fd, const short which, conn c)
1712 if (fd != c->fd) {
1713 twarnx("Argh! event fd doesn't match conn fd.");
1714 close(fd);
1715 return conn_close(c);
1718 switch (which) {
1719 case EV_TIMEOUT:
1720 h_conn_timeout(c);
1721 event_add(&c->evq, NULL); /* seems to be necessary */
1722 break;
1723 case EV_READ:
1724 /* fall through... */
1725 case EV_WRITE:
1726 /* fall through... */
1727 default:
1728 h_conn_data(c);
1731 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1734 static void
1735 h_delay()
1737 int r;
1738 job j;
1739 usec now;
1740 int i;
1741 tube t;
1743 now = now_usec();
1744 while ((j = delay_q_peek())) {
1745 if (j->deadline_at > now) break;
1746 j = delay_q_take();
1747 r = enqueue_job(j, 0, 0);
1748 if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
1751 for (i = 0; i < tubes.used; i++) {
1752 t = tubes.items[i];
1754 dprintf("h_delay for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64 "\n",
1755 t->name, t->waiting.used, t->ready.used, t->pause);
1756 if (t->pause && t->deadline_at <= now) {
1757 t->pause = 0;
1758 process_queue();
1762 set_main_delay_timeout();
1765 void
1766 h_accept(const int fd, const short which, struct event *ev)
1768 conn c;
1769 int cfd, flags, r;
1770 socklen_t addrlen;
1771 struct sockaddr addr;
1773 if (which == EV_TIMEOUT) return h_delay();
1775 addrlen = sizeof addr;
1776 cfd = accept(fd, &addr, &addrlen);
1777 if (cfd == -1) {
1778 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1779 if (errno == EMFILE) brake();
1780 return;
1783 flags = fcntl(cfd, F_GETFL, 0);
1784 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1786 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1787 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1789 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1790 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1792 dprintf("accepted conn, fd=%d\n", cfd);
1793 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1794 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1797 void
1798 prot_init()
1800 started_at = now_usec();
1801 memset(op_ct, 0, sizeof(op_ct));
1803 ms_init(&tubes, NULL, NULL);
1805 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1806 if (!default_tube) twarnx("Out of memory during startup!");
1809 void
1810 prot_replay_binlog(job binlog_jobs)
1812 job j, nj;
1813 usec delay;
1814 int r;
1816 for (j = binlog_jobs->next ; j != binlog_jobs ; j = nj) {
1817 nj = j->next;
1818 job_remove(j);
1819 binlog_reserve_space_update(j); /* reserve space for a delete */
1820 delay = 0;
1821 switch (j->state) {
1822 case JOB_STATE_BURIED:
1823 bury_job(j, 0);
1824 break;
1825 case JOB_STATE_DELAYED:
1826 if (started_at < j->deadline_at) {
1827 delay = j->deadline_at - started_at;
1829 /* fall through */
1830 default:
1831 r = enqueue_job(j, delay, 0);
1832 if (r < 1) twarnx("error processing binlog job %llu", j->id);