Fix a memory leak (#976).
[beanstalkd.git] / beanstalkd.c
blob36e5cdfe279d9dec0b5a993020a378babc073ff8
1 /* beanstalk - fast, general-purpose work queue */
3 #include <netinet/in.h>
4 #include <signal.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <sys/uio.h>
10 #include <unistd.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/resource.h>
15 #include "conn.h"
16 #include "net.h"
17 #include "beanstalkd.h"
18 #include "pq.h"
19 #include "util.h"
20 #include "prot.h"
21 #include "reserve.h"
22 #include "version.h"
24 /* job body cannot be greater than this many bytes long */
25 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
27 static time_t start_time;
28 static int drain_mode = 0;
30 static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
31 delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
32 stats_ct = 0, timeout_ct = 0;
34 #ifndef DEBUG
35 static void
36 nullfd(int fd, int flags)
38 int r;
40 close(fd);
41 r = open("/dev/null", flags);
42 if (r != fd) twarn("open(\"/dev/null\")"), exit(1);
45 static void
46 dfork()
48 pid_t p;
50 p = fork();
51 if (p == -1) exit(1);
52 if (p) exit(0);
55 static void
56 daemonize()
58 chdir("/");
59 nullfd(0, O_RDONLY);
60 nullfd(1, O_WRONLY);
61 nullfd(2, O_WRONLY);
62 umask(0);
63 dfork();
64 setsid();
65 dfork();
67 #endif /*DEBUG*/
69 static void
70 enter_drain_mode(int sig)
72 drain_mode = 1;
75 static void
76 set_sig_handlers()
78 int r;
79 struct sigaction sa;
81 sa.sa_handler = SIG_IGN;
82 sa.sa_flags = 0;
83 r = sigemptyset(&sa.sa_mask);
84 if (r == -1) twarn("sigemptyset()"), exit(111);
86 r = sigaction(SIGPIPE, &sa, 0);
87 if (r == -1) twarn("sigaction(SIGPIPE)"), exit(111);
89 sa.sa_handler = enter_drain_mode;
90 r = sigaction(SIGUSR1, &sa, 0);
91 if (r == -1) twarn("sigaction(SIGUSR1)"), exit(111);
94 /* This is a workaround for a mystifying workaround in libevent's epoll
95 * implementation. The epoll_init() function creates an epoll fd with space to
96 * handle RLIMIT_NOFILE - 1 fds, accompanied by the following puzzling comment:
97 * "Solaris is somewhat retarded - it's important to drop backwards
98 * compatibility when making changes. So, don't dare to put rl.rlim_cur here."
99 * This is presumably to work around a bug in Solaris, but it has the
100 * unfortunate side-effect of causing epoll_ctl() (and, therefore, event_add())
101 * to fail for a valid fd if we have hit the limit of open fds. That makes it
102 * hard to provide reasonable behavior in that situation. So, let's reduce the
103 * real value of RLIMIT_NOFILE by one, after epoll_init() has run. */
104 static void
105 nudge_fd_limit()
107 int r;
108 struct rlimit rl;
110 r = getrlimit(RLIMIT_NOFILE, &rl);
111 if (r != 0) twarn("getrlimit(RLIMIT_NOFILE)"), exit(2);
113 rl.rlim_cur--;
115 r = setrlimit(RLIMIT_NOFILE, &rl);
116 if (r != 0) twarn("setrlimit(RLIMIT_NOFILE)"), exit(2);
119 static void
120 check_err(conn c, const char *s)
122 if (errno == EAGAIN) return;
123 if (errno == EINTR) return;
124 if (errno == EWOULDBLOCK) return;
126 warn("%s:%d in %s: %s", __FILE__, __LINE__, __func__, s);
127 conn_close(c);
128 return;
131 /* Scan the given string for the sequence "\r\n" and return the line length.
132 * Always returns at least 2 if a match is found. Returns 0 if no match. */
133 static int
134 scan_line_end(const char *s, int size)
136 char *match;
138 match = memchr(s, '\r', size - 1);
139 if (!match) return 0;
141 /* this is safe because we only scan size - 1 chars above */
142 if (match[1] == '\n') return match - s + 2;
144 return 0;
147 static int
148 cmd_len(conn c)
150 return scan_line_end(c->cmd, c->cmd_read);
153 /* parse the command line */
154 static int
155 which_cmd(conn c)
157 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
158 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
159 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
160 TEST_CMD(c->cmd, CMD_PEEK, OP_PEEK);
161 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
162 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
163 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
164 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
165 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
166 TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
167 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
168 return OP_UNKNOWN;
171 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
172 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
173 * This function is idempotent(). */
174 static void
175 fill_extra_data(conn c)
177 int extra_bytes, job_data_bytes = 0, cmd_bytes;
179 if (!c->fd) return; /* the connection was closed */
180 if (!c->cmd_len) return; /* we don't have a complete command */
182 /* how many extra bytes did we read? */
183 extra_bytes = c->cmd_read - c->cmd_len;
185 /* how many bytes should we put into the job body? */
186 if (c->in_job) {
187 job_data_bytes = min(extra_bytes, c->in_job->body_size);
188 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
189 c->in_job_read = job_data_bytes;
192 /* how many bytes are left to go into the future cmd? */
193 cmd_bytes = extra_bytes - job_data_bytes;
194 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
195 c->cmd_read = cmd_bytes;
196 c->cmd_len = 0; /* we no longer know the length of the new command */
199 static void
200 enqueue_incoming_job(conn c)
202 int r;
203 job j = c->in_job;
205 /* check if the trailer is present and correct */
206 if (memcmp(j->body + j->body_size - 2, "\r\n", 2)) return conn_close(c);
208 c->in_job = NULL; /* the connection no longer owns this job */
209 c->in_job_read = 0;
211 /* we have a complete job, so let's stick it in the pqueue */
212 r = enqueue_job(j, j->delay);
213 put_ct++; /* stats */
215 if (r) return reply(c, MSG_INSERTED, MSG_INSERTED_LEN, STATE_SENDWORD);
217 bury_job(j); /* there was no room in the queue, so it gets buried */
218 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
221 static void
222 maybe_enqueue_incoming_job(conn c)
224 job j = c->in_job;
226 /* do we have a complete job? */
227 if (c->in_job_read == j->body_size) return enqueue_incoming_job(c);
229 /* otherwise we have incomplete data, so just keep waiting */
230 c->state = STATE_WANTDATA;
233 static void
234 wait_for_job(conn c)
236 int r;
238 /* this conn is waiting, but we want to know if they hang up */
239 r = conn_update_evq(c, EV_READ | EV_PERSIST);
240 if (r == -1) return twarnx("update events failed"), conn_close(c);
242 c->state = STATE_WAIT;
243 enqueue_waiting_conn(c);
246 static unsigned int
247 uptime()
249 return time(NULL) - start_time;
252 static int
253 fmt_stats(char *buf, size_t size, void *x)
255 struct rusage ru = {{0, 0}, {0, 0}};
256 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
257 return snprintf(buf, size, STATS_FMT,
258 get_urgent_job_ct(),
259 get_ready_job_ct(),
260 get_reserved_job_ct(),
261 get_delayed_job_ct(),
262 get_buried_job_ct(),
263 HEAP_SIZE,
264 put_ct,
265 peek_ct,
266 reserve_ct,
267 delete_ct,
268 release_ct,
269 bury_ct,
270 kick_ct,
271 stats_ct,
272 timeout_ct,
273 total_jobs(),
274 count_cur_conns(),
275 count_cur_producers(),
276 count_cur_workers(),
277 count_cur_waiting(),
278 count_tot_conns(),
279 getpid(),
280 VERSION,
281 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
282 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
283 uptime());
287 static int
288 fmt_job_stats(char *buf, size_t size, void *jp)
290 time_t t;
291 job j = (job) jp;
293 t = time(NULL);
294 return snprintf(buf, size, JOB_STATS_FMT,
295 j->id,
296 job_state(j),
297 (unsigned int) (t - j->creation),
298 j->delay,
299 (unsigned int) (j->deadline - t),
300 j->timeout_ct,
301 j->release_ct,
302 j->bury_ct,
303 j->kick_ct);
306 static void
307 do_stats(conn c, int(*fmt)(char *, size_t, void *), void *data)
309 int r, stats_len;
311 /* first, measure how big a buffer we will need */
312 stats_len = fmt(NULL, 0, data);
314 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
315 if (!c->out_job) return conn_close(c);
317 /* now actually format the stats data */
318 r = fmt(c->out_job->body, stats_len, data);
319 if (r != stats_len) return twarnx("snprintf inconsistency"), conn_close(c);
320 c->out_job->body[stats_len - 1] = '\n'; /* patch up sprintf's output */
322 c->out_job_sent = 0;
324 r = snprintf(c->reply_buf, LINE_BUF_SIZE, "OK %d\r\n", stats_len - 2);
325 if (r >= LINE_BUF_SIZE) return twarnx("truncated reply"), conn_close(c);
327 reply(c, c->reply_buf, strlen(c->reply_buf), STATE_SENDJOB);
330 /* Read a priority value from the given buffer and place it in pri.
331 * Update end to point to the address after the last character consumed.
332 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
333 * conversion and return the status code but not update any values. This is an
334 * easy way to check for errors.
335 * Return 0 on success, or nonzero on failure.
336 * If a failure occurs, pri and end are not modified. */
337 static int
338 read_pri(unsigned int *pri, const char *buf, char **end)
340 char *tend;
341 unsigned int tpri;
343 errno = 0;
344 tpri = strtoul(buf, &tend, 10);
345 if (tend == buf) return -1;
346 if (errno && errno != ERANGE) return -1;
348 if (pri) *pri = tpri;
349 if (end) *end = tend;
350 return 0;
353 /* Read a delay value from the given buffer and place it in delay.
354 * The interface and behavior are the same as in read_pri(). */
355 static int
356 read_delay(unsigned int *delay, const char *buf, char **end)
358 return read_pri(delay, buf, end);
361 static void
362 dispatch_cmd(conn c)
364 int r, i;
365 unsigned int count;
366 job j;
367 char type;
368 char *size_buf, *delay_buf, *pri_buf, *end_buf;
369 unsigned int pri, delay, body_size;
370 unsigned long long int id;
372 /* NUL-terminate this string so we can use strtol and friends */
373 c->cmd[c->cmd_len - 2] = '\0';
375 /* check for possible maliciousness */
376 if (strlen(c->cmd) != c->cmd_len - 2) return conn_close(c);
378 type = which_cmd(c);
380 switch (type) {
381 case OP_PUT:
382 if (drain_mode) return conn_close(c);
384 r = read_pri(&pri, c->cmd + 4, &delay_buf);
385 if (r) return conn_close(c);
387 r = read_delay(&delay, delay_buf, &size_buf);
388 if (r) return conn_close(c);
390 errno = 0;
391 body_size = strtoul(size_buf, &end_buf, 10);
392 if (errno) return conn_close(c);
394 if (body_size > JOB_DATA_SIZE_LIMIT) return conn_close(c);
396 /* don't allow trailing garbage */
397 if (end_buf[0] != '\0') return conn_close(c);
399 conn_set_producer(c);
401 c->in_job = make_job(pri, delay, body_size + 2);
403 fill_extra_data(c);
405 /* it's possible we already have a complete job */
406 maybe_enqueue_incoming_job(c);
408 break;
409 case OP_PEEK:
410 /* don't allow trailing garbage */
411 if (c->cmd_len != CMD_PEEK_LEN + 2) return conn_close(c);
413 j = job_copy(peek_buried_job() ? : delay_q_peek());
415 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
417 peek_ct++; /* stats */
418 reply_job(c, j, MSG_FOUND);
419 break;
420 case OP_PEEKJOB:
421 errno = 0;
422 id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
423 if (errno) return conn_close(c);
425 /* So, peek is annoying, because some other connection might free the
426 * job while we are still trying to write it out. So we copy it and
427 * then free the copy when it's done sending. */
428 j = job_copy(peek_job(id));
430 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
432 peek_ct++; /* stats */
433 reply_job(c, j, MSG_FOUND);
434 break;
435 case OP_RESERVE:
436 /* don't allow trailing garbage */
437 if (c->cmd_len != CMD_RESERVE_LEN + 2) return conn_close(c);
439 reserve_ct++; /* stats */
440 conn_set_worker(c);
442 /* try to get a new job for this guy */
443 wait_for_job(c);
444 process_queue();
445 break;
446 case OP_DELETE:
447 errno = 0;
448 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
449 if (errno) return conn_close(c);
451 j = remove_reserved_job(c, id) ? : remove_buried_job(id);
453 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
455 delete_ct++; /* stats */
456 free(j);
458 reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
459 break;
460 case OP_RELEASE:
461 errno = 0;
462 id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
463 if (errno) return conn_close(c);
465 r = read_pri(&pri, pri_buf, &delay_buf);
466 if (r) return conn_close(c);
468 r = read_delay(&delay, delay_buf, NULL);
469 if (r) return conn_close(c);
471 j = remove_reserved_job(c, id);
473 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
475 j->pri = pri;
476 j->delay = delay;
477 j->release_ct++;
478 release_ct++; /* stats */
479 r = enqueue_job(j, delay);
480 if (r) return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
482 bury_job(j); /* there was no room in the queue, so it gets buried */
483 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
484 break;
485 case OP_BURY:
486 errno = 0;
487 id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
488 if (errno) return conn_close(c);
490 r = read_pri(&pri, pri_buf, NULL);
491 if (r) return conn_close(c);
493 j = remove_reserved_job(c, id);
495 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
497 j->pri = pri;
498 bury_ct++; /* stats */
499 bury_job(j);
500 reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
501 break;
502 case OP_KICK:
503 errno = 0;
504 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
505 if (end_buf == c->cmd + CMD_KICK_LEN) return conn_close(c);
506 if (errno) return conn_close(c);
508 kick_ct++; /* stats */
510 i = kick_jobs(count);
512 r = snprintf(c->reply_buf, LINE_BUF_SIZE, "KICKED %u\r\n", i);
513 /* can't happen */
514 if (r >= LINE_BUF_SIZE) return twarnx("truncated reply"), conn_close(c);
516 reply(c, c->reply_buf, strlen(c->reply_buf), STATE_SENDWORD);
517 break;
518 case OP_STATS:
519 /* don't allow trailing garbage */
520 if (c->cmd_len != CMD_STATS_LEN + 2) return conn_close(c);
522 stats_ct++; /* stats */
524 do_stats(c, fmt_stats, NULL);
525 break;
526 case OP_JOBSTATS:
527 errno = 0;
528 id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
529 if (errno) return conn_close(c);
531 j = peek_job(id);
532 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
534 stats_ct++; /* stats */
535 do_stats(c, fmt_job_stats, j);
536 break;
537 default:
538 /* unknown command -- protocol error */
539 return conn_close(c);
543 static void
544 do_cmd(conn c)
546 dispatch_cmd(c);
547 fill_extra_data(c);
550 static void
551 reset_conn(conn c)
553 int r;
555 r = conn_update_evq(c, EV_READ | EV_PERSIST);
556 if (r == -1) return twarnx("update events failed"), conn_close(c);
558 /* was this a peek or stats command? */
559 if (!has_reserved_this_job(c, c->out_job)) free(c->out_job);
560 c->out_job = NULL;
562 c->reply_sent = 0; /* now that we're done, reset this */
563 c->state = STATE_WANTCOMMAND;
566 static void
567 h_conn_data(conn c)
569 int r;
570 job j;
571 struct iovec iov[2];
573 switch (c->state) {
574 case STATE_WANTCOMMAND:
575 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
576 if (r == -1) return check_err(c, "read()");
577 if (r == 0) return conn_close(c); /* the client hung up */
579 c->cmd_read += r; /* we got some bytes */
581 c->cmd_len = cmd_len(c); /* find the EOL */
583 /* yay, complete command line */
584 if (c->cmd_len) return do_cmd(c);
586 /* c->cmd_read > LINE_BUF_SIZE can't happen */
588 /* command line too long? */
589 if (c->cmd_read == LINE_BUF_SIZE) return conn_close(c);
591 /* otherwise we have an incomplete line, so just keep waiting */
592 break;
593 case STATE_WANTDATA:
594 j = c->in_job;
596 r = read(c->fd, j->body + c->in_job_read, j->body_size - c->in_job_read);
597 if (r == -1) return check_err(c, "read()");
598 if (r == 0) return conn_close(c); /* the client hung up */
600 c->in_job_read += r; /* we got some bytes */
602 /* (j->in_job_read > j->body_size) can't happen */
604 maybe_enqueue_incoming_job(c);
605 break;
606 case STATE_SENDWORD:
607 r= write(c->fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
608 if (r == -1) return check_err(c, "write()");
609 if (r == 0) return conn_close(c); /* the client hung up */
611 c->reply_sent += r; /* we got some bytes */
613 /* (c->reply_sent > c->reply_len) can't happen */
615 if (c->reply_sent == c->reply_len) return reset_conn(c);
617 /* otherwise we sent an incomplete reply, so just keep waiting */
618 break;
619 case STATE_SENDJOB:
620 j = c->out_job;
622 iov[0].iov_base = c->reply + c->reply_sent;
623 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
624 iov[1].iov_base = j->body + c->out_job_sent;
625 iov[1].iov_len = j->body_size - c->out_job_sent;
627 r = writev(c->fd, iov, 2);
628 if (r == -1) return check_err(c, "writev()");
629 if (r == 0) return conn_close(c); /* the client hung up */
631 /* update the sent values */
632 c->reply_sent += r;
633 if (c->reply_sent >= c->reply_len) {
634 c->out_job_sent += c->reply_sent - c->reply_len;
635 c->reply_sent = c->reply_len;
638 /* (c->out_job_sent > j->body_size) can't happen */
640 /* are we done? */
641 if (c->out_job_sent == j->body_size) return reset_conn(c);
643 /* otherwise we sent incomplete data, so just keep waiting */
644 break;
645 case STATE_WAIT: /* keep an eye out in case they hang up */
646 /* but don't hang up just because our buffer is full */
647 if (LINE_BUF_SIZE - c->cmd_read < 1) break;
649 r = read(c->fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
650 if (r == -1) return check_err(c, "read()");
651 if (r == 0) return conn_close(c); /* the client hung up */
652 c->cmd_read += r; /* we got some bytes */
656 /* if we get a timeout, it means that a job has been reserved for too long, so
657 * we should put it back in the queue */
658 static void
659 h_conn_timeout(conn c)
661 int r;
662 job j;
664 while ((j = soonest_job(c))) {
665 if (j->deadline > time(NULL)) return;
666 timeout_ct++; /* stats */
667 j->timeout_ct++;
668 r = enqueue_job(remove_this_reserved_job(c, j), 0);
669 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
670 r = conn_update_evq(c, c->evq.ev_events);
671 if (r == -1) return twarnx("conn_update_evq() failed"), conn_close(c);
675 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
676 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
678 static void
679 h_conn(const int fd, const short which, conn c)
681 if (fd != c->fd) {
682 twarnx("Argh! event fd doesn't match conn fd.");
683 close(fd);
684 return conn_close(c);
687 switch (which) {
688 case EV_TIMEOUT:
689 h_conn_timeout(c);
690 event_add(&c->evq, NULL); /* seems to be necessary */
691 break;
692 case EV_READ:
693 /* fall through... */
694 case EV_WRITE:
695 /* fall through... */
696 default:
697 h_conn_data(c);
700 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
703 static void
704 h_delay()
706 int r;
707 job j;
708 time_t t;
710 t = time(NULL);
711 while ((j = delay_q_peek())) {
712 if (j->deadline > t) break;
713 j = delay_q_take();
714 r = enqueue_job(j, 0);
715 if (!r) bury_job(j); /* there was no room in the queue, so bury it */
718 set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
721 static void
722 h_accept(const int fd, const short which, struct event *ev)
724 conn c;
725 int cfd, flags, r;
726 socklen_t addrlen;
727 struct sockaddr addr;
729 if (which == EV_TIMEOUT) return h_delay();
731 addrlen = sizeof addr;
732 cfd = accept(fd, &addr, &addrlen);
733 if (cfd == -1) {
734 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
735 if (errno == EMFILE) brake();
736 return;
739 flags = fcntl(cfd, F_GETFL, 0);
740 if (flags < 0) return twarn("getting flags"), close(cfd), v();
742 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
743 if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), v();
745 c = make_conn(cfd, STATE_WANTCOMMAND);
746 if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
748 dprintf("accepted conn, fd=%d\n", cfd);
749 r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
750 if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
754 main(int argc, char **argv)
756 int r;
758 start_time = time(NULL);
760 r = make_server_socket(HOST, PORT);
761 if (r == -1) twarnx("make_server_socket()"), exit(111);
763 prot_init();
764 #ifndef DEBUG
765 daemonize();
766 #endif
767 event_init();
768 set_sig_handlers();
769 nudge_fd_limit();
771 unbrake((evh) h_accept);
773 event_dispatch();
774 twarnx("got here for some reason");
775 return 0;