Refactoring.
[beanstalkd.git] / prot.c
blobbbaa3c92b3b21d02881b07a57a1e0ea920b584dd
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 size_t job_data_size_limit = ((1 << 16) - 1);
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEKJOB "peek "
49 #define CMD_PEEK_READY "peek-ready"
50 #define CMD_PEEK_DELAYED "peek-delayed"
51 #define CMD_PEEK_BURIED "peek-buried"
52 #define CMD_RESERVE "reserve"
53 #define CMD_DELETE "delete "
54 #define CMD_RELEASE "release "
55 #define CMD_BURY "bury "
56 #define CMD_KICK "kick "
57 #define CMD_STATS "stats"
58 #define CMD_JOBSTATS "stats-job "
59 #define CMD_USE "use "
60 #define CMD_WATCH "watch "
61 #define CMD_IGNORE "ignore "
62 #define CMD_LIST_TUBES "list-tubes"
63 #define CMD_LIST_TUBE_USED "list-tube-used"
64 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
65 #define CMD_STATS_TUBE "stats-tube "
67 #define CONSTSTRLEN(m) (sizeof(m) - 1)
69 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
70 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
71 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
72 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
73 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
74 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
75 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
76 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
77 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
78 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
79 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
80 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
81 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
82 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
83 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
84 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
85 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
86 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
88 #define MSG_FOUND "FOUND"
89 #define MSG_NOTFOUND "NOT_FOUND\r\n"
90 #define MSG_RESERVED "RESERVED"
91 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
92 #define MSG_DELETED "DELETED\r\n"
93 #define MSG_RELEASED "RELEASED\r\n"
94 #define MSG_BURIED "BURIED\r\n"
95 #define MSG_BURIED_FMT "BURIED %llu\r\n"
96 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
97 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
99 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
100 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
101 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
102 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
103 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
105 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
106 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
107 #define MSG_DRAINING "DRAINING\r\n"
108 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
109 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
110 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
111 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
113 #define STATE_WANTCOMMAND 0
114 #define STATE_WANTDATA 1
115 #define STATE_SENDJOB 2
116 #define STATE_SENDWORD 3
117 #define STATE_WAIT 4
119 #define OP_UNKNOWN 0
120 #define OP_PUT 1
121 #define OP_PEEKJOB 2
122 #define OP_RESERVE 3
123 #define OP_DELETE 4
124 #define OP_RELEASE 5
125 #define OP_BURY 6
126 #define OP_KICK 7
127 #define OP_STATS 8
128 #define OP_JOBSTATS 9
129 #define OP_PEEK_BURIED 10
130 #define OP_USE 11
131 #define OP_WATCH 12
132 #define OP_IGNORE 13
133 #define OP_LIST_TUBES 14
134 #define OP_LIST_TUBE_USED 15
135 #define OP_LIST_TUBES_WATCHED 16
136 #define OP_STATS_TUBE 17
137 #define OP_PEEK_READY 18
138 #define OP_PEEK_DELAYED 19
139 #define TOTAL_OPS 20
141 #define STATS_FMT "---\n" \
142 "current-jobs-urgent: %u\n" \
143 "current-jobs-ready: %u\n" \
144 "current-jobs-reserved: %u\n" \
145 "current-jobs-delayed: %u\n" \
146 "current-jobs-buried: %u\n" \
147 "cmd-put: %llu\n" \
148 "cmd-peek: %llu\n" \
149 "cmd-reserve: %llu\n" \
150 "cmd-delete: %llu\n" \
151 "cmd-release: %llu\n" \
152 "cmd-use: %llu\n" \
153 "cmd-watch: %llu\n" \
154 "cmd-ignore: %llu\n" \
155 "cmd-bury: %llu\n" \
156 "cmd-kick: %llu\n" \
157 "cmd-stats: %llu\n" \
158 "cmd-stats-job: %llu\n" \
159 "cmd-stats-tube: %llu\n" \
160 "cmd-list-tubes: %llu\n" \
161 "cmd-list-tube-used: %llu\n" \
162 "cmd-list-tubes-watched: %llu\n" \
163 "job-timeouts: %llu\n" \
164 "total-jobs: %llu\n" \
165 "max-job-size: %zu\n" \
166 "current-tubes: %zu\n" \
167 "current-connections: %u\n" \
168 "current-producers: %u\n" \
169 "current-workers: %u\n" \
170 "current-waiting: %u\n" \
171 "total-connections: %u\n" \
172 "pid: %u\n" \
173 "version: %s\n" \
174 "rusage-utime: %d.%06d\n" \
175 "rusage-stime: %d.%06d\n" \
176 "uptime: %u\n" \
177 "\r\n"
179 #define STATS_TUBE_FMT "---\n" \
180 "name: %s\n" \
181 "current-jobs-urgent: %u\n" \
182 "current-jobs-ready: %u\n" \
183 "current-jobs-reserved: %u\n" \
184 "current-jobs-delayed: %u\n" \
185 "current-jobs-buried: %u\n" \
186 "total-jobs: %llu\n" \
187 "current-using: %u\n" \
188 "current-watching: %u\n" \
189 "current-waiting: %u\n" \
190 "\r\n"
192 #define JOB_STATS_FMT "---\n" \
193 "id: %llu\n" \
194 "tube: %s\n" \
195 "state: %s\n" \
196 "pri: %u\n" \
197 "age: %u\n" \
198 "delay: %u\n" \
199 "ttr: %u\n" \
200 "time-left: %u\n" \
201 "timeouts: %u\n" \
202 "releases: %u\n" \
203 "buries: %u\n" \
204 "kicks: %u\n" \
205 "\r\n"
207 static unsigned int ready_ct = 0;
208 static struct stats global_stat = {0, 0, 0, 0, 0};
210 static tube default_tube;
211 static struct ms tubes;
213 static int drain_mode = 0;
214 static time_t start_time;
215 static unsigned long long int op_ct[TOTAL_OPS], timeout_ct = 0;
218 /* Doubly-linked list of connections with at least one reserved job. */
219 static struct conn running = { &running, &running, 0 };
221 #ifdef DEBUG
222 static const char * op_names[] = {
223 "<unknown>",
224 CMD_PUT,
225 CMD_PEEKJOB,
226 CMD_RESERVE,
227 CMD_DELETE,
228 CMD_RELEASE,
229 CMD_BURY,
230 CMD_KICK,
231 CMD_STATS,
232 CMD_JOBSTATS,
233 CMD_PEEK_BURIED,
234 CMD_USE,
235 CMD_WATCH,
236 CMD_IGNORE,
237 CMD_LIST_TUBES,
238 CMD_LIST_TUBE_USED,
239 CMD_LIST_TUBES_WATCHED,
240 CMD_STATS_TUBE,
242 #endif
244 static int
245 buried_job_p(tube t)
247 return job_list_any_p(&t->buried);
250 static void
251 reply(conn c, const char *line, int len, int state)
253 int r;
255 if (!c) return;
257 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
258 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
260 c->reply = line;
261 c->reply_len = len;
262 c->reply_sent = 0;
263 c->state = state;
264 dprintf("sending reply: %.*s", len, line);
267 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
269 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
270 reply_msg((c),(e)))
272 static void
273 reply_line(conn c, int state, const char *fmt, ...)
275 int r;
276 va_list ap;
278 va_start(ap, fmt);
279 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
280 va_end(ap);
282 /* Make sure the buffer was big enough. If not, we have a bug. */
283 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
285 return reply(c, c->reply_buf, r, state);
288 static void
289 reply_job(conn c, job j, const char *word)
291 /* tell this connection which job to send */
292 c->out_job = j;
293 c->out_job_sent = 0;
295 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
296 word, j->id, j->body_size - 2);
299 conn
300 remove_waiting_conn(conn c)
302 tube t;
303 size_t i;
305 if (!conn_waiting(c)) return NULL;
307 c->type &= ~CONN_TYPE_WAITING;
308 global_stat.waiting_ct--;
309 for (i = 0; i < c->watch.used; i++) {
310 t = c->watch.items[i];
311 t->stat.waiting_ct--;
312 ms_remove(&t->waiting, c);
314 return c;
317 static void
318 reserve_job(conn c, job j)
320 j->deadline = time(NULL) + j->ttr;
321 global_stat.reserved_ct++; /* stats */
322 j->tube->stat.reserved_ct++;
323 conn_insert(&running, c);
324 j->state = JOB_STATE_RESERVED;
325 job_insert(&c->reserved_jobs, j);
326 return reply_job(c, j, MSG_RESERVED);
329 static job
330 next_eligible_job()
332 tube t;
333 size_t i;
334 job j = NULL, candidate;
336 dprintf("tubes.used = %d\n", tubes.used);
337 for (i = 0; i < tubes.used; i++) {
338 t = tubes.items[i];
339 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
340 t->name, t->waiting.used, t->ready.used);
341 if (t->waiting.used && t->ready.used) {
342 candidate = pq_peek(&t->ready);
343 if (!j || candidate->id < j->id) j = candidate;
345 dprintf("i = %d, tubes.used = %d\n", i, tubes.used);
348 return j;
351 static void
352 process_queue()
354 job j;
356 dprintf("processing queue\n");
357 while ((j = next_eligible_job())) {
358 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
359 j = pq_take(&j->tube->ready);
360 ready_ct--;
361 if (j->pri < URGENT_THRESHOLD) {
362 global_stat.urgent_ct--;
363 j->tube->stat.urgent_ct--;
365 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
369 static job
370 delay_q_peek()
372 int i;
373 tube t;
374 job j = NULL, nj;
376 for (i = 0; i < tubes.used; i++) {
377 t = tubes.items[i];
378 nj = pq_peek(&t->delay);
379 if (!nj) continue;
380 if (!j || nj->deadline < j->deadline) j = nj;
383 return j;
386 static void
387 set_main_delay_timeout()
389 job j;
391 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
394 static int
395 enqueue_job(job j, unsigned int delay)
397 int r;
399 if (delay) {
400 j->deadline = time(NULL) + delay;
401 r = pq_give(&j->tube->delay, j);
402 if (!r) return 0;
403 j->state = JOB_STATE_DELAYED;
404 set_main_delay_timeout();
405 } else {
406 r = pq_give(&j->tube->ready, j);
407 if (!r) return 0;
408 j->state = JOB_STATE_READY;
409 ready_ct++;
410 if (j->pri < URGENT_THRESHOLD) {
411 global_stat.urgent_ct++;
412 j->tube->stat.urgent_ct++;
415 process_queue();
416 return 1;
419 static void
420 bury_job(job j)
422 job_insert(&j->tube->buried, j);
423 global_stat.buried_ct++;
424 j->tube->stat.buried_ct++;
425 j->state = JOB_STATE_BURIED;
426 j->bury_ct++;
429 void
430 enqueue_reserved_jobs(conn c)
432 int r;
433 job j;
435 while (job_list_any_p(&c->reserved_jobs)) {
436 j = job_remove(c->reserved_jobs.next);
437 r = enqueue_job(j, 0);
438 if (!r) bury_job(j);
439 global_stat.reserved_ct--;
440 j->tube->stat.reserved_ct--;
441 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
445 static job
446 delay_q_take()
448 job j = delay_q_peek();
449 return j ? pq_take(&j->tube->delay) : NULL;
452 static job
453 remove_this_buried_job(job j)
455 j = job_remove(j);
456 if (j) {
457 global_stat.buried_ct--;
458 j->tube->stat.buried_ct--;
460 return j;
463 static int
464 kick_buried_job(tube t)
466 int r;
467 job j;
469 if (!buried_job_p(t)) return 0;
470 j = remove_this_buried_job(t->buried.next);
471 j->kick_ct++;
472 r = enqueue_job(j, 0);
473 if (r) return 1;
475 /* ready queue is full, so bury it */
476 bury_job(j);
477 return 0;
480 static unsigned int
481 get_delayed_job_ct()
483 tube t;
484 size_t i;
485 unsigned int count = 0;
487 for (i = 0; i < tubes.used; i++) {
488 t = tubes.items[i];
489 count += pq_used(&t->delay);
491 return count;
494 static int
495 kick_delayed_job(tube t)
497 int r;
498 job j;
500 if (get_delayed_job_ct() < 1) return 0;
501 j = pq_take(&t->delay);
502 j->kick_ct++;
503 r = enqueue_job(j, 0);
504 if (r) return 1;
506 /* ready queue is full, so delay it again */
507 r = enqueue_job(j, j->delay);
508 if (r) return 0;
510 /* last resort */
511 bury_job(j);
512 return 0;
515 /* return the number of jobs successfully kicked */
516 static unsigned int
517 kick_buried_jobs(tube t, unsigned int n)
519 unsigned int i;
520 for (i = 0; (i < n) && kick_buried_job(t); ++i);
521 return i;
524 /* return the number of jobs successfully kicked */
525 static unsigned int
526 kick_delayed_jobs(tube t, unsigned int n)
528 unsigned int i;
529 for (i = 0; (i < n) && kick_delayed_job(t); ++i);
530 return i;
533 static unsigned int
534 kick_jobs(tube t, unsigned int n)
536 if (buried_job_p(t)) return kick_buried_jobs(t, n);
537 return kick_delayed_jobs(t, n);
540 static job
541 find_buried_job_in_tube(tube t, unsigned long long int id)
543 job j;
545 for (j = t->buried.next; j != &t->buried; j = j->next) {
546 if (j->id == id) return j;
548 return NULL;
551 static job
552 find_buried_job(unsigned long long int id)
554 job j;
555 size_t i;
557 for (i = 0; i < tubes.used; i++) {
558 j = find_buried_job_in_tube(tubes.items[i], id);
559 if (j) return j;
561 return NULL;
564 static job
565 find_delayed_job(unsigned long long int id)
567 job j;
568 size_t i;
569 tube t;
571 for (i = 0; i < tubes.used; i++) {
572 t = tubes.items[i];
573 j = pq_find(&t->delay, id);
574 if (j) return j;
576 return NULL;
579 static job
580 remove_buried_job(unsigned long long int id)
582 return remove_this_buried_job(find_buried_job(id));
585 static void
586 enqueue_waiting_conn(conn c)
588 tube t;
589 size_t i;
591 global_stat.waiting_ct++;
592 c->type |= CONN_TYPE_WAITING;
593 for (i = 0; i < c->watch.used; i++) {
594 t = c->watch.items[i];
595 t->stat.waiting_ct++;
596 ms_append(&t->waiting, c);
600 static job
601 find_reserved_job_in_conn(conn c, unsigned long long int id)
603 job j;
605 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
606 if (j->id == id) return j;
608 return NULL;
611 static job
612 find_reserved_job_in_list(conn list, unsigned long long int id)
614 job j;
615 conn c;
617 for (c = list->next; c != list; c = c->next) {
618 j = find_reserved_job_in_conn(c, id);
619 if (j) return j;
621 return NULL;
624 static job
625 find_reserved_job(unsigned long long int id)
627 return find_reserved_job_in_list(&running, id);
630 static job
631 peek_ready_job(unsigned long long int id)
634 job j;
635 size_t i;
637 for (i = 0; i < tubes.used; i++) {
638 j = pq_find(&((tube) tubes.items[i])->ready, id);
639 if (j) return j;
641 return NULL;
644 /* TODO: make a global hashtable of jobs because this is slow */
645 static job
646 peek_job(unsigned long long int id)
648 return find_reserved_job(id) ? :
649 peek_ready_job(id) ? :
650 find_delayed_job(id) ? :
651 find_buried_job(id);
654 static void
655 check_err(conn c, const char *s)
657 if (errno == EAGAIN) return;
658 if (errno == EINTR) return;
659 if (errno == EWOULDBLOCK) return;
661 twarn("%s", s);
662 conn_close(c);
663 return;
666 /* Scan the given string for the sequence "\r\n" and return the line length.
667 * Always returns at least 2 if a match is found. Returns 0 if no match. */
668 static int
669 scan_line_end(const char *s, int size)
671 char *match;
673 match = memchr(s, '\r', size - 1);
674 if (!match) return 0;
676 /* this is safe because we only scan size - 1 chars above */
677 if (match[1] == '\n') return match - s + 2;
679 return 0;
682 static int
683 cmd_len(conn c)
685 return scan_line_end(c->cmd, c->cmd_read);
688 /* parse the command line */
689 static int
690 which_cmd(conn c)
692 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
693 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
694 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
695 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
696 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
697 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
698 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
699 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
700 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
701 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
702 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
703 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
704 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
705 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
706 TEST_CMD(c->cmd, CMD_USE, OP_USE);
707 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
708 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
709 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
710 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
711 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
712 return OP_UNKNOWN;
715 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
716 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
717 * This function is idempotent(). */
718 static void
719 fill_extra_data(conn c)
721 int extra_bytes, job_data_bytes = 0, cmd_bytes;
723 if (!c->fd) return; /* the connection was closed */
724 if (!c->cmd_len) return; /* we don't have a complete command */
726 /* how many extra bytes did we read? */
727 extra_bytes = c->cmd_read - c->cmd_len;
729 /* how many bytes should we put into the job body? */
730 if (c->in_job) {
731 job_data_bytes = min(extra_bytes, c->in_job->body_size);
732 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
733 c->in_job_read = job_data_bytes;
736 /* how many bytes are left to go into the future cmd? */
737 cmd_bytes = extra_bytes - job_data_bytes;
738 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
739 c->cmd_read = cmd_bytes;
740 c->cmd_len = 0; /* we no longer know the length of the new command */
743 static void
744 enqueue_incoming_job(conn c)
746 int r;
747 job j = c->in_job;
749 c->in_job = NULL; /* the connection no longer owns this job */
750 c->in_job_read = 0;
752 /* check if the trailer is present and correct */
753 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
754 job_free(j);
755 return reply_msg(c, MSG_EXPECTED_CRLF);
758 if (drain_mode) {
759 job_free(j);
760 return reply_serr(c, MSG_DRAINING);
763 /* we have a complete job, so let's stick it in the pqueue */
764 r = enqueue_job(j, j->delay);
765 op_ct[OP_PUT]++; /* stats */
766 global_stat.total_jobs_ct++;
767 j->tube->stat.total_jobs_ct++;
769 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
771 /* out of memory trying to grow the queue, so it gets buried */
772 bury_job(j);
773 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
776 static unsigned int
777 uptime()
779 return time(NULL) - start_time;
782 static int
783 fmt_stats(char *buf, size_t size, void *x)
785 struct rusage ru = {{0, 0}, {0, 0}};
786 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
787 return snprintf(buf, size, STATS_FMT,
788 global_stat.urgent_ct,
789 ready_ct,
790 global_stat.reserved_ct,
791 get_delayed_job_ct(),
792 global_stat.buried_ct,
793 op_ct[OP_PUT],
794 op_ct[OP_PEEKJOB],
795 op_ct[OP_RESERVE],
796 op_ct[OP_DELETE],
797 op_ct[OP_RELEASE],
798 op_ct[OP_USE],
799 op_ct[OP_WATCH],
800 op_ct[OP_IGNORE],
801 op_ct[OP_BURY],
802 op_ct[OP_KICK],
803 op_ct[OP_STATS],
804 op_ct[OP_JOBSTATS],
805 op_ct[OP_STATS_TUBE],
806 op_ct[OP_LIST_TUBES],
807 op_ct[OP_LIST_TUBE_USED],
808 op_ct[OP_LIST_TUBES_WATCHED],
809 timeout_ct,
810 global_stat.total_jobs_ct,
811 job_data_size_limit,
812 tubes.used,
813 count_cur_conns(),
814 count_cur_producers(),
815 count_cur_workers(),
816 global_stat.waiting_ct,
817 count_tot_conns(),
818 getpid(),
819 VERSION,
820 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
821 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
822 uptime());
826 /* Read a priority value from the given buffer and place it in pri.
827 * Update end to point to the address after the last character consumed.
828 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
829 * conversion and return the status code but not update any values. This is an
830 * easy way to check for errors.
831 * If end is NULL, read_pri will also check that the entire input string was
832 * consumed and return an error code otherwise.
833 * Return 0 on success, or nonzero on failure.
834 * If a failure occurs, pri and end are not modified. */
835 static int
836 read_pri(unsigned int *pri, const char *buf, char **end)
838 char *tend;
839 unsigned int tpri;
841 errno = 0;
842 tpri = strtoul(buf, &tend, 10);
843 if (tend == buf) return -1;
844 if (errno && errno != ERANGE) return -1;
845 if (!end && tend[0] != '\0') return -1;
847 if (pri) *pri = tpri;
848 if (end) *end = tend;
849 return 0;
852 /* Read a delay value from the given buffer and place it in delay.
853 * The interface and behavior are the same as in read_pri(). */
854 static int
855 read_delay(unsigned int *delay, const char *buf, char **end)
857 return read_pri(delay, buf, end);
860 /* Read a timeout value from the given buffer and place it in ttr.
861 * The interface and behavior are the same as in read_pri(). */
862 static int
863 read_ttr(unsigned int *ttr, const char *buf, char **end)
865 return read_pri(ttr, buf, end);
868 static void
869 wait_for_job(conn c)
871 int r;
873 c->state = STATE_WAIT;
874 enqueue_waiting_conn(c);
876 /* this conn is waiting, but we want to know if they hang up */
877 r = conn_update_evq(c, EV_READ | EV_PERSIST);
878 if (r == -1) return twarnx("update events failed"), conn_close(c);
881 typedef int(*fmt_fn)(char *, size_t, void *);
883 static void
884 do_stats(conn c, fmt_fn fmt, void *data)
886 int r, stats_len;
888 /* first, measure how big a buffer we will need */
889 stats_len = fmt(NULL, 0, data);
891 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
892 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
894 /* now actually format the stats data */
895 r = fmt(c->out_job->body, stats_len, data);
896 if (r != stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
897 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
899 c->out_job_sent = 0;
900 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", stats_len - 2);
903 static void
904 do_list_tubes(conn c, ms l)
906 char *buf;
907 tube t;
908 size_t i, resp_z;
910 /* first, measure how big a buffer we will need */
911 resp_z = 6; /* initial "---\n" and final "\r\n" */
912 for (i = 0; i < l->used; i++) {
913 t = l->items[i];
914 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
917 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
918 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
920 /* now actually format the response */
921 buf = c->out_job->body;
922 buf += snprintf(buf, 5, "---\n");
923 for (i = 0; i < l->used; i++) {
924 t = l->items[i];
925 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
927 buf[0] = '\r';
928 buf[1] = '\n';
930 c->out_job_sent = 0;
931 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
934 static int
935 fmt_job_stats(char *buf, size_t size, job j)
937 time_t t;
939 t = time(NULL);
940 return snprintf(buf, size, JOB_STATS_FMT,
941 j->id,
942 j->tube->name,
943 job_state(j),
944 j->pri,
945 (unsigned int) (t - j->creation),
946 j->delay,
947 j->ttr,
948 (unsigned int) (j->deadline - t),
949 j->timeout_ct,
950 j->release_ct,
951 j->bury_ct,
952 j->kick_ct);
955 static int
956 fmt_stats_tube(char *buf, size_t size, tube t)
958 return snprintf(buf, size, STATS_TUBE_FMT,
959 t->name,
960 t->stat.urgent_ct,
961 t->ready.used,
962 t->stat.reserved_ct,
963 pq_used(&t->delay),
964 t->stat.buried_ct,
965 t->stat.total_jobs_ct,
966 t->using_ct,
967 t->watching_ct,
968 t->stat.waiting_ct);
971 static void
972 maybe_enqueue_incoming_job(conn c)
974 job j = c->in_job;
976 /* do we have a complete job? */
977 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
979 /* otherwise we have incomplete data, so just keep waiting */
980 c->state = STATE_WANTDATA;
983 /* j can be NULL */
984 static job
985 remove_this_reserved_job(conn c, job j)
987 j = job_remove(j);
988 if (j) {
989 global_stat.reserved_ct--;
990 j->tube->stat.reserved_ct--;
992 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
993 return j;
996 static job
997 remove_reserved_job(conn c, unsigned long long int id)
999 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
1002 static int
1003 name_is_ok(const char *name, size_t max)
1005 size_t len = strlen(name);
1006 return len > 0 && len <= max &&
1007 strspn(name, NAME_CHARS) == len && name[0] != '-';
1010 static tube
1011 find_tube(const char *name)
1013 tube t;
1014 size_t i;
1016 for (i = 0; i < tubes.used; i++) {
1017 t = tubes.items[i];
1018 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
1020 return NULL;
1023 void
1024 prot_remove_tube(tube t)
1026 ms_remove(&tubes, t);
1029 static tube
1030 make_and_insert_tube(const char *name)
1032 int r;
1033 tube t = NULL;
1035 t = make_tube(name);
1036 if (!t) return NULL;
1038 /* We want this global tube list to behave like "weak" refs, so don't
1039 * increment the ref count. */
1040 r = ms_append(&tubes, t);
1041 if (!r) return tube_dref(t), NULL;
1043 return t;
1046 static tube
1047 find_or_make_tube(const char *name)
1049 return find_tube(name) ? : make_and_insert_tube(name);
1052 static void
1053 dispatch_cmd(conn c)
1055 int r, i;
1056 unsigned int count;
1057 job j;
1058 unsigned char type;
1059 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1060 unsigned int pri, delay, ttr, body_size;
1061 unsigned long long int id;
1062 tube t = NULL;
1064 /* NUL-terminate this string so we can use strtol and friends */
1065 c->cmd[c->cmd_len - 2] = '\0';
1067 /* check for possible maliciousness */
1068 if (strlen(c->cmd) != c->cmd_len - 2) {
1069 return reply_msg(c, MSG_BAD_FORMAT);
1072 type = which_cmd(c);
1073 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1075 switch (type) {
1076 case OP_PUT:
1077 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1078 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1080 r = read_delay(&delay, delay_buf, &ttr_buf);
1081 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1083 r = read_ttr(&ttr, ttr_buf, &size_buf);
1084 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1086 errno = 0;
1087 body_size = strtoul(size_buf, &end_buf, 10);
1088 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1090 if (body_size > job_data_size_limit) {
1091 return reply_msg(c, MSG_JOB_TOO_BIG);
1094 /* don't allow trailing garbage */
1095 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1097 conn_set_producer(c);
1099 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1101 fill_extra_data(c);
1103 /* it's possible we already have a complete job */
1104 maybe_enqueue_incoming_job(c);
1106 break;
1107 case OP_PEEK_READY:
1108 /* don't allow trailing garbage */
1109 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1110 return reply_msg(c, MSG_BAD_FORMAT);
1113 j = job_copy(pq_peek(&c->use->ready));
1115 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1117 op_ct[type]++;
1118 reply_job(c, j, MSG_FOUND);
1119 break;
1120 case OP_PEEK_DELAYED:
1121 /* don't allow trailing garbage */
1122 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1123 return reply_msg(c, MSG_BAD_FORMAT);
1126 j = job_copy(pq_peek(&c->use->delay));
1128 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1130 op_ct[type]++;
1131 reply_job(c, j, MSG_FOUND);
1132 break;
1133 case OP_PEEK_BURIED:
1134 /* don't allow trailing garbage */
1135 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1136 return reply_msg(c, MSG_BAD_FORMAT);
1139 j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
1141 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1143 op_ct[type]++;
1144 reply_job(c, j, MSG_FOUND);
1145 break;
1146 case OP_PEEKJOB:
1147 errno = 0;
1148 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1149 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1151 /* So, peek is annoying, because some other connection might free the
1152 * job while we are still trying to write it out. So we copy it and
1153 * then free the copy when it's done sending. */
1154 j = job_copy(peek_job(id));
1156 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1158 op_ct[type]++;
1159 reply_job(c, j, MSG_FOUND);
1160 break;
1161 case OP_RESERVE:
1162 /* don't allow trailing garbage */
1163 if (c->cmd_len != CMD_RESERVE_LEN + 2) {
1164 return reply_msg(c, MSG_BAD_FORMAT);
1167 op_ct[type]++;
1168 conn_set_worker(c);
1170 if (conn_has_close_deadline(c) && !conn_ready(c)) {
1171 return reply_msg(c, MSG_DEADLINE_SOON);
1174 /* try to get a new job for this guy */
1175 wait_for_job(c);
1176 process_queue();
1177 break;
1178 case OP_DELETE:
1179 errno = 0;
1180 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1181 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1183 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1185 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1187 op_ct[type]++;
1188 job_free(j);
1190 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1191 break;
1192 case OP_RELEASE:
1193 errno = 0;
1194 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1195 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1197 r = read_pri(&pri, pri_buf, &delay_buf);
1198 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1200 r = read_delay(&delay, delay_buf, NULL);
1201 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1203 j = remove_reserved_job(c, id);
1205 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1207 j->pri = pri;
1208 j->delay = delay;
1209 j->release_ct++;
1210 op_ct[type]++;
1211 r = enqueue_job(j, delay);
1212 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1214 /* out of memory trying to grow the queue, so it gets buried */
1215 bury_job(j);
1216 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1217 break;
1218 case OP_BURY:
1219 errno = 0;
1220 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1221 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1223 r = read_pri(&pri, pri_buf, NULL);
1224 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1226 j = remove_reserved_job(c, id);
1228 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1230 j->pri = pri;
1231 op_ct[type]++;
1232 bury_job(j);
1233 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1234 break;
1235 case OP_KICK:
1236 errno = 0;
1237 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1238 if (end_buf == c->cmd + CMD_KICK_LEN) {
1239 return reply_msg(c, MSG_BAD_FORMAT);
1241 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1243 op_ct[type]++;
1245 i = kick_jobs(c->use, count);
1247 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1248 case OP_STATS:
1249 /* don't allow trailing garbage */
1250 if (c->cmd_len != CMD_STATS_LEN + 2) {
1251 return reply_msg(c, MSG_BAD_FORMAT);
1254 op_ct[type]++;
1256 do_stats(c, fmt_stats, NULL);
1257 break;
1258 case OP_JOBSTATS:
1259 errno = 0;
1260 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1261 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1263 j = peek_job(id);
1264 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1266 op_ct[type]++;
1268 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1269 do_stats(c, (fmt_fn) fmt_job_stats, j);
1270 break;
1271 case OP_STATS_TUBE:
1272 name = c->cmd + CMD_STATS_TUBE_LEN;
1273 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1275 t = find_tube(name);
1276 if (!t) return reply_msg(c, MSG_NOTFOUND);
1278 op_ct[type]++;
1280 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1281 t = NULL;
1282 break;
1283 case OP_LIST_TUBES:
1284 /* don't allow trailing garbage */
1285 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1286 return reply_msg(c, MSG_BAD_FORMAT);
1289 op_ct[type]++;
1290 do_list_tubes(c, &tubes);
1291 break;
1292 case OP_LIST_TUBE_USED:
1293 /* don't allow trailing garbage */
1294 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1295 return reply_msg(c, MSG_BAD_FORMAT);
1298 op_ct[type]++;
1299 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1300 break;
1301 case OP_LIST_TUBES_WATCHED:
1302 /* don't allow trailing garbage */
1303 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1304 return reply_msg(c, MSG_BAD_FORMAT);
1307 op_ct[type]++;
1308 do_list_tubes(c, &c->watch);
1309 break;
1310 case OP_USE:
1311 name = c->cmd + CMD_USE_LEN;
1312 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1314 TUBE_ASSIGN(t, find_or_make_tube(name));
1315 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1317 c->use->using_ct--;
1318 TUBE_ASSIGN(c->use, t);
1319 TUBE_ASSIGN(t, NULL);
1320 c->use->using_ct++;
1322 op_ct[type]++;
1323 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1324 break;
1325 case OP_WATCH:
1326 name = c->cmd + CMD_WATCH_LEN;
1327 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1329 TUBE_ASSIGN(t, find_or_make_tube(name));
1330 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1332 r = 1;
1333 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1334 TUBE_ASSIGN(t, NULL);
1335 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1337 op_ct[type]++;
1338 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1339 break;
1340 case OP_IGNORE:
1341 name = c->cmd + CMD_IGNORE_LEN;
1342 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1344 t = NULL;
1345 for (i = 0; i < c->watch.used; i++) {
1346 t = c->watch.items[i];
1347 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1348 t = NULL;
1351 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1353 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1354 t = NULL;
1356 op_ct[type]++;
1357 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1358 break;
1359 default:
1360 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1364 /* if we get a timeout, it means that a job has been reserved for too long, so
1365 * we should put it back in the queue */
1366 static void
1367 h_conn_timeout(conn c)
1369 int r, should_timeout = 0;
1370 job j;
1372 if (conn_waiting(c) && conn_has_close_deadline(c)) should_timeout = 1;
1374 while ((j = soonest_job(c))) {
1375 if (j->deadline > time(NULL)) break;
1376 timeout_ct++; /* stats */
1377 j->timeout_ct++;
1378 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1379 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1380 r = conn_update_evq(c, c->evq.ev_events);
1381 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1384 if (should_timeout) {
1385 dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
1386 return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
1390 void
1391 enter_drain_mode(int sig)
1393 drain_mode = 1;
1396 static void
1397 do_cmd(conn c)
1399 dispatch_cmd(c);
1400 fill_extra_data(c);
1403 static void
1404 reset_conn(conn c)
1406 int r;
1408 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1409 if (r == -1) return twarnx("update events failed"), conn_close(c);
1411 /* was this a peek or stats command? */
1412 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1413 c->out_job = NULL;
1415 c->reply_sent = 0; /* now that we're done, reset this */
1416 c->state = STATE_WANTCOMMAND;
1419 static void
1420 h_conn_data(conn c)
1422 int r;
1423 job j;
1424 struct iovec iov[2];
1426 switch (c->state) {
1427 case STATE_WANTCOMMAND:
1428 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1429 if (r == -1) return check_err(c, "read()");
1430 if (r == 0) return conn_close(c); /* the client hung up */
1432 c->cmd_read += r; /* we got some bytes */
1434 c->cmd_len = cmd_len(c); /* find the EOL */
1436 /* yay, complete command line */
1437 if (c->cmd_len) return do_cmd(c);
1439 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1441 /* command line too long? */
1442 if (c->cmd_read == LINE_BUF_SIZE) {
1443 c->cmd_read = 0; /* discard the input so far */
1444 return reply_msg(c, MSG_BAD_FORMAT);
1447 /* otherwise we have an incomplete line, so just keep waiting */
1448 break;
1449 case STATE_WANTDATA:
1450 j = c->in_job;
1452 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1453 if (r == -1) return check_err(c, "read()");
1454 if (r == 0) return conn_close(c); /* the client hung up */
1456 c->in_job_read += r; /* we got some bytes */
1458 /* (j->in_job_read > j->body_size) can't happen */
1460 maybe_enqueue_incoming_job(c);
1461 break;
1462 case STATE_SENDWORD:
1463 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1464 if (r == -1) return check_err(c, "write()");
1465 if (r == 0) return conn_close(c); /* the client hung up */
1467 c->reply_sent += r; /* we got some bytes */
1469 /* (c->reply_sent > c->reply_len) can't happen */
1471 if (c->reply_sent == c->reply_len) return reset_conn(c);
1473 /* otherwise we sent an incomplete reply, so just keep waiting */
1474 break;
1475 case STATE_SENDJOB:
1476 j = c->out_job;
1478 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1479 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1480 iov[1].iov_base = j->body + c->out_job_sent;
1481 iov[1].iov_len = j->body_size - c->out_job_sent;
1483 r = writev(c->fd, iov, 2);
1484 if (r == -1) return check_err(c, "writev()");
1485 if (r == 0) return conn_close(c); /* the client hung up */
1487 /* update the sent values */
1488 c->reply_sent += r;
1489 if (c->reply_sent >= c->reply_len) {
1490 c->out_job_sent += c->reply_sent - c->reply_len;
1491 c->reply_sent = c->reply_len;
1494 /* (c->out_job_sent > j->body_size) can't happen */
1496 /* are we done? */
1497 if (c->out_job_sent == j->body_size) return reset_conn(c);
1499 /* otherwise we sent incomplete data, so just keep waiting */
1500 break;
1501 case STATE_WAIT: /* keep an eye out in case they hang up */
1502 /* but don't hang up just because our buffer is full */
1503 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1505 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1506 if (r == -1) return check_err(c, "read()");
1507 if (r == 0) return conn_close(c); /* the client hung up */
1508 c->cmd_read += r; /* we got some bytes */
1512 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1513 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1515 static void
1516 h_conn(const int fd, const short which, conn c)
1518 if (fd != c->fd) {
1519 twarnx("Argh! event fd doesn't match conn fd.");
1520 close(fd);
1521 return conn_close(c);
1524 switch (which) {
1525 case EV_TIMEOUT:
1526 h_conn_timeout(c);
1527 event_add(&c->evq, NULL); /* seems to be necessary */
1528 break;
1529 case EV_READ:
1530 /* fall through... */
1531 case EV_WRITE:
1532 /* fall through... */
1533 default:
1534 h_conn_data(c);
1537 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1540 static void
1541 h_delay()
1543 int r;
1544 job j;
1545 time_t t;
1547 t = time(NULL);
1548 while ((j = delay_q_peek())) {
1549 if (j->deadline > t) break;
1550 j = delay_q_take();
1551 r = enqueue_job(j, 0);
1552 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1555 set_main_delay_timeout();
1558 void
1559 h_accept(const int fd, const short which, struct event *ev)
1561 conn c;
1562 int cfd, flags, r;
1563 socklen_t addrlen;
1564 struct sockaddr addr;
1566 if (which == EV_TIMEOUT) return h_delay();
1568 addrlen = sizeof addr;
1569 cfd = accept(fd, &addr, &addrlen);
1570 if (cfd == -1) {
1571 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1572 if (errno == EMFILE) brake();
1573 return;
1576 flags = fcntl(cfd, F_GETFL, 0);
1577 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1579 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1580 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1582 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1583 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1585 dprintf("accepted conn, fd=%d\n", cfd);
1586 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1587 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1590 void
1591 prot_init()
1593 start_time = time(NULL);
1594 memset(op_ct, 0, sizeof(op_ct));
1596 ms_init(&tubes, NULL, NULL);
1598 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1599 if (!default_tube) twarnx("Out of memory during startup!");