Reserve timeout, part 1.
[beanstalkd.git] / prot.c
blob9d6a6e1ac452a97305d962e56db449630e678820
1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <sys/resource.h>
25 #include <sys/uio.h>
26 #include <stdarg.h>
28 #include "stat.h"
29 #include "prot.h"
30 #include "pq.h"
31 #include "ms.h"
32 #include "job.h"
33 #include "tube.h"
34 #include "conn.h"
35 #include "util.h"
36 #include "net.h"
37 #include "version.h"
39 /* job body cannot be greater than this many bytes long */
40 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
42 #define NAME_CHARS \
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
45 "0123456789-+/;.$()"
47 #define CMD_PUT "put "
48 #define CMD_PEEK "peek"
49 #define CMD_PEEKJOB "peek "
50 #define CMD_RESERVE "reserve"
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_STATS "stats"
56 #define CMD_JOBSTATS "stats-job "
57 #define CMD_USE "use "
58 #define CMD_WATCH "watch "
59 #define CMD_IGNORE "ignore "
60 #define CMD_LIST_TUBES "list-tubes"
61 #define CMD_LIST_TUBE_USED "list-tube-used"
62 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
63 #define CMD_STATS_TUBE "stats-tube "
65 #define CONSTSTRLEN(m) (sizeof(m) - 1)
67 #define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
68 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
69 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
70 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
71 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
72 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
73 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
74 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
75 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
76 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
77 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
78 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
79 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
80 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
81 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
82 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
84 #define MSG_FOUND "FOUND"
85 #define MSG_NOTFOUND "NOT_FOUND\r\n"
86 #define MSG_RESERVED "RESERVED"
87 #define MSG_TIMEOUT "TIMEOUT\r\n"
88 #define MSG_DELETED "DELETED\r\n"
89 #define MSG_RELEASED "RELEASED\r\n"
90 #define MSG_BURIED "BURIED\r\n"
91 #define MSG_BURIED_FMT "BURIED %llu\r\n"
92 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
93 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
95 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
96 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
97 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
98 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
99 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
101 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
102 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
103 #define MSG_DRAINING "DRAINING\r\n"
104 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
105 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
106 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
107 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
109 #define STATS_FMT "---\n" \
110 "current-jobs-urgent: %u\n" \
111 "current-jobs-ready: %u\n" \
112 "current-jobs-reserved: %u\n" \
113 "current-jobs-delayed: %u\n" \
114 "current-jobs-buried: %u\n" \
115 "cmd-put: %llu\n" \
116 "cmd-peek: %llu\n" \
117 "cmd-reserve: %llu\n" \
118 "cmd-delete: %llu\n" \
119 "cmd-release: %llu\n" \
120 "cmd-use: %llu\n" \
121 "cmd-watch: %llu\n" \
122 "cmd-ignore: %llu\n" \
123 "cmd-bury: %llu\n" \
124 "cmd-kick: %llu\n" \
125 "cmd-stats: %llu\n" \
126 "cmd-stats-job: %llu\n" \
127 "cmd-stats-tube: %llu\n" \
128 "cmd-list-tubes: %llu\n" \
129 "cmd-list-tube-used: %llu\n" \
130 "cmd-list-tubes-watched: %llu\n" \
131 "job-timeouts: %llu\n" \
132 "total-jobs: %llu\n" \
133 "current-tubes: %Zu\n" \
134 "current-connections: %u\n" \
135 "current-producers: %u\n" \
136 "current-workers: %u\n" \
137 "current-waiting: %u\n" \
138 "total-connections: %u\n" \
139 "pid: %u\n" \
140 "version: %s\n" \
141 "rusage-utime: %d.%06d\n" \
142 "rusage-stime: %d.%06d\n" \
143 "uptime: %u\n" \
144 "\r\n"
146 #define STATS_TUBE_FMT "---\n" \
147 "name: %s\n" \
148 "current-jobs-urgent: %u\n" \
149 "current-jobs-ready: %u\n" \
150 "current-jobs-reserved: %u\n" \
151 "current-jobs-buried: %u\n" \
152 "total-jobs: %llu\n" \
153 "current-using: %u\n" \
154 "current-watching: %u\n" \
155 "current-waiting: %u\n" \
156 "\r\n"
158 #define JOB_STATS_FMT "---\n" \
159 "id: %llu\n" \
160 "tube: %s\n" \
161 "state: %s\n" \
162 "pri: %u\n" \
163 "age: %u\n" \
164 "delay: %u\n" \
165 "ttr: %u\n" \
166 "time-left: %u\n" \
167 "timeouts: %u\n" \
168 "releases: %u\n" \
169 "buries: %u\n" \
170 "kicks: %u\n" \
171 "\r\n"
173 static struct pq delay_q;
175 static unsigned int ready_ct = 0;
176 static struct stats global_stat = {0, 0, 0, 0, 0};
178 static tube default_tube;
179 static struct ms tubes;
181 static int drain_mode = 0;
182 static time_t start_time;
183 static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
184 delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
185 stats_job_ct = 0, stats_ct = 0, timeout_ct = 0,
186 list_tubes_ct = 0, stats_tube_ct = 0,
187 list_tube_used_ct = 0, list_watched_tubes_ct = 0,
188 use_ct = 0, watch_ct = 0, ignore_ct = 0;
191 /* Doubly-linked list of connections with at least one reserved job. */
192 static struct conn running = { &running, &running, 0 };
194 #ifdef DEBUG
195 static const char * op_names[] = {
196 "<unknown>",
197 CMD_PUT,
198 CMD_PEEKJOB,
199 CMD_RESERVE,
200 CMD_DELETE,
201 CMD_RELEASE,
202 CMD_BURY,
203 CMD_KICK,
204 CMD_STATS,
205 CMD_JOBSTATS,
206 CMD_PEEK,
207 CMD_USE,
208 CMD_WATCH,
209 CMD_IGNORE,
210 CMD_LIST_TUBES,
211 CMD_LIST_TUBE_USED,
212 CMD_LIST_TUBES_WATCHED,
213 CMD_STATS_TUBE,
215 #endif
217 static int
218 buried_job_p(tube t)
220 return job_list_any_p(&t->buried);
223 static void
224 reply(conn c, const char *line, int len, int state)
226 int r;
228 r = conn_update_evq(c, EV_WRITE | EV_PERSIST);
229 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
231 c->reply = line;
232 c->reply_len = len;
233 c->reply_sent = 0;
234 c->state = state;
235 dprintf("sending reply: %.*s", len, line);
238 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
240 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
241 reply_msg((c),(e)))
243 static void
244 reply_line(conn c, int state, const char *fmt, ...)
246 int r;
247 va_list ap;
249 va_start(ap, fmt);
250 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
251 va_end(ap);
253 /* Make sure the buffer was big enough. If not, we have a bug. */
254 if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
256 return reply(c, c->reply_buf, r, state);
259 static void
260 reply_job(conn c, job j, const char *word)
262 /* tell this connection which job to send */
263 c->out_job = j;
264 c->out_job_sent = 0;
266 return reply_line(c, STATE_SENDJOB, "%s %llu %u\r\n",
267 word, j->id, j->body_size - 2);
270 conn
271 remove_waiting_conn(conn c)
273 tube t;
274 size_t i;
276 if (!(c->type & CONN_TYPE_WAITING)) return NULL;
277 c->type &= ~CONN_TYPE_WAITING;
278 global_stat.waiting_ct--;
279 for (i = 0; i < c->watch.used; i++) {
280 t = c->watch.items[i];
281 t->stat.waiting_ct--;
282 ms_remove(&t->waiting, c);
284 return c;
287 static void
288 reserve_job(conn c, job j)
290 j->deadline = time(NULL) + j->ttr;
291 global_stat.reserved_ct++; /* stats */
292 j->tube->stat.reserved_ct++;
293 conn_insert(&running, c);
294 j->state = JOB_STATE_RESERVED;
295 job_insert(&c->reserved_jobs, j);
296 return reply_job(c, j, MSG_RESERVED);
299 static job
300 next_eligible_job()
302 tube t;
303 size_t i;
304 job j = NULL, candidate;
306 dprintf("tubes.used = %d\n", tubes.used);
307 for (i = 0; i < tubes.used; i++) {
308 t = tubes.items[i];
309 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
310 t->name, t->waiting.used, t->ready.used);
311 if (t->waiting.used && t->ready.used) {
312 candidate = pq_peek(&t->ready);
313 if (!j || candidate->id < j->id) j = candidate;
315 dprintf("i = %d, tubes.used = %d\n", i, tubes.used);
318 return j;
321 static void
322 process_queue()
324 job j;
326 dprintf("processing queue\n");
327 while ((j = next_eligible_job())) {
328 dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
329 j = pq_take(&j->tube->ready);
330 ready_ct--;
331 if (j->pri < URGENT_THRESHOLD) {
332 global_stat.urgent_ct--;
333 j->tube->stat.urgent_ct--;
335 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
339 static int
340 enqueue_job(job j, unsigned int delay)
342 int r;
344 if (delay) {
345 j->deadline = time(NULL) + delay;
346 r = pq_give(&delay_q, j);
347 if (!r) return 0;
348 j->state = JOB_STATE_DELAYED;
349 set_main_timeout(pq_peek(&delay_q)->deadline);
350 } else {
351 r = pq_give(&j->tube->ready, j);
352 if (!r) return 0;
353 j->state = JOB_STATE_READY;
354 ready_ct++;
355 if (j->pri < URGENT_THRESHOLD) {
356 global_stat.urgent_ct++;
357 j->tube->stat.urgent_ct++;
360 process_queue();
361 return 1;
364 static void
365 bury_job(job j)
367 job_insert(&j->tube->buried, j);
368 global_stat.buried_ct++;
369 j->tube->stat.buried_ct++;
370 j->state = JOB_STATE_BURIED;
371 j->bury_ct++;
374 void
375 enqueue_reserved_jobs(conn c)
377 int r;
378 job j;
380 while (job_list_any_p(&c->reserved_jobs)) {
381 j = job_remove(c->reserved_jobs.next);
382 r = enqueue_job(j, 0);
383 if (!r) bury_job(j);
384 global_stat.reserved_ct--;
385 j->tube->stat.reserved_ct--;
386 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
390 static job
391 delay_q_peek()
393 return pq_peek(&delay_q);
396 static job
397 delay_q_take()
399 return pq_take(&delay_q);
402 static job
403 remove_this_buried_job(job j)
405 j = job_remove(j);
406 if (j) {
407 global_stat.buried_ct--;
408 j->tube->stat.buried_ct--;
410 return j;
413 static int
414 kick_buried_job(tube t)
416 int r;
417 job j;
419 if (!buried_job_p(t)) return 0;
420 j = remove_this_buried_job(t->buried.next);
421 j->kick_ct++;
422 r = enqueue_job(j, 0);
423 if (r) return 1;
425 /* ready queue is full, so bury it */
426 bury_job(j);
427 return 0;
430 static unsigned int
431 get_delayed_job_ct()
433 return pq_used(&delay_q);
436 static int
437 kick_delayed_job()
439 int r;
440 job j;
442 if (get_delayed_job_ct() < 1) return 0;
443 j = delay_q_take();
444 j->kick_ct++;
445 r = enqueue_job(j, 0);
446 if (r) return 1;
448 /* ready queue is full, so delay it again */
449 r = enqueue_job(j, j->delay);
450 if (r) return 0;
452 /* last resort */
453 bury_job(j);
454 return 0;
457 /* return the number of jobs successfully kicked */
458 static unsigned int
459 kick_buried_jobs(tube t, unsigned int n)
461 unsigned int i;
462 for (i = 0; (i < n) && kick_buried_job(t); ++i);
463 return i;
466 /* return the number of jobs successfully kicked */
467 static unsigned int
468 kick_delayed_jobs(unsigned int n)
470 unsigned int i;
471 for (i = 0; (i < n) && kick_delayed_job(); ++i);
472 return i;
475 static unsigned int
476 kick_jobs(tube t, unsigned int n)
478 if (buried_job_p(t)) return kick_buried_jobs(t, n);
479 return kick_delayed_jobs(n);
482 static job
483 peek_buried_job()
485 tube t;
486 size_t i;
488 for (i = 0; i < tubes.used; i++) {
489 t = tubes.items[i];
490 if (buried_job_p(t)) return t->buried.next;
492 return NULL;
495 static job
496 find_buried_job_in_tube(tube t, unsigned long long int id)
498 job j;
500 for (j = t->buried.next; j != &t->buried; j = j->next) {
501 if (j->id == id) return j;
503 return NULL;
506 static job
507 find_buried_job(unsigned long long int id)
509 job j;
510 size_t i;
512 for (i = 0; i < tubes.used; i++) {
513 j = find_buried_job_in_tube(tubes.items[i], id);
514 if (j) return j;
516 return NULL;
519 static job
520 remove_buried_job(unsigned long long int id)
522 return remove_this_buried_job(find_buried_job(id));
525 static void
526 enqueue_waiting_conn(conn c)
528 tube t;
529 size_t i;
531 global_stat.waiting_ct++;
532 c->type |= CONN_TYPE_WAITING;
533 for (i = 0; i < c->watch.used; i++) {
534 t = c->watch.items[i];
535 t->stat.waiting_ct++;
536 ms_append(&t->waiting, c);
540 static job
541 find_reserved_job_in_conn(conn c, unsigned long long int id)
543 job j;
545 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
546 if (j->id == id) return j;
548 return NULL;
551 static job
552 find_reserved_job_in_list(conn list, unsigned long long int id)
554 job j;
555 conn c;
557 for (c = list->next; c != list; c = c->next) {
558 j = find_reserved_job_in_conn(c, id);
559 if (j) return j;
561 return NULL;
564 static job
565 find_reserved_job(unsigned long long int id)
567 return find_reserved_job_in_list(&running, id);
570 static job
571 peek_ready_job(unsigned long long int id)
574 job j;
575 size_t i;
577 for (i = 0; i < tubes.used; i++) {
578 j = pq_find(&((tube) tubes.items[i])->ready, id);
579 if (j) return j;
581 return NULL;
584 /* TODO: make a global hashtable of jobs because this is slow */
585 static job
586 peek_job(unsigned long long int id)
588 return peek_ready_job(id) ? :
589 pq_find(&delay_q, id) ? :
590 find_reserved_job(id) ? :
591 find_buried_job(id);
594 static void
595 check_err(conn c, const char *s)
597 if (errno == EAGAIN) return;
598 if (errno == EINTR) return;
599 if (errno == EWOULDBLOCK) return;
601 twarn("%s", s);
602 conn_close(c);
603 return;
606 /* Scan the given string for the sequence "\r\n" and return the line length.
607 * Always returns at least 2 if a match is found. Returns 0 if no match. */
608 static int
609 scan_line_end(const char *s, int size)
611 char *match;
613 match = memchr(s, '\r', size - 1);
614 if (!match) return 0;
616 /* this is safe because we only scan size - 1 chars above */
617 if (match[1] == '\n') return match - s + 2;
619 return 0;
622 static int
623 cmd_len(conn c)
625 return scan_line_end(c->cmd, c->cmd_read);
628 /* parse the command line */
629 static int
630 which_cmd(conn c)
632 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
633 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
634 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
635 TEST_CMD(c->cmd, CMD_PEEK, OP_PEEK);
636 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
637 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
638 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
639 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
640 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
641 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
642 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
643 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
644 TEST_CMD(c->cmd, CMD_USE, OP_USE);
645 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
646 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
647 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
648 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
649 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
650 return OP_UNKNOWN;
653 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
654 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
655 * This function is idempotent(). */
656 static void
657 fill_extra_data(conn c)
659 int extra_bytes, job_data_bytes = 0, cmd_bytes;
661 if (!c->fd) return; /* the connection was closed */
662 if (!c->cmd_len) return; /* we don't have a complete command */
664 /* how many extra bytes did we read? */
665 extra_bytes = c->cmd_read - c->cmd_len;
667 /* how many bytes should we put into the job body? */
668 if (c->in_job) {
669 job_data_bytes = min(extra_bytes, c->in_job->body_size);
670 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
671 c->in_job_read = job_data_bytes;
674 /* how many bytes are left to go into the future cmd? */
675 cmd_bytes = extra_bytes - job_data_bytes;
676 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
677 c->cmd_read = cmd_bytes;
678 c->cmd_len = 0; /* we no longer know the length of the new command */
681 static void
682 enqueue_incoming_job(conn c)
684 int r;
685 job j = c->in_job;
687 c->in_job = NULL; /* the connection no longer owns this job */
688 c->in_job_read = 0;
690 /* check if the trailer is present and correct */
691 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) {
692 job_free(j);
693 return reply_msg(c, MSG_EXPECTED_CRLF);
696 /* we have a complete job, so let's stick it in the pqueue */
697 r = enqueue_job(j, j->delay);
698 put_ct++; /* stats */
699 global_stat.total_jobs_ct++;
700 j->tube->stat.total_jobs_ct++;
702 if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
704 /* out of memory trying to grow the queue, so it gets buried */
705 bury_job(j);
706 reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->id);
709 static unsigned int
710 uptime()
712 return time(NULL) - start_time;
715 static int
716 fmt_stats(char *buf, size_t size, void *x)
718 struct rusage ru = {{0, 0}, {0, 0}};
719 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
720 return snprintf(buf, size, STATS_FMT,
721 global_stat.urgent_ct,
722 ready_ct,
723 global_stat.reserved_ct,
724 get_delayed_job_ct(),
725 global_stat.buried_ct,
726 put_ct,
727 peek_ct,
728 reserve_ct,
729 delete_ct,
730 release_ct,
731 use_ct,
732 watch_ct,
733 ignore_ct,
734 bury_ct,
735 kick_ct,
736 stats_ct,
737 stats_job_ct,
738 stats_tube_ct,
739 list_tubes_ct,
740 list_tube_used_ct,
741 list_watched_tubes_ct,
742 timeout_ct,
743 global_stat.total_jobs_ct,
744 tubes.used,
745 count_cur_conns(),
746 count_cur_producers(),
747 count_cur_workers(),
748 global_stat.waiting_ct,
749 count_tot_conns(),
750 getpid(),
751 VERSION,
752 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
753 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
754 uptime());
758 /* Read a priority value from the given buffer and place it in pri.
759 * Update end to point to the address after the last character consumed.
760 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
761 * conversion and return the status code but not update any values. This is an
762 * easy way to check for errors.
763 * If end is NULL, read_pri will also check that the entire input string was
764 * consumed and return an error code otherwise.
765 * Return 0 on success, or nonzero on failure.
766 * If a failure occurs, pri and end are not modified. */
767 static int
768 read_pri(unsigned int *pri, const char *buf, char **end)
770 char *tend;
771 unsigned int tpri;
773 errno = 0;
774 tpri = strtoul(buf, &tend, 10);
775 if (tend == buf) return -1;
776 if (errno && errno != ERANGE) return -1;
777 if (!end && tend[0] != '\0') return -1;
779 if (pri) *pri = tpri;
780 if (end) *end = tend;
781 return 0;
784 /* Read a delay value from the given buffer and place it in delay.
785 * The interface and behavior are the same as in read_pri(). */
786 static int
787 read_delay(unsigned int *delay, const char *buf, char **end)
789 return read_pri(delay, buf, end);
792 /* Read a timeout value from the given buffer and place it in ttr.
793 * The interface and behavior are the same as in read_pri(). */
794 static int
795 read_ttr(unsigned int *ttr, const char *buf, char **end)
797 return read_pri(ttr, buf, end);
800 static void
801 wait_for_job(conn c)
803 int r;
805 /* this conn is waiting, but we want to know if they hang up */
806 r = conn_update_evq(c, EV_READ | EV_PERSIST);
807 if (r == -1) return twarnx("update events failed"), conn_close(c);
809 c->state = STATE_WAIT;
810 enqueue_waiting_conn(c);
813 typedef int(*fmt_fn)(char *, size_t, void *);
815 static void
816 do_stats(conn c, fmt_fn fmt, void *data)
818 int r, stats_len;
820 /* first, measure how big a buffer we will need */
821 stats_len = fmt(NULL, 0, data);
823 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
824 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
826 /* now actually format the stats data */
827 r = fmt(c->out_job->body, stats_len, data);
828 if (r != stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
829 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
831 c->out_job_sent = 0;
832 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", stats_len - 2);
835 static void
836 do_list_tubes(conn c, ms l)
838 char *buf;
839 tube t;
840 size_t i, resp_z;
842 /* first, measure how big a buffer we will need */
843 resp_z = 6; /* initial "---\n" and final "\r\n" */
844 for (i = 0; i < l->used; i++) {
845 t = l->items[i];
846 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
849 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
850 if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
852 /* now actually format the response */
853 buf = c->out_job->body;
854 buf += snprintf(buf, 5, "---\n");
855 for (i = 0; i < l->used; i++) {
856 t = l->items[i];
857 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
859 buf[0] = '\r';
860 buf[1] = '\n';
862 c->out_job_sent = 0;
863 return reply_line(c, STATE_SENDJOB, "OK %d\r\n", resp_z - 2);
866 static int
867 fmt_job_stats(char *buf, size_t size, job j)
869 time_t t;
871 t = time(NULL);
872 return snprintf(buf, size, JOB_STATS_FMT,
873 j->id,
874 j->tube->name,
875 job_state(j),
876 j->pri,
877 (unsigned int) (t - j->creation),
878 j->delay,
879 j->ttr,
880 (unsigned int) (j->deadline - t),
881 j->timeout_ct,
882 j->release_ct,
883 j->bury_ct,
884 j->kick_ct);
887 static int
888 fmt_stats_tube(char *buf, size_t size, tube t)
890 return snprintf(buf, size, STATS_TUBE_FMT,
891 t->name,
892 t->stat.urgent_ct,
893 t->ready.used,
894 t->stat.reserved_ct,
895 t->stat.buried_ct,
896 t->stat.total_jobs_ct,
897 t->using_ct,
898 t->watching_ct,
899 t->stat.waiting_ct);
902 static void
903 maybe_enqueue_incoming_job(conn c)
905 job j = c->in_job;
907 /* do we have a complete job? */
908 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
910 /* otherwise we have incomplete data, so just keep waiting */
911 c->state = STATE_WANTDATA;
914 /* j can be NULL */
915 static job
916 remove_this_reserved_job(conn c, job j)
918 j = job_remove(j);
919 if (j) {
920 global_stat.reserved_ct--;
921 j->tube->stat.reserved_ct--;
923 if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
924 return j;
927 static job
928 remove_reserved_job(conn c, unsigned long long int id)
930 return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
933 static int
934 name_is_ok(const char *name, size_t max)
936 size_t len = strlen(name);
937 return len > 0 && len <= max &&
938 strspn(name, NAME_CHARS) == len && name[0] != '-';
941 static tube
942 find_tube(const char *name)
944 tube t;
945 size_t i;
947 for (i = 0; i < tubes.used; i++) {
948 t = tubes.items[i];
949 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) return t;
951 return NULL;
954 void
955 prot_remove_tube(tube t)
957 ms_remove(&tubes, t);
960 static tube
961 make_and_insert_tube(const char *name)
963 int r;
964 tube t = NULL;
966 t = make_tube(name);
967 if (!t) return NULL;
969 /* We want this global tube list to behave like "weak" refs, so don't
970 * increment the ref count. */
971 r = ms_append(&tubes, t);
972 if (!r) return tube_dref(t), NULL;
974 return t;
977 static tube
978 find_or_make_tube(const char *name)
980 return find_tube(name) ? : make_and_insert_tube(name);
983 static void
984 dispatch_cmd(conn c)
986 int r, i;
987 unsigned int count;
988 job j;
989 char type;
990 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
991 unsigned int pri, delay, ttr, body_size;
992 unsigned long long int id;
993 tube t = NULL;
995 /* NUL-terminate this string so we can use strtol and friends */
996 c->cmd[c->cmd_len - 2] = '\0';
998 /* check for possible maliciousness */
999 if (strlen(c->cmd) != c->cmd_len - 2) {
1000 return reply_msg(c, MSG_BAD_FORMAT);
1003 type = which_cmd(c);
1004 dprintf("got %s command: \"%s\"\n", op_names[(int) type], c->cmd);
1006 switch (type) {
1007 case OP_PUT:
1008 if (drain_mode) return reply_serr(c, MSG_DRAINING);
1010 r = read_pri(&pri, c->cmd + 4, &delay_buf);
1011 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1013 r = read_delay(&delay, delay_buf, &ttr_buf);
1014 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1016 r = read_ttr(&ttr, ttr_buf, &size_buf);
1017 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1019 errno = 0;
1020 body_size = strtoul(size_buf, &end_buf, 10);
1021 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1023 if (body_size > JOB_DATA_SIZE_LIMIT) {
1024 return reply_msg(c, MSG_JOB_TOO_BIG);
1027 /* don't allow trailing garbage */
1028 if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
1030 conn_set_producer(c);
1032 c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
1034 fill_extra_data(c);
1036 /* it's possible we already have a complete job */
1037 maybe_enqueue_incoming_job(c);
1039 break;
1040 case OP_PEEK:
1041 /* don't allow trailing garbage */
1042 if (c->cmd_len != CMD_PEEK_LEN + 2) {
1043 return reply_msg(c, MSG_BAD_FORMAT);
1046 j = job_copy(peek_buried_job() ? : delay_q_peek());
1048 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1050 peek_ct++; /* stats */
1051 reply_job(c, j, MSG_FOUND);
1052 break;
1053 case OP_PEEKJOB:
1054 errno = 0;
1055 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
1056 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1058 /* So, peek is annoying, because some other connection might free the
1059 * job while we are still trying to write it out. So we copy it and
1060 * then free the copy when it's done sending. */
1061 j = job_copy(peek_job(id));
1063 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1065 peek_ct++; /* stats */
1066 reply_job(c, j, MSG_FOUND);
1067 break;
1068 case OP_RESERVE:
1069 /* don't allow trailing garbage */
1070 if (c->cmd_len != CMD_RESERVE_LEN + 2) {
1071 return reply_msg(c, MSG_BAD_FORMAT);
1074 reserve_ct++; /* stats */
1075 conn_set_worker(c);
1077 if (conn_has_close_deadline(c)) return reply_msg(c, MSG_TIMEOUT);
1079 /* try to get a new job for this guy */
1080 wait_for_job(c);
1081 process_queue();
1082 break;
1083 case OP_DELETE:
1084 errno = 0;
1085 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
1086 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1088 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
1090 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1092 delete_ct++; /* stats */
1093 job_free(j);
1095 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
1096 break;
1097 case OP_RELEASE:
1098 errno = 0;
1099 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
1100 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1102 r = read_pri(&pri, pri_buf, &delay_buf);
1103 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1105 r = read_delay(&delay, delay_buf, NULL);
1106 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1108 j = remove_reserved_job(c, id);
1110 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1112 j->pri = pri;
1113 j->delay = delay;
1114 j->release_ct++;
1115 release_ct++; /* stats */
1116 r = enqueue_job(j, delay);
1117 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
1119 /* out of memory trying to grow the queue, so it gets buried */
1120 bury_job(j);
1121 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1122 break;
1123 case OP_BURY:
1124 errno = 0;
1125 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
1126 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1128 r = read_pri(&pri, pri_buf, NULL);
1129 if (r) return reply_msg(c, MSG_BAD_FORMAT);
1131 j = remove_reserved_job(c, id);
1133 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1135 j->pri = pri;
1136 bury_ct++; /* stats */
1137 bury_job(j);
1138 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
1139 break;
1140 case OP_KICK:
1141 errno = 0;
1142 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1143 if (end_buf == c->cmd + CMD_KICK_LEN) {
1144 return reply_msg(c, MSG_BAD_FORMAT);
1146 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1148 kick_ct++; /* stats */
1150 i = kick_jobs(c->use, count);
1152 return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
1153 case OP_STATS:
1154 /* don't allow trailing garbage */
1155 if (c->cmd_len != CMD_STATS_LEN + 2) {
1156 return reply_msg(c, MSG_BAD_FORMAT);
1159 stats_ct++; /* stats */
1161 do_stats(c, fmt_stats, NULL);
1162 break;
1163 case OP_JOBSTATS:
1164 errno = 0;
1165 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
1166 if (errno) return reply_msg(c, MSG_BAD_FORMAT);
1168 j = peek_job(id);
1169 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
1171 stats_job_ct++; /* stats */
1173 if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
1174 do_stats(c, (fmt_fn) fmt_job_stats, j);
1175 break;
1176 case OP_STATS_TUBE:
1177 name = c->cmd + CMD_STATS_TUBE_LEN;
1178 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1180 t = find_tube(name);
1181 if (!t) return reply_msg(c, MSG_NOTFOUND);
1183 stats_tube_ct++; /* stats */
1185 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1186 t = NULL;
1187 break;
1188 case OP_LIST_TUBES:
1189 /* don't allow trailing garbage */
1190 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1191 return reply_msg(c, MSG_BAD_FORMAT);
1194 list_tubes_ct++;
1195 do_list_tubes(c, &tubes);
1196 break;
1197 case OP_LIST_TUBE_USED:
1198 /* don't allow trailing garbage */
1199 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1200 return reply_msg(c, MSG_BAD_FORMAT);
1203 list_tube_used_ct++;
1204 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1205 break;
1206 case OP_LIST_TUBES_WATCHED:
1207 /* don't allow trailing garbage */
1208 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1209 return reply_msg(c, MSG_BAD_FORMAT);
1212 list_watched_tubes_ct++;
1213 do_list_tubes(c, &c->watch);
1214 break;
1215 case OP_USE:
1216 name = c->cmd + CMD_USE_LEN;
1217 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1219 TUBE_ASSIGN(t, find_or_make_tube(name));
1220 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1222 c->use->using_ct--;
1223 TUBE_ASSIGN(c->use, t);
1224 TUBE_ASSIGN(t, NULL);
1225 c->use->using_ct++;
1227 use_ct++;
1228 reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
1229 break;
1230 case OP_WATCH:
1231 name = c->cmd + CMD_WATCH_LEN;
1232 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1234 TUBE_ASSIGN(t, find_or_make_tube(name));
1235 if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
1237 r = 1;
1238 if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
1239 TUBE_ASSIGN(t, NULL);
1240 if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
1242 watch_ct++;
1243 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1244 break;
1245 case OP_IGNORE:
1246 name = c->cmd + CMD_IGNORE_LEN;
1247 if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
1249 t = NULL;
1250 for (i = 0; i < c->watch.used; i++) {
1251 t = c->watch.items[i];
1252 if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
1253 t = NULL;
1256 if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
1258 if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1259 t = NULL;
1261 ignore_ct++;
1262 reply_line(c, STATE_SENDWORD, "WATCHING %d\r\n", c->watch.used);
1263 break;
1264 default:
1265 return reply_msg(c, MSG_UNKNOWN_COMMAND);
1269 /* if we get a timeout, it means that a job has been reserved for too long, so
1270 * we should put it back in the queue */
1271 static void
1272 h_conn_timeout(conn c)
1274 int r;
1275 job j;
1277 while ((j = soonest_job(c))) {
1278 if (j->deadline > time(NULL)) return;
1279 timeout_ct++; /* stats */
1280 j->timeout_ct++;
1281 r = enqueue_job(remove_this_reserved_job(c, j), 0);
1282 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1283 r = conn_update_evq(c, c->evq.ev_events);
1284 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
1288 void
1289 enter_drain_mode(int sig)
1291 drain_mode = 1;
1294 static void
1295 do_cmd(conn c)
1297 dispatch_cmd(c);
1298 fill_extra_data(c);
1301 static void
1302 reset_conn(conn c)
1304 int r;
1306 r = conn_update_evq(c, EV_READ | EV_PERSIST);
1307 if (r == -1) return twarnx("update events failed"), conn_close(c);
1309 /* was this a peek or stats command? */
1310 if (!has_reserved_this_job(c, c->out_job)) job_free(c->out_job);
1311 c->out_job = NULL;
1313 c->reply_sent = 0; /* now that we're done, reset this */
1314 c->state = STATE_WANTCOMMAND;
1317 static void
1318 h_conn_data(conn c)
1320 int r;
1321 job j;
1322 struct iovec iov[2];
1324 switch (c->state) {
1325 case STATE_WANTCOMMAND:
1326 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1327 if (r == -1) return check_err(c, "read()");
1328 if (r == 0) return conn_close(c); /* the client hung up */
1330 c->cmd_read += r; /* we got some bytes */
1332 c->cmd_len = cmd_len(c); /* find the EOL */
1333 dprintf("cmd_len is %d\n", c->cmd_len);
1335 /* yay, complete command line */
1336 if (c->cmd_len) return do_cmd(c);
1338 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1340 dprintf("cmd_read is %d\n", c->cmd_read);
1341 /* command line too long? */
1342 if (c->cmd_read == LINE_BUF_SIZE) {
1343 return reply_msg(c, MSG_BAD_FORMAT);
1346 /* otherwise we have an incomplete line, so just keep waiting */
1347 break;
1348 case STATE_WANTDATA:
1349 j = c->in_job;
1351 r = read(c->fd, j->body + c->in_job_read, j->body_size -c->in_job_read);
1352 if (r == -1) return check_err(c, "read()");
1353 if (r == 0) return conn_close(c); /* the client hung up */
1355 c->in_job_read += r; /* we got some bytes */
1357 /* (j->in_job_read > j->body_size) can't happen */
1359 maybe_enqueue_incoming_job(c);
1360 break;
1361 case STATE_SENDWORD:
1362 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
1363 if (r == -1) return check_err(c, "write()");
1364 if (r == 0) return conn_close(c); /* the client hung up */
1366 c->reply_sent += r; /* we got some bytes */
1368 /* (c->reply_sent > c->reply_len) can't happen */
1370 if (c->reply_sent == c->reply_len) return reset_conn(c);
1372 /* otherwise we sent an incomplete reply, so just keep waiting */
1373 break;
1374 case STATE_SENDJOB:
1375 j = c->out_job;
1377 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
1378 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
1379 iov[1].iov_base = j->body + c->out_job_sent;
1380 iov[1].iov_len = j->body_size - c->out_job_sent;
1382 r = writev(c->fd, iov, 2);
1383 if (r == -1) return check_err(c, "writev()");
1384 if (r == 0) return conn_close(c); /* the client hung up */
1386 /* update the sent values */
1387 c->reply_sent += r;
1388 if (c->reply_sent >= c->reply_len) {
1389 c->out_job_sent += c->reply_sent - c->reply_len;
1390 c->reply_sent = c->reply_len;
1393 /* (c->out_job_sent > j->body_size) can't happen */
1395 /* are we done? */
1396 if (c->out_job_sent == j->body_size) return reset_conn(c);
1398 /* otherwise we sent incomplete data, so just keep waiting */
1399 break;
1400 case STATE_WAIT: /* keep an eye out in case they hang up */
1401 /* but don't hang up just because our buffer is full */
1402 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
1404 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1405 if (r == -1) return check_err(c, "read()");
1406 if (r == 0) return conn_close(c); /* the client hung up */
1407 c->cmd_read += r; /* we got some bytes */
1411 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1412 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1414 static void
1415 h_conn(const int fd, const short which, conn c)
1417 if (fd != c->fd) {
1418 twarnx("Argh! event fd doesn't match conn fd.");
1419 close(fd);
1420 return conn_close(c);
1423 switch (which) {
1424 case EV_TIMEOUT:
1425 h_conn_timeout(c);
1426 event_add(&c->evq, NULL); /* seems to be necessary */
1427 break;
1428 case EV_READ:
1429 /* fall through... */
1430 case EV_WRITE:
1431 /* fall through... */
1432 default:
1433 h_conn_data(c);
1436 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
1439 static void
1440 h_delay()
1442 int r;
1443 job j;
1444 time_t t;
1446 t = time(NULL);
1447 while ((j = delay_q_peek())) {
1448 if (j->deadline > t) break;
1449 j = delay_q_take();
1450 r = enqueue_job(j, 0);
1451 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
1454 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
1457 void
1458 h_accept(const int fd, const short which, struct event *ev)
1460 conn c;
1461 int cfd, flags, r;
1462 socklen_t addrlen;
1463 struct sockaddr addr;
1465 if (which == EV_TIMEOUT) return h_delay();
1467 addrlen = sizeof addr;
1468 cfd = accept(fd, &addr, &addrlen);
1469 if (cfd == -1) {
1470 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
1471 if (errno == EMFILE) brake();
1472 return;
1475 flags = fcntl(cfd, F_GETFL, 0);
1476 if (flags < 0) return twarn("getting flags"), close(cfd), v();
1478 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
1479 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
1481 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
1482 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
1484 dprintf("accepted conn, fd=%d\n", cfd);
1485 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
1486 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
1489 void
1490 prot_init()
1492 start_time = time(NULL);
1493 pq_init(&delay_q, job_delay_cmp);
1495 ms_init(&tubes, NULL, NULL);
1497 TUBE_ASSIGN(default_tube, find_or_make_tube("default"));
1498 if (!default_tube) twarnx("Out of memory during startup!");