Portability fix for non-32-bit architectures.
[beanstalkd.git] / prot.c
blob12cda0cb8e36fae7a68a585674115e49a984962b
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEK "peek"
49 #define CMD_PEEKJOB "peek "
50 #define CMD_RESERVE "reserve"
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_STATS "stats"
56 #define CMD_JOBSTATS "stats-job "
57 #define CMD_USE "use "
58 #define CMD_WATCH "watch "
59 #define CMD_IGNORE "ignore "
60 #define CMD_LIST_TUBES "list-tubes"
61 #define CMD_LIST_TUBE_USED "list-tube-used"
62 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
63 #define CMD_STATS_TUBE "stats-tube "
65 #define CONSTSTRLEN(m) (sizeof(m) - 1)
67 #define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
68 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
69 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
70 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
71 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
72 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
73 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
74 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
75 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
76 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
77 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
78 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
79 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
80 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
81 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
82 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
84 #define MSG_FOUND "FOUND"
85 #define MSG_NOTFOUND "NOT_FOUND\r\n"
86 #define MSG_RESERVED "RESERVED"
87 #define MSG_DELETED "DELETED\r\n"
88 #define MSG_RELEASED "RELEASED\r\n"
89 #define MSG_BURIED "BURIED\r\n"
90 #define MSG_BURIED_FMT "BURIED %llu\r\n"
91 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
92 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
94 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
95 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
96 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
97 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
98 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
100 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
101 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
102 #define MSG_DRAINING "DRAINING\r\n"
103 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
104 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
105 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
106 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
108 #define STATS_FMT "---\n" \
109 "current-jobs-urgent: %u\n" \
110 "current-jobs-ready: %u\n" \
111 "current-jobs-reserved: %u\n" \
112 "current-jobs-delayed: %u\n" \
113 "current-jobs-buried: %u\n" \
114 "cmd-put: %llu\n" \
115 "cmd-peek: %llu\n" \
116 "cmd-reserve: %llu\n" \
117 "cmd-delete: %llu\n" \
118 "cmd-release: %llu\n" \
119 "cmd-use: %llu\n" \
120 "cmd-watch: %llu\n" \
121 "cmd-ignore: %llu\n" \
122 "cmd-bury: %llu\n" \
123 "cmd-kick: %llu\n" \
124 "cmd-stats: %llu\n" \
125 "cmd-stats-job: %llu\n" \
126 "cmd-stats-tube: %llu\n" \
127 "cmd-list-tubes: %llu\n" \
128 "cmd-list-tube-used: %llu\n" \
129 "cmd-list-tubes-watched: %llu\n" \
130 "job-timeouts: %llu\n" \
131 "total-jobs: %llu\n" \
132 "current-tubes: %Zu\n" \
133 "current-connections: %u\n" \
134 "current-producers: %u\n" \
135 "current-workers: %u\n" \
136 "current-waiting: %u\n" \
137 "total-connections: %u\n" \
138 "pid: %u\n" \
139 "version: %s\n" \
140 "rusage-utime: %d.%06d\n" \
141 "rusage-stime: %d.%06d\n" \
142 "uptime: %u\n" \
143 "\r\n"
145 #define STATS_TUBE_FMT "---\n" \
146 "name: %s\n" \
147 "current-jobs-urgent: %u\n" \
148 "current-jobs-ready: %u\n" \
149 "current-jobs-reserved: %u\n" \
150 "current-jobs-buried: %u\n" \
151 "total-jobs: %llu\n" \
152 "current-waiting: %u\n" \
153 "\r\n"
155 #define JOB_STATS_FMT "---\n" \
156 "id: %llu\n" \
157 "tube: %s\n" \
158 "state: %s\n" \
159 "pri: %u\n" \
160 "age: %u\n" \
161 "delay: %u\n" \
162 "ttr: %u\n" \
163 "time-left: %u\n" \
164 "timeouts: %u\n" \
165 "releases: %u\n" \
166 "buries: %u\n" \
167 "kicks: %u\n" \
168 "\r\n"
170 static struct pq delay_q;
172 static unsigned int ready_ct = 0;
173 static struct stats global_stat = {0, 0, 0, 0, 0};
175 static tube default_tube;
176 static struct ms tubes;
178 static int drain_mode = 0;
179 static time_t start_time;
180 static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
181 delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
182 stats_job_ct = 0, stats_ct = 0, timeout_ct = 0,
183 list_tubes_ct = 0, stats_tube_ct = 0,
184 list_tube_used_ct = 0, list_watched_tubes_ct = 0,
185 use_ct = 0, watch_ct = 0, ignore_ct = 0;
188 /* Doubly-linked list of connections with at least one reserved job. */
189 static struct conn running = { &running, &running, 0 };
191 #ifdef DEBUG
192 static const char * op_names[] = {
193 "<unknown>",
194 CMD_PUT,
195 CMD_PEEKJOB,
196 CMD_RESERVE,
197 CMD_DELETE,
198 CMD_RELEASE,
199 CMD_BURY,
200 CMD_KICK,
201 CMD_STATS,
202 CMD_JOBSTATS,
203 CMD_PEEK,
204 CMD_USE,
205 CMD_WATCH,
206 CMD_IGNORE,
207 CMD_LIST_TUBES,
208 CMD_LIST_TUBE_USED,
209 CMD_LIST_TUBES_WATCHED,
210 CMD_STATS_TUBE,
212 #endif
214 static int
215 buried_job_p(tube t)
217 return job_list_any_p(&t->buried);
220 static void
221 reply(conn c, const char *line, int len, int state)
223 int r;
225 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
226 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
228 c->reply = line;
229 c->reply_len = len;
230 c->reply_sent = 0;
231 c->state = state;
232 dprintf("sending reply: %.*s", len, line);
235 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
237 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
238 reply_msg((c),(e)))
240 static void
241 reply_line(conn c, int state, const char *fmt, ...)
243 int r;
244 va_list ap;
246 va_start(ap, fmt);
247 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
248 va_end(ap);
250 /* Make sure the buffer was big enough. If not, we have a bug. */
251 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
253 return reply(c, c->reply_buf, r, state);
256 static void
257 reply_job(conn c, job j, const char *word)
259 /* tell this connection which job to send */
260 c->out_job = j;
261 c->out_job_sent = 0;
263 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
264 word, j->id, j->body_size - 2);
267 conn
268 remove_waiting_conn(conn c)
270 tube t;
271 size_t i;
273 if (!(c->type & CONN_TYPE_WAITING)) return NULL;
274 c->type &= ~CONN_TYPE_WAITING;
275 global_stat.waiting_ct--;
276 for (i = 0; i < c->watch.used; i++) {
277 t = c->watch.items[i];
278 t->stat.waiting_ct--;
279 ms_remove(&t->waiting, c);
281 return c;
284 static void
285 reserve_job(conn c, job j)
287 j->deadline = time(NULL) + j->ttr;
288 global_stat.reserved_ct++; /* stats */
289 j->tube->stat.reserved_ct++;
290 conn_insert(&running, c);
291 j->state = JOB_STATE_RESERVED;
292 job_insert(&c->reserved_jobs, j);
293 return reply_job(c, j, MSG_RESERVED);
296 static job
297 next_eligible_job()
299 tube t;
300 size_t i;
301 job j = NULL, candidate;
303 dprintf("tubes.used = %d\n", tubes.used);
304 for (i = 0; i < tubes.used; i++) {
305 t = tubes.items[i];
306 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
307 t->name, t->waiting.used, t->ready.used);
308 if (t->waiting.used && t->ready.used) {
309 candidate = pq_peek(&t->ready);
310 if (!j || candidate->id < j->id) j = candidate;
312 dprintf("i = %d, tubes.used = %d\n", i, tubes.used);
315 return j;
318 static void
319 process_queue()
321 job j;
323 dprintf("processing queue\n");
324 while ((j = next_eligible_job())) {
325 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
326 j = pq_take(&j->tube->ready);
327 ready_ct--;
328 if (j->pri < URGENT_THRESHOLD) {
329 global_stat.urgent_ct--;
330 j->tube->stat.urgent_ct--;
332 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
336 static int
337 enqueue_job(job j, unsigned int delay)
339 int r;
341 if (delay) {
342 j->deadline = time(NULL) + delay;
343 r = pq_give(&delay_q, j);
344 if (!r) return 0;
345 j->state = JOB_STATE_DELAYED;
346 set_main_timeout(pq_peek(&delay_q)->deadline);
347 } else {
348 r = pq_give(&j->tube->ready, j);
349 if (!r) return 0;
350 j->state = JOB_STATE_READY;
351 ready_ct++;
352 if (j->pri < URGENT_THRESHOLD) {
353 global_stat.urgent_ct++;
354 j->tube->stat.urgent_ct++;
357 process_queue();
358 return 1;
361 static void
362 bury_job(job j)
364 job_insert(&j->tube->buried, j);
365 global_stat.buried_ct++;
366 j->tube->stat.buried_ct++;
367 j->state = JOB_STATE_BURIED;
368 j->bury_ct++;
371 void
372 enqueue_reserved_jobs(conn c)
374 int r;
375 job j;
377 while (job_list_any_p(&c->reserved_jobs)) {
378 j = job_remove(c->reserved_jobs.next);
379 r = enqueue_job(j, 0);
380 if (!r) bury_job(j);
381 global_stat.reserved_ct--;
382 j->tube->stat.reserved_ct--;
383 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
387 static job
388 delay_q_peek()
390 return pq_peek(&delay_q);
393 static job
394 delay_q_take()
396 return pq_take(&delay_q);
399 static job
400 remove_this_buried_job(job j)
402 j = job_remove(j);
403 if (j) {
404 global_stat.buried_ct--;
405 j->tube->stat.buried_ct--;
407 return j;
410 static int
411 kick_buried_job(tube t)
413 int r;
414 job j;
416 if (!buried_job_p(t)) return 0;
417 j = remove_this_buried_job(t->buried.next);
418 j->kick_ct++;
419 r = enqueue_job(j, 0);
420 if (r) return 1;
422 /* ready queue is full, so bury it */
423 bury_job(j);
424 return 0;
427 static unsigned int
428 get_delayed_job_ct()
430 return pq_used(&delay_q);
433 static int
434 kick_delayed_job()
436 int r;
437 job j;
439 if (get_delayed_job_ct() < 1) return 0;
440 j = delay_q_take();
441 j->kick_ct++;
442 r = enqueue_job(j, 0);
443 if (r) return 1;
445 /* ready queue is full, so delay it again */
446 r = enqueue_job(j, j->delay);
447 if (r) return 0;
449 /* last resort */
450 bury_job(j);
451 return 0;
454 /* return the number of jobs successfully kicked */
455 static unsigned int
456 kick_buried_jobs(tube t, unsigned int n)
458 unsigned int i;
459 for (i = 0; (i < n) && kick_buried_job(t); ++i);
460 return i;
463 /* return the number of jobs successfully kicked */
464 static unsigned int
465 kick_delayed_jobs(unsigned int n)
467 unsigned int i;
468 for (i = 0; (i < n) && kick_delayed_job(); ++i);
469 return i;
472 static unsigned int
473 kick_jobs(tube t, unsigned int n)
475 if (buried_job_p(t)) return kick_buried_jobs(t, n);
476 return kick_delayed_jobs(n);
479 static job
480 peek_buried_job()
482 tube t;
483 size_t i;
485 for (i = 0; i < tubes.used; i++) {
486 t = tubes.items[i];
487 if (buried_job_p(t)) return t->buried.next;
489 return NULL;
492 static job
493 find_buried_job_in_tube(tube t, unsigned long long int id)
495 job j;
497 for (j = t->buried.next; j != &t->buried; j = j->next) {
498 if (j->id == id) return j;
500 return NULL;
503 static job
504 find_buried_job(unsigned long long int id)
506 job j;
507 size_t i;
509 for (i = 0; i < tubes.used; i++) {
510 j = find_buried_job_in_tube(tubes.items[i], id);
511 if (j) return j;
513 return NULL;
516 static job
517 remove_buried_job(unsigned long long int id)
519 return remove_this_buried_job(find_buried_job(id));
522 static void
523 enqueue_waiting_conn(conn c)
525 tube t;
526 size_t i;
528 global_stat.waiting_ct++;
529 c->type |= CONN_TYPE_WAITING;
530 for (i = 0; i < c->watch.used; i++) {
531 t = c->watch.items[i];
532 t->stat.waiting_ct++;
533 ms_append(&t->waiting, c);
537 static job
538 find_reserved_job_in_conn(conn c, unsigned long long int id)
540 job j;
542 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
543 if (j->id == id) return j;
545 return NULL;
548 static job
549 find_reserved_job_in_list(conn list, unsigned long long int id)
551 job j;
552 conn c;
554 for (c = list->next; c != list; c = c->next) {
555 j = find_reserved_job_in_conn(c, id);
556 if (j) return j;
558 return NULL;
561 static job
562 find_reserved_job(unsigned long long int id)
564 return find_reserved_job_in_list(&running, id);
567 static job
568 peek_ready_job(unsigned long long int id)
571 job j;
572 size_t i;
574 for (i = 0; i < tubes.used; i++) {
575 j = pq_find(&((tube) tubes.items[i])->ready, id);
576 if (j) return j;
578 return NULL;
581 /* TODO: make a global hashtable of jobs because this is slow */
582 static job
583 peek_job(unsigned long long int id)
585 return peek_ready_job(id) ? :
586 pq_find(&delay_q, id) ? :
587 find_reserved_job(id) ? :
588 find_buried_job(id);
591 static void
592 check_err(conn c, const char *s)
594 if (errno == EAGAIN) return;
595 if (errno == EINTR) return;
596 if (errno == EWOULDBLOCK) return;
598 twarn("%s", s);
599 conn_close(c);
600 return;
603 /* Scan the given string for the sequence "\r\n" and return the line length.
604 * Always returns at least 2 if a match is found. Returns 0 if no match. */
605 static int
606 scan_line_end(const char *s, int size)
608 char *match;
610 match = memchr(s, '\r', size - 1);
611 if (!match) return 0;
613 /* this is safe because we only scan size - 1 chars above */
614 if (match[1] == '\n') return match - s + 2;
616 return 0;
619 static int
620 cmd_len(conn c)
622 return scan_line_end(c->cmd, c->cmd_read);
625 /* parse the command line */
626 static int
627 which_cmd(conn c)
629 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
630 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
631 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
632 TEST_CMD(c->cmd, CMD_PEEK, OP_PEEK);
633 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
634 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
635 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
636 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
637 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
638 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
639 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
640 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
641 TEST_CMD(c->cmd, CMD_USE, OP_USE);
642 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
643 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
644 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
645 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
646 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
647 return OP_UNKNOWN;
650 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
651 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
652 * This function is idempotent(). */
653 static void
654 fill_extra_data(conn c)
656 int extra_bytes, job_data_bytes = 0, cmd_bytes;
658 if (!c->fd) return; /* the connection was closed */
659 if (!c->cmd_len) return; /* we don't have a complete command */
661 /* how many extra bytes did we read? */
662 extra_bytes = c->cmd_read - c->cmd_len;
664 /* how many bytes should we put into the job body? */
665 if (c->in_job) {
666 job_data_bytes = min(extra_bytes, c->in_job->body_size);
667 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
668 c->in_job_read = job_data_bytes;
671 /* how many bytes are left to go into the future cmd? */
672 cmd_bytes = extra_bytes - job_data_bytes;
673 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
674 c->cmd_read = cmd_bytes;
675 c->cmd_len = 0; /* we no longer know the length of the new command */
678 static void
679 enqueue_incoming_job(conn c)
681 int r;
682 job j = c->in_job;
684 c->in_job = NULL; /* the connection no longer owns this job */
685 c->in_job_read = 0;
687 /* check if the trailer is present and correct */
688 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
689 job_free(j);
690 return reply_msg(c, MSG_EXPECTED_CRLF);
693 /* we have a complete job, so let's stick it in the pqueue */
694 r = enqueue_job(j, j->delay);
695 put_ct++; /* stats */
696 global_stat.total_jobs_ct++;
697 j->tube->stat.total_jobs_ct++;
699 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
701 /* out of memory trying to grow the queue, so it gets buried */
702 bury_job(j);
703 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
706 static unsigned int
707 uptime()
709 return time(NULL) - start_time;
712 static int
713 fmt_stats(char *buf, size_t size, void *x)
715 struct rusage ru = {{0, 0}, {0, 0}};
716 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
717 return snprintf(buf, size, STATS_FMT,
718 global_stat.urgent_ct,
719 ready_ct,
720 global_stat.reserved_ct,
721 get_delayed_job_ct(),
722 global_stat.buried_ct,
723 put_ct,
724 peek_ct,
725 reserve_ct,
726 delete_ct,
727 release_ct,
728 use_ct,
729 watch_ct,
730 ignore_ct,
731 bury_ct,
732 kick_ct,
733 stats_ct,
734 stats_job_ct,
735 stats_tube_ct,
736 list_tubes_ct,
737 list_tube_used_ct,
738 list_watched_tubes_ct,
739 timeout_ct,
740 global_stat.total_jobs_ct,
741 tubes.used,
742 count_cur_conns(),
743 count_cur_producers(),
744 count_cur_workers(),
745 global_stat.waiting_ct,
746 count_tot_conns(),
747 getpid(),
748 VERSION,
749 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
750 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
751 uptime());
755 /* Read a priority value from the given buffer and place it in pri.
756 * Update end to point to the address after the last character consumed.
757 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
758 * conversion and return the status code but not update any values. This is an
759 * easy way to check for errors.
760 * If end is NULL, read_pri will also check that the entire input string was
761 * consumed and return an error code otherwise.
762 * Return 0 on success, or nonzero on failure.
763 * If a failure occurs, pri and end are not modified. */
764 static int
765 read_pri(unsigned int *pri, const char *buf, char **end)
767 char *tend;
768 unsigned int tpri;
770 errno = 0;
771 tpri = strtoul(buf, &tend, 10);
772 if (tend == buf) return -1;
773 if (errno && errno != ERANGE) return -1;
774 if (!end && tend[0] != '\0') return -1;
776 if (pri) *pri = tpri;
777 if (end) *end = tend;
778 return 0;
781 /* Read a delay value from the given buffer and place it in delay.
782 * The interface and behavior are the same as in read_pri(). */
783 static int
784 read_delay(unsigned int *delay, const char *buf, char **end)
786 return read_pri(delay, buf, end);
789 /* Read a timeout value from the given buffer and place it in ttr.
790 * The interface and behavior are the same as in read_pri(). */
791 static int
792 read_ttr(unsigned int *ttr, const char *buf, char **end)
794 return read_pri(ttr, buf, end);
797 static void
798 wait_for_job(conn c)
800 int r;
802 /* this conn is waiting, but we want to know if they hang up */
803 r = conn_update_evq(c, EV_READ | EV_PERSIST);
804 if (r == -1) return twarnx("update events failed"), conn_close(c);
806 c->state = STATE_WAIT;
807 enqueue_waiting_conn(c);
810 typedef int(*fmt_fn)(char *, size_t, void *);
812 static void
813 do_stats(conn c, fmt_fn fmt, void *data)
815 int r, stats_len;
817 /* first, measure how big a buffer we will need */
818 stats_len = fmt(NULL, 0, data);
820 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
821 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
823 /* now actually format the stats data */
824 r = fmt(c->out_job->body, stats_len, data);
825 if (r != stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
826 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
828 c->out_job_sent = 0;
829 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", stats_len - 2);
832 static void
833 do_list_tubes(conn c, ms l)
835 char *buf;
836 tube t;
837 size_t i, resp_z;
839 /* first, measure how big a buffer we will need */
840 resp_z = 6; /* initial "---\n" and final "\r\n" */
841 for (i = 0; i < l->used; i++) {
842 t = l->items[i];
843 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
846 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
847 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
849 /* now actually format the response */
850 buf = c->out_job->body;
851 buf += snprintf(buf, 5, "---\n");
852 for (i = 0; i < l->used; i++) {
853 t = l->items[i];
854 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
856 buf[0] = '\r';
857 buf[1] = '\n';
859 c->out_job_sent = 0;
860 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
863 static int
864 fmt_job_stats(char *buf, size_t size, job j)
866 time_t t;
868 t = time(NULL);
869 return snprintf(buf, size, JOB_STATS_FMT,
870 j->id,
871 j->tube->name,
872 job_state(j),
873 j->pri,
874 (unsigned int) (t - j->creation),
875 j->delay,
876 j->ttr,
877 (unsigned int) (j->deadline - t),
878 j->timeout_ct,
879 j->release_ct,
880 j->bury_ct,
881 j->kick_ct);
884 static int
885 fmt_stats_tube(char *buf, size_t size, tube t)
887 return snprintf(buf, size, STATS_TUBE_FMT,
888 t->name,
889 t->stat.urgent_ct,
890 t->ready.used,
891 t->stat.reserved_ct,
892 t->stat.buried_ct,
893 t->stat.total_jobs_ct,
894 t->stat.waiting_ct);
897 static void
898 maybe_enqueue_incoming_job(conn c)
900 job j = c->in_job;
902 /* do we have a complete job? */
903 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
905 /* otherwise we have incomplete data, so just keep waiting */
906 c->state = STATE_WANTDATA;
909 /* j can be NULL */
910 static job
911 remove_this_reserved_job(conn c, job j)
913 j = job_remove(j);
914 if (j) {
915 global_stat.reserved_ct--;
916 j->tube->stat.reserved_ct--;
918 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
919 return j;
922 static job
923 remove_reserved_job(conn c, unsigned long long int id)
925 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
928 static int
929 name_is_ok(const char *name, size_t max)
931 size_t len = strlen(name);
932 return len > 0 && len <= max &&
933 strspn(name, NAME_CHARS) == len && name[0] != '-';
936 static tube
937 find_tube(const char *name)
939 tube t;
940 size_t i;
942 for (i = 0; i < tubes.used; i++) {
943 t = tubes.items[i];
944 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
946 return NULL;
949 void
950 prot_remove_tube(tube t)
952 ms_remove(&tubes, t);
955 static tube
956 make_and_insert_tube(const char *name)
958 int r;
959 tube t = NULL;
961 t = make_tube(name);
962 if (!t) return NULL;
964 /* We want this global tube list to behave like "weak" refs, so don't
965 * increment the ref count. */
966 r = ms_append(&tubes, t);
967 if (!r) return tube_dref(t), NULL;
969 return t;
972 static tube
973 find_or_make_tube(const char *name)
975 return find_tube(name) ? : make_and_insert_tube(name);
978 static void
979 dispatch_cmd(conn c)
981 int r, i;
982 unsigned int count;
983 job j;
984 char type;
985 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
986 unsigned int pri, delay, ttr, body_size;
987 unsigned long long int id;
988 tube t = NULL;
990 /* NUL-terminate this string so we can use strtol and friends */
991 c->cmd[c->cmd_len - 2] = '\0';
993 /* check for possible maliciousness */
994 if (strlen(c->cmd) != c->cmd_len - 2) {
995 return reply_msg(c, MSG_BAD_FORMAT);
998 type = which_cmd(c);
999 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1001 switch (type) {
1002 case OP_PUT:
1003 if (drain_mode) return reply_serr(c, MSG_DRAINING);
1005 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1006 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1008 r = read_delay(&delay, delay_buf, &ttr_buf);
1009 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1011 r = read_ttr(&ttr, ttr_buf, &size_buf);
1012 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1014 errno = 0;
1015 body_size = strtoul(size_buf, &end_buf, 10);
1016 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1018 if (body_size > JOB_DATA_SIZE_LIMIT) {
1019 return reply_msg(c, MSG_JOB_TOO_BIG);
1022 /* don't allow trailing garbage */
1023 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1025 conn_set_producer(c);
1027 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1029 fill_extra_data(c);
1031 /* it's possible we already have a complete job */
1032 maybe_enqueue_incoming_job(c);
1034 break;
1035 case OP_PEEK:
1036 /* don't allow trailing garbage */
1037 if (c->cmd_len != CMD_PEEK_LEN + 2) {
1038 return reply_msg(c, MSG_BAD_FORMAT);
1041 j = job_copy(peek_buried_job() ? : delay_q_peek());
1043 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1045 peek_ct++; /* stats */
1046 reply_job(c, j, MSG_FOUND);
1047 break;
1048 case OP_PEEKJOB:
1049 errno = 0;
1050 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1051 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1053 /* So, peek is annoying, because some other connection might free the
1054 * job while we are still trying to write it out. So we copy it and
1055 * then free the copy when it's done sending. */
1056 j = job_copy(peek_job(id));
1058 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1060 peek_ct++; /* stats */
1061 reply_job(c, j, MSG_FOUND);
1062 break;
1063 case OP_RESERVE:
1064 /* don't allow trailing garbage */
1065 if (c->cmd_len != CMD_RESERVE_LEN + 2) {
1066 return reply_msg(c, MSG_BAD_FORMAT);
1069 reserve_ct++; /* stats */
1070 conn_set_worker(c);
1072 /* try to get a new job for this guy */
1073 wait_for_job(c);
1074 process_queue();
1075 break;
1076 case OP_DELETE:
1077 errno = 0;
1078 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1079 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1081 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1083 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1085 delete_ct++; /* stats */
1086 job_free(j);
1088 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1089 break;
1090 case OP_RELEASE:
1091 errno = 0;
1092 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1093 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1095 r = read_pri(&pri, pri_buf, &delay_buf);
1096 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1098 r = read_delay(&delay, delay_buf, NULL);
1099 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1101 j = remove_reserved_job(c, id);
1103 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1105 j->pri = pri;
1106 j->delay = delay;
1107 j->release_ct++;
1108 release_ct++; /* stats */
1109 r = enqueue_job(j, delay);
1110 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1112 /* out of memory trying to grow the queue, so it gets buried */
1113 bury_job(j);
1114 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1115 break;
1116 case OP_BURY:
1117 errno = 0;
1118 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1119 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1121 r = read_pri(&pri, pri_buf, NULL);
1122 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1124 j = remove_reserved_job(c, id);
1126 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1128 j->pri = pri;
1129 bury_ct++; /* stats */
1130 bury_job(j);
1131 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1132 break;
1133 case OP_KICK:
1134 errno = 0;
1135 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1136 if (end_buf == c->cmd + CMD_KICK_LEN) {
1137 return reply_msg(c, MSG_BAD_FORMAT);
1139 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1141 kick_ct++; /* stats */
1143 i = kick_jobs(c->use, count);
1145 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1146 case OP_STATS:
1147 /* don't allow trailing garbage */
1148 if (c->cmd_len != CMD_STATS_LEN + 2) {
1149 return reply_msg(c, MSG_BAD_FORMAT);
1152 stats_ct++; /* stats */
1154 do_stats(c, fmt_stats, NULL);
1155 break;
1156 case OP_JOBSTATS:
1157 errno = 0;
1158 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1159 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1161 j = peek_job(id);
1162 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1164 stats_job_ct++; /* stats */
1166 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1167 do_stats(c, (fmt_fn) fmt_job_stats, j);
1168 break;
1169 case OP_STATS_TUBE:
1170 name = c->cmd + CMD_STATS_TUBE_LEN;
1171 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1173 t = find_tube(name);
1174 if (!t) return reply_msg(c, MSG_NOTFOUND);
1176 stats_tube_ct++; /* stats */
1178 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1179 t = NULL;
1180 break;
1181 case OP_LIST_TUBES:
1182 /* don't allow trailing garbage */
1183 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1184 return reply_msg(c, MSG_BAD_FORMAT);
1187 list_tubes_ct++;
1188 do_list_tubes(c, &tubes);
1189 break;
1190 case OP_LIST_TUBE_USED:
1191 /* don't allow trailing garbage */
1192 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1193 return reply_msg(c, MSG_BAD_FORMAT);
1196 list_tube_used_ct++;
1197 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1198 break;
1199 case OP_LIST_TUBES_WATCHED:
1200 /* don't allow trailing garbage */
1201 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1202 return reply_msg(c, MSG_BAD_FORMAT);
1205 list_watched_tubes_ct++;
1206 do_list_tubes(c, &c->watch);
1207 break;
1208 case OP_USE:
1209 name = c->cmd + CMD_USE_LEN;
1210 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1212 TUBE_ASSIGN(t, find_or_make_tube(name));
1213 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1215 TUBE_ASSIGN(c->use, t);
1216 TUBE_ASSIGN(t, NULL);
1218 use_ct++;
1219 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1220 break;
1221 case OP_WATCH:
1222 name = c->cmd + CMD_WATCH_LEN;
1223 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1225 TUBE_ASSIGN(t, find_or_make_tube(name));
1226 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1228 r = 1;
1229 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1230 TUBE_ASSIGN(t, NULL);
1231 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1233 watch_ct++;
1234 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1235 break;
1236 case OP_IGNORE:
1237 name = c->cmd + CMD_IGNORE_LEN;
1238 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1240 t = NULL;
1241 for (i = 0; i < c->watch.used; i++) {
1242 t = c->watch.items[i];
1243 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1244 t = NULL;
1247 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1249 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1250 t = NULL;
1252 ignore_ct++;
1253 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1254 break;
1255 default:
1256 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1260 /* if we get a timeout, it means that a job has been reserved for too long, so
1261 * we should put it back in the queue */
1262 static void
1263 h_conn_timeout(conn c)
1265 int r;
1266 job j;
1268 while ((j = soonest_job(c))) {
1269 if (j->deadline > time(NULL)) return;
1270 timeout_ct++; /* stats */
1271 j->timeout_ct++;
1272 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1273 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1274 r = conn_update_evq(c, c->evq.ev_events);
1275 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1279 void
1280 enter_drain_mode(int sig)
1282 drain_mode = 1;
1285 static void
1286 do_cmd(conn c)
1288 dispatch_cmd(c);
1289 fill_extra_data(c);
1292 static void
1293 reset_conn(conn c)
1295 int r;
1297 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1298 if (r == -1) return twarnx("update events failed"), conn_close(c);
1300 /* was this a peek or stats command? */
1301 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1302 c->out_job = NULL;
1304 c->reply_sent = 0; /* now that we're done, reset this */
1305 c->state = STATE_WANTCOMMAND;
1308 static void
1309 h_conn_data(conn c)
1311 int r;
1312 job j;
1313 struct iovec iov[2];
1315 switch (c->state) {
1316 case STATE_WANTCOMMAND:
1317 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1318 if (r == -1) return check_err(c, "read()");
1319 if (r == 0) return conn_close(c); /* the client hung up */
1321 c->cmd_read += r; /* we got some bytes */
1323 c->cmd_len = cmd_len(c); /* find the EOL */
1324 dprintf("cmd_len is %d\n", c->cmd_len);
1326 /* yay, complete command line */
1327 if (c->cmd_len) return do_cmd(c);
1329 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1331 dprintf("cmd_read is %d\n", c->cmd_read);
1332 /* command line too long? */
1333 if (c->cmd_read == LINE_BUF_SIZE) {
1334 return reply_msg(c, MSG_BAD_FORMAT);
1337 /* otherwise we have an incomplete line, so just keep waiting */
1338 break;
1339 case STATE_WANTDATA:
1340 j = c->in_job;
1342 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1343 if (r == -1) return check_err(c, "read()");
1344 if (r == 0) return conn_close(c); /* the client hung up */
1346 c->in_job_read += r; /* we got some bytes */
1348 /* (j->in_job_read > j->body_size) can't happen */
1350 maybe_enqueue_incoming_job(c);
1351 break;
1352 case STATE_SENDWORD:
1353 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1354 if (r == -1) return check_err(c, "write()");
1355 if (r == 0) return conn_close(c); /* the client hung up */
1357 c->reply_sent += r; /* we got some bytes */
1359 /* (c->reply_sent > c->reply_len) can't happen */
1361 if (c->reply_sent == c->reply_len) return reset_conn(c);
1363 /* otherwise we sent an incomplete reply, so just keep waiting */
1364 break;
1365 case STATE_SENDJOB:
1366 j = c->out_job;
1368 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1369 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1370 iov[1].iov_base = j->body + c->out_job_sent;
1371 iov[1].iov_len = j->body_size - c->out_job_sent;
1373 r = writev(c->fd, iov, 2);
1374 if (r == -1) return check_err(c, "writev()");
1375 if (r == 0) return conn_close(c); /* the client hung up */
1377 /* update the sent values */
1378 c->reply_sent += r;
1379 if (c->reply_sent >= c->reply_len) {
1380 c->out_job_sent += c->reply_sent - c->reply_len;
1381 c->reply_sent = c->reply_len;
1384 /* (c->out_job_sent > j->body_size) can't happen */
1386 /* are we done? */
1387 if (c->out_job_sent == j->body_size) return reset_conn(c);
1389 /* otherwise we sent incomplete data, so just keep waiting */
1390 break;
1391 case STATE_WAIT: /* keep an eye out in case they hang up */
1392 /* but don't hang up just because our buffer is full */
1393 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1395 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1396 if (r == -1) return check_err(c, "read()");
1397 if (r == 0) return conn_close(c); /* the client hung up */
1398 c->cmd_read += r; /* we got some bytes */
1402 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1403 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1405 static void
1406 h_conn(const int fd, const short which, conn c)
1408 if (fd != c->fd) {
1409 twarnx("Argh! event fd doesn't match conn fd.");
1410 close(fd);
1411 return conn_close(c);
1414 switch (which) {
1415 case EV_TIMEOUT:
1416 h_conn_timeout(c);
1417 event_add(&c->evq, NULL); /* seems to be necessary */
1418 break;
1419 case EV_READ:
1420 /* fall through... */
1421 case EV_WRITE:
1422 /* fall through... */
1423 default:
1424 h_conn_data(c);
1427 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1430 static void
1431 h_delay()
1433 int r;
1434 job j;
1435 time_t t;
1437 t = time(NULL);
1438 while ((j = delay_q_peek())) {
1439 if (j->deadline > t) break;
1440 j = delay_q_take();
1441 r = enqueue_job(j, 0);
1442 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1445 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
1448 void
1449 h_accept(const int fd, const short which, struct event *ev)
1451 conn c;
1452 int cfd, flags, r;
1453 socklen_t addrlen;
1454 struct sockaddr addr;
1456 if (which == EV_TIMEOUT) return h_delay();
1458 addrlen = sizeof addr;
1459 cfd = accept(fd, &addr, &addrlen);
1460 if (cfd == -1) {
1461 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1462 if (errno == EMFILE) brake();
1463 return;
1466 flags = fcntl(cfd, F_GETFL, 0);
1467 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1469 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1470 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1472 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1473 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1475 dprintf("accepted conn, fd=%d\n", cfd);
1476 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1477 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1480 void
1481 prot_init()
1483 start_time = time(NULL);
1484 pq_init(&delay_q, job_delay_cmp);
1486 ms_init(&tubes, NULL, NULL);
1488 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1489 if (!default_tube) twarnx("Out of memory during startup!");