Performance test.
[beanstalkd.git] / prot.c
blobcda3ab83d8e3cb40958c841a9bd28de70b0cf4c0
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 size_t job_data_size_limit = ((1 << 16) - 1);
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEKJOB "peek "
49 #define CMD_PEEK_READY "peek-ready"
50 #define CMD_PEEK_DELAYED "peek-delayed"
51 #define CMD_PEEK_BURIED "peek-buried"
52 #define CMD_RESERVE "reserve"
53 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
54 #define CMD_DELETE "delete "
55 #define CMD_RELEASE "release "
56 #define CMD_BURY "bury "
57 #define CMD_KICK "kick "
58 #define CMD_STATS "stats"
59 #define CMD_JOBSTATS "stats-job "
60 #define CMD_USE "use "
61 #define CMD_WATCH "watch "
62 #define CMD_IGNORE "ignore "
63 #define CMD_LIST_TUBES "list-tubes"
64 #define CMD_LIST_TUBE_USED "list-tube-used"
65 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
66 #define CMD_STATS_TUBE "stats-tube "
68 #define CONSTSTRLEN(m) (sizeof(m) - 1)
70 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
71 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
72 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
73 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
74 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
75 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
76 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
77 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
78 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
79 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
80 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
81 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
82 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
83 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
84 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
85 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
86 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
87 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
88 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
90 #define MSG_FOUND "FOUND"
91 #define MSG_NOTFOUND "NOT_FOUND\r\n"
92 #define MSG_RESERVED "RESERVED"
93 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
94 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
95 #define MSG_DELETED "DELETED\r\n"
96 #define MSG_RELEASED "RELEASED\r\n"
97 #define MSG_BURIED "BURIED\r\n"
98 #define MSG_BURIED_FMT "BURIED %llu\r\n"
99 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
100 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
102 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
103 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
104 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
105 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
106 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
108 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
109 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
110 #define MSG_DRAINING "DRAINING\r\n"
111 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
112 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
113 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
114 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
116 #define STATE_WANTCOMMAND 0
117 #define STATE_WANTDATA 1
118 #define STATE_SENDJOB 2
119 #define STATE_SENDWORD 3
120 #define STATE_WAIT 4
121 #define STATE_BITBUCKET 5
123 #define OP_UNKNOWN 0
124 #define OP_PUT 1
125 #define OP_PEEKJOB 2
126 #define OP_RESERVE 3
127 #define OP_DELETE 4
128 #define OP_RELEASE 5
129 #define OP_BURY 6
130 #define OP_KICK 7
131 #define OP_STATS 8
132 #define OP_JOBSTATS 9
133 #define OP_PEEK_BURIED 10
134 #define OP_USE 11
135 #define OP_WATCH 12
136 #define OP_IGNORE 13
137 #define OP_LIST_TUBES 14
138 #define OP_LIST_TUBE_USED 15
139 #define OP_LIST_TUBES_WATCHED 16
140 #define OP_STATS_TUBE 17
141 #define OP_PEEK_READY 18
142 #define OP_PEEK_DELAYED 19
143 #define OP_RESERVE_TIMEOUT 20
144 #define TOTAL_OPS 21
146 #define STATS_FMT "---\n" \
147 "current-jobs-urgent: %u\n" \
148 "current-jobs-ready: %u\n" \
149 "current-jobs-reserved: %u\n" \
150 "current-jobs-delayed: %u\n" \
151 "current-jobs-buried: %u\n" \
152 "cmd-put: %llu\n" \
153 "cmd-peek: %llu\n" \
154 "cmd-peek-ready: %llu\n" \
155 "cmd-peek-delayed: %llu\n" \
156 "cmd-peek-buried: %llu\n" \
157 "cmd-reserve: %llu\n" \
158 "cmd-reserve-with-timeout: %llu\n" \
159 "cmd-delete: %llu\n" \
160 "cmd-release: %llu\n" \
161 "cmd-use: %llu\n" \
162 "cmd-watch: %llu\n" \
163 "cmd-ignore: %llu\n" \
164 "cmd-bury: %llu\n" \
165 "cmd-kick: %llu\n" \
166 "cmd-stats: %llu\n" \
167 "cmd-stats-job: %llu\n" \
168 "cmd-stats-tube: %llu\n" \
169 "cmd-list-tubes: %llu\n" \
170 "cmd-list-tube-used: %llu\n" \
171 "cmd-list-tubes-watched: %llu\n" \
172 "job-timeouts: %llu\n" \
173 "total-jobs: %llu\n" \
174 "max-job-size: %zu\n" \
175 "current-tubes: %zu\n" \
176 "current-connections: %u\n" \
177 "current-producers: %u\n" \
178 "current-workers: %u\n" \
179 "current-waiting: %u\n" \
180 "total-connections: %u\n" \
181 "pid: %u\n" \
182 "version: %s\n" \
183 "rusage-utime: %d.%06d\n" \
184 "rusage-stime: %d.%06d\n" \
185 "uptime: %u\n" \
186 "\r\n"
188 #define STATS_TUBE_FMT "---\n" \
189 "name: %s\n" \
190 "current-jobs-urgent: %u\n" \
191 "current-jobs-ready: %u\n" \
192 "current-jobs-reserved: %u\n" \
193 "current-jobs-delayed: %u\n" \
194 "current-jobs-buried: %u\n" \
195 "total-jobs: %llu\n" \
196 "current-using: %u\n" \
197 "current-watching: %u\n" \
198 "current-waiting: %u\n" \
199 "\r\n"
201 #define JOB_STATS_FMT "---\n" \
202 "id: %llu\n" \
203 "tube: %s\n" \
204 "state: %s\n" \
205 "pri: %u\n" \
206 "age: %u\n" \
207 "delay: %u\n" \
208 "ttr: %u\n" \
209 "time-left: %u\n" \
210 "timeouts: %u\n" \
211 "releases: %u\n" \
212 "buries: %u\n" \
213 "kicks: %u\n" \
214 "\r\n"
216 /* this number is pretty arbitrary */
217 #define BUCKET_BUF_SIZE 1024
219 static char bucket[BUCKET_BUF_SIZE];
221 static unsigned int ready_ct = 0;
222 static struct stats global_stat = {0, 0, 0, 0, 0};
224 static tube default_tube;
225 static struct ms tubes;
227 static int drain_mode = 0;
228 static time_t start_time;
229 static unsigned long long int op_ct[TOTAL_OPS], timeout_ct = 0;
232 /* Doubly-linked list of connections with at least one reserved job. */
233 static struct conn running = { &running, &running, 0 };
235 #ifdef DEBUG
236 static const char * op_names[] = {
237 "<unknown>",
238 CMD_PUT,
239 CMD_PEEKJOB,
240 CMD_RESERVE,
241 CMD_DELETE,
242 CMD_RELEASE,
243 CMD_BURY,
244 CMD_KICK,
245 CMD_STATS,
246 CMD_JOBSTATS,
247 CMD_PEEK_BURIED,
248 CMD_USE,
249 CMD_WATCH,
250 CMD_IGNORE,
251 CMD_LIST_TUBES,
252 CMD_LIST_TUBE_USED,
253 CMD_LIST_TUBES_WATCHED,
254 CMD_STATS_TUBE,
255 CMD_PEEK_READY,
256 CMD_PEEK_DELAYED,
257 CMD_RESERVE_TIMEOUT
259 #endif
261 static int
262 buried_job_p(tube t)
264 return job_list_any_p(&t->buried);
267 static void
268 reply(conn c, const char *line, int len, int state)
270 int r;
272 if (!c) return;
274 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
275 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
277 c->reply = line;
278 c->reply_len = len;
279 c->reply_sent = 0;
280 c->state = state;
281 dprintf("sending reply: %.*s", len, line);
284 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
286 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
287 reply_msg((c),(e)))
289 static void
290 reply_line(conn c, int state, const char *fmt, ...)
292 int r;
293 va_list ap;
295 va_start(ap, fmt);
296 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
297 va_end(ap);
299 /* Make sure the buffer was big enough. If not, we have a bug. */
300 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
302 return reply(c, c->reply_buf, r, state);
305 static void
306 reply_job(conn c, job j, const char *word)
308 /* tell this connection which job to send */
309 c->out_job = j;
310 c->out_job_sent = 0;
312 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
313 word, j->id, j->body_size - 2);
316 conn
317 remove_waiting_conn(conn c)
319 tube t;
320 size_t i;
322 if (!conn_waiting(c)) return NULL;
324 c->type &= ~CONN_TYPE_WAITING;
325 global_stat.waiting_ct--;
326 for (i = 0; i < c->watch.used; i++) {
327 t = c->watch.items[i];
328 t->stat.waiting_ct--;
329 ms_remove(&t->waiting, c);
331 return c;
334 static void
335 reserve_job(conn c, job j)
337 j->deadline = time(NULL) + j->ttr;
338 global_stat.reserved_ct++; /* stats */
339 j->tube->stat.reserved_ct++;
340 conn_insert(&running, c);
341 j->state = JOB_STATE_RESERVED;
342 job_insert(&c->reserved_jobs, j);
343 j->reserver=c;
344 if (c->soonest_job && j->deadline < c->soonest_job->deadline) {
345 c->soonest_job = j;
347 return reply_job(c, j, MSG_RESERVED);
350 static job
351 next_eligible_job()
353 tube t;
354 size_t i;
355 job j = NULL, candidate;
357 dprintf("tubes.used = %zu\n", tubes.used);
358 for (i = 0; i < tubes.used; i++) {
359 t = tubes.items[i];
360 dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
361 t->name, t->waiting.used, t->ready.used);
362 if (t->waiting.used && t->ready.used) {
363 candidate = pq_peek(&t->ready);
364 if (!j || candidate->id < j->id) j = candidate;
366 dprintf("i = %zu, tubes.used = %zu\n", i, tubes.used);
369 return j;
372 static void
373 process_queue()
375 job j;
377 dprintf("processing queue\n");
378 while ((j = next_eligible_job())) {
379 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
380 j = pq_take(&j->tube->ready);
381 ready_ct--;
382 if (j->pri < URGENT_THRESHOLD) {
383 global_stat.urgent_ct--;
384 j->tube->stat.urgent_ct--;
386 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
390 static job
391 delay_q_peek()
393 int i;
394 tube t;
395 job j = NULL, nj;
397 for (i = 0; i < tubes.used; i++) {
398 t = tubes.items[i];
399 nj = pq_peek(&t->delay);
400 if (!nj) continue;
401 if (!j || nj->deadline < j->deadline) j = nj;
404 return j;
407 static void
408 set_main_delay_timeout()
410 job j;
412 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
415 static int
416 enqueue_job(job j, unsigned int delay)
418 int r;
420 j->reserver = NULL;
421 if (delay) {
422 j->deadline = time(NULL) + delay;
423 r = pq_give(&j->tube->delay, j);
424 if (!r) return 0;
425 j->state = JOB_STATE_DELAYED;
426 set_main_delay_timeout();
427 } else {
428 r = pq_give(&j->tube->ready, j);
429 if (!r) return 0;
430 j->state = JOB_STATE_READY;
431 ready_ct++;
432 if (j->pri < URGENT_THRESHOLD) {
433 global_stat.urgent_ct++;
434 j->tube->stat.urgent_ct++;
437 process_queue();
438 return 1;
441 static void
442 bury_job(job j)
444 job_insert(&j->tube->buried, j);
445 global_stat.buried_ct++;
446 j->tube->stat.buried_ct++;
447 j->state = JOB_STATE_BURIED;
448 j->reserver=NULL;
449 j->bury_ct++;
452 void
453 enqueue_reserved_jobs(conn c)
455 int r;
456 job j;
458 while (job_list_any_p(&c->reserved_jobs)) {
459 j = job_remove(c->reserved_jobs.next);
460 r = enqueue_job(j, 0);
461 if (!r) bury_job(j);
462 global_stat.reserved_ct--;
463 j->tube->stat.reserved_ct--;
464 c->soonest_job = NULL;
465 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
469 static job
470 delay_q_take()
472 job j = delay_q_peek();
473 return j ? pq_take(&j->tube->delay) : NULL;
476 static job
477 remove_this_buried_job(job j)
479 j = job_remove(j);
480 if (j) {
481 global_stat.buried_ct--;
482 j->tube->stat.buried_ct--;
484 return j;
487 static int
488 kick_buried_job(tube t)
490 int r;
491 job j;
493 if (!buried_job_p(t)) return 0;
494 j = remove_this_buried_job(t->buried.next);
495 j->kick_ct++;
496 r = enqueue_job(j, 0);
497 if (r) return 1;
499 /* ready queue is full, so bury it */
500 bury_job(j);
501 return 0;
504 static unsigned int
505 get_delayed_job_ct()
507 tube t;
508 size_t i;
509 unsigned int count = 0;
511 for (i = 0; i < tubes.used; i++) {
512 t = tubes.items[i];
513 count += pq_used(&t->delay);
515 return count;
518 static int
519 kick_delayed_job(tube t)
521 int r;
522 job j;
524 j = pq_take(&t->delay);
525 if (!j) return 0;
526 j->kick_ct++;
527 r = enqueue_job(j, 0);
528 if (r) return 1;
530 /* ready queue is full, so delay it again */
531 r = enqueue_job(j, j->delay);
532 if (r) return 0;
534 /* last resort */
535 bury_job(j);
536 return 0;
539 /* return the number of jobs successfully kicked */
540 static unsigned int
541 kick_buried_jobs(tube t, unsigned int n)
543 unsigned int i;
544 for (i = 0; (i < n) && kick_buried_job(t); ++i);
545 return i;
548 /* return the number of jobs successfully kicked */
549 static unsigned int
550 kick_delayed_jobs(tube t, unsigned int n)
552 unsigned int i;
553 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
554 return i;
557 static unsigned int
558 kick_jobs(tube t, unsigned int n)
560 if (buried_job_p(t)) return kick_buried_jobs(t, n);
561 return kick_delayed_jobs(t, n);
564 static job
565 find_buried_job(unsigned long long int id)
567 job j = job_find(id);
568 return (j && j->state == JOB_STATE_BURIED) ? j : NULL;
571 static job
572 remove_buried_job(unsigned long long int id)
574 return remove_this_buried_job(find_buried_job(id));
577 static void
578 enqueue_waiting_conn(conn c)
580 tube t;
581 size_t i;
583 global_stat.waiting_ct++;
584 c->type |= CONN_TYPE_WAITING;
585 for (i = 0; i < c->watch.used; i++) {
586 t = c->watch.items[i];
587 t->stat.waiting_ct++;
588 ms_append(&t->waiting, c);
592 static job
593 find_reserved_job_in_conn(conn c, unsigned long long int id)
595 job j = job_find(id);
596 return (j && j->reserver == c && j->state == JOB_STATE_RESERVED) ? j : NULL;
599 static job
600 peek_job(unsigned long long int id)
602 return job_find(id);
605 static void
606 check_err(conn c, const char *s)
608 if (errno == EAGAIN) return;
609 if (errno == EINTR) return;
610 if (errno == EWOULDBLOCK) return;
612 twarn("%s", s);
613 conn_close(c);
614 return;
617 /* Scan the given string for the sequence "\r\n" and return the line length.
618 * Always returns at least 2 if a match is found. Returns 0 if no match. */
619 static int
620 scan_line_end(const char *s, int size)
622 char *match;
624 match = memchr(s, '\r', size - 1);
625 if (!match) return 0;
627 /* this is safe because we only scan size - 1 chars above */
628 if (match[1] == '\n') return match - s + 2;
630 return 0;
633 static int
634 cmd_len(conn c)
636 return scan_line_end(c->cmd, c->cmd_read);
639 /* parse the command line */
640 static int
641 which_cmd(conn c)
643 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
644 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
645 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
646 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
647 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
648 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
649 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
650 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
651 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
652 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
653 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
654 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
655 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
656 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
657 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
658 TEST_CMD(c->cmd, CMD_USE, OP_USE);
659 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
660 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
661 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
662 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
663 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
664 return OP_UNKNOWN;
667 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
668 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
669 * This function is idempotent(). */
670 static void
671 fill_extra_data(conn c)
673 int extra_bytes, job_data_bytes = 0, cmd_bytes;
675 if (!c->fd) return; /* the connection was closed */
676 if (!c->cmd_len) return; /* we don't have a complete command */
678 /* how many extra bytes did we read? */
679 extra_bytes = c->cmd_read - c->cmd_len;
681 /* how many bytes should we put into the job body? */
682 if (c->in_job) {
683 job_data_bytes = min(extra_bytes, c->in_job->body_size);
684 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
685 c->in_job_read = job_data_bytes;
686 } else if (c->in_job_read) {
687 /* we are in bit-bucket mode, throwing away data */
688 job_data_bytes = min(extra_bytes, c->in_job_read);
689 c->in_job_read -= job_data_bytes;
692 /* how many bytes are left to go into the future cmd? */
693 cmd_bytes = extra_bytes - job_data_bytes;
694 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
695 c->cmd_read = cmd_bytes;
696 c->cmd_len = 0; /* we no longer know the length of the new command */
699 static void
700 enqueue_incoming_job(conn c)
702 int r;
703 job j = c->in_job;
705 c->in_job = NULL; /* the connection no longer owns this job */
706 c->in_job_read = 0;
708 /* check if the trailer is present and correct */
709 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
710 job_free(j);
711 return reply_msg(c, MSG_EXPECTED_CRLF);
714 if (drain_mode) {
715 job_free(j);
716 return reply_serr(c, MSG_DRAINING);
719 /* we have a complete job, so let's stick it in the pqueue */
720 r = enqueue_job(j, j->delay);
721 op_ct[OP_PUT]++; /* stats */
722 global_stat.total_jobs_ct++;
723 j->tube->stat.total_jobs_ct++;
725 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
727 /* out of memory trying to grow the queue, so it gets buried */
728 bury_job(j);
729 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
732 static unsigned int
733 uptime()
735 return time(NULL) - start_time;
738 static int
739 fmt_stats(char *buf, size_t size, void *x)
741 struct rusage ru = {{0, 0}, {0, 0}};
742 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
743 return snprintf(buf, size, STATS_FMT,
744 global_stat.urgent_ct,
745 ready_ct,
746 global_stat.reserved_ct,
747 get_delayed_job_ct(),
748 global_stat.buried_ct,
749 op_ct[OP_PUT],
750 op_ct[OP_PEEKJOB],
751 op_ct[OP_PEEK_READY],
752 op_ct[OP_PEEK_DELAYED],
753 op_ct[OP_PEEK_BURIED],
754 op_ct[OP_RESERVE],
755 op_ct[OP_RESERVE_TIMEOUT],
756 op_ct[OP_DELETE],
757 op_ct[OP_RELEASE],
758 op_ct[OP_USE],
759 op_ct[OP_WATCH],
760 op_ct[OP_IGNORE],
761 op_ct[OP_BURY],
762 op_ct[OP_KICK],
763 op_ct[OP_STATS],
764 op_ct[OP_JOBSTATS],
765 op_ct[OP_STATS_TUBE],
766 op_ct[OP_LIST_TUBES],
767 op_ct[OP_LIST_TUBE_USED],
768 op_ct[OP_LIST_TUBES_WATCHED],
769 timeout_ct,
770 global_stat.total_jobs_ct,
771 job_data_size_limit,
772 tubes.used,
773 count_cur_conns(),
774 count_cur_producers(),
775 count_cur_workers(),
776 global_stat.waiting_ct,
777 count_tot_conns(),
778 getpid(),
779 VERSION,
780 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
781 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
782 uptime());
786 /* Read a priority value from the given buffer and place it in pri.
787 * Update end to point to the address after the last character consumed.
788 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
789 * conversion and return the status code but not update any values. This is an
790 * easy way to check for errors.
791 * If end is NULL, read_pri will also check that the entire input string was
792 * consumed and return an error code otherwise.
793 * Return 0 on success, or nonzero on failure.
794 * If a failure occurs, pri and end are not modified. */
795 static int
796 read_pri(unsigned int *pri, const char *buf, char **end)
798 char *tend;
799 unsigned int tpri;
801 errno = 0;
802 tpri = strtoul(buf, &tend, 10);
803 if (tend == buf) return -1;
804 if (errno && errno != ERANGE) return -1;
805 if (!end && tend[0] != '\0') return -1;
807 if (pri) *pri = tpri;
808 if (end) *end = tend;
809 return 0;
812 /* Read a delay value from the given buffer and place it in delay.
813 * The interface and behavior are the same as in read_pri(). */
814 static int
815 read_delay(unsigned int *delay, const char *buf, char **end)
817 return read_pri(delay, buf, end);
820 /* Read a timeout value from the given buffer and place it in ttr.
821 * The interface and behavior are the same as in read_pri(). */
822 static int
823 read_ttr(unsigned int *ttr, const char *buf, char **end)
825 return read_pri(ttr, buf, end);
828 static void
829 wait_for_job(conn c, int timeout)
831 int r;
833 c->state = STATE_WAIT;
834 enqueue_waiting_conn(c);
836 /* Set the pending timeout to the requested timeout amount */
837 c->pending_timeout = timeout;
839 /* this conn is waiting, but we want to know if they hang up */
840 r = conn_update_evq(c, EV_READ | EV_PERSIST);
841 if (r == -1) return twarnx("update events failed"), conn_close(c);
844 typedef int(*fmt_fn)(char *, size_t, void *);
846 static void
847 do_stats(conn c, fmt_fn fmt, void *data)
849 int r, stats_len;
851 /* first, measure how big a buffer we will need */
852 stats_len = fmt(NULL, 0, data) + 16;
854 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
855 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
857 /* now actually format the stats data */
858 r = fmt(c->out_job->body, stats_len, data);
859 /* and set the actual body size */
860 c->out_job->body_size = r;
861 if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
863 c->out_job_sent = 0;
864 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
867 static void
868 do_list_tubes(conn c, ms l)
870 char *buf;
871 tube t;
872 size_t i, resp_z;
874 /* first, measure how big a buffer we will need */
875 resp_z = 6; /* initial "---\n" and final "\r\n" */
876 for (i = 0; i < l->used; i++) {
877 t = l->items[i];
878 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
881 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
882 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
884 /* now actually format the response */
885 buf = c->out_job->body;
886 buf += snprintf(buf, 5, "---\n");
887 for (i = 0; i < l->used; i++) {
888 t = l->items[i];
889 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
891 buf[0] = '\r';
892 buf[1] = '\n';
894 c->out_job_sent = 0;
895 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
898 static int
899 fmt_job_stats(char *buf, size_t size, job j)
901 time_t t;
903 t = time(NULL);
904 return snprintf(buf, size, JOB_STATS_FMT,
905 j->id,
906 j->tube->name,
907 job_state(j),
908 j->pri,
909 (unsigned int) (t - j->creation),
910 j->delay,
911 j->ttr,
912 (unsigned int) (j->deadline - t),
913 j->timeout_ct,
914 j->release_ct,
915 j->bury_ct,
916 j->kick_ct);
919 static int
920 fmt_stats_tube(char *buf, size_t size, tube t)
922 return snprintf(buf, size, STATS_TUBE_FMT,
923 t->name,
924 t->stat.urgent_ct,
925 t->ready.used,
926 t->stat.reserved_ct,
927 pq_used(&t->delay),
928 t->stat.buried_ct,
929 t->stat.total_jobs_ct,
930 t->using_ct,
931 t->watching_ct,
932 t->stat.waiting_ct);
935 static void
936 maybe_enqueue_incoming_job(conn c)
938 job j = c->in_job;
940 /* do we have a complete job? */
941 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
943 /* otherwise we have incomplete data, so just keep waiting */
944 c->state = STATE_WANTDATA;
947 /* j can be NULL */
948 static job
949 remove_this_reserved_job(conn c, job j)
951 j = job_remove(j);
952 if (j) {
953 global_stat.reserved_ct--;
954 j->tube->stat.reserved_ct--;
955 j->reserver=NULL;
957 c->soonest_job = NULL;
958 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
959 return j;
962 static job
963 remove_reserved_job(conn c, unsigned long long int id)
965 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
968 static int
969 name_is_ok(const char *name, size_t max)
971 size_t len = strlen(name);
972 return len > 0 && len <= max &&
973 strspn(name, NAME_CHARS) == len && name[0] != '-';
976 static tube
977 find_tube(const char *name)
979 tube t;
980 size_t i;
982 for (i = 0; i < tubes.used; i++) {
983 t = tubes.items[i];
984 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
986 return NULL;
989 void
990 prot_remove_tube(tube t)
992 ms_remove(&tubes, t);
995 static tube
996 make_and_insert_tube(const char *name)
998 int r;
999 tube t = NULL;
1001 t = make_tube(name);
1002 if (!t) return NULL;
1004 /* We want this global tube list to behave like "weak" refs, so don't
1005 * increment the ref count. */
1006 r = ms_append(&tubes, t);
1007 if (!r) return tube_dref(t), NULL;
1009 return t;
1012 static tube
1013 find_or_make_tube(const char *name)
1015 return find_tube(name) ? : make_and_insert_tube(name);
1018 static void
1019 dispatch_cmd(conn c)
1021 int r, i, timeout = -1;
1022 unsigned int count;
1023 job j;
1024 unsigned char type;
1025 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1026 unsigned int pri, delay, ttr, body_size;
1027 unsigned long long int id;
1028 tube t = NULL;
1030 /* NUL-terminate this string so we can use strtol and friends */
1031 c->cmd[c->cmd_len - 2] = '\0';
1033 /* check for possible maliciousness */
1034 if (strlen(c->cmd) != c->cmd_len - 2) {
1035 return reply_msg(c, MSG_BAD_FORMAT);
1038 type = which_cmd(c);
1039 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1041 switch (type) {
1042 case OP_PUT:
1043 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1044 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1046 r = read_delay(&delay, delay_buf, &ttr_buf);
1047 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1049 r = read_ttr(&ttr, ttr_buf, &size_buf);
1050 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1052 errno = 0;
1053 body_size = strtoul(size_buf, &end_buf, 10);
1054 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1056 if (body_size > job_data_size_limit) {
1057 return reply_msg(c, MSG_JOB_TOO_BIG);
1060 /* don't allow trailing garbage */
1061 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1063 conn_set_producer(c);
1065 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1067 /* OOM? */
1068 if (!c->in_job) {
1069 /* throw away the job body and respond with OUT_OF_MEMORY */
1071 /* Invert the meaning of in_job_read while throwing away data -- it
1072 * counts the bytes that remain to be thrown away. */
1073 c->in_job_read = body_size + 2;
1074 fill_extra_data(c);
1076 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1078 c->state = STATE_BITBUCKET;
1079 return;
1082 fill_extra_data(c);
1084 /* it's possible we already have a complete job */
1085 maybe_enqueue_incoming_job(c);
1087 break;
1088 case OP_PEEK_READY:
1089 /* don't allow trailing garbage */
1090 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1091 return reply_msg(c, MSG_BAD_FORMAT);
1093 op_ct[type]++;
1095 j = job_copy(pq_peek(&c->use->ready));
1097 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1099 reply_job(c, j, MSG_FOUND);
1100 break;
1101 case OP_PEEK_DELAYED:
1102 /* don't allow trailing garbage */
1103 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1104 return reply_msg(c, MSG_BAD_FORMAT);
1106 op_ct[type]++;
1108 j = job_copy(pq_peek(&c->use->delay));
1110 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1112 reply_job(c, j, MSG_FOUND);
1113 break;
1114 case OP_PEEK_BURIED:
1115 /* don't allow trailing garbage */
1116 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1117 return reply_msg(c, MSG_BAD_FORMAT);
1119 op_ct[type]++;
1121 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1123 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1125 reply_job(c, j, MSG_FOUND);
1126 break;
1127 case OP_PEEKJOB:
1128 errno = 0;
1129 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1130 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1131 op_ct[type]++;
1133 /* So, peek is annoying, because some other connection might free the
1134 * job while we are still trying to write it out. So we copy it and
1135 * then free the copy when it's done sending. */
1136 j = job_copy(peek_job(id));
1138 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1140 reply_job(c, j, MSG_FOUND);
1141 break;
1142 case OP_RESERVE_TIMEOUT:
1143 errno = 0;
1144 timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
1145 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1146 case OP_RESERVE: /* FALLTHROUGH */
1147 /* don't allow trailing garbage */
1148 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1149 return reply_msg(c, MSG_BAD_FORMAT);
1152 op_ct[type]++;
1153 conn_set_worker(c);
1155 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1156 return reply_msg(c, MSG_DEADLINE_SOON);
1159 /* try to get a new job for this guy */
1160 wait_for_job(c, timeout);
1161 process_queue();
1162 break;
1163 case OP_DELETE:
1164 errno = 0;
1165 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1166 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1167 op_ct[type]++;
1169 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1171 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1173 job_free(j);
1175 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1176 break;
1177 case OP_RELEASE:
1178 errno = 0;
1179 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1180 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1182 r = read_pri(&pri, pri_buf, &delay_buf);
1183 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1185 r = read_delay(&delay, delay_buf, NULL);
1186 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1187 op_ct[type]++;
1189 j = remove_reserved_job(c, id);
1191 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1193 j->pri = pri;
1194 j->delay = delay;
1195 j->release_ct++;
1196 r = enqueue_job(j, delay);
1197 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1199 /* out of memory trying to grow the queue, so it gets buried */
1200 bury_job(j);
1201 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1202 break;
1203 case OP_BURY:
1204 errno = 0;
1205 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1206 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1208 r = read_pri(&pri, pri_buf, NULL);
1209 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1210 op_ct[type]++;
1212 j = remove_reserved_job(c, id);
1214 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1216 j->pri = pri;
1217 bury_job(j);
1218 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1219 break;
1220 case OP_KICK:
1221 errno = 0;
1222 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1223 if (end_buf == c->cmd + CMD_KICK_LEN) {
1224 return reply_msg(c, MSG_BAD_FORMAT);
1226 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1228 op_ct[type]++;
1230 i = kick_jobs(c->use, count);
1232 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1233 case OP_STATS:
1234 /* don't allow trailing garbage */
1235 if (c->cmd_len != CMD_STATS_LEN + 2) {
1236 return reply_msg(c, MSG_BAD_FORMAT);
1239 op_ct[type]++;
1241 do_stats(c, fmt_stats, NULL);
1242 break;
1243 case OP_JOBSTATS:
1244 errno = 0;
1245 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1246 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1248 op_ct[type]++;
1250 j = peek_job(id);
1251 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1253 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1254 do_stats(c, (fmt_fn) fmt_job_stats, j);
1255 break;
1256 case OP_STATS_TUBE:
1257 name = c->cmd + CMD_STATS_TUBE_LEN;
1258 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1260 op_ct[type]++;
1262 t = find_tube(name);
1263 if (!t) return reply_msg(c, MSG_NOTFOUND);
1265 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1266 t = NULL;
1267 break;
1268 case OP_LIST_TUBES:
1269 /* don't allow trailing garbage */
1270 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1271 return reply_msg(c, MSG_BAD_FORMAT);
1274 op_ct[type]++;
1275 do_list_tubes(c, &tubes);
1276 break;
1277 case OP_LIST_TUBE_USED:
1278 /* don't allow trailing garbage */
1279 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1280 return reply_msg(c, MSG_BAD_FORMAT);
1283 op_ct[type]++;
1284 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1285 break;
1286 case OP_LIST_TUBES_WATCHED:
1287 /* don't allow trailing garbage */
1288 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1289 return reply_msg(c, MSG_BAD_FORMAT);
1292 op_ct[type]++;
1293 do_list_tubes(c, &c->watch);
1294 break;
1295 case OP_USE:
1296 name = c->cmd + CMD_USE_LEN;
1297 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1298 op_ct[type]++;
1300 TUBE_ASSIGN(t, find_or_make_tube(name));
1301 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1303 c->use->using_ct--;
1304 TUBE_ASSIGN(c->use, t);
1305 TUBE_ASSIGN(t, NULL);
1306 c->use->using_ct++;
1308 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1309 break;
1310 case OP_WATCH:
1311 name = c->cmd + CMD_WATCH_LEN;
1312 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1313 op_ct[type]++;
1315 TUBE_ASSIGN(t, find_or_make_tube(name));
1316 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1318 r = 1;
1319 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1320 TUBE_ASSIGN(t, NULL);
1321 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1323 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1324 break;
1325 case OP_IGNORE:
1326 name = c->cmd + CMD_IGNORE_LEN;
1327 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1328 op_ct[type]++;
1330 t = NULL;
1331 for (i = 0; i < c->watch.used; i++) {
1332 t = c->watch.items[i];
1333 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1334 t = NULL;
1337 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1339 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1340 t = NULL;
1342 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1343 break;
1344 default:
1345 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1349 /* if we get a timeout, it means that a job has been reserved for too long, so
1350 * we should put it back in the queue */
1351 static void
1352 h_conn_timeout(conn c)
1354 int should_timeout = 0;
1356 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1358 if (should_timeout) {
1359 int r;
1360 job j;
1361 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1362 while ((j = soonest_job(c))) {
1363 if (j->deadline > time(NULL)) break;
1364 timeout_ct++; /* stats */
1365 j->timeout_ct++;
1366 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1367 /* there was no room in the queue, so bury it */
1368 if (!r) bury_job(j);
1369 r = conn_update_evq(c, c->evq.ev_events);
1370 if (r == -1)
1371 return twarnx("conn_update_evq() failed"), conn_close(c);
1373 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1374 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1375 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1376 c->pending_timeout=-1;
1377 return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
1381 void
1382 enter_drain_mode(int sig)
1384 drain_mode = 1;
1387 static void
1388 do_cmd(conn c)
1390 dispatch_cmd(c);
1391 fill_extra_data(c);
1394 static void
1395 reset_conn(conn c)
1397 int r;
1399 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1400 if (r == -1) return twarnx("update events failed"), conn_close(c);
1402 /* was this a peek or stats command? */
1403 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1404 c->out_job = NULL;
1406 c->reply_sent = 0; /* now that we're done, reset this */
1407 c->state = STATE_WANTCOMMAND;
1410 static void
1411 h_conn_data(conn c)
1413 int r, to_read;
1414 job j;
1415 struct iovec iov[2];
1417 switch (c->state) {
1418 case STATE_WANTCOMMAND:
1419 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1420 if (r == -1) return check_err(c, "read()");
1421 if (r == 0) return conn_close(c); /* the client hung up */
1423 c->cmd_read += r; /* we got some bytes */
1425 c->cmd_len = cmd_len(c); /* find the EOL */
1427 /* yay, complete command line */
1428 if (c->cmd_len) return do_cmd(c);
1430 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1432 /* command line too long? */
1433 if (c->cmd_read == LINE_BUF_SIZE) {
1434 c->cmd_read = 0; /* discard the input so far */
1435 return reply_msg(c, MSG_BAD_FORMAT);
1438 /* otherwise we have an incomplete line, so just keep waiting */
1439 break;
1440 case STATE_BITBUCKET:
1441 /* Invert the meaning of in_job_read while throwing away data -- it
1442 * counts the bytes that remain to be thrown away. */
1443 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1444 r = read(c->fd, bucket, to_read);
1445 if (r == -1) return check_err(c, "read()");
1446 if (r == 0) return conn_close(c); /* the client hung up */
1448 c->in_job_read -= r; /* we got some bytes */
1450 /* (c->in_job_read < 0) can't happen */
1452 if (c->in_job_read == 0) return reply_serr(c, MSG_OUT_OF_MEMORY);
1453 break;
1454 case STATE_WANTDATA:
1455 j = c->in_job;
1457 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1458 if (r == -1) return check_err(c, "read()");
1459 if (r == 0) return conn_close(c); /* the client hung up */
1461 c->in_job_read += r; /* we got some bytes */
1463 /* (j->in_job_read > j->body_size) can't happen */
1465 maybe_enqueue_incoming_job(c);
1466 break;
1467 case STATE_SENDWORD:
1468 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1469 if (r == -1) return check_err(c, "write()");
1470 if (r == 0) return conn_close(c); /* the client hung up */
1472 c->reply_sent += r; /* we got some bytes */
1474 /* (c->reply_sent > c->reply_len) can't happen */
1476 if (c->reply_sent == c->reply_len) return reset_conn(c);
1478 /* otherwise we sent an incomplete reply, so just keep waiting */
1479 break;
1480 case STATE_SENDJOB:
1481 j = c->out_job;
1483 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1484 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1485 iov[1].iov_base = j->body + c->out_job_sent;
1486 iov[1].iov_len = j->body_size - c->out_job_sent;
1488 r = writev(c->fd, iov, 2);
1489 if (r == -1) return check_err(c, "writev()");
1490 if (r == 0) return conn_close(c); /* the client hung up */
1492 /* update the sent values */
1493 c->reply_sent += r;
1494 if (c->reply_sent >= c->reply_len) {
1495 c->out_job_sent += c->reply_sent - c->reply_len;
1496 c->reply_sent = c->reply_len;
1499 /* (c->out_job_sent > j->body_size) can't happen */
1501 /* are we done? */
1502 if (c->out_job_sent == j->body_size) return reset_conn(c);
1504 /* otherwise we sent incomplete data, so just keep waiting */
1505 break;
1506 case STATE_WAIT: /* keep an eye out in case they hang up */
1507 /* but don't hang up just because our buffer is full */
1508 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1510 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1511 if (r == -1) return check_err(c, "read()");
1512 if (r == 0) return conn_close(c); /* the client hung up */
1513 c->cmd_read += r; /* we got some bytes */
1517 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1518 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1520 static void
1521 h_conn(const int fd, const short which, conn c)
1523 if (fd != c->fd) {
1524 twarnx("Argh! event fd doesn't match conn fd.");
1525 close(fd);
1526 return conn_close(c);
1529 switch (which) {
1530 case EV_TIMEOUT:
1531 h_conn_timeout(c);
1532 event_add(&c->evq, NULL); /* seems to be necessary */
1533 break;
1534 case EV_READ:
1535 /* fall through... */
1536 case EV_WRITE:
1537 /* fall through... */
1538 default:
1539 h_conn_data(c);
1542 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1545 static void
1546 h_delay()
1548 int r;
1549 job j;
1550 time_t t;
1552 t = time(NULL);
1553 while ((j = delay_q_peek())) {
1554 if (j->deadline > t) break;
1555 j = delay_q_take();
1556 r = enqueue_job(j, 0);
1557 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1560 set_main_delay_timeout();
1563 void
1564 h_accept(const int fd, const short which, struct event *ev)
1566 conn c;
1567 int cfd, flags, r;
1568 socklen_t addrlen;
1569 struct sockaddr addr;
1571 if (which == EV_TIMEOUT) return h_delay();
1573 addrlen = sizeof addr;
1574 cfd = accept(fd, &addr, &addrlen);
1575 if (cfd == -1) {
1576 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1577 if (errno == EMFILE) brake();
1578 return;
1581 flags = fcntl(cfd, F_GETFL, 0);
1582 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1584 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1585 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1587 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1588 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1590 dprintf("accepted conn, fd=%d\n", cfd);
1591 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1592 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1595 void
1596 prot_init()
1598 start_time = time(NULL);
1599 memset(op_ct, 0, sizeof(op_ct));
1601 ms_init(&tubes, NULL, NULL);
1603 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1604 if (!default_tube) twarnx("Out of memory during startup!");