Per-tube buried list; kick takes tube name.
[beanstalkd.git] / prot.c
blob59667e820f1ef432ab485018f03a491ece1b1b8f
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEK "peek"
49 #define CMD_PEEKJOB "peek "
50 #define CMD_RESERVE "reserve"
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_STATS "stats"
56 #define CMD_JOBSTATS "stats-job "
57 #define CMD_USE "use "
58 #define CMD_WATCH "watch "
59 #define CMD_IGNORE "ignore "
60 #define CMD_LIST_TUBES "list-tubes"
61 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
62 #define CMD_STATS_TUBE "stats-tube "
64 #define CONSTSTRLEN(m) (sizeof(m) - 1)
66 #define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
67 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
68 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
69 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
70 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
71 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
72 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
73 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
74 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
75 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
76 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
77 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
78 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
79 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
80 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
82 #define MSG_FOUND "FOUND"
83 #define MSG_NOTFOUND "NOT_FOUND\r\n"
84 #define MSG_RESERVED "RESERVED"
85 #define MSG_DELETED "DELETED\r\n"
86 #define MSG_RELEASED "RELEASED\r\n"
87 #define MSG_BURIED "BURIED\r\n"
88 #define MSG_BURIED_FMT "BURIED %llu\r\n"
89 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
90 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
92 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
93 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
94 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
95 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
96 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
98 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
99 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
100 #define MSG_DRAINING "DRAINING\r\n"
101 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
102 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
103 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
104 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
106 #define STATS_FMT "---\n" \
107 "current-jobs-urgent: %u\n" \
108 "current-jobs-ready: %u\n" \
109 "current-jobs-reserved: %u\n" \
110 "current-jobs-delayed: %u\n" \
111 "current-jobs-buried: %u\n" \
112 "cmd-put: %llu\n" \
113 "cmd-peek: %llu\n" \
114 "cmd-reserve: %llu\n" \
115 "cmd-delete: %llu\n" \
116 "cmd-release: %llu\n" \
117 "cmd-bury: %llu\n" \
118 "cmd-kick: %llu\n" \
119 "cmd-stats: %llu\n" \
120 "cmd-stats-job: %llu\n" \
121 "cmd-stats-tube: %llu\n" \
122 "cmd-list-tubes: %llu\n" \
123 "cmd-list-tubes-watched: %llu\n" \
124 "job-timeouts: %llu\n" \
125 "total-jobs: %llu\n" \
126 "current-tubes: %u\n" \
127 "current-connections: %u\n" \
128 "current-producers: %u\n" \
129 "current-workers: %u\n" \
130 "current-waiting: %u\n" \
131 "total-connections: %u\n" \
132 "pid: %u\n" \
133 "version: %s\n" \
134 "rusage-utime: %d.%06d\n" \
135 "rusage-stime: %d.%06d\n" \
136 "uptime: %u\n" \
137 "\r\n"
139 #define STATS_TUBE_FMT "---\n" \
140 "name: %s\n" \
141 "current-jobs-urgent: %u\n" \
142 "current-jobs-ready: %u\n" \
143 "current-jobs-reserved: %u\n" \
144 "current-jobs-buried: %u\n" \
145 "total-jobs: %llu\n" \
146 "current-waiting: %u\n" \
147 "\r\n"
149 #define JOB_STATS_FMT "---\n" \
150 "id: %llu\n" \
151 "tube: %s\n" \
152 "state: %s\n" \
153 "pri: %u\n" \
154 "age: %u\n" \
155 "delay: %u\n" \
156 "ttr: %u\n" \
157 "time-left: %u\n" \
158 "timeouts: %u\n" \
159 "releases: %u\n" \
160 "buries: %u\n" \
161 "kicks: %u\n" \
162 "\r\n"
164 static struct pq delay_q;
166 static unsigned int ready_ct = 0;
167 static struct stats global_stat = {0, 0, 0, 0, 0};
169 static tube default_tube;
170 static struct ms tubes;
172 static int drain_mode = 0;
173 static time_t start_time;
174 static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
175 delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
176 stats_job_ct = 0, stats_ct = 0, timeout_ct = 0,
177 list_tubes_ct = 0, stats_tube_ct = 0,
178 list_watched_tubes_ct = 0;
181 /* Doubly-linked list of connections with at least one reserved job. */
182 static struct conn running = { &running, &running, 0 };
184 #ifdef DEBUG
185 static const char * op_names[] = {
186 "<unknown>",
187 CMD_PUT,
188 CMD_PEEKJOB,
189 CMD_RESERVE,
190 CMD_DELETE,
191 CMD_RELEASE,
192 CMD_BURY,
193 CMD_KICK,
194 CMD_STATS,
195 CMD_JOBSTATS,
196 CMD_PEEK,
197 CMD_USE,
198 CMD_WATCH,
199 CMD_IGNORE,
200 CMD_LIST_TUBES,
201 CMD_LIST_TUBES_WATCHED,
202 CMD_STATS_TUBE,
204 #endif
206 static int
207 buried_job_p(tube t)
209 return job_list_any_p(&t->buried);
212 static void
213 reply(conn c, const char *line, int len, int state)
215 int r;
217 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
218 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
220 c->reply = line;
221 c->reply_len = len;
222 c->reply_sent = 0;
223 c->state = state;
224 dprintf("sending reply: %.*s", len, line);
227 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
229 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
230 reply_msg((c),(e)))
232 static void
233 reply_line(conn c, int state, const char *fmt, ...)
235 int r;
236 va_list ap;
238 va_start(ap, fmt);
239 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
240 va_end(ap);
242 /* Make sure the buffer was big enough. If not, we have a bug. */
243 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
245 return reply(c, c->reply_buf, r, state);
248 static void
249 reply_job(conn c, job j, const char *word)
251 /* tell this connection which job to send */
252 c->out_job = j;
253 c->out_job_sent = 0;
255 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
256 word, j->id, j->body_size - 2);
259 conn
260 remove_waiting_conn(conn c)
262 tube t;
263 size_t i;
265 if (!(c->type & CONN_TYPE_WAITING)) return NULL;
266 c->type &= ~CONN_TYPE_WAITING;
267 global_stat.waiting_ct--;
268 for (i = 0; i < c->watch.used; i++) {
269 t = c->watch.items[i];
270 t->stat.waiting_ct--;
271 ms_remove(&t->waiting, c);
273 return c;
276 static void
277 reserve_job(conn c, job j)
279 j->deadline = time(NULL) + j->ttr;
280 global_stat.reserved_ct++; /* stats */
281 j->tube->stat.reserved_ct++;
282 conn_insert(&running, c);
283 j->state = JOB_STATE_RESERVED;
284 job_insert(&c->reserved_jobs, j);
285 return reply_job(c, j, MSG_RESERVED);
288 static job
289 next_eligible_job()
291 tube t;
292 size_t i;
293 job j = NULL, candidate;
295 dprintf("tubes.used = %d\n", tubes.used);
296 for (i = 0; i < tubes.used; i++) {
297 t = tubes.items[i];
298 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
299 t->name, t->waiting.used, t->ready.used);
300 if (t->waiting.used && t->ready.used) {
301 candidate = pq_peek(&t->ready);
302 if (!j || candidate->id < j->id) j = candidate;
304 dprintf("i = %d, tubes.used = %d\n", i, tubes.used);
307 return j;
310 static void
311 process_queue()
313 job j;
315 dprintf("processing queue\n");
316 while ((j = next_eligible_job())) {
317 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
318 j = pq_take(&j->tube->ready);
319 ready_ct--;
320 if (j->pri < URGENT_THRESHOLD) {
321 global_stat.urgent_ct--;
322 j->tube->stat.urgent_ct--;
324 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
328 static int
329 enqueue_job(job j, unsigned int delay)
331 int r;
333 if (delay) {
334 j->deadline = time(NULL) + delay;
335 r = pq_give(&delay_q, j);
336 if (!r) return 0;
337 j->state = JOB_STATE_DELAYED;
338 set_main_timeout(pq_peek(&delay_q)->deadline);
339 } else {
340 r = pq_give(&j->tube->ready, j);
341 if (!r) return 0;
342 j->state = JOB_STATE_READY;
343 ready_ct++;
344 if (j->pri < URGENT_THRESHOLD) {
345 global_stat.urgent_ct++;
346 j->tube->stat.urgent_ct++;
349 process_queue();
350 return 1;
353 static void
354 bury_job(job j)
356 job_insert(&j->tube->buried, j);
357 global_stat.buried_ct++;
358 j->tube->stat.buried_ct++;
359 j->state = JOB_STATE_BURIED;
360 j->bury_ct++;
363 void
364 enqueue_reserved_jobs(conn c)
366 int r;
367 job j;
369 while (job_list_any_p(&c->reserved_jobs)) {
370 j = job_remove(c->reserved_jobs.next);
371 r = enqueue_job(j, 0);
372 if (!r) bury_job(j);
373 global_stat.reserved_ct--;
374 j->tube->stat.reserved_ct--;
375 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
379 static job
380 delay_q_peek()
382 return pq_peek(&delay_q);
385 static job
386 delay_q_take()
388 return pq_take(&delay_q);
391 static job
392 remove_this_buried_job(job j)
394 j = job_remove(j);
395 if (j) {
396 global_stat.buried_ct--;
397 j->tube->stat.buried_ct--;
399 return j;
402 static int
403 kick_buried_job(tube t)
405 int r;
406 job j;
408 if (!buried_job_p(t)) return 0;
409 j = remove_this_buried_job(t->buried.next);
410 j->kick_ct++;
411 r = enqueue_job(j, 0);
412 if (r) return 1;
414 /* ready queue is full, so bury it */
415 bury_job(j);
416 return 0;
419 static unsigned int
420 get_delayed_job_ct()
422 return pq_used(&delay_q);
425 static int
426 kick_delayed_job()
428 int r;
429 job j;
431 if (get_delayed_job_ct() < 1) return 0;
432 j = delay_q_take();
433 j->kick_ct++;
434 r = enqueue_job(j, 0);
435 if (r) return 1;
437 /* ready queue is full, so delay it again */
438 r = enqueue_job(j, j->delay);
439 if (r) return 0;
441 /* last resort */
442 bury_job(j);
443 return 0;
446 /* return the number of jobs successfully kicked */
447 static unsigned int
448 kick_buried_jobs(tube t, unsigned int n)
450 unsigned int i;
451 for (i = 0; (i < n) && kick_buried_job(t); ++i);
452 return i;
455 /* return the number of jobs successfully kicked */
456 static unsigned int
457 kick_delayed_jobs(unsigned int n)
459 unsigned int i;
460 for (i = 0; (i < n) && kick_delayed_job(); ++i);
461 return i;
464 static unsigned int
465 kick_jobs(tube t, unsigned int n)
467 if (buried_job_p(t)) return kick_buried_jobs(t, n);
468 return kick_delayed_jobs(n);
471 static job
472 peek_buried_job()
474 tube t;
475 size_t i;
477 for (i = 0; i < tubes.used; i++) {
478 t = tubes.items[i];
479 if (buried_job_p(t)) return t->buried.next;
481 return NULL;
484 static job
485 find_buried_job_in_tube(tube t, unsigned long long int id)
487 job j;
489 for (j = t->buried.next; j != &t->buried; j = j->next) {
490 if (j->id == id) return j;
492 return NULL;
495 static job
496 find_buried_job(unsigned long long int id)
498 job j;
499 size_t i;
501 for (i = 0; i < tubes.used; i++) {
502 j = find_buried_job_in_tube(tubes.items[i], id);
503 if (j) return j;
505 return NULL;
508 static job
509 remove_buried_job(unsigned long long int id)
511 return remove_this_buried_job(find_buried_job(id));
514 static void
515 enqueue_waiting_conn(conn c)
517 tube t;
518 size_t i;
520 global_stat.waiting_ct++;
521 c->type |= CONN_TYPE_WAITING;
522 for (i = 0; i < c->watch.used; i++) {
523 t = c->watch.items[i];
524 t->stat.waiting_ct++;
525 ms_append(&t->waiting, c);
529 static job
530 find_reserved_job_in_conn(conn c, unsigned long long int id)
532 job j;
534 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
535 if (j->id == id) return j;
537 return NULL;
540 static job
541 find_reserved_job_in_list(conn list, unsigned long long int id)
543 job j;
544 conn c;
546 for (c = list->next; c != list; c = c->next) {
547 j = find_reserved_job_in_conn(c, id);
548 if (j) return j;
550 return NULL;
553 static job
554 find_reserved_job(unsigned long long int id)
556 return find_reserved_job_in_list(&running, id);
559 static job
560 peek_ready_job(unsigned long long int id)
563 job j;
564 size_t i;
566 for (i = 0; i < tubes.used; i++) {
567 j = pq_find(&((tube) tubes.items[i])->ready, id);
568 if (j) return j;
570 return NULL;
573 /* TODO: make a global hashtable of jobs because this is slow */
574 static job
575 peek_job(unsigned long long int id)
577 return peek_ready_job(id) ? :
578 pq_find(&delay_q, id) ? :
579 find_reserved_job(id) ? :
580 find_buried_job(id);
583 static void
584 check_err(conn c, const char *s)
586 if (errno == EAGAIN) return;
587 if (errno == EINTR) return;
588 if (errno == EWOULDBLOCK) return;
590 twarn("%s", s);
591 conn_close(c);
592 return;
595 /* Scan the given string for the sequence "\r\n" and return the line length.
596 * Always returns at least 2 if a match is found. Returns 0 if no match. */
597 static int
598 scan_line_end(const char *s, int size)
600 char *match;
602 match = memchr(s, '\r', size - 1);
603 if (!match) return 0;
605 /* this is safe because we only scan size - 1 chars above */
606 if (match[1] == '\n') return match - s + 2;
608 return 0;
611 static int
612 cmd_len(conn c)
614 return scan_line_end(c->cmd, c->cmd_read);
617 /* parse the command line */
618 static int
619 which_cmd(conn c)
621 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
622 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
623 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
624 TEST_CMD(c->cmd, CMD_PEEK, OP_PEEK);
625 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
626 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
627 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
628 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
629 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
630 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
631 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
632 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
633 TEST_CMD(c->cmd, CMD_USE, OP_USE);
634 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
635 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
636 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
637 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
638 return OP_UNKNOWN;
641 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
642 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
643 * This function is idempotent(). */
644 static void
645 fill_extra_data(conn c)
647 int extra_bytes, job_data_bytes = 0, cmd_bytes;
649 if (!c->fd) return; /* the connection was closed */
650 if (!c->cmd_len) return; /* we don't have a complete command */
652 /* how many extra bytes did we read? */
653 extra_bytes = c->cmd_read - c->cmd_len;
655 /* how many bytes should we put into the job body? */
656 if (c->in_job) {
657 job_data_bytes = min(extra_bytes, c->in_job->body_size);
658 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
659 c->in_job_read = job_data_bytes;
662 /* how many bytes are left to go into the future cmd? */
663 cmd_bytes = extra_bytes - job_data_bytes;
664 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
665 c->cmd_read = cmd_bytes;
666 c->cmd_len = 0; /* we no longer know the length of the new command */
669 static void
670 enqueue_incoming_job(conn c)
672 int r;
673 job j = c->in_job;
675 c->in_job = NULL; /* the connection no longer owns this job */
676 c->in_job_read = 0;
678 /* check if the trailer is present and correct */
679 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
680 job_free(j);
681 return reply_msg(c, MSG_EXPECTED_CRLF);
684 /* we have a complete job, so let's stick it in the pqueue */
685 r = enqueue_job(j, j->delay);
686 put_ct++; /* stats */
687 global_stat.total_jobs_ct++;
688 j->tube->stat.total_jobs_ct++;
690 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
692 /* out of memory trying to grow the queue, so it gets buried */
693 bury_job(j);
694 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
697 static unsigned int
698 uptime()
700 return time(NULL) - start_time;
703 static int
704 fmt_stats(char *buf, size_t size, void *x)
706 struct rusage ru = {{0, 0}, {0, 0}};
707 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
708 return snprintf(buf, size, STATS_FMT,
709 global_stat.urgent_ct,
710 ready_ct,
711 global_stat.reserved_ct,
712 get_delayed_job_ct(),
713 global_stat.buried_ct,
714 put_ct,
715 peek_ct,
716 reserve_ct,
717 delete_ct,
718 release_ct,
719 bury_ct,
720 kick_ct,
721 stats_ct,
722 stats_job_ct,
723 stats_tube_ct,
724 list_tubes_ct,
725 list_watched_tubes_ct,
726 timeout_ct,
727 global_stat.total_jobs_ct,
728 tubes.used,
729 count_cur_conns(),
730 count_cur_producers(),
731 count_cur_workers(),
732 global_stat.waiting_ct,
733 count_tot_conns(),
734 getpid(),
735 VERSION,
736 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
737 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
738 uptime());
742 /* Read a priority value from the given buffer and place it in pri.
743 * Update end to point to the address after the last character consumed.
744 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
745 * conversion and return the status code but not update any values. This is an
746 * easy way to check for errors.
747 * If end is NULL, read_pri will also check that the entire input string was
748 * consumed and return an error code otherwise.
749 * Return 0 on success, or nonzero on failure.
750 * If a failure occurs, pri and end are not modified. */
751 static int
752 read_pri(unsigned int *pri, const char *buf, char **end)
754 char *tend;
755 unsigned int tpri;
757 errno = 0;
758 tpri = strtoul(buf, &tend, 10);
759 if (tend == buf) return -1;
760 if (errno && errno != ERANGE) return -1;
761 if (!end && tend[0] != '\0') return -1;
763 if (pri) *pri = tpri;
764 if (end) *end = tend;
765 return 0;
768 /* Read a delay value from the given buffer and place it in delay.
769 * The interface and behavior are the same as in read_pri(). */
770 static int
771 read_delay(unsigned int *delay, const char *buf, char **end)
773 return read_pri(delay, buf, end);
776 /* Read a timeout value from the given buffer and place it in ttr.
777 * The interface and behavior are the same as in read_pri(). */
778 static int
779 read_ttr(unsigned int *ttr, const char *buf, char **end)
781 return read_pri(ttr, buf, end);
784 static void
785 wait_for_job(conn c)
787 int r;
789 /* this conn is waiting, but we want to know if they hang up */
790 r = conn_update_evq(c, EV_READ | EV_PERSIST);
791 if (r == -1) return twarnx("update events failed"), conn_close(c);
793 c->state = STATE_WAIT;
794 enqueue_waiting_conn(c);
797 typedef int(*fmt_fn)(char *, size_t, void *);
799 static void
800 do_stats(conn c, fmt_fn fmt, void *data)
802 int r, stats_len;
804 /* first, measure how big a buffer we will need */
805 stats_len = fmt(NULL, 0, data);
807 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
808 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
810 /* now actually format the stats data */
811 r = fmt(c->out_job->body, stats_len, data);
812 if (r != stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
813 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
815 c->out_job_sent = 0;
816 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", stats_len - 2);
819 static void
820 do_list_tubes(conn c, ms l)
822 char *buf;
823 tube t;
824 size_t i, resp_z;
826 /* first, measure how big a buffer we will need */
827 resp_z = 6; /* initial "---\n" and final "\r\n" */
828 for (i = 0; i < l->used; i++) {
829 t = l->items[i];
830 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
833 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
834 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
836 /* now actually format the response */
837 buf = c->out_job->body;
838 buf += snprintf(buf, 5, "---\n");
839 for (i = 0; i < l->used; i++) {
840 t = l->items[i];
841 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
843 buf[0] = '\r';
844 buf[1] = '\n';
846 c->out_job_sent = 0;
847 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
850 static int
851 fmt_job_stats(char *buf, size_t size, job j)
853 time_t t;
855 t = time(NULL);
856 return snprintf(buf, size, JOB_STATS_FMT,
857 j->id,
858 j->tube->name,
859 job_state(j),
860 j->pri,
861 (unsigned int) (t - j->creation),
862 j->delay,
863 j->ttr,
864 (unsigned int) (j->deadline - t),
865 j->timeout_ct,
866 j->release_ct,
867 j->bury_ct,
868 j->kick_ct);
871 static int
872 fmt_stats_tube(char *buf, size_t size, tube t)
874 return snprintf(buf, size, STATS_TUBE_FMT,
875 t->name,
876 t->stat.urgent_ct,
877 t->ready.used,
878 t->stat.reserved_ct,
879 t->stat.buried_ct,
880 t->stat.total_jobs_ct,
881 t->stat.waiting_ct);
884 static void
885 maybe_enqueue_incoming_job(conn c)
887 job j = c->in_job;
889 /* do we have a complete job? */
890 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
892 /* otherwise we have incomplete data, so just keep waiting */
893 c->state = STATE_WANTDATA;
896 /* j can be NULL */
897 static job
898 remove_this_reserved_job(conn c, job j)
900 j = job_remove(j);
901 if (j) {
902 global_stat.reserved_ct--;
903 j->tube->stat.reserved_ct--;
905 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
906 return j;
909 static job
910 remove_reserved_job(conn c, unsigned long long int id)
912 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
915 static int
916 name_is_ok(const char *name, size_t max)
918 size_t len = strlen(name);
919 return len > 0 && len <= max &&
920 strspn(name, NAME_CHARS) == len && name[0] != '-';
923 static tube
924 find_tube(const char *name)
926 tube t;
927 size_t i;
929 for (i = 0; i < tubes.used; i++) {
930 t = tubes.items[i];
931 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
933 return NULL;
936 void
937 prot_remove_tube(tube t)
939 ms_remove(&tubes, t);
942 static tube
943 make_and_insert_tube(const char *name)
945 int r;
946 tube t = NULL;
948 t = make_tube(name);
949 if (!t) return NULL;
951 /* We want this global tube list to behave like "weak" refs, so don't
952 * increment the ref count. */
953 r = ms_append(&tubes, t);
954 if (!r) return tube_dref(t), NULL;
956 return t;
959 static tube
960 find_or_make_tube(const char *name)
962 return find_tube(name) ? : make_and_insert_tube(name);
965 static void
966 dispatch_cmd(conn c)
968 int r, i;
969 unsigned int count;
970 job j;
971 char type;
972 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
973 unsigned int pri, delay, ttr, body_size;
974 unsigned long long int id;
975 tube t = NULL;
977 /* NUL-terminate this string so we can use strtol and friends */
978 c->cmd[c->cmd_len - 2] = '\0';
980 /* check for possible maliciousness */
981 if (strlen(c->cmd) != c->cmd_len - 2) {
982 return reply_msg(c, MSG_BAD_FORMAT);
985 type = which_cmd(c);
986 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
988 switch (type) {
989 case OP_PUT:
990 if (drain_mode) return reply_serr(c, MSG_DRAINING);
992 r = read_pri(&pri, c->cmd + 4, &delay_buf);
993 if (r) return reply_msg(c, MSG_BAD_FORMAT);
995 r = read_delay(&delay, delay_buf, &ttr_buf);
996 if (r) return reply_msg(c, MSG_BAD_FORMAT);
998 r = read_ttr(&ttr, ttr_buf, &size_buf);
999 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1001 errno = 0;
1002 body_size = strtoul(size_buf, &end_buf, 10);
1003 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1005 if (body_size > JOB_DATA_SIZE_LIMIT) {
1006 return reply_msg(c, MSG_JOB_TOO_BIG);
1009 /* don't allow trailing garbage */
1010 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1012 conn_set_producer(c);
1014 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1016 fill_extra_data(c);
1018 /* it's possible we already have a complete job */
1019 maybe_enqueue_incoming_job(c);
1021 break;
1022 case OP_PEEK:
1023 /* don't allow trailing garbage */
1024 if (c->cmd_len != CMD_PEEK_LEN + 2) {
1025 return reply_msg(c, MSG_BAD_FORMAT);
1028 j = job_copy(peek_buried_job() ? : delay_q_peek());
1030 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1032 peek_ct++; /* stats */
1033 reply_job(c, j, MSG_FOUND);
1034 break;
1035 case OP_PEEKJOB:
1036 errno = 0;
1037 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1038 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1040 /* So, peek is annoying, because some other connection might free the
1041 * job while we are still trying to write it out. So we copy it and
1042 * then free the copy when it's done sending. */
1043 j = job_copy(peek_job(id));
1045 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1047 peek_ct++; /* stats */
1048 reply_job(c, j, MSG_FOUND);
1049 break;
1050 case OP_RESERVE:
1051 /* don't allow trailing garbage */
1052 if (c->cmd_len != CMD_RESERVE_LEN + 2) {
1053 return reply_msg(c, MSG_BAD_FORMAT);
1056 reserve_ct++; /* stats */
1057 conn_set_worker(c);
1059 /* try to get a new job for this guy */
1060 wait_for_job(c);
1061 process_queue();
1062 break;
1063 case OP_DELETE:
1064 errno = 0;
1065 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1066 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1068 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1070 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1072 delete_ct++; /* stats */
1073 job_free(j);
1075 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1076 break;
1077 case OP_RELEASE:
1078 errno = 0;
1079 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1080 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1082 r = read_pri(&pri, pri_buf, &delay_buf);
1083 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1085 r = read_delay(&delay, delay_buf, NULL);
1086 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1088 j = remove_reserved_job(c, id);
1090 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1092 j->pri = pri;
1093 j->delay = delay;
1094 j->release_ct++;
1095 release_ct++; /* stats */
1096 r = enqueue_job(j, delay);
1097 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1099 /* out of memory trying to grow the queue, so it gets buried */
1100 bury_job(j);
1101 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1102 break;
1103 case OP_BURY:
1104 errno = 0;
1105 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1106 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1108 r = read_pri(&pri, pri_buf, NULL);
1109 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1111 j = remove_reserved_job(c, id);
1113 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1115 j->pri = pri;
1116 bury_ct++; /* stats */
1117 bury_job(j);
1118 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1119 break;
1120 case OP_KICK:
1121 errno = 0;
1122 count = strtoul(c->cmd + CMD_KICK_LEN, &name, 10);
1123 if (name++ == c->cmd + CMD_KICK_LEN) {
1124 return reply_msg(c, MSG_BAD_FORMAT);
1126 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1128 kick_ct++; /* stats */
1130 t = find_tube(name);
1131 if (!t) return reply_msg(c, MSG_NOTFOUND);
1133 i = kick_jobs(t, count);
1134 t = NULL;
1136 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1137 case OP_STATS:
1138 /* don't allow trailing garbage */
1139 if (c->cmd_len != CMD_STATS_LEN + 2) {
1140 return reply_msg(c, MSG_BAD_FORMAT);
1143 stats_ct++; /* stats */
1145 do_stats(c, fmt_stats, NULL);
1146 break;
1147 case OP_JOBSTATS:
1148 errno = 0;
1149 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1150 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1152 j = peek_job(id);
1153 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1155 stats_job_ct++; /* stats */
1157 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1158 do_stats(c, (fmt_fn) fmt_job_stats, j);
1159 break;
1160 case OP_STATS_TUBE:
1161 name = c->cmd + CMD_STATS_TUBE_LEN;
1162 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1164 t = find_tube(name);
1165 if (!t) return reply_msg(c, MSG_NOTFOUND);
1167 stats_tube_ct++; /* stats */
1169 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1170 t = NULL;
1171 break;
1172 case OP_LIST_TUBES:
1173 /* don't allow trailing garbage */
1174 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1175 return reply_msg(c, MSG_BAD_FORMAT);
1178 list_tubes_ct++;
1179 do_list_tubes(c, &tubes);
1180 break;
1181 case OP_LIST_TUBES_WATCHED:
1182 /* don't allow trailing garbage */
1183 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1184 return reply_msg(c, MSG_BAD_FORMAT);
1187 list_watched_tubes_ct++;
1188 do_list_tubes(c, &c->watch);
1189 break;
1190 case OP_USE:
1191 name = c->cmd + CMD_USE_LEN;
1192 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1194 TUBE_ASSIGN(t, find_or_make_tube(name));
1195 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1197 TUBE_ASSIGN(c->use, t);
1198 TUBE_ASSIGN(t, NULL);
1200 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1201 break;
1202 case OP_WATCH:
1203 name = c->cmd + CMD_WATCH_LEN;
1204 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1206 TUBE_ASSIGN(t, find_or_make_tube(name));
1207 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1209 r = 1;
1210 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1211 TUBE_ASSIGN(t, NULL);
1212 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1214 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1215 break;
1216 case OP_IGNORE:
1217 name = c->cmd + CMD_IGNORE_LEN;
1218 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1220 t = NULL;
1221 for (i = 0; i < c->watch.used; i++) {
1222 t = c->watch.items[i];
1223 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1224 t = NULL;
1227 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1229 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1230 t = NULL;
1232 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1233 break;
1234 default:
1235 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1239 /* if we get a timeout, it means that a job has been reserved for too long, so
1240 * we should put it back in the queue */
1241 static void
1242 h_conn_timeout(conn c)
1244 int r;
1245 job j;
1247 while ((j = soonest_job(c))) {
1248 if (j->deadline > time(NULL)) return;
1249 timeout_ct++; /* stats */
1250 j->timeout_ct++;
1251 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1252 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1253 r = conn_update_evq(c, c->evq.ev_events);
1254 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1258 void
1259 enter_drain_mode(int sig)
1261 drain_mode = 1;
1264 static void
1265 do_cmd(conn c)
1267 dispatch_cmd(c);
1268 fill_extra_data(c);
1271 static void
1272 reset_conn(conn c)
1274 int r;
1276 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1277 if (r == -1) return twarnx("update events failed"), conn_close(c);
1279 /* was this a peek or stats command? */
1280 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1281 c->out_job = NULL;
1283 c->reply_sent = 0; /* now that we're done, reset this */
1284 c->state = STATE_WANTCOMMAND;
1287 static void
1288 h_conn_data(conn c)
1290 int r;
1291 job j;
1292 struct iovec iov[2];
1294 switch (c->state) {
1295 case STATE_WANTCOMMAND:
1296 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1297 if (r == -1) return check_err(c, "read()");
1298 if (r == 0) return conn_close(c); /* the client hung up */
1300 c->cmd_read += r; /* we got some bytes */
1302 c->cmd_len = cmd_len(c); /* find the EOL */
1303 dprintf("cmd_len is %d\n", c->cmd_len);
1305 /* yay, complete command line */
1306 if (c->cmd_len) return do_cmd(c);
1308 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1310 dprintf("cmd_read is %d\n", c->cmd_read);
1311 /* command line too long? */
1312 if (c->cmd_read == LINE_BUF_SIZE) {
1313 return reply_msg(c, MSG_BAD_FORMAT);
1316 /* otherwise we have an incomplete line, so just keep waiting */
1317 break;
1318 case STATE_WANTDATA:
1319 j = c->in_job;
1321 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1322 if (r == -1) return check_err(c, "read()");
1323 if (r == 0) return conn_close(c); /* the client hung up */
1325 c->in_job_read += r; /* we got some bytes */
1327 /* (j->in_job_read > j->body_size) can't happen */
1329 maybe_enqueue_incoming_job(c);
1330 break;
1331 case STATE_SENDWORD:
1332 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1333 if (r == -1) return check_err(c, "write()");
1334 if (r == 0) return conn_close(c); /* the client hung up */
1336 c->reply_sent += r; /* we got some bytes */
1338 /* (c->reply_sent > c->reply_len) can't happen */
1340 if (c->reply_sent == c->reply_len) return reset_conn(c);
1342 /* otherwise we sent an incomplete reply, so just keep waiting */
1343 break;
1344 case STATE_SENDJOB:
1345 j = c->out_job;
1347 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1348 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1349 iov[1].iov_base = j->body + c->out_job_sent;
1350 iov[1].iov_len = j->body_size - c->out_job_sent;
1352 r = writev(c->fd, iov, 2);
1353 if (r == -1) return check_err(c, "writev()");
1354 if (r == 0) return conn_close(c); /* the client hung up */
1356 /* update the sent values */
1357 c->reply_sent += r;
1358 if (c->reply_sent >= c->reply_len) {
1359 c->out_job_sent += c->reply_sent - c->reply_len;
1360 c->reply_sent = c->reply_len;
1363 /* (c->out_job_sent > j->body_size) can't happen */
1365 /* are we done? */
1366 if (c->out_job_sent == j->body_size) return reset_conn(c);
1368 /* otherwise we sent incomplete data, so just keep waiting */
1369 break;
1370 case STATE_WAIT: /* keep an eye out in case they hang up */
1371 /* but don't hang up just because our buffer is full */
1372 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1374 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1375 if (r == -1) return check_err(c, "read()");
1376 if (r == 0) return conn_close(c); /* the client hung up */
1377 c->cmd_read += r; /* we got some bytes */
1381 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1382 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1384 static void
1385 h_conn(const int fd, const short which, conn c)
1387 if (fd != c->fd) {
1388 twarnx("Argh! event fd doesn't match conn fd.");
1389 close(fd);
1390 return conn_close(c);
1393 switch (which) {
1394 case EV_TIMEOUT:
1395 h_conn_timeout(c);
1396 event_add(&c->evq, NULL); /* seems to be necessary */
1397 break;
1398 case EV_READ:
1399 /* fall through... */
1400 case EV_WRITE:
1401 /* fall through... */
1402 default:
1403 h_conn_data(c);
1406 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1409 static void
1410 h_delay()
1412 int r;
1413 job j;
1414 time_t t;
1416 t = time(NULL);
1417 while ((j = delay_q_peek())) {
1418 if (j->deadline > t) break;
1419 j = delay_q_take();
1420 r = enqueue_job(j, 0);
1421 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1424 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
1427 void
1428 h_accept(const int fd, const short which, struct event *ev)
1430 conn c;
1431 int cfd, flags, r;
1432 socklen_t addrlen;
1433 struct sockaddr addr;
1435 if (which == EV_TIMEOUT) return h_delay();
1437 addrlen = sizeof addr;
1438 cfd = accept(fd, &addr, &addrlen);
1439 if (cfd == -1) {
1440 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1441 if (errno == EMFILE) brake();
1442 return;
1445 flags = fcntl(cfd, F_GETFL, 0);
1446 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1448 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1449 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1451 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1452 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1454 dprintf("accepted conn, fd=%d\n", cfd);
1455 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1456 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1459 void
1460 prot_init()
1462 start_time = time(NULL);
1463 pq_init(&delay_q, job_delay_cmp);
1465 ms_init(&tubes, NULL, NULL);
1467 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1468 if (!default_tube) twarnx("Out of memory during startup!");