Implement per-tube statistics.
[beanstalkd.git] / prot.c
blobd869b4f3d4b69d1b9b7bd5e9cc73cf2ef8379d49
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEK "peek"
49 #define CMD_PEEKJOB "peek "
50 #define CMD_RESERVE "reserve"
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_STATS "stats"
56 #define CMD_JOBSTATS "stats-job "
57 #define CMD_USE "use "
58 #define CMD_WATCH "watch "
59 #define CMD_IGNORE "ignore "
60 #define CMD_LIST_TUBES "list-tubes"
61 #define CMD_LIST_WATCHED_TUBES "list-watched-tubes"
62 #define CMD_STATS_TUBE "stats-tube "
64 #define CONSTSTRLEN(m) (sizeof(m) - 1)
66 #define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
67 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
68 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
69 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
70 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
71 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
72 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
73 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
74 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
75 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
76 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
77 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
78 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
79 #define CMD_LIST_WATCHED_TUBES_LEN CONSTSTRLEN(CMD_LIST_WATCHED_TUBES)
80 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
82 #define MSG_FOUND "FOUND"
83 #define MSG_NOTFOUND "NOT_FOUND\r\n"
84 #define MSG_RESERVED "RESERVED"
85 #define MSG_DELETED "DELETED\r\n"
86 #define MSG_RELEASED "RELEASED\r\n"
87 #define MSG_BURIED "BURIED\r\n"
88 #define MSG_BURIED_FMT "BURIED %llu\r\n"
89 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
90 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
92 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
93 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
94 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
95 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
96 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
98 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
99 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
100 #define MSG_DRAINING "DRAINING\r\n"
101 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
102 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
103 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
104 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
106 #define STATS_FMT "---\n" \
107 "current-jobs-urgent: %u\n" \
108 "current-jobs-ready: %u\n" \
109 "current-jobs-reserved: %u\n" \
110 "current-jobs-delayed: %u\n" \
111 "current-jobs-buried: %u\n" \
112 "cmd-put: %llu\n" \
113 "cmd-peek: %llu\n" \
114 "cmd-reserve: %llu\n" \
115 "cmd-delete: %llu\n" \
116 "cmd-release: %llu\n" \
117 "cmd-bury: %llu\n" \
118 "cmd-kick: %llu\n" \
119 "cmd-stats: %llu\n" \
120 "cmd-stats-job: %llu\n" \
121 "cmd-stats-tube: %llu\n" \
122 "cmd-list-tubes: %llu\n" \
123 "cmd-list-watched-tubes: %llu\n" \
124 "job-timeouts: %llu\n" \
125 "total-jobs: %llu\n" \
126 "current-tubes: %u\n" \
127 "current-connections: %u\n" \
128 "current-producers: %u\n" \
129 "current-workers: %u\n" \
130 "current-waiting: %u\n" \
131 "total-connections: %u\n" \
132 "pid: %u\n" \
133 "version: %s\n" \
134 "rusage-utime: %d.%06d\n" \
135 "rusage-stime: %d.%06d\n" \
136 "uptime: %u\n" \
137 "\r\n"
139 #define STATS_TUBE_FMT "---\n" \
140 "name: %s\n" \
141 "current-jobs-urgent: %u\n" \
142 "current-jobs-ready: %u\n" \
143 "current-jobs-reserved: %u\n" \
144 "current-jobs-delayed: %u\n" \
145 "current-jobs-buried: %u\n" \
146 "total-jobs: %llu\n" \
147 "current-waiting: %u\n" \
148 "\r\n"
150 #define JOB_STATS_FMT "---\n" \
151 "id: %llu\n" \
152 "tube: %s\n" \
153 "state: %s\n" \
154 "pri: %u\n" \
155 "age: %u\n" \
156 "delay: %u\n" \
157 "ttr: %u\n" \
158 "time-left: %u\n" \
159 "timeouts: %u\n" \
160 "releases: %u\n" \
161 "buries: %u\n" \
162 "kicks: %u\n" \
163 "\r\n"
165 static struct pq ready_q;
166 static struct pq delay_q;
168 /* Doubly-linked list of waiting connections. */
169 static struct job graveyard = { &graveyard, &graveyard, 0 };
170 static unsigned int ready_ct = 0;
171 static struct stats global_stat = {0, 0, 0, 0, 0};
173 static tube default_tube;
174 static struct ms tubes;
176 static int drain_mode = 0;
177 static time_t start_time;
178 static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
179 delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
180 stats_job_ct = 0, stats_ct = 0, timeout_ct = 0,
181 list_tubes_ct = 0, stats_tube_ct = 0,
182 list_watched_tubes_ct = 0;
185 /* Doubly-linked list of connections with at least one reserved job. */
186 static struct conn running = { &running, &running, 0 };
188 #ifdef DEBUG
189 static const char * op_names[] = {
190 "<unknown>",
191 CMD_PUT,
192 CMD_PEEKJOB,
193 CMD_RESERVE,
194 CMD_DELETE,
195 CMD_RELEASE,
196 CMD_BURY,
197 CMD_KICK,
198 CMD_STATS,
199 CMD_JOBSTATS,
200 CMD_PEEK,
201 CMD_USE,
202 CMD_WATCH,
203 CMD_IGNORE,
204 CMD_LIST_TUBES,
205 CMD_LIST_WATCHED_TUBES,
206 CMD_STATS_TUBE,
208 #endif
210 static int
211 buried_job_p()
213 return job_list_any_p(&graveyard);
216 static void
217 reply(conn c, const char *line, int len, int state)
219 int r;
221 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
222 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
224 c->reply = line;
225 c->reply_len = len;
226 c->reply_sent = 0;
227 c->state = state;
228 dprintf("sending reply: %.*s", len, line);
231 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
233 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
234 reply_msg((c),(e)))
236 static void
237 reply_line(conn c, int state, const char *fmt, ...)
239 int r;
240 va_list ap;
242 va_start(ap, fmt);
243 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
244 va_end(ap);
246 /* Make sure the buffer was big enough. If not, we have a bug. */
247 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
249 return reply(c, c->reply_buf, r, state);
252 static void
253 reply_job(conn c, job j, const char *word)
255 /* tell this connection which job to send */
256 c->out_job = j;
257 c->out_job_sent = 0;
259 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
260 word, j->id, j->body_size - 2);
263 conn
264 remove_waiting_conn(conn c)
266 tube t;
267 size_t i;
269 if (!(c->type & CONN_TYPE_WAITING)) return NULL;
270 c->type &= ~CONN_TYPE_WAITING;
271 global_stat.waiting_ct--;
272 for (i = 0; i < c->watch.used; i++) {
273 t = c->watch.items[i];
274 t->stat.waiting_ct--;
275 ms_remove(&t->waiting, c);
277 return c;
280 static void
281 reserve_job(conn c, job j)
283 j->deadline = time(NULL) + j->ttr;
284 global_stat.reserved_ct++; /* stats */
285 j->tube->stat.reserved_ct++;
286 conn_insert(&running, c);
287 j->state = JOB_STATE_RESERVED;
288 job_insert(&c->reserved_jobs, j);
289 return reply_job(c, j, MSG_RESERVED);
292 static job
293 next_eligible_job()
295 tube t;
296 size_t i;
297 job j = NULL, candidate;
299 dprintf("tubes.used = %d\n", tubes.used);
300 for (i = 0; i < tubes.used; i++) {
301 t = tubes.items[i];
302 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
303 t->name, t->waiting.used, t->ready.used);
304 if (t->waiting.used && t->ready.used) {
305 candidate = pq_peek(&t->ready);
306 if (!j || candidate->id < j->id) j = candidate;
308 dprintf("i = %d, tubes.used = %d\n", i, tubes.used);
311 return j;
314 static void
315 process_queue()
317 job j;
319 dprintf("processing queue\n");
320 while ((j = next_eligible_job())) {
321 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
322 j = pq_take(&j->tube->ready);
323 ready_ct--;
324 if (j->pri < URGENT_THRESHOLD) {
325 global_stat.urgent_ct--;
326 j->tube->stat.urgent_ct--;
328 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
332 static int
333 enqueue_job(job j, unsigned int delay)
335 int r;
337 if (delay) {
338 j->deadline = time(NULL) + delay;
339 r = pq_give(&delay_q, j);
340 if (!r) return 0;
341 j->state = JOB_STATE_DELAYED;
342 set_main_timeout(pq_peek(&delay_q)->deadline);
343 } else {
344 r = pq_give(&j->tube->ready, j);
345 if (!r) return 0;
346 j->state = JOB_STATE_READY;
347 ready_ct++;
348 if (j->pri < URGENT_THRESHOLD) {
349 global_stat.urgent_ct++;
350 j->tube->stat.urgent_ct++;
353 process_queue();
354 return 1;
357 static void
358 bury_job(job j)
360 job_insert(&graveyard, j);
361 global_stat.buried_ct++;
362 j->tube->stat.buried_ct++;
363 j->state = JOB_STATE_BURIED;
364 j->bury_ct++;
367 void
368 enqueue_reserved_jobs(conn c)
370 int r;
371 job j;
373 while (job_list_any_p(&c->reserved_jobs)) {
374 j = job_remove(c->reserved_jobs.next);
375 r = enqueue_job(j, 0);
376 if (!r) bury_job(j);
377 global_stat.reserved_ct--;
378 j->tube->stat.reserved_ct--;
379 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
383 static job
384 delay_q_peek()
386 return pq_peek(&delay_q);
389 static job
390 delay_q_take()
392 return pq_take(&delay_q);
395 static job
396 remove_this_buried_job(job j)
398 j = job_remove(j);
399 if (j) {
400 global_stat.buried_ct--;
401 j->tube->stat.buried_ct--;
403 return j;
406 static int
407 kick_buried_job()
409 int r;
410 job j;
412 if (!buried_job_p()) return 0;
413 j = remove_this_buried_job(graveyard.next);
414 j->kick_ct++;
415 r = enqueue_job(j, 0);
416 if (r) return 1;
418 /* ready queue is full, so bury it */
419 bury_job(j);
420 return 0;
423 static unsigned int
424 get_delayed_job_ct()
426 return pq_used(&delay_q);
429 static int
430 kick_delayed_job()
432 int r;
433 job j;
435 if (get_delayed_job_ct() < 1) return 0;
436 j = delay_q_take();
437 j->kick_ct++;
438 r = enqueue_job(j, 0);
439 if (r) return 1;
441 /* ready queue is full, so delay it again */
442 r = enqueue_job(j, j->delay);
443 if (r) return 0;
445 /* last resort */
446 bury_job(j);
447 return 0;
450 /* return the number of jobs successfully kicked */
451 static unsigned int
452 kick_buried_jobs(unsigned int n)
454 unsigned int i;
455 for (i = 0; (i < n) && kick_buried_job(); ++i);
456 return i;
459 /* return the number of jobs successfully kicked */
460 static unsigned int
461 kick_delayed_jobs(unsigned int n)
463 unsigned int i;
464 for (i = 0; (i < n) && kick_delayed_job(); ++i);
465 return i;
468 static unsigned int
469 kick_jobs(unsigned int n)
471 if (buried_job_p()) return kick_buried_jobs(n);
472 return kick_delayed_jobs(n);
475 static job
476 peek_buried_job()
478 return buried_job_p() ? graveyard.next : NULL;
481 static job
482 find_buried_job(unsigned long long int id)
484 job j;
486 for (j = graveyard.next; j != &graveyard; j = j->next) {
487 if (j->id == id) return j;
489 return NULL;
492 static job
493 remove_buried_job(unsigned long long int id)
495 return remove_this_buried_job(find_buried_job(id));
498 static void
499 enqueue_waiting_conn(conn c)
501 tube t;
502 size_t i;
504 global_stat.waiting_ct++;
505 c->type |= CONN_TYPE_WAITING;
506 for (i = 0; i < c->watch.used; i++) {
507 t = c->watch.items[i];
508 t->stat.waiting_ct++;
509 ms_append(&t->waiting, c);
513 static job
514 find_reserved_job_in_conn(conn c, unsigned long long int id)
516 job j;
518 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
519 if (j->id == id) return j;
521 return NULL;
524 static job
525 find_reserved_job_in_list(conn list, unsigned long long int id)
527 job j;
528 conn c;
530 for (c = list->next; c != list; c = c->next) {
531 j = find_reserved_job_in_conn(c, id);
532 if (j) return j;
534 return NULL;
537 static job
538 find_reserved_job(unsigned long long int id)
540 return find_reserved_job_in_list(&running, id);
543 static job
544 peek_ready_job(unsigned long long int id)
547 job j;
548 size_t i;
550 for (i = 0; i < tubes.used; i++) {
551 j = pq_find(&((tube) tubes.items[i])->ready, id);
552 if (j) return j;
554 return NULL;
557 /* TODO: make a global hashtable of jobs because this is slow */
558 static job
559 peek_job(unsigned long long int id)
561 return peek_ready_job(id) ? :
562 pq_find(&delay_q, id) ? :
563 find_reserved_job(id) ? :
564 find_buried_job(id);
567 static void
568 check_err(conn c, const char *s)
570 if (errno == EAGAIN) return;
571 if (errno == EINTR) return;
572 if (errno == EWOULDBLOCK) return;
574 twarn("%s", s);
575 conn_close(c);
576 return;
579 /* Scan the given string for the sequence "\r\n" and return the line length.
580 * Always returns at least 2 if a match is found. Returns 0 if no match. */
581 static int
582 scan_line_end(const char *s, int size)
584 char *match;
586 match = memchr(s, '\r', size - 1);
587 if (!match) return 0;
589 /* this is safe because we only scan size - 1 chars above */
590 if (match[1] == '\n') return match - s + 2;
592 return 0;
595 static int
596 cmd_len(conn c)
598 return scan_line_end(c->cmd, c->cmd_read);
601 /* parse the command line */
602 static int
603 which_cmd(conn c)
605 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
606 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
607 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
608 TEST_CMD(c->cmd, CMD_PEEK, OP_PEEK);
609 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
610 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
611 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
612 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
613 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
614 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
615 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
616 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
617 TEST_CMD(c->cmd, CMD_USE, OP_USE);
618 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
619 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
620 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
621 TEST_CMD(c->cmd, CMD_LIST_WATCHED_TUBES, OP_LIST_WATCHED_TUBES);
622 return OP_UNKNOWN;
625 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
626 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
627 * This function is idempotent(). */
628 static void
629 fill_extra_data(conn c)
631 int extra_bytes, job_data_bytes = 0, cmd_bytes;
633 if (!c->fd) return; /* the connection was closed */
634 if (!c->cmd_len) return; /* we don't have a complete command */
636 /* how many extra bytes did we read? */
637 extra_bytes = c->cmd_read - c->cmd_len;
639 /* how many bytes should we put into the job body? */
640 if (c->in_job) {
641 job_data_bytes = min(extra_bytes, c->in_job->body_size);
642 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
643 c->in_job_read = job_data_bytes;
646 /* how many bytes are left to go into the future cmd? */
647 cmd_bytes = extra_bytes - job_data_bytes;
648 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
649 c->cmd_read = cmd_bytes;
650 c->cmd_len = 0; /* we no longer know the length of the new command */
653 static void
654 enqueue_incoming_job(conn c)
656 int r;
657 job j = c->in_job;
659 c->in_job = NULL; /* the connection no longer owns this job */
660 c->in_job_read = 0;
662 /* check if the trailer is present and correct */
663 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
664 job_free(j);
665 return reply_msg(c, MSG_EXPECTED_CRLF);
668 /* we have a complete job, so let's stick it in the pqueue */
669 r = enqueue_job(j, j->delay);
670 put_ct++; /* stats */
671 global_stat.total_jobs_ct++;
672 j->tube->stat.total_jobs_ct++;
674 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
676 /* out of memory trying to grow the queue, so it gets buried */
677 bury_job(j);
678 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
681 static unsigned int
682 uptime()
684 return time(NULL) - start_time;
687 static int
688 fmt_stats(char *buf, size_t size, void *x)
690 struct rusage ru = {{0, 0}, {0, 0}};
691 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
692 return snprintf(buf, size, STATS_FMT,
693 global_stat.urgent_ct,
694 ready_ct,
695 global_stat.reserved_ct,
696 get_delayed_job_ct(),
697 global_stat.buried_ct,
698 put_ct,
699 peek_ct,
700 reserve_ct,
701 delete_ct,
702 release_ct,
703 bury_ct,
704 kick_ct,
705 stats_ct,
706 stats_job_ct,
707 stats_tube_ct,
708 list_tubes_ct,
709 list_watched_tubes_ct,
710 timeout_ct,
711 global_stat.total_jobs_ct,
712 tubes.used,
713 count_cur_conns(),
714 count_cur_producers(),
715 count_cur_workers(),
716 global_stat.waiting_ct,
717 count_tot_conns(),
718 getpid(),
719 VERSION,
720 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
721 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
722 uptime());
726 /* Read a priority value from the given buffer and place it in pri.
727 * Update end to point to the address after the last character consumed.
728 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
729 * conversion and return the status code but not update any values. This is an
730 * easy way to check for errors.
731 * If end is NULL, read_pri will also check that the entire input string was
732 * consumed and return an error code otherwise.
733 * Return 0 on success, or nonzero on failure.
734 * If a failure occurs, pri and end are not modified. */
735 static int
736 read_pri(unsigned int *pri, const char *buf, char **end)
738 char *tend;
739 unsigned int tpri;
741 errno = 0;
742 tpri = strtoul(buf, &tend, 10);
743 if (tend == buf) return -1;
744 if (errno && errno != ERANGE) return -1;
745 if (!end && tend[0] != '\0') return -1;
747 if (pri) *pri = tpri;
748 if (end) *end = tend;
749 return 0;
752 /* Read a delay value from the given buffer and place it in delay.
753 * The interface and behavior are the same as in read_pri(). */
754 static int
755 read_delay(unsigned int *delay, const char *buf, char **end)
757 return read_pri(delay, buf, end);
760 /* Read a timeout value from the given buffer and place it in ttr.
761 * The interface and behavior are the same as in read_pri(). */
762 static int
763 read_ttr(unsigned int *ttr, const char *buf, char **end)
765 return read_pri(ttr, buf, end);
768 static void
769 wait_for_job(conn c)
771 int r;
773 /* this conn is waiting, but we want to know if they hang up */
774 r = conn_update_evq(c, EV_READ | EV_PERSIST);
775 if (r == -1) return twarnx("update events failed"), conn_close(c);
777 c->state = STATE_WAIT;
778 enqueue_waiting_conn(c);
781 typedef int(*fmt_fn)(char *, size_t, void *);
783 static void
784 do_stats(conn c, fmt_fn fmt, void *data)
786 int r, stats_len;
788 /* first, measure how big a buffer we will need */
789 stats_len = fmt(NULL, 0, data);
791 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
792 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
794 /* now actually format the stats data */
795 r = fmt(c->out_job->body, stats_len, data);
796 if (r != stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
797 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
799 c->out_job_sent = 0;
800 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", stats_len - 2);
803 static void
804 do_list_tubes(conn c, ms l)
806 char *buf;
807 tube t;
808 size_t i, resp_z;
810 /* first, measure how big a buffer we will need */
811 resp_z = 6; /* initial "---\n" and final "\r\n" */
812 for (i = 0; i < l->used; i++) {
813 t = l->items[i];
814 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
817 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
818 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
820 /* now actually format the response */
821 buf = c->out_job->body;
822 buf += snprintf(buf, 5, "---\n");
823 for (i = 0; i < l->used; i++) {
824 t = l->items[i];
825 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
827 buf[0] = '\r';
828 buf[1] = '\n';
830 c->out_job_sent = 0;
831 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
834 static int
835 fmt_job_stats(char *buf, size_t size, job j)
837 time_t t;
839 t = time(NULL);
840 return snprintf(buf, size, JOB_STATS_FMT,
841 j->id,
842 j->tube->name,
843 job_state(j),
844 j->pri,
845 (unsigned int) (t - j->creation),
846 j->delay,
847 j->ttr,
848 (unsigned int) (j->deadline - t),
849 j->timeout_ct,
850 j->release_ct,
851 j->bury_ct,
852 j->kick_ct);
855 static int
856 fmt_stats_tube(char *buf, size_t size, tube t)
858 return snprintf(buf, size, STATS_TUBE_FMT,
859 t->name,
860 t->stat.urgent_ct,
861 t->ready.used,
862 t->stat.reserved_ct,
863 t->delay.used,
864 t->stat.buried_ct,
865 t->stat.total_jobs_ct,
866 t->stat.waiting_ct);
869 static void
870 maybe_enqueue_incoming_job(conn c)
872 job j = c->in_job;
874 /* do we have a complete job? */
875 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
877 /* otherwise we have incomplete data, so just keep waiting */
878 c->state = STATE_WANTDATA;
881 /* j can be NULL */
882 static job
883 remove_this_reserved_job(conn c, job j)
885 j = job_remove(j);
886 if (j) {
887 global_stat.reserved_ct--;
888 j->tube->stat.reserved_ct--;
890 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
891 return j;
894 static job
895 remove_reserved_job(conn c, unsigned long long int id)
897 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
900 static int
901 name_is_ok(const char *name, size_t max)
903 size_t len = strlen(name);
904 return len > 0 && len <= max &&
905 strspn(name, NAME_CHARS) == len && name[0] != '-';
908 static tube
909 find_tube(const char *name)
911 tube t;
912 size_t i;
914 for (i = 0; i < tubes.used; i++) {
915 t = tubes.items[i];
916 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
918 return NULL;
921 void
922 prot_remove_tube(tube t)
924 ms_remove(&tubes, t);
927 static tube
928 make_and_insert_tube(const char *name)
930 int r;
931 tube t = NULL;
933 t = make_tube(name);
934 if (!t) return NULL;
936 /* We want this global tube list to behave like "weak" refs, so don't
937 * increment the ref count. */
938 r = ms_append(&tubes, t);
939 if (!r) return tube_dref(t), NULL;
941 return t;
944 static tube
945 find_or_make_tube(const char *name)
947 return find_tube(name) ? : make_and_insert_tube(name);
950 static void
951 dispatch_cmd(conn c)
953 int r, i;
954 unsigned int count;
955 job j;
956 char type;
957 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
958 unsigned int pri, delay, ttr, body_size;
959 unsigned long long int id;
960 tube t = NULL;
962 /* NUL-terminate this string so we can use strtol and friends */
963 c->cmd[c->cmd_len - 2] = '\0';
965 /* check for possible maliciousness */
966 if (strlen(c->cmd) != c->cmd_len - 2) {
967 return reply_msg(c, MSG_BAD_FORMAT);
970 type = which_cmd(c);
971 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
973 switch (type) {
974 case OP_PUT:
975 if (drain_mode) return reply_serr(c, MSG_DRAINING);
977 r = read_pri(&pri, c->cmd + 4, &delay_buf);
978 if (r) return reply_msg(c, MSG_BAD_FORMAT);
980 r = read_delay(&delay, delay_buf, &ttr_buf);
981 if (r) return reply_msg(c, MSG_BAD_FORMAT);
983 r = read_ttr(&ttr, ttr_buf, &size_buf);
984 if (r) return reply_msg(c, MSG_BAD_FORMAT);
986 errno = 0;
987 body_size = strtoul(size_buf, &end_buf, 10);
988 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
990 if (body_size > JOB_DATA_SIZE_LIMIT) {
991 return reply_msg(c, MSG_JOB_TOO_BIG);
994 /* don't allow trailing garbage */
995 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
997 conn_set_producer(c);
999 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1001 fill_extra_data(c);
1003 /* it's possible we already have a complete job */
1004 maybe_enqueue_incoming_job(c);
1006 break;
1007 case OP_PEEK:
1008 /* don't allow trailing garbage */
1009 if (c->cmd_len != CMD_PEEK_LEN + 2) {
1010 return reply_msg(c, MSG_BAD_FORMAT);
1013 j = job_copy(peek_buried_job() ? : delay_q_peek());
1015 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1017 peek_ct++; /* stats */
1018 reply_job(c, j, MSG_FOUND);
1019 break;
1020 case OP_PEEKJOB:
1021 errno = 0;
1022 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1023 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1025 /* So, peek is annoying, because some other connection might free the
1026 * job while we are still trying to write it out. So we copy it and
1027 * then free the copy when it's done sending. */
1028 j = job_copy(peek_job(id));
1030 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1032 peek_ct++; /* stats */
1033 reply_job(c, j, MSG_FOUND);
1034 break;
1035 case OP_RESERVE:
1036 /* don't allow trailing garbage */
1037 if (c->cmd_len != CMD_RESERVE_LEN + 2) {
1038 return reply_msg(c, MSG_BAD_FORMAT);
1041 reserve_ct++; /* stats */
1042 conn_set_worker(c);
1044 /* try to get a new job for this guy */
1045 wait_for_job(c);
1046 process_queue();
1047 break;
1048 case OP_DELETE:
1049 errno = 0;
1050 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1051 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1053 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1055 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1057 delete_ct++; /* stats */
1058 job_free(j);
1060 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1061 break;
1062 case OP_RELEASE:
1063 errno = 0;
1064 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1065 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1067 r = read_pri(&pri, pri_buf, &delay_buf);
1068 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1070 r = read_delay(&delay, delay_buf, NULL);
1071 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1073 j = remove_reserved_job(c, id);
1075 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1077 j->pri = pri;
1078 j->delay = delay;
1079 j->release_ct++;
1080 release_ct++; /* stats */
1081 r = enqueue_job(j, delay);
1082 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1084 /* out of memory trying to grow the queue, so it gets buried */
1085 bury_job(j);
1086 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1087 break;
1088 case OP_BURY:
1089 errno = 0;
1090 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1091 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1093 r = read_pri(&pri, pri_buf, NULL);
1094 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1096 j = remove_reserved_job(c, id);
1098 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1100 j->pri = pri;
1101 bury_ct++; /* stats */
1102 bury_job(j);
1103 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1104 break;
1105 case OP_KICK:
1106 errno = 0;
1107 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1108 if (end_buf == c->cmd + CMD_KICK_LEN) {
1109 return reply_msg(c, MSG_BAD_FORMAT);
1111 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1113 kick_ct++; /* stats */
1115 i = kick_jobs(count);
1117 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1118 case OP_STATS:
1119 /* don't allow trailing garbage */
1120 if (c->cmd_len != CMD_STATS_LEN + 2) {
1121 return reply_msg(c, MSG_BAD_FORMAT);
1124 stats_ct++; /* stats */
1126 do_stats(c, fmt_stats, NULL);
1127 break;
1128 case OP_JOBSTATS:
1129 errno = 0;
1130 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1131 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1133 j = peek_job(id);
1134 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1136 stats_job_ct++; /* stats */
1138 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1139 do_stats(c, (fmt_fn) fmt_job_stats, j);
1140 break;
1141 case OP_STATS_TUBE:
1142 name = c->cmd + CMD_STATS_TUBE_LEN;
1143 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1145 t = find_tube(name);
1146 if (!t) return reply_msg(c, MSG_NOTFOUND);
1148 stats_tube_ct++; /* stats */
1150 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1151 break;
1152 case OP_LIST_TUBES:
1153 /* don't allow trailing garbage */
1154 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1155 return reply_msg(c, MSG_BAD_FORMAT);
1158 list_tubes_ct++;
1159 do_list_tubes(c, &tubes);
1160 break;
1161 case OP_LIST_WATCHED_TUBES:
1162 /* don't allow trailing garbage */
1163 if (c->cmd_len != CMD_LIST_WATCHED_TUBES_LEN + 2) {
1164 return reply_msg(c, MSG_BAD_FORMAT);
1167 list_watched_tubes_ct++;
1168 do_list_tubes(c, &c->watch);
1169 break;
1170 case OP_USE:
1171 name = c->cmd + CMD_USE_LEN;
1172 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1174 TUBE_ASSIGN(t, find_or_make_tube(name));
1175 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1177 TUBE_ASSIGN(c->use, t);
1178 TUBE_ASSIGN(t, NULL);
1180 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1181 break;
1182 case OP_WATCH:
1183 name = c->cmd + CMD_WATCH_LEN;
1184 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1186 TUBE_ASSIGN(t, find_or_make_tube(name));
1187 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1189 r = 1;
1190 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1191 TUBE_ASSIGN(t, NULL);
1192 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1194 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1195 break;
1196 case OP_IGNORE:
1197 name = c->cmd + CMD_IGNORE_LEN;
1198 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1200 t = NULL;
1201 for (i = 0; i < c->watch.used; i++) {
1202 t = c->watch.items[i];
1203 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1204 t = NULL;
1207 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1209 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1210 t = NULL;
1212 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1213 break;
1214 default:
1215 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1219 /* if we get a timeout, it means that a job has been reserved for too long, so
1220 * we should put it back in the queue */
1221 static void
1222 h_conn_timeout(conn c)
1224 int r;
1225 job j;
1227 while ((j = soonest_job(c))) {
1228 if (j->deadline > time(NULL)) return;
1229 timeout_ct++; /* stats */
1230 j->timeout_ct++;
1231 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1232 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1233 r = conn_update_evq(c, c->evq.ev_events);
1234 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1238 void
1239 enter_drain_mode(int sig)
1241 drain_mode = 1;
1244 static void
1245 do_cmd(conn c)
1247 dispatch_cmd(c);
1248 fill_extra_data(c);
1251 static void
1252 reset_conn(conn c)
1254 int r;
1256 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1257 if (r == -1) return twarnx("update events failed"), conn_close(c);
1259 /* was this a peek or stats command? */
1260 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1261 c->out_job = NULL;
1263 c->reply_sent = 0; /* now that we're done, reset this */
1264 c->state = STATE_WANTCOMMAND;
1267 static void
1268 h_conn_data(conn c)
1270 int r;
1271 job j;
1272 struct iovec iov[2];
1274 switch (c->state) {
1275 case STATE_WANTCOMMAND:
1276 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1277 if (r == -1) return check_err(c, "read()");
1278 if (r == 0) return conn_close(c); /* the client hung up */
1280 c->cmd_read += r; /* we got some bytes */
1282 c->cmd_len = cmd_len(c); /* find the EOL */
1283 dprintf("cmd_len is %d\n", c->cmd_len);
1285 /* yay, complete command line */
1286 if (c->cmd_len) return do_cmd(c);
1288 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1290 dprintf("cmd_read is %d\n", c->cmd_read);
1291 /* command line too long? */
1292 if (c->cmd_read == LINE_BUF_SIZE) {
1293 return reply_msg(c, MSG_BAD_FORMAT);
1296 /* otherwise we have an incomplete line, so just keep waiting */
1297 break;
1298 case STATE_WANTDATA:
1299 j = c->in_job;
1301 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1302 if (r == -1) return check_err(c, "read()");
1303 if (r == 0) return conn_close(c); /* the client hung up */
1305 c->in_job_read += r; /* we got some bytes */
1307 /* (j->in_job_read > j->body_size) can't happen */
1309 maybe_enqueue_incoming_job(c);
1310 break;
1311 case STATE_SENDWORD:
1312 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1313 if (r == -1) return check_err(c, "write()");
1314 if (r == 0) return conn_close(c); /* the client hung up */
1316 c->reply_sent += r; /* we got some bytes */
1318 /* (c->reply_sent > c->reply_len) can't happen */
1320 if (c->reply_sent == c->reply_len) return reset_conn(c);
1322 /* otherwise we sent an incomplete reply, so just keep waiting */
1323 break;
1324 case STATE_SENDJOB:
1325 j = c->out_job;
1327 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1328 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1329 iov[1].iov_base = j->body + c->out_job_sent;
1330 iov[1].iov_len = j->body_size - c->out_job_sent;
1332 r = writev(c->fd, iov, 2);
1333 if (r == -1) return check_err(c, "writev()");
1334 if (r == 0) return conn_close(c); /* the client hung up */
1336 /* update the sent values */
1337 c->reply_sent += r;
1338 if (c->reply_sent >= c->reply_len) {
1339 c->out_job_sent += c->reply_sent - c->reply_len;
1340 c->reply_sent = c->reply_len;
1343 /* (c->out_job_sent > j->body_size) can't happen */
1345 /* are we done? */
1346 if (c->out_job_sent == j->body_size) return reset_conn(c);
1348 /* otherwise we sent incomplete data, so just keep waiting */
1349 break;
1350 case STATE_WAIT: /* keep an eye out in case they hang up */
1351 /* but don't hang up just because our buffer is full */
1352 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1354 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1355 if (r == -1) return check_err(c, "read()");
1356 if (r == 0) return conn_close(c); /* the client hung up */
1357 c->cmd_read += r; /* we got some bytes */
1361 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1362 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1364 static void
1365 h_conn(const int fd, const short which, conn c)
1367 if (fd != c->fd) {
1368 twarnx("Argh! event fd doesn't match conn fd.");
1369 close(fd);
1370 return conn_close(c);
1373 switch (which) {
1374 case EV_TIMEOUT:
1375 h_conn_timeout(c);
1376 event_add(&c->evq, NULL); /* seems to be necessary */
1377 break;
1378 case EV_READ:
1379 /* fall through... */
1380 case EV_WRITE:
1381 /* fall through... */
1382 default:
1383 h_conn_data(c);
1386 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1389 static void
1390 h_delay()
1392 int r;
1393 job j;
1394 time_t t;
1396 t = time(NULL);
1397 while ((j = delay_q_peek())) {
1398 if (j->deadline > t) break;
1399 j = delay_q_take();
1400 r = enqueue_job(j, 0);
1401 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1404 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
1407 void
1408 h_accept(const int fd, const short which, struct event *ev)
1410 conn c;
1411 int cfd, flags, r;
1412 socklen_t addrlen;
1413 struct sockaddr addr;
1415 if (which == EV_TIMEOUT) return h_delay();
1417 addrlen = sizeof addr;
1418 cfd = accept(fd, &addr, &addrlen);
1419 if (cfd == -1) {
1420 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1421 if (errno == EMFILE) brake();
1422 return;
1425 flags = fcntl(cfd, F_GETFL, 0);
1426 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1428 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1429 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1431 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1432 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1434 dprintf("accepted conn, fd=%d\n", cfd);
1435 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1436 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1439 void
1440 prot_init()
1442 start_time = time(NULL);
1443 pq_init(&ready_q, job_pri_cmp);
1444 pq_init(&delay_q, job_delay_cmp);
1446 ms_init(&tubes, NULL, NULL);
1448 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1449 if (!default_tube) twarnx("Out of memory during startup!");