1 /* beanstalk - fast, general-purpose work queue */
3 #include <netinet/in.h>
11 #include <sys/types.h>
13 #include <sys/resource.h>
17 #include "beanstalkd.h"
24 /* job body cannot be greater than this many bytes long */
25 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
27 static time_t start_time
;
28 static int drain_mode
= 0;
30 static unsigned long long int put_ct
= 0, peek_ct
= 0, reserve_ct
= 0,
31 delete_ct
= 0, release_ct
= 0, bury_ct
= 0, kick_ct
= 0,
32 stats_ct
= 0, timeout_ct
= 0;
36 nullfd(int fd
, int flags
)
41 r
= open("/dev/null", flags
);
42 if (r
!= fd
) twarn("open(\"/dev/null\")"), exit(1);
70 enter_drain_mode(int sig
)
81 sa
.sa_handler
= SIG_IGN
;
83 r
= sigemptyset(&sa
.sa_mask
);
84 if (r
== -1) twarn("sigemptyset()"), exit(111);
86 r
= sigaction(SIGPIPE
, &sa
, 0);
87 if (r
== -1) twarn("sigaction(SIGPIPE)"), exit(111);
89 sa
.sa_handler
= enter_drain_mode
;
90 r
= sigaction(SIGUSR1
, &sa
, 0);
91 if (r
== -1) twarn("sigaction(SIGUSR1)"), exit(111);
94 /* This is a workaround for a mystifying workaround in libevent's epoll
95 * implementation. The epoll_init() function creates an epoll fd with space to
96 * handle RLIMIT_NOFILE - 1 fds, accompanied by the following puzzling comment:
97 * "Solaris is somewhat retarded - it's important to drop backwards
98 * compatibility when making changes. So, don't dare to put rl.rlim_cur here."
99 * This is presumably to work around a bug in Solaris, but it has the
100 * unfortunate side-effect of causing epoll_ctl() (and, therefore, event_add())
101 * to fail for a valid fd if we have hit the limit of open fds. That makes it
102 * hard to provide reasonable behavior in that situation. So, let's reduce the
103 * real value of RLIMIT_NOFILE by one, after epoll_init() has run. */
110 r
= getrlimit(RLIMIT_NOFILE
, &rl
);
111 if (r
!= 0) twarn("getrlimit(RLIMIT_NOFILE)"), exit(2);
115 r
= setrlimit(RLIMIT_NOFILE
, &rl
);
116 if (r
!= 0) twarn("setrlimit(RLIMIT_NOFILE)"), exit(2);
120 check_err(conn c
, const char *s
)
122 if (errno
== EAGAIN
) return;
123 if (errno
== EINTR
) return;
124 if (errno
== EWOULDBLOCK
) return;
126 warn("%s:%d in %s: %s", __FILE__
, __LINE__
, __func__
, s
);
131 /* Scan the given string for the sequence "\r\n" and return the line length.
132 * Always returns at least 2 if a match is found. Returns 0 if no match. */
134 scan_line_end(const char *s
, int size
)
138 match
= memchr(s
, '\r', size
- 1);
139 if (!match
) return 0;
141 /* this is safe because we only scan size - 1 chars above */
142 if (match
[1] == '\n') return match
- s
+ 2;
150 return scan_line_end(c
->cmd
, c
->cmd_read
);
153 /* parse the command line */
157 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
158 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
159 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
160 TEST_CMD(c
->cmd
, CMD_PEEK
, OP_PEEK
);
161 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
162 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
163 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
164 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
165 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
166 TEST_CMD(c
->cmd
, CMD_JOBSTATS
, OP_JOBSTATS
);
167 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
171 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
172 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
173 * This function is idempotent(). */
175 fill_extra_data(conn c
)
177 int extra_bytes
, job_data_bytes
= 0, cmd_bytes
;
179 if (!c
->fd
) return; /* the connection was closed */
180 if (!c
->cmd_len
) return; /* we don't have a complete command */
182 /* how many extra bytes did we read? */
183 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
185 /* how many bytes should we put into the job body? */
187 job_data_bytes
= min(extra_bytes
, c
->in_job
->body_size
);
188 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
189 c
->in_job_read
= job_data_bytes
;
192 /* how many bytes are left to go into the future cmd? */
193 cmd_bytes
= extra_bytes
- job_data_bytes
;
194 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
195 c
->cmd_read
= cmd_bytes
;
196 c
->cmd_len
= 0; /* we no longer know the length of the new command */
200 enqueue_incoming_job(conn c
)
205 /* check if the trailer is present and correct */
206 if (memcmp(j
->body
+ j
->body_size
- 2, "\r\n", 2)) return conn_close(c
);
208 c
->in_job
= NULL
; /* the connection no longer owns this job */
211 /* we have a complete job, so let's stick it in the pqueue */
212 r
= enqueue_job(j
, j
->delay
);
213 put_ct
++; /* stats */
215 if (r
) return reply(c
, MSG_INSERTED
, MSG_INSERTED_LEN
, STATE_SENDWORD
);
217 bury_job(j
); /* there was no room in the queue, so it gets buried */
218 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
222 maybe_enqueue_incoming_job(conn c
)
226 /* do we have a complete job? */
227 if (c
->in_job_read
== j
->body_size
) return enqueue_incoming_job(c
);
229 /* otherwise we have incomplete data, so just keep waiting */
230 c
->state
= STATE_WANTDATA
;
238 /* this conn is waiting, but we want to know if they hang up */
239 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
240 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
242 c
->state
= STATE_WAIT
;
243 enqueue_waiting_conn(c
);
249 return time(NULL
) - start_time
;
253 fmt_stats(char *buf
, size_t size
, void *x
)
255 struct rusage ru
= {{0, 0}, {0, 0}};
256 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
257 return snprintf(buf
, size
, STATS_FMT
,
260 get_reserved_job_ct(),
261 get_delayed_job_ct(),
275 count_cur_producers(),
281 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
282 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
288 fmt_job_stats(char *buf
, size_t size
, void *jp
)
294 return snprintf(buf
, size
, JOB_STATS_FMT
,
297 (unsigned int) (t
- j
->creation
),
299 (unsigned int) (j
->deadline
- t
),
307 do_stats(conn c
, int(*fmt
)(char *, size_t, void *), void *data
)
311 /* first, measure how big a buffer we will need */
312 stats_len
= fmt(NULL
, 0, data
);
314 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
315 if (!c
->out_job
) return conn_close(c
);
317 /* now actually format the stats data */
318 r
= fmt(c
->out_job
->body
, stats_len
, data
);
319 if (r
!= stats_len
) return twarnx("snprintf inconsistency"), conn_close(c
);
320 c
->out_job
->body
[stats_len
- 1] = '\n'; /* patch up sprintf's output */
324 r
= snprintf(c
->reply_buf
, LINE_BUF_SIZE
, "OK %d\r\n", stats_len
- 2);
325 if (r
>= LINE_BUF_SIZE
) return twarnx("truncated reply"), conn_close(c
);
327 reply(c
, c
->reply_buf
, strlen(c
->reply_buf
), STATE_SENDJOB
);
330 /* Read a priority value from the given buffer and place it in pri.
331 * Update end to point to the address after the last character consumed.
332 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
333 * conversion and return the status code but not update any values. This is an
334 * easy way to check for errors.
335 * Return 0 on success, or nonzero on failure.
336 * If a failure occurs, pri and end are not modified. */
338 read_pri(unsigned int *pri
, const char *buf
, char **end
)
344 tpri
= strtoul(buf
, &tend
, 10);
345 if (tend
== buf
) return -1;
346 if (errno
&& errno
!= ERANGE
) return -1;
348 if (pri
) *pri
= tpri
;
349 if (end
) *end
= tend
;
353 /* Read a delay value from the given buffer and place it in delay.
354 * The interface and behavior are the same as in read_pri(). */
356 read_delay(unsigned int *delay
, const char *buf
, char **end
)
358 return read_pri(delay
, buf
, end
);
368 char *size_buf
, *delay_buf
, *pri_buf
, *end_buf
;
369 unsigned int pri
, delay
, body_size
;
370 unsigned long long int id
;
372 /* NUL-terminate this string so we can use strtol and friends */
373 c
->cmd
[c
->cmd_len
- 2] = '\0';
375 /* check for possible maliciousness */
376 if (strlen(c
->cmd
) != c
->cmd_len
- 2) return conn_close(c
);
382 if (drain_mode
) return conn_close(c
);
384 r
= read_pri(&pri
, c
->cmd
+ 4, &delay_buf
);
385 if (r
) return conn_close(c
);
387 r
= read_delay(&delay
, delay_buf
, &size_buf
);
388 if (r
) return conn_close(c
);
391 body_size
= strtoul(size_buf
, &end_buf
, 10);
392 if (errno
) return conn_close(c
);
394 if (body_size
> JOB_DATA_SIZE_LIMIT
) return conn_close(c
);
396 /* don't allow trailing garbage */
397 if (end_buf
[0] != '\0') return conn_close(c
);
399 conn_set_producer(c
);
401 c
->in_job
= make_job(pri
, delay
, body_size
+ 2);
405 /* it's possible we already have a complete job */
406 maybe_enqueue_incoming_job(c
);
410 /* don't allow trailing garbage */
411 if (c
->cmd_len
!= CMD_PEEK_LEN
+ 2) return conn_close(c
);
413 j
= job_copy(peek_buried_job() ? : delay_q_peek());
415 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
417 peek_ct
++; /* stats */
418 reply_job(c
, j
, MSG_FOUND
);
422 id
= strtoull(c
->cmd
+ CMD_PEEKJOB_LEN
, &end_buf
, 10);
423 if (errno
) return conn_close(c
);
425 /* So, peek is annoying, because some other connection might free the
426 * job while we are still trying to write it out. So we copy it and
427 * then free the copy when it's done sending. */
428 j
= job_copy(peek_job(id
));
430 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
432 peek_ct
++; /* stats */
433 reply_job(c
, j
, MSG_FOUND
);
436 /* don't allow trailing garbage */
437 if (c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) return conn_close(c
);
439 reserve_ct
++; /* stats */
442 /* try to get a new job for this guy */
448 id
= strtoull(c
->cmd
+ CMD_DELETE_LEN
, &end_buf
, 10);
449 if (errno
) return conn_close(c
);
451 j
= remove_reserved_job(c
, id
) ? : remove_buried_job(id
);
453 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
455 delete_ct
++; /* stats */
458 reply(c
, MSG_DELETED
, MSG_DELETED_LEN
, STATE_SENDWORD
);
462 id
= strtoull(c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
, 10);
463 if (errno
) return conn_close(c
);
465 r
= read_pri(&pri
, pri_buf
, &delay_buf
);
466 if (r
) return conn_close(c
);
468 r
= read_delay(&delay
, delay_buf
, NULL
);
469 if (r
) return conn_close(c
);
471 j
= remove_reserved_job(c
, id
);
473 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
478 release_ct
++; /* stats */
479 r
= enqueue_job(j
, delay
);
480 if (r
) return reply(c
, MSG_RELEASED
, MSG_RELEASED_LEN
, STATE_SENDWORD
);
482 bury_job(j
); /* there was no room in the queue, so it gets buried */
483 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
487 id
= strtoull(c
->cmd
+ CMD_BURY_LEN
, &pri_buf
, 10);
488 if (errno
) return conn_close(c
);
490 r
= read_pri(&pri
, pri_buf
, NULL
);
491 if (r
) return conn_close(c
);
493 j
= remove_reserved_job(c
, id
);
495 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
498 bury_ct
++; /* stats */
500 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
504 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &end_buf
, 10);
505 if (end_buf
== c
->cmd
+ CMD_KICK_LEN
) return conn_close(c
);
506 if (errno
) return conn_close(c
);
508 kick_ct
++; /* stats */
510 i
= kick_jobs(count
);
512 r
= snprintf(c
->reply_buf
, LINE_BUF_SIZE
, "KICKED %u\r\n", i
);
514 if (r
>= LINE_BUF_SIZE
) return twarnx("truncated reply"), conn_close(c
);
516 reply(c
, c
->reply_buf
, strlen(c
->reply_buf
), STATE_SENDWORD
);
519 /* don't allow trailing garbage */
520 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) return conn_close(c
);
522 stats_ct
++; /* stats */
524 do_stats(c
, fmt_stats
, NULL
);
528 id
= strtoull(c
->cmd
+ CMD_JOBSTATS_LEN
, &end_buf
, 10);
529 if (errno
) return conn_close(c
);
532 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
534 stats_ct
++; /* stats */
535 do_stats(c
, fmt_job_stats
, j
);
538 /* unknown command -- protocol error */
539 return conn_close(c
);
555 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
556 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
558 /* was this a peek or stats command? */
559 if (!has_reserved_this_job(c
, c
->out_job
)) free(c
->out_job
);
562 c
->reply_sent
= 0; /* now that we're done, reset this */
563 c
->state
= STATE_WANTCOMMAND
;
574 case STATE_WANTCOMMAND
:
575 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
576 if (r
== -1) return check_err(c
, "read()");
577 if (r
== 0) return conn_close(c
); /* the client hung up */
579 c
->cmd_read
+= r
; /* we got some bytes */
581 c
->cmd_len
= cmd_len(c
); /* find the EOL */
583 /* yay, complete command line */
584 if (c
->cmd_len
) return do_cmd(c
);
586 /* c->cmd_read > LINE_BUF_SIZE can't happen */
588 /* command line too long? */
589 if (c
->cmd_read
== LINE_BUF_SIZE
) return conn_close(c
);
591 /* otherwise we have an incomplete line, so just keep waiting */
596 r
= read(c
->fd
, j
->body
+ c
->in_job_read
, j
->body_size
- c
->in_job_read
);
597 if (r
== -1) return check_err(c
, "read()");
598 if (r
== 0) return conn_close(c
); /* the client hung up */
600 c
->in_job_read
+= r
; /* we got some bytes */
602 /* (j->in_job_read > j->body_size) can't happen */
604 maybe_enqueue_incoming_job(c
);
607 r
= write(c
->fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
608 if (r
== -1) return check_err(c
, "write()");
609 if (r
== 0) return conn_close(c
); /* the client hung up */
611 c
->reply_sent
+= r
; /* we got some bytes */
613 /* (c->reply_sent > c->reply_len) can't happen */
615 if (c
->reply_sent
== c
->reply_len
) return reset_conn(c
);
617 /* otherwise we sent an incomplete reply, so just keep waiting */
622 iov
[0].iov_base
= c
->reply
+ c
->reply_sent
;
623 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
624 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
625 iov
[1].iov_len
= j
->body_size
- c
->out_job_sent
;
627 r
= writev(c
->fd
, iov
, 2);
628 if (r
== -1) return check_err(c
, "writev()");
629 if (r
== 0) return conn_close(c
); /* the client hung up */
631 /* update the sent values */
633 if (c
->reply_sent
>= c
->reply_len
) {
634 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
635 c
->reply_sent
= c
->reply_len
;
638 /* (c->out_job_sent > j->body_size) can't happen */
641 if (c
->out_job_sent
== j
->body_size
) return reset_conn(c
);
643 /* otherwise we sent incomplete data, so just keep waiting */
645 case STATE_WAIT
: /* keep an eye out in case they hang up */
646 /* but don't hang up just because our buffer is full */
647 if (LINE_BUF_SIZE
- c
->cmd_read
< 1) break;
649 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
650 if (r
== -1) return check_err(c
, "read()");
651 if (r
== 0) return conn_close(c
); /* the client hung up */
652 c
->cmd_read
+= r
; /* we got some bytes */
656 /* if we get a timeout, it means that a job has been reserved for too long, so
657 * we should put it back in the queue */
659 h_conn_timeout(conn c
)
664 while ((j
= soonest_job(c
))) {
665 if (j
->deadline
> time(NULL
)) return;
666 timeout_ct
++; /* stats */
668 r
= enqueue_job(remove_this_reserved_job(c
, j
), 0);
669 if (!r
) bury_job(j
); /* there was no room in the queue, so bury it */
670 r
= conn_update_evq(c
, c
->evq
.ev_events
);
671 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
675 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
676 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
679 h_conn(const int fd
, const short which
, conn c
)
682 twarnx("Argh! event fd doesn't match conn fd.");
684 return conn_close(c
);
690 event_add(&c
->evq
, NULL
); /* seems to be necessary */
693 /* fall through... */
695 /* fall through... */
700 while (cmd_data_ready(c
) && (c
->cmd_len
= cmd_len(c
))) do_cmd(c
);
711 while ((j
= delay_q_peek())) {
712 if (j
->deadline
> t
) break;
714 r
= enqueue_job(j
, 0);
715 if (!r
) bury_job(j
); /* there was no room in the queue, so bury it */
718 set_main_timeout((j
= delay_q_peek()) ? j
->deadline
: 0);
722 h_accept(const int fd
, const short which
, struct event
*ev
)
727 struct sockaddr addr
;
729 if (which
== EV_TIMEOUT
) return h_delay();
731 addrlen
= sizeof addr
;
732 cfd
= accept(fd
, &addr
, &addrlen
);
734 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
735 if (errno
== EMFILE
) brake();
739 flags
= fcntl(cfd
, F_GETFL
, 0);
740 if (flags
< 0) return twarn("getting flags"), close(cfd
), v();
742 r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
743 if (r
< 0) return twarn("setting O_NONBLOCK"), close(cfd
), v();
745 c
= make_conn(cfd
, STATE_WANTCOMMAND
);
746 if (!c
) return twarnx("make_conn() failed"), close(cfd
), brake();
748 dprintf("accepted conn, fd=%d\n", cfd
);
749 r
= conn_set_evq(c
, EV_READ
| EV_PERSIST
, (evh
) h_conn
);
750 if (r
== -1) return twarnx("conn_set_evq() failed"), close(cfd
), brake();
754 main(int argc
, char **argv
)
758 start_time
= time(NULL
);
760 r
= make_server_socket(HOST
, PORT
);
761 if (r
== -1) twarnx("make_server_socket()"), exit(111);
771 unbrake((evh
) h_accept
);
774 twarnx("got here for some reason");