Ignore git's scratch files during a rebase etc.
[beanstalkd.git] / prot.c
blob17c50ed7c4a92af977df1c487707622124b2d872
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "binlog.h"
38 #include "config.h"
40 /* job body cannot be greater than this many bytes long */
41 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
43 #define NAME_CHARS \
44 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
45 "abcdefghijklmnopqrstuvwxyz" \
46 "0123456789-+/;.$()"
48 #define CMD_PUT "put "
49 #define CMD_PEEKJOB "peek "
50 #define CMD_PEEK_READY "peek-ready"
51 #define CMD_PEEK_DELAYED "peek-delayed"
52 #define CMD_PEEK_BURIED "peek-buried"
53 #define CMD_RESERVE "reserve"
54 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
55 #define CMD_DELETE "delete "
56 #define CMD_RELEASE "release "
57 #define CMD_BURY "bury "
58 #define CMD_KICK "kick "
59 #define CMD_TOUCH "touch "
60 #define CMD_STATS "stats"
61 #define CMD_JOBSTATS "stats-job "
62 #define CMD_USE "use "
63 #define CMD_WATCH "watch "
64 #define CMD_IGNORE "ignore "
65 #define CMD_LIST_TUBES "list-tubes"
66 #define CMD_LIST_TUBE_USED "list-tube-used"
67 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
68 #define CMD_STATS_TUBE "stats-tube "
70 #define CONSTSTRLEN(m) (sizeof(m) - 1)
72 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
73 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
74 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
75 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
76 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
77 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
78 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
79 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
80 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
81 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
82 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
83 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
84 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
85 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
86 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
87 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
88 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
89 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
90 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
91 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
93 #define MSG_FOUND "FOUND"
94 #define MSG_NOTFOUND "NOT_FOUND\r\n"
95 #define MSG_RESERVED "RESERVED"
96 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
97 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
98 #define MSG_DELETED "DELETED\r\n"
99 #define MSG_RELEASED "RELEASED\r\n"
100 #define MSG_BURIED "BURIED\r\n"
101 #define MSG_TOUCHED "TOUCHED\r\n"
102 #define MSG_BURIED_FMT "BURIED %llu\r\n"
103 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
104 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
106 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
107 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
108 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
109 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
110 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
111 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
113 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
114 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
115 #define MSG_DRAINING "DRAINING\r\n"
116 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
117 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
118 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
119 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
121 #define STATE_WANTCOMMAND 0
122 #define STATE_WANTDATA 1
123 #define STATE_SENDJOB 2
124 #define STATE_SENDWORD 3
125 #define STATE_WAIT 4
126 #define STATE_BITBUCKET 5
128 #define OP_UNKNOWN 0
129 #define OP_PUT 1
130 #define OP_PEEKJOB 2
131 #define OP_RESERVE 3
132 #define OP_DELETE 4
133 #define OP_RELEASE 5
134 #define OP_BURY 6
135 #define OP_KICK 7
136 #define OP_STATS 8
137 #define OP_JOBSTATS 9
138 #define OP_PEEK_BURIED 10
139 #define OP_USE 11
140 #define OP_WATCH 12
141 #define OP_IGNORE 13
142 #define OP_LIST_TUBES 14
143 #define OP_LIST_TUBE_USED 15
144 #define OP_LIST_TUBES_WATCHED 16
145 #define OP_STATS_TUBE 17
146 #define OP_PEEK_READY 18
147 #define OP_PEEK_DELAYED 19
148 #define OP_RESERVE_TIMEOUT 20
149 #define OP_TOUCH 21
150 #define TOTAL_OPS 22
152 #define STATS_FMT "---\n" \
153 "current-jobs-urgent: %u\n" \
154 "current-jobs-ready: %u\n" \
155 "current-jobs-reserved: %u\n" \
156 "current-jobs-delayed: %u\n" \
157 "current-jobs-buried: %u\n" \
158 "cmd-put: %llu\n" \
159 "cmd-peek: %llu\n" \
160 "cmd-peek-ready: %llu\n" \
161 "cmd-peek-delayed: %llu\n" \
162 "cmd-peek-buried: %llu\n" \
163 "cmd-reserve: %llu\n" \
164 "cmd-reserve-with-timeout: %llu\n" \
165 "cmd-delete: %llu\n" \
166 "cmd-release: %llu\n" \
167 "cmd-use: %llu\n" \
168 "cmd-watch: %llu\n" \
169 "cmd-ignore: %llu\n" \
170 "cmd-bury: %llu\n" \
171 "cmd-kick: %llu\n" \
172 "cmd-touch: %llu\n" \
173 "cmd-stats: %llu\n" \
174 "cmd-stats-job: %llu\n" \
175 "cmd-stats-tube: %llu\n" \
176 "cmd-list-tubes: %llu\n" \
177 "cmd-list-tube-used: %llu\n" \
178 "cmd-list-tubes-watched: %llu\n" \
179 "job-timeouts: %llu\n" \
180 "total-jobs: %llu\n" \
181 "max-job-size: %zu\n" \
182 "current-tubes: %zu\n" \
183 "current-connections: %u\n" \
184 "current-producers: %u\n" \
185 "current-workers: %u\n" \
186 "current-waiting: %u\n" \
187 "total-connections: %u\n" \
188 "pid: %u\n" \
189 "version: %s\n" \
190 "rusage-utime: %d.%06d\n" \
191 "rusage-stime: %d.%06d\n" \
192 "uptime: %u\n" \
193 "\r\n"
195 #define STATS_TUBE_FMT "---\n" \
196 "name: %s\n" \
197 "current-jobs-urgent: %u\n" \
198 "current-jobs-ready: %u\n" \
199 "current-jobs-reserved: %u\n" \
200 "current-jobs-delayed: %u\n" \
201 "current-jobs-buried: %u\n" \
202 "total-jobs: %llu\n" \
203 "current-using: %u\n" \
204 "current-watching: %u\n" \
205 "current-waiting: %u\n" \
206 "\r\n"
208 #define JOB_STATS_FMT "---\n" \
209 "id: %llu\n" \
210 "tube: %s\n" \
211 "state: %s\n" \
212 "pri: %u\n" \
213 "age: %u\n" \
214 "delay: %u\n" \
215 "ttr: %u\n" \
216 "time-left: %u\n" \
217 "timeouts: %u\n" \
218 "releases: %u\n" \
219 "buries: %u\n" \
220 "kicks: %u\n" \
221 "\r\n"
223 /* this number is pretty arbitrary */
224 #define BUCKET_BUF_SIZE 1024
226 static char bucket[BUCKET_BUF_SIZE];
228 static unsigned int ready_ct = 0;
229 static struct stats global_stat = {0, 0, 0, 0, 0};
231 static tube default_tube;
233 static int drain_mode = 0;
234 static time_t start_time;
235 static unsigned long long int op_ct[TOTAL_OPS], timeout_ct = 0;
238 /* Doubly-linked list of connections with at least one reserved job. */
239 static struct conn running = { &running, &running, 0 };
241 #ifdef DEBUG
242 static const char * op_names[] = {
243 "<unknown>",
244 CMD_PUT,
245 CMD_PEEKJOB,
246 CMD_RESERVE,
247 CMD_DELETE,
248 CMD_RELEASE,
249 CMD_BURY,
250 CMD_KICK,
251 CMD_STATS,
252 CMD_JOBSTATS,
253 CMD_PEEK_BURIED,
254 CMD_USE,
255 CMD_WATCH,
256 CMD_IGNORE,
257 CMD_LIST_TUBES,
258 CMD_LIST_TUBE_USED,
259 CMD_LIST_TUBES_WATCHED,
260 CMD_STATS_TUBE,
261 CMD_PEEK_READY,
262 CMD_PEEK_DELAYED,
263 CMD_RESERVE_TIMEOUT,
264 CMD_TOUCH
266 #endif
268 static job remove_buried_job(job j);
270 static int
271 buried_job_p(tube t)
273 return job_list_any_p(&t->buried);
276 static void
277 reply(conn c, const char *line, int len, int state)
279 int r;
281 if (!c) return;
283 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
284 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
286 c->reply = line;
287 c->reply_len = len;
288 c->reply_sent = 0;
289 c->state = state;
290 dprintf("sending reply: %.*s", len, line);
293 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
295 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
296 reply_msg((c),(e)))
298 static void
299 reply_line(conn c, int state, const char *fmt, ...)
301 int r;
302 va_list ap;
304 va_start(ap, fmt);
305 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
306 va_end(ap);
308 /* Make sure the buffer was big enough. If not, we have a bug. */
309 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
311 return reply(c, c->reply_buf, r, state);
314 static void
315 reply_job(conn c, job j, const char *word)
317 /* tell this connection which job to send */
318 c->out_job = j;
319 c->out_job_sent = 0;
321 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
322 word, j->id, j->body_size - 2);
325 conn
326 remove_waiting_conn(conn c)
328 tube t;
329 size_t i;
331 if (!conn_waiting(c)) return NULL;
333 c->type &= ~CONN_TYPE_WAITING;
334 global_stat.waiting_ct--;
335 for (i = 0; i < c->watch.used; i++) {
336 t = c->watch.items[i];
337 t->stat.waiting_ct--;
338 ms_remove(&t->waiting, c);
340 return c;
343 static void
344 reserve_job(conn c, job j)
346 j->deadline = time(NULL) + j->ttr;
347 global_stat.reserved_ct++; /* stats */
348 j->tube->stat.reserved_ct++;
349 conn_insert(&running, c);
350 j->state = JOB_STATE_RESERVED;
351 job_insert(&c->reserved_jobs, j);
352 j->reserver = c;
353 if (c->soonest_job && j->deadline < c->soonest_job->deadline) {
354 c->soonest_job = j;
356 return reply_job(c, j, MSG_RESERVED);
359 static job
360 next_eligible_job()
362 tube t;
363 size_t i;
364 job j = NULL, candidate;
366 dprintf("tubes.used = %zu\n", tubes.used);
367 for (i = 0; i < tubes.used; i++) {
368 t = tubes.items[i];
369 dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
370 t->name, t->waiting.used, t->ready.used);
371 if (t->waiting.used && t->ready.used) {
372 candidate = pq_peek(&t->ready);
373 if (!j || candidate->id < j->id) j = candidate;
375 dprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
378 return j;
381 static void
382 process_queue()
384 job j;
386 dprintf("processing queue\n");
387 while ((j = next_eligible_job())) {
388 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
389 j = pq_take(&j->tube->ready);
390 ready_ct--;
391 if (j->pri < URGENT_THRESHOLD) {
392 global_stat.urgent_ct--;
393 j->tube->stat.urgent_ct--;
395 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
399 static job
400 delay_q_peek()
402 int i;
403 tube t;
404 job j = NULL, nj;
406 for (i = 0; i < tubes.used; i++) {
407 t = tubes.items[i];
408 nj = pq_peek(&t->delay);
409 if (!nj) continue;
410 if (!j || nj->deadline < j->deadline) j = nj;
413 return j;
416 static void
417 set_main_delay_timeout()
419 job j;
421 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
424 static int
425 enqueue_job(job j, unsigned int delay)
427 int r;
429 j->reserver = NULL;
430 if (delay) {
431 j->deadline = time(NULL) + delay;
432 r = pq_give(&j->tube->delay, j);
433 if (!r) return 0;
434 j->state = JOB_STATE_DELAYED;
435 set_main_delay_timeout();
436 } else {
437 r = pq_give(&j->tube->ready, j);
438 if (!r) return 0;
439 j->state = JOB_STATE_READY;
440 ready_ct++;
441 if (j->pri < URGENT_THRESHOLD) {
442 global_stat.urgent_ct++;
443 j->tube->stat.urgent_ct++;
446 binlog_write_job(j);
447 process_queue();
448 return 1;
451 static void
452 bury_job(job j)
454 job_insert(&j->tube->buried, j);
455 global_stat.buried_ct++;
456 j->tube->stat.buried_ct++;
457 j->state = JOB_STATE_BURIED;
458 j->reserver = NULL;
459 j->bury_ct++;
460 binlog_write_job(j);
463 void
464 enqueue_reserved_jobs(conn c)
466 int r;
467 job j;
469 while (job_list_any_p(&c->reserved_jobs)) {
470 j = job_remove(c->reserved_jobs.next);
471 r = enqueue_job(j, 0);
472 if (!r) bury_job(j);
473 global_stat.reserved_ct--;
474 j->tube->stat.reserved_ct--;
475 c->soonest_job = NULL;
476 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
480 static job
481 delay_q_take()
483 job j = delay_q_peek();
484 return j ? pq_take(&j->tube->delay) : NULL;
487 static int
488 kick_buried_job(tube t)
490 int r;
491 job j;
493 if (!buried_job_p(t)) return 0;
494 j = remove_buried_job(t->buried.next);
495 j->kick_ct++;
496 r = enqueue_job(j, 0);
497 if (r) return 1;
499 /* ready queue is full, so bury it */
500 bury_job(j);
501 return 0;
504 static unsigned int
505 get_delayed_job_ct()
507 tube t;
508 size_t i;
509 unsigned int count = 0;
511 for (i = 0; i < tubes.used; i++) {
512 t = tubes.items[i];
513 count += t->delay.used;
515 return count;
518 static int
519 kick_delayed_job(tube t)
521 int r;
522 job j;
524 j = pq_take(&t->delay);
525 if (!j) return 0;
526 j->kick_ct++;
527 r = enqueue_job(j, 0);
528 if (r) return 1;
530 /* ready queue is full, so delay it again */
531 r = enqueue_job(j, j->delay);
532 if (r) return 0;
534 /* last resort */
535 bury_job(j);
536 return 0;
539 /* return the number of jobs successfully kicked */
540 static unsigned int
541 kick_buried_jobs(tube t, unsigned int n)
543 unsigned int i;
544 for (i = 0; (i < n) && kick_buried_job(t); ++i);
545 return i;
548 /* return the number of jobs successfully kicked */
549 static unsigned int
550 kick_delayed_jobs(tube t, unsigned int n)
552 unsigned int i;
553 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
554 return i;
557 static unsigned int
558 kick_jobs(tube t, unsigned int n)
560 if (buried_job_p(t)) return kick_buried_jobs(t, n);
561 return kick_delayed_jobs(t, n);
564 static job
565 remove_buried_job(job j)
567 if (!j || j->state != JOB_STATE_BURIED) return NULL;
568 j = job_remove(j);
569 if (j) {
570 global_stat.buried_ct--;
571 j->tube->stat.buried_ct--;
573 return j;
576 static job
577 remove_ready_job(job j)
579 if (!j || j->state != JOB_STATE_READY) return NULL;
580 j = pq_remove(&j->tube->ready, j);
581 if (j) {
582 ready_ct--;
583 if (j->pri < URGENT_THRESHOLD) {
584 global_stat.urgent_ct--;
585 j->tube->stat.urgent_ct--;
588 return j;
591 static void
592 enqueue_waiting_conn(conn c)
594 tube t;
595 size_t i;
597 global_stat.waiting_ct++;
598 c->type |= CONN_TYPE_WAITING;
599 for (i = 0; i < c->watch.used; i++) {
600 t = c->watch.items[i];
601 t->stat.waiting_ct++;
602 ms_append(&t->waiting, c);
606 static job
607 find_reserved_job_in_conn(conn c, job j)
609 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
612 static job
613 touch_job(conn c, job j)
615 j = find_reserved_job_in_conn(c, j);
616 if (j) {
617 j->deadline = time(NULL) + j->ttr;
618 c->soonest_job = NULL;
620 return j;
623 static job
624 peek_job(unsigned long long int id)
626 return job_find(id);
629 static void
630 check_err(conn c, const char *s)
632 if (errno == EAGAIN) return;
633 if (errno == EINTR) return;
634 if (errno == EWOULDBLOCK) return;
636 twarn("%s", s);
637 conn_close(c);
638 return;
641 /* Scan the given string for the sequence "\r\n" and return the line length.
642 * Always returns at least 2 if a match is found. Returns 0 if no match. */
643 static int
644 scan_line_end(const char *s, int size)
646 char *match;
648 match = memchr(s, '\r', size - 1);
649 if (!match) return 0;
651 /* this is safe because we only scan size - 1 chars above */
652 if (match[1] == '\n') return match - s + 2;
654 return 0;
657 static int
658 cmd_len(conn c)
660 return scan_line_end(c->cmd, c->cmd_read);
663 /* parse the command line */
664 static int
665 which_cmd(conn c)
667 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
668 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
669 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
670 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
671 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
672 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
673 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
674 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
675 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
676 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
677 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
678 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
679 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
680 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
681 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
682 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
683 TEST_CMD(c->cmd, CMD_USE, OP_USE);
684 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
685 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
686 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
687 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
688 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
689 return OP_UNKNOWN;
692 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
693 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
694 * This function is idempotent(). */
695 static void
696 fill_extra_data(conn c)
698 int extra_bytes, job_data_bytes = 0, cmd_bytes;
700 if (!c->fd) return; /* the connection was closed */
701 if (!c->cmd_len) return; /* we don't have a complete command */
703 /* how many extra bytes did we read? */
704 extra_bytes = c->cmd_read - c->cmd_len;
706 /* how many bytes should we put into the job body? */
707 if (c->in_job) {
708 job_data_bytes = min(extra_bytes, c->in_job->body_size);
709 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
710 c->in_job_read = job_data_bytes;
711 } else if (c->in_job_read) {
712 /* we are in bit-bucket mode, throwing away data */
713 job_data_bytes = min(extra_bytes, c->in_job_read);
714 c->in_job_read -= job_data_bytes;
717 /* how many bytes are left to go into the future cmd? */
718 cmd_bytes = extra_bytes - job_data_bytes;
719 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
720 c->cmd_read = cmd_bytes;
721 c->cmd_len = 0; /* we no longer know the length of the new command */
724 static void
725 enqueue_incoming_job(conn c)
727 int r;
728 job j = c->in_job;
730 c->in_job = NULL; /* the connection no longer owns this job */
731 c->in_job_read = 0;
733 /* check if the trailer is present and correct */
734 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
735 job_free(j);
736 return reply_msg(c, MSG_EXPECTED_CRLF);
739 if (drain_mode) {
740 job_free(j);
741 return reply_serr(c, MSG_DRAINING);
744 /* we have a complete job, so let's stick it in the pqueue */
745 r = enqueue_job(j, j->delay);
746 op_ct[OP_PUT]++; /* stats */
747 global_stat.total_jobs_ct++;
748 j->tube->stat.total_jobs_ct++;
750 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
752 /* out of memory trying to grow the queue, so it gets buried */
753 bury_job(j);
754 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
757 static unsigned int
758 uptime()
760 return time(NULL) - start_time;
763 static int
764 fmt_stats(char *buf, size_t size, void *x)
766 struct rusage ru = {{0, 0}, {0, 0}};
767 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
768 return snprintf(buf, size, STATS_FMT,
769 global_stat.urgent_ct,
770 ready_ct,
771 global_stat.reserved_ct,
772 get_delayed_job_ct(),
773 global_stat.buried_ct,
774 op_ct[OP_PUT],
775 op_ct[OP_PEEKJOB],
776 op_ct[OP_PEEK_READY],
777 op_ct[OP_PEEK_DELAYED],
778 op_ct[OP_PEEK_BURIED],
779 op_ct[OP_RESERVE],
780 op_ct[OP_RESERVE_TIMEOUT],
781 op_ct[OP_DELETE],
782 op_ct[OP_RELEASE],
783 op_ct[OP_USE],
784 op_ct[OP_WATCH],
785 op_ct[OP_IGNORE],
786 op_ct[OP_BURY],
787 op_ct[OP_KICK],
788 op_ct[OP_TOUCH],
789 op_ct[OP_STATS],
790 op_ct[OP_JOBSTATS],
791 op_ct[OP_STATS_TUBE],
792 op_ct[OP_LIST_TUBES],
793 op_ct[OP_LIST_TUBE_USED],
794 op_ct[OP_LIST_TUBES_WATCHED],
795 timeout_ct,
796 global_stat.total_jobs_ct,
797 job_data_size_limit,
798 tubes.used,
799 count_cur_conns(),
800 count_cur_producers(),
801 count_cur_workers(),
802 global_stat.waiting_ct,
803 count_tot_conns(),
804 getpid(),
805 VERSION,
806 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
807 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
808 uptime());
812 /* Read a priority value from the given buffer and place it in pri.
813 * Update end to point to the address after the last character consumed.
814 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
815 * conversion and return the status code but not update any values. This is an
816 * easy way to check for errors.
817 * If end is NULL, read_pri will also check that the entire input string was
818 * consumed and return an error code otherwise.
819 * Return 0 on success, or nonzero on failure.
820 * If a failure occurs, pri and end are not modified. */
821 static int
822 read_pri(unsigned int *pri, const char *buf, char **end)
824 char *tend;
825 unsigned int tpri;
827 errno = 0;
828 while (buf[0] == ' ') buf++;
829 if (!isdigit(buf[0])) return -1;
830 tpri = strtoul(buf, &tend, 10);
831 if (tend == buf) return -1;
832 if (errno && errno != ERANGE) return -1;
833 if (!end && tend[0] != '\0') return -1;
835 if (pri) *pri = tpri;
836 if (end) *end = tend;
837 return 0;
840 /* Read a delay value from the given buffer and place it in delay.
841 * The interface and behavior are the same as in read_pri(). */
842 static int
843 read_delay(unsigned int *delay, const char *buf, char **end)
845 time_t now = time(NULL);
846 return read_pri(delay, buf, end) ? : (now + *delay < now);
849 /* Read a timeout value from the given buffer and place it in ttr.
850 * The interface and behavior are the same as in read_pri(). */
851 static int
852 read_ttr(unsigned int *ttr, const char *buf, char **end)
854 return read_pri(ttr, buf, end);
857 static void
858 wait_for_job(conn c, int timeout)
860 int r;
862 c->state = STATE_WAIT;
863 enqueue_waiting_conn(c);
865 /* Set the pending timeout to the requested timeout amount */
866 c->pending_timeout = timeout;
868 /* this conn is waiting, but we want to know if they hang up */
869 r = conn_update_evq(c, EV_READ | EV_PERSIST);
870 if (r == -1) return twarnx("update events failed"), conn_close(c);
873 typedef int(*fmt_fn)(char *, size_t, void *);
875 static void
876 do_stats(conn c, fmt_fn fmt, void *data)
878 int r, stats_len;
880 /* first, measure how big a buffer we will need */
881 stats_len = fmt(NULL, 0, data) + 16;
883 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
884 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
886 /* now actually format the stats data */
887 r = fmt(c->out_job->body, stats_len, data);
888 /* and set the actual body size */
889 c->out_job->body_size = r;
890 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
892 c->out_job_sent = 0;
893 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
896 static void
897 do_list_tubes(conn c, ms l)
899 char *buf;
900 tube t;
901 size_t i, resp_z;
903 /* first, measure how big a buffer we will need */
904 resp_z = 6; /* initial "---\n" and final "\r\n" */
905 for (i = 0; i < l->used; i++) {
906 t = l->items[i];
907 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
910 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
911 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
913 /* now actually format the response */
914 buf = c->out_job->body;
915 buf += snprintf(buf, 5, "---\n");
916 for (i = 0; i < l->used; i++) {
917 t = l->items[i];
918 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
920 buf[0] = '\r';
921 buf[1] = '\n';
923 c->out_job_sent = 0;
924 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
927 static int
928 fmt_job_stats(char *buf, size_t size, job j)
930 time_t t;
932 t = time(NULL);
933 return snprintf(buf, size, JOB_STATS_FMT,
934 j->id,
935 j->tube->name,
936 job_state(j),
937 j->pri,
938 (unsigned int) (t - j->creation),
939 j->delay,
940 j->ttr,
941 (unsigned int) (j->deadline - t),
942 j->timeout_ct,
943 j->release_ct,
944 j->bury_ct,
945 j->kick_ct);
948 static int
949 fmt_stats_tube(char *buf, size_t size, tube t)
951 return snprintf(buf, size, STATS_TUBE_FMT,
952 t->name,
953 t->stat.urgent_ct,
954 t->ready.used,
955 t->stat.reserved_ct,
956 t->delay.used,
957 t->stat.buried_ct,
958 t->stat.total_jobs_ct,
959 t->using_ct,
960 t->watching_ct,
961 t->stat.waiting_ct);
964 static void
965 maybe_enqueue_incoming_job(conn c)
967 job j = c->in_job;
969 /* do we have a complete job? */
970 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
972 /* otherwise we have incomplete data, so just keep waiting */
973 c->state = STATE_WANTDATA;
976 /* j can be NULL */
977 static job
978 remove_this_reserved_job(conn c, job j)
980 j = job_remove(j);
981 if (j) {
982 global_stat.reserved_ct--;
983 j->tube->stat.reserved_ct--;
984 j->reserver = NULL;
986 c->soonest_job = NULL;
987 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
988 return j;
991 static job
992 remove_reserved_job(conn c, job j)
994 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
997 static int
998 name_is_ok(const char *name, size_t max)
1000 size_t len = strlen(name);
1001 return len > 0 && len <= max &&
1002 strspn(name, NAME_CHARS) == len && name[0] != '-';
1005 void
1006 prot_remove_tube(tube t)
1008 ms_remove(&tubes, t);
1011 static void
1012 dispatch_cmd(conn c)
1014 int r, i, timeout = -1;
1015 unsigned int count;
1016 job j;
1017 unsigned char type;
1018 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1019 unsigned int pri, delay, ttr, body_size;
1020 unsigned long long int id;
1021 tube t = NULL;
1023 /* NUL-terminate this string so we can use strtol and friends */
1024 c->cmd[c->cmd_len - 2] = '\0';
1026 /* check for possible maliciousness */
1027 if (strlen(c->cmd) != c->cmd_len - 2) {
1028 return reply_msg(c, MSG_BAD_FORMAT);
1031 type = which_cmd(c);
1032 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1034 switch (type) {
1035 case OP_PUT:
1036 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1037 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1039 r = read_delay(&delay, delay_buf, &ttr_buf);
1040 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1042 r = read_ttr(&ttr, ttr_buf, &size_buf);
1043 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1045 errno = 0;
1046 body_size = strtoul(size_buf, &end_buf, 10);
1047 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1049 if (body_size > job_data_size_limit) {
1050 return reply_msg(c, MSG_JOB_TOO_BIG);
1053 /* don't allow trailing garbage */
1054 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1056 conn_set_producer(c);
1058 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1060 /* OOM? */
1061 if (!c->in_job) {
1062 /* throw away the job body and respond with OUT_OF_MEMORY */
1064 /* Invert the meaning of in_job_read while throwing away data -- it
1065 * counts the bytes that remain to be thrown away. */
1066 c->in_job_read = body_size + 2;
1067 fill_extra_data(c);
1069 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1071 c->state = STATE_BITBUCKET;
1072 return;
1075 fill_extra_data(c);
1077 /* it's possible we already have a complete job */
1078 maybe_enqueue_incoming_job(c);
1080 break;
1081 case OP_PEEK_READY:
1082 /* don't allow trailing garbage */
1083 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1084 return reply_msg(c, MSG_BAD_FORMAT);
1086 op_ct[type]++;
1088 j = job_copy(pq_peek(&c->use->ready));
1090 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1092 reply_job(c, j, MSG_FOUND);
1093 break;
1094 case OP_PEEK_DELAYED:
1095 /* don't allow trailing garbage */
1096 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1097 return reply_msg(c, MSG_BAD_FORMAT);
1099 op_ct[type]++;
1101 j = job_copy(pq_peek(&c->use->delay));
1103 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1105 reply_job(c, j, MSG_FOUND);
1106 break;
1107 case OP_PEEK_BURIED:
1108 /* don't allow trailing garbage */
1109 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1110 return reply_msg(c, MSG_BAD_FORMAT);
1112 op_ct[type]++;
1114 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1116 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1118 reply_job(c, j, MSG_FOUND);
1119 break;
1120 case OP_PEEKJOB:
1121 errno = 0;
1122 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1123 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1124 op_ct[type]++;
1126 /* So, peek is annoying, because some other connection might free the
1127 * job while we are still trying to write it out. So we copy it and
1128 * then free the copy when it's done sending. */
1129 j = job_copy(peek_job(id));
1131 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1133 reply_job(c, j, MSG_FOUND);
1134 break;
1135 case OP_RESERVE_TIMEOUT:
1136 errno = 0;
1137 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1138 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1139 case OP_RESERVE: /* FALLTHROUGH */
1140 /* don't allow trailing garbage */
1141 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1142 return reply_msg(c, MSG_BAD_FORMAT);
1145 op_ct[type]++;
1146 conn_set_worker(c);
1148 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1149 return reply_msg(c, MSG_DEADLINE_SOON);
1152 /* try to get a new job for this guy */
1153 wait_for_job(c, timeout);
1154 process_queue();
1155 break;
1156 case OP_DELETE:
1157 errno = 0;
1158 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1159 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1160 op_ct[type]++;
1162 j = job_find(id);
1163 j = remove_reserved_job(c, j) ? :
1164 remove_ready_job(j) ? :
1165 remove_buried_job(j);
1167 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1169 j->state = JOB_STATE_INVALID;
1170 binlog_write_job(j);
1171 job_free(j);
1173 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1174 break;
1175 case OP_RELEASE:
1176 errno = 0;
1177 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1178 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1180 r = read_pri(&pri, pri_buf, &delay_buf);
1181 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1183 r = read_delay(&delay, delay_buf, NULL);
1184 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1185 op_ct[type]++;
1187 j = remove_reserved_job(c, job_find(id));
1189 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1191 j->pri = pri;
1192 j->delay = delay;
1193 j->release_ct++;
1194 r = enqueue_job(j, delay);
1195 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1197 /* out of memory trying to grow the queue, so it gets buried */
1198 bury_job(j);
1199 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1200 break;
1201 case OP_BURY:
1202 errno = 0;
1203 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1204 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1206 r = read_pri(&pri, pri_buf, NULL);
1207 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1208 op_ct[type]++;
1210 j = remove_reserved_job(c, job_find(id));
1212 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1214 j->pri = pri;
1215 bury_job(j);
1216 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1217 break;
1218 case OP_KICK:
1219 errno = 0;
1220 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1221 if (end_buf == c->cmd + CMD_KICK_LEN) {
1222 return reply_msg(c, MSG_BAD_FORMAT);
1224 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1226 op_ct[type]++;
1228 i = kick_jobs(c->use, count);
1230 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1231 case OP_TOUCH:
1232 errno = 0;
1233 id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
1234 if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
1236 op_ct[type]++;
1238 j = touch_job(c, job_find(id));
1240 if (j) {
1241 reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
1242 } else {
1243 return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1245 break;
1246 case OP_STATS:
1247 /* don't allow trailing garbage */
1248 if (c->cmd_len != CMD_STATS_LEN + 2) {
1249 return reply_msg(c, MSG_BAD_FORMAT);
1252 op_ct[type]++;
1254 do_stats(c, fmt_stats, NULL);
1255 break;
1256 case OP_JOBSTATS:
1257 errno = 0;
1258 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1259 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1261 op_ct[type]++;
1263 j = peek_job(id);
1264 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1266 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1267 do_stats(c, (fmt_fn) fmt_job_stats, j);
1268 break;
1269 case OP_STATS_TUBE:
1270 name = c->cmd + CMD_STATS_TUBE_LEN;
1271 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1273 op_ct[type]++;
1275 t = tube_find(name);
1276 if (!t) return reply_msg(c, MSG_NOTFOUND);
1278 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1279 t = NULL;
1280 break;
1281 case OP_LIST_TUBES:
1282 /* don't allow trailing garbage */
1283 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1284 return reply_msg(c, MSG_BAD_FORMAT);
1287 op_ct[type]++;
1288 do_list_tubes(c, &tubes);
1289 break;
1290 case OP_LIST_TUBE_USED:
1291 /* don't allow trailing garbage */
1292 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1293 return reply_msg(c, MSG_BAD_FORMAT);
1296 op_ct[type]++;
1297 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1298 break;
1299 case OP_LIST_TUBES_WATCHED:
1300 /* don't allow trailing garbage */
1301 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1302 return reply_msg(c, MSG_BAD_FORMAT);
1305 op_ct[type]++;
1306 do_list_tubes(c, &c->watch);
1307 break;
1308 case OP_USE:
1309 name = c->cmd + CMD_USE_LEN;
1310 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1311 op_ct[type]++;
1313 TUBE_ASSIGN(t, tube_find_or_make(name));
1314 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1316 c->use->using_ct--;
1317 TUBE_ASSIGN(c->use, t);
1318 TUBE_ASSIGN(t, NULL);
1319 c->use->using_ct++;
1321 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1322 break;
1323 case OP_WATCH:
1324 name = c->cmd + CMD_WATCH_LEN;
1325 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1326 op_ct[type]++;
1328 TUBE_ASSIGN(t, tube_find_or_make(name));
1329 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1331 r = 1;
1332 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1333 TUBE_ASSIGN(t, NULL);
1334 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1336 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1337 break;
1338 case OP_IGNORE:
1339 name = c->cmd + CMD_IGNORE_LEN;
1340 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1341 op_ct[type]++;
1343 t = NULL;
1344 for (i = 0; i < c->watch.used; i++) {
1345 t = c->watch.items[i];
1346 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1347 t = NULL;
1350 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1352 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1353 t = NULL;
1355 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1356 break;
1357 default:
1358 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1362 /* There are three reasons this function may be called. We need to check for
1363 * all of them.
1365 * 1. A reserved job has run out of time.
1366 * 2. A waiting client's reserved job has entered the safety margin.
1367 * 3. A waiting client's requested timeout has occurred.
1369 * If any of these happen, we must do the appropriate thing. */
1370 static void
1371 h_conn_timeout(conn c)
1373 int r, should_timeout = 0;
1374 job j;
1376 /* Check if the client was trying to reserve a job. */
1377 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1379 /* Check if any reserved jobs have run out of time. We should do this
1380 * whether or not the client is waiting for a new reservation. */
1381 while ((j = soonest_job(c))) {
1382 if (j->deadline > time(NULL)) break;
1383 timeout_ct++; /* stats */
1384 j->timeout_ct++;
1385 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1386 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1387 r = conn_update_evq(c, c->evq.ev_events);
1388 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1391 if (should_timeout) {
1392 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1393 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1394 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1395 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1396 c->pending_timeout = -1;
1397 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1401 void
1402 enter_drain_mode(int sig)
1404 drain_mode = 1;
1407 static void
1408 do_cmd(conn c)
1410 dispatch_cmd(c);
1411 fill_extra_data(c);
1414 static void
1415 reset_conn(conn c)
1417 int r;
1419 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1420 if (r == -1) return twarnx("update events failed"), conn_close(c);
1422 /* was this a peek or stats command? */
1423 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1424 c->out_job = NULL;
1426 c->reply_sent = 0; /* now that we're done, reset this */
1427 c->state = STATE_WANTCOMMAND;
1430 static void
1431 h_conn_data(conn c)
1433 int r, to_read;
1434 job j;
1435 struct iovec iov[2];
1437 switch (c->state) {
1438 case STATE_WANTCOMMAND:
1439 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1440 if (r == -1) return check_err(c, "read()");
1441 if (r == 0) return conn_close(c); /* the client hung up */
1443 c->cmd_read += r; /* we got some bytes */
1445 c->cmd_len = cmd_len(c); /* find the EOL */
1447 /* yay, complete command line */
1448 if (c->cmd_len) return do_cmd(c);
1450 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1452 /* command line too long? */
1453 if (c->cmd_read == LINE_BUF_SIZE) {
1454 c->cmd_read = 0; /* discard the input so far */
1455 return reply_msg(c, MSG_BAD_FORMAT);
1458 /* otherwise we have an incomplete line, so just keep waiting */
1459 break;
1460 case STATE_BITBUCKET:
1461 /* Invert the meaning of in_job_read while throwing away data -- it
1462 * counts the bytes that remain to be thrown away. */
1463 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1464 r = read(c->fd, bucket, to_read);
1465 if (r == -1) return check_err(c, "read()");
1466 if (r == 0) return conn_close(c); /* the client hung up */
1468 c->in_job_read -= r; /* we got some bytes */
1470 /* (c->in_job_read < 0) can't happen */
1472 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1473 break;
1474 case STATE_WANTDATA:
1475 j = c->in_job;
1477 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1478 if (r == -1) return check_err(c, "read()");
1479 if (r == 0) return conn_close(c); /* the client hung up */
1481 c->in_job_read += r; /* we got some bytes */
1483 /* (j->in_job_read > j->body_size) can't happen */
1485 maybe_enqueue_incoming_job(c);
1486 break;
1487 case STATE_SENDWORD:
1488 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1489 if (r == -1) return check_err(c, "write()");
1490 if (r == 0) return conn_close(c); /* the client hung up */
1492 c->reply_sent += r; /* we got some bytes */
1494 /* (c->reply_sent > c->reply_len) can't happen */
1496 if (c->reply_sent == c->reply_len) return reset_conn(c);
1498 /* otherwise we sent an incomplete reply, so just keep waiting */
1499 break;
1500 case STATE_SENDJOB:
1501 j = c->out_job;
1503 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1504 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1505 iov[1].iov_base = j->body + c->out_job_sent;
1506 iov[1].iov_len = j->body_size - c->out_job_sent;
1508 r = writev(c->fd, iov, 2);
1509 if (r == -1) return check_err(c, "writev()");
1510 if (r == 0) return conn_close(c); /* the client hung up */
1512 /* update the sent values */
1513 c->reply_sent += r;
1514 if (c->reply_sent >= c->reply_len) {
1515 c->out_job_sent += c->reply_sent - c->reply_len;
1516 c->reply_sent = c->reply_len;
1519 /* (c->out_job_sent > j->body_size) can't happen */
1521 /* are we done? */
1522 if (c->out_job_sent == j->body_size) return reset_conn(c);
1524 /* otherwise we sent incomplete data, so just keep waiting */
1525 break;
1526 case STATE_WAIT: /* keep an eye out in case they hang up */
1527 /* but don't hang up just because our buffer is full */
1528 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1530 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1531 if (r == -1) return check_err(c, "read()");
1532 if (r == 0) return conn_close(c); /* the client hung up */
1533 c->cmd_read += r; /* we got some bytes */
1537 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1538 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1540 static void
1541 h_conn(const int fd, const short which, conn c)
1543 if (fd != c->fd) {
1544 twarnx("Argh! event fd doesn't match conn fd.");
1545 close(fd);
1546 return conn_close(c);
1549 switch (which) {
1550 case EV_TIMEOUT:
1551 h_conn_timeout(c);
1552 event_add(&c->evq, NULL); /* seems to be necessary */
1553 break;
1554 case EV_READ:
1555 /* fall through... */
1556 case EV_WRITE:
1557 /* fall through... */
1558 default:
1559 h_conn_data(c);
1562 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1565 static void
1566 h_delay()
1568 int r;
1569 job j;
1570 time_t t;
1572 t = time(NULL);
1573 while ((j = delay_q_peek())) {
1574 if (j->deadline > t) break;
1575 j = delay_q_take();
1576 r = enqueue_job(j, 0);
1577 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1580 set_main_delay_timeout();
1583 void
1584 h_accept(const int fd, const short which, struct event *ev)
1586 conn c;
1587 int cfd, flags, r;
1588 socklen_t addrlen;
1589 struct sockaddr addr;
1591 if (which == EV_TIMEOUT) return h_delay();
1593 addrlen = sizeof addr;
1594 cfd = accept(fd, &addr, &addrlen);
1595 if (cfd == -1) {
1596 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1597 if (errno == EMFILE) brake();
1598 return;
1601 flags = fcntl(cfd, F_GETFL, 0);
1602 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1604 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1605 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1607 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1608 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1610 dprintf("accepted conn, fd=%d\n", cfd);
1611 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1612 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1615 void
1616 prot_init()
1618 start_time = time(NULL);
1619 memset(op_ct, 0, sizeof(op_ct));
1621 ms_init(&tubes, NULL, NULL);
1623 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
1624 if (!default_tube) twarnx("Out of memory during startup!");
1627 void
1628 prot_replay_binlog()
1630 struct job binlog_jobs;
1631 job j, nj;
1632 unsigned int delay;
1634 binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
1635 binlog_read(&binlog_jobs);
1637 for (j = binlog_jobs.next ; j != &binlog_jobs ; j = nj) {
1638 nj = j->next;
1639 job_remove(j);
1640 delay = 0;
1641 switch (j->state) {
1642 case JOB_STATE_BURIED:
1643 bury_job(j);
1644 break;
1645 case JOB_STATE_DELAYED:
1646 if (start_time < j->deadline) delay = j->deadline - start_time;
1647 /* fall through */
1648 default:
1649 enqueue_job(j,delay);