add changelog for 1.13
[beanstalkd.git] / prot.c
blob297e77290cd7ba145d1c7facce432f22664ce278
1 #include "dat.h"
2 #include <stdbool.h>
3 #include <stdint.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <fcntl.h>
8 #include <string.h>
9 #include <errno.h>
10 #include <sys/resource.h>
11 #include <sys/uio.h>
12 #include <sys/types.h>
13 #include <sys/utsname.h>
14 #include <sys/socket.h>
15 #include <inttypes.h>
16 #include <stdarg.h>
17 #include <signal.h>
18 #include <limits.h>
20 /* job body cannot be greater than this many bytes long */
21 size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
23 #define NAME_CHARS \
24 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
25 "abcdefghijklmnopqrstuvwxyz" \
26 "0123456789-+/;.$_()"
28 #define CMD_PUT "put "
29 #define CMD_PEEKJOB "peek "
30 #define CMD_PEEK_READY "peek-ready"
31 #define CMD_PEEK_DELAYED "peek-delayed"
32 #define CMD_PEEK_BURIED "peek-buried"
33 #define CMD_RESERVE "reserve"
34 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
35 #define CMD_RESERVE_JOB "reserve-job "
36 #define CMD_DELETE "delete "
37 #define CMD_RELEASE "release "
38 #define CMD_BURY "bury "
39 #define CMD_KICK "kick "
40 #define CMD_KICKJOB "kick-job "
41 #define CMD_TOUCH "touch "
42 #define CMD_STATS "stats"
43 #define CMD_STATSJOB "stats-job "
44 #define CMD_USE "use "
45 #define CMD_WATCH "watch "
46 #define CMD_IGNORE "ignore "
47 #define CMD_LIST_TUBES "list-tubes"
48 #define CMD_LIST_TUBE_USED "list-tube-used"
49 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
50 #define CMD_STATS_TUBE "stats-tube "
51 #define CMD_QUIT "quit"
52 #define CMD_PAUSE_TUBE "pause-tube"
54 #define CONSTSTRLEN(m) (sizeof(m) - 1)
56 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
57 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
58 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
59 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
60 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
61 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
62 #define CMD_RESERVE_JOB_LEN CONSTSTRLEN(CMD_RESERVE_JOB)
63 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
64 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
65 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
66 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
67 #define CMD_KICKJOB_LEN CONSTSTRLEN(CMD_KICKJOB)
68 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
69 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
70 #define CMD_STATSJOB_LEN CONSTSTRLEN(CMD_STATSJOB)
71 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
72 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
73 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
74 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
75 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
76 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
77 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
78 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
80 #define MSG_FOUND "FOUND"
81 #define MSG_NOTFOUND "NOT_FOUND\r\n"
82 #define MSG_RESERVED "RESERVED"
83 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
84 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
85 #define MSG_DELETED "DELETED\r\n"
86 #define MSG_RELEASED "RELEASED\r\n"
87 #define MSG_BURIED "BURIED\r\n"
88 #define MSG_KICKED "KICKED\r\n"
89 #define MSG_TOUCHED "TOUCHED\r\n"
90 #define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
91 #define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
92 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
94 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
95 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
96 #define MSG_DRAINING "DRAINING\r\n"
97 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
98 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
99 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
100 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
102 // Connection can be in one of these states:
103 #define STATE_WANT_COMMAND 0 // conn expects a command from the client
104 #define STATE_WANT_DATA 1 // conn expects a job data
105 #define STATE_SEND_JOB 2 // conn sends job to the client
106 #define STATE_SEND_WORD 3 // conn sends a line reply
107 #define STATE_WAIT 4 // client awaits for the job reservation
108 #define STATE_BITBUCKET 5 // conn discards content
109 #define STATE_CLOSE 6 // conn should be closed
110 #define STATE_WANT_ENDLINE 7 // skip until the end of a line
112 #define OP_UNKNOWN 0
113 #define OP_PUT 1
114 #define OP_PEEKJOB 2
115 #define OP_RESERVE 3
116 #define OP_DELETE 4
117 #define OP_RELEASE 5
118 #define OP_BURY 6
119 #define OP_KICK 7
120 #define OP_STATS 8
121 #define OP_STATSJOB 9
122 #define OP_PEEK_BURIED 10
123 #define OP_USE 11
124 #define OP_WATCH 12
125 #define OP_IGNORE 13
126 #define OP_LIST_TUBES 14
127 #define OP_LIST_TUBE_USED 15
128 #define OP_LIST_TUBES_WATCHED 16
129 #define OP_STATS_TUBE 17
130 #define OP_PEEK_READY 18
131 #define OP_PEEK_DELAYED 19
132 #define OP_RESERVE_TIMEOUT 20
133 #define OP_TOUCH 21
134 #define OP_QUIT 22
135 #define OP_PAUSE_TUBE 23
136 #define OP_KICKJOB 24
137 #define OP_RESERVE_JOB 25
138 #define TOTAL_OPS 26
140 #define STATS_FMT "---\n" \
141 "current-jobs-urgent: %" PRIu64 "\n" \
142 "current-jobs-ready: %" PRIu64 "\n" \
143 "current-jobs-reserved: %" PRIu64 "\n" \
144 "current-jobs-delayed: %u\n" \
145 "current-jobs-buried: %" PRIu64 "\n" \
146 "cmd-put: %" PRIu64 "\n" \
147 "cmd-peek: %" PRIu64 "\n" \
148 "cmd-peek-ready: %" PRIu64 "\n" \
149 "cmd-peek-delayed: %" PRIu64 "\n" \
150 "cmd-peek-buried: %" PRIu64 "\n" \
151 "cmd-reserve: %" PRIu64 "\n" \
152 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
153 "cmd-delete: %" PRIu64 "\n" \
154 "cmd-release: %" PRIu64 "\n" \
155 "cmd-use: %" PRIu64 "\n" \
156 "cmd-watch: %" PRIu64 "\n" \
157 "cmd-ignore: %" PRIu64 "\n" \
158 "cmd-bury: %" PRIu64 "\n" \
159 "cmd-kick: %" PRIu64 "\n" \
160 "cmd-touch: %" PRIu64 "\n" \
161 "cmd-stats: %" PRIu64 "\n" \
162 "cmd-stats-job: %" PRIu64 "\n" \
163 "cmd-stats-tube: %" PRIu64 "\n" \
164 "cmd-list-tubes: %" PRIu64 "\n" \
165 "cmd-list-tube-used: %" PRIu64 "\n" \
166 "cmd-list-tubes-watched: %" PRIu64 "\n" \
167 "cmd-pause-tube: %" PRIu64 "\n" \
168 "job-timeouts: %" PRIu64 "\n" \
169 "total-jobs: %" PRIu64 "\n" \
170 "max-job-size: %zu\n" \
171 "current-tubes: %zu\n" \
172 "current-connections: %u\n" \
173 "current-producers: %u\n" \
174 "current-workers: %u\n" \
175 "current-waiting: %" PRIu64 "\n" \
176 "total-connections: %u\n" \
177 "pid: %ld\n" \
178 "version: \"%s\"\n" \
179 "rusage-utime: %d.%06d\n" \
180 "rusage-stime: %d.%06d\n" \
181 "uptime: %u\n" \
182 "binlog-oldest-index: %d\n" \
183 "binlog-current-index: %d\n" \
184 "binlog-records-migrated: %" PRId64 "\n" \
185 "binlog-records-written: %" PRId64 "\n" \
186 "binlog-max-size: %d\n" \
187 "draining: %s\n" \
188 "id: %s\n" \
189 "hostname: \"%s\"\n" \
190 "os: \"%s\"\n" \
191 "platform: \"%s\"\n" \
192 "\r\n"
194 #define STATS_TUBE_FMT "---\n" \
195 "name: \"%s\"\n" \
196 "current-jobs-urgent: %" PRIu64 "\n" \
197 "current-jobs-ready: %zu\n" \
198 "current-jobs-reserved: %" PRIu64 "\n" \
199 "current-jobs-delayed: %zu\n" \
200 "current-jobs-buried: %" PRIu64 "\n" \
201 "total-jobs: %" PRIu64 "\n" \
202 "current-using: %u\n" \
203 "current-watching: %u\n" \
204 "current-waiting: %" PRIu64 "\n" \
205 "cmd-delete: %" PRIu64 "\n" \
206 "cmd-pause-tube: %" PRIu64 "\n" \
207 "pause: %" PRIu64 "\n" \
208 "pause-time-left: %" PRId64 "\n" \
209 "\r\n"
211 #define STATS_JOB_FMT "---\n" \
212 "id: %" PRIu64 "\n" \
213 "tube: \"%s\"\n" \
214 "state: %s\n" \
215 "pri: %u\n" \
216 "age: %" PRId64 "\n" \
217 "delay: %" PRId64 "\n" \
218 "ttr: %" PRId64 "\n" \
219 "time-left: %" PRId64 "\n" \
220 "file: %d\n" \
221 "reserves: %u\n" \
222 "timeouts: %u\n" \
223 "releases: %u\n" \
224 "buries: %u\n" \
225 "kicks: %u\n" \
226 "\r\n"
228 // The size of the throw-away (BITBUCKET) buffer. Arbitrary.
229 #define BUCKET_BUF_SIZE 1024
231 static uint64 ready_ct = 0;
232 static uint64 timeout_ct = 0;
233 static uint64 op_ct[TOTAL_OPS] = {0};
234 static struct stats global_stat = {0};
236 static Tube *default_tube;
238 // If drain_mode is 1, then server does not accept new jobs.
239 // Variable is set by the SIGUSR1 handler.
240 static volatile sig_atomic_t drain_mode = 0;
242 static int64 started_at;
244 enum { instance_id_bytes = 8 };
245 static char instance_hex[instance_id_bytes * 2 + 1]; // hex-encoded len of instance_id_bytes
247 static struct utsname node_info;
249 // Single linked list with connections that require updates
250 // in the event notification mechanism.
251 static Conn *epollq;
253 static const char * op_names[] = {
254 "<unknown>",
255 CMD_PUT,
256 CMD_PEEKJOB,
257 CMD_RESERVE,
258 CMD_DELETE,
259 CMD_RELEASE,
260 CMD_BURY,
261 CMD_KICK,
262 CMD_STATS,
263 CMD_STATSJOB,
264 CMD_PEEK_BURIED,
265 CMD_USE,
266 CMD_WATCH,
267 CMD_IGNORE,
268 CMD_LIST_TUBES,
269 CMD_LIST_TUBE_USED,
270 CMD_LIST_TUBES_WATCHED,
271 CMD_STATS_TUBE,
272 CMD_PEEK_READY,
273 CMD_PEEK_DELAYED,
274 CMD_RESERVE_TIMEOUT,
275 CMD_TOUCH,
276 CMD_QUIT,
277 CMD_PAUSE_TUBE,
278 CMD_KICKJOB,
279 CMD_RESERVE_JOB,
282 static Job *remove_ready_job(Job *j);
283 static Job *remove_buried_job(Job *j);
285 // epollq_add schedules connection c in the s->conns heap, adds c
286 // to the epollq list to change expected operation in event notifications.
287 // rw='w' means to notify when socket is writeable, 'r' - readable, 'h' - closed.
288 static void
289 epollq_add(Conn *c, char rw) {
290 c->rw = rw;
291 connsched(c);
292 c->next = epollq;
293 epollq = c;
296 // epollq_rmconn removes connection c from the epollq.
297 static void
298 epollq_rmconn(Conn *c)
300 Conn *x, *newhead = NULL;
302 while (epollq) {
303 // x as next element from epollq.
304 x = epollq;
305 epollq = epollq->next;
306 x->next = NULL;
308 // put x back into newhead list.
309 if (x != c) {
310 x->next = newhead;
311 newhead = x;
314 epollq = newhead;
317 // Propagate changes to event notification mechanism about expected operations
318 // in connections' sockets. Clear the epollq list.
319 static void
320 epollq_apply()
322 Conn *c;
324 while (epollq) {
325 c = epollq;
326 epollq = epollq->next;
327 c->next = NULL;
328 int r = sockwant(&c->sock, c->rw);
329 if (r == -1) {
330 twarn("sockwant");
331 connclose(c);
336 #define reply_msg(c, m) \
337 reply((c), (m), CONSTSTRLEN(m), STATE_SEND_WORD)
339 #define reply_serr(c, e) \
340 (twarnx("server error: %s", (e)), reply_msg((c), (e)))
342 static void
343 reply(Conn *c, char *line, int len, int state)
345 if (!c)
346 return;
348 epollq_add(c, 'w');
350 c->reply = line;
351 c->reply_len = len;
352 c->reply_sent = 0;
353 c->state = state;
354 if (verbose >= 2) {
355 printf(">%d reply %.*s\n", c->sock.fd, len-2, line);
359 static void
360 reply_line(Conn*, int, const char*, ...)
361 __attribute__((format(printf, 3, 4)));
363 // reply_line prints *fmt into c->reply_buffer and
364 // calls reply() for the string and state.
365 static void
366 reply_line(Conn *c, int state, const char *fmt, ...)
368 int r;
369 va_list ap;
371 va_start(ap, fmt);
372 r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
373 va_end(ap);
375 /* Make sure the buffer was big enough. If not, we have a bug. */
376 if (r >= LINE_BUF_SIZE) {
377 reply_serr(c, MSG_INTERNAL_ERROR);
378 return;
381 reply(c, c->reply_buf, r, state);
384 // reply_job tells the connection c which job to send,
385 // and replies with this line: <msg> <job_id> <job_size>.
386 static void
387 reply_job(Conn *c, Job *j, const char *msg)
389 c->out_job = j;
390 c->out_job_sent = 0;
391 reply_line(c, STATE_SEND_JOB, "%s %"PRIu64" %u\r\n",
392 msg, j->r.id, j->r.body_size - 2);
395 // remove_waiting_conn unsets CONN_TYPE_WAITING for the connection,
396 // removes it from the waiting_conns set of every tube it's watching.
397 // Noop if connection is not waiting.
398 void
399 remove_waiting_conn(Conn *c)
401 if (!conn_waiting(c))
402 return;
404 c->type &= ~CONN_TYPE_WAITING;
405 global_stat.waiting_ct--;
406 size_t i;
407 for (i = 0; i < c->watch.len; i++) {
408 Tube *t = c->watch.items[i];
409 t->stat.waiting_ct--;
410 ms_remove(&t->waiting_conns, c);
414 // enqueue_waiting_conn sets CONN_TYPE_WAITING for the connection,
415 // adds it to the waiting_conns set of every tube it's watching.
416 static void
417 enqueue_waiting_conn(Conn *c)
419 c->type |= CONN_TYPE_WAITING;
420 global_stat.waiting_ct++;
421 size_t i;
422 for (i = 0; i < c->watch.len; i++) {
423 Tube *t = c->watch.items[i];
424 t->stat.waiting_ct++;
425 ms_append(&t->waiting_conns, c);
429 // next_awaited_job iterates through all the tubes with awaiting connections,
430 // returns the next ready job with the smallest priority.
431 // If jobs has the same priority it picks the job with smaller id.
432 // All tubes with expired pause are unpaused.
433 static Job *
434 next_awaited_job(int64 now)
436 size_t i;
437 Job *j = NULL;
439 for (i = 0; i < tubes.len; i++) {
440 Tube *t = tubes.items[i];
441 if (t->pause) {
442 if (t->unpause_at > now)
443 continue;
444 t->pause = 0;
446 if (t->waiting_conns.len && t->ready.len) {
447 Job *candidate = t->ready.data[0];
448 if (!j || job_pri_less(candidate, j)) {
449 j = candidate;
453 return j;
456 // process_queue performs reservation for every jobs that is awaited for.
457 static void
458 process_queue()
460 Job *j = NULL;
461 int64 now = nanoseconds();
463 while ((j = next_awaited_job(now))) {
464 j = remove_ready_job(j);
465 if (j == NULL) {
466 twarnx("job not ready");
467 continue;
469 Conn *c = ms_take(&j->tube->waiting_conns);
470 if (c == NULL) {
471 twarnx("waiting_conns is empty");
472 continue;
474 global_stat.reserved_ct++;
476 remove_waiting_conn(c);
477 conn_reserve_job(c, j);
478 reply_job(c, j, MSG_RESERVED);
482 // soonest_delayed_job returns the delayed job
483 // with the smallest deadline_at among all tubes.
484 static Job *
485 soonest_delayed_job()
487 Job *j = NULL;
488 size_t i;
490 for (i = 0; i < tubes.len; i++) {
491 Tube *t = tubes.items[i];
492 if (t->delay.len == 0) {
493 continue;
495 Job *nj = t->delay.data[0];
496 if (!j || nj->r.deadline_at < j->r.deadline_at)
497 j = nj;
499 return j;
502 // enqueue_job inserts job j in the tube, returns 1 on success, otherwise 0.
503 // If update_store then it writes an entry to WAL.
504 // On success it processes the queue.
505 // BUG: If maintenance of WAL has failed, it is not reported as error.
506 static int
507 enqueue_job(Server *s, Job *j, int64 delay, char update_store)
509 int r;
511 j->reserver = NULL;
512 if (delay) {
513 j->r.deadline_at = nanoseconds() + delay;
514 r = heapinsert(&j->tube->delay, j);
515 if (!r)
516 return 0;
517 j->r.state = Delayed;
518 } else {
519 r = heapinsert(&j->tube->ready, j);
520 if (!r)
521 return 0;
522 j->r.state = Ready;
523 ready_ct++;
524 if (j->r.pri < URGENT_THRESHOLD) {
525 global_stat.urgent_ct++;
526 j->tube->stat.urgent_ct++;
530 if (update_store) {
531 if (!walwrite(&s->wal, j)) {
532 return 0;
534 walmaint(&s->wal);
537 // The call below makes this function do too much.
538 // TODO: refactor this call outside so the call is explicit (not hidden)?
539 process_queue();
540 return 1;
543 static int
544 bury_job(Server *s, Job *j, char update_store)
546 if (update_store) {
547 int z = walresvupdate(&s->wal);
548 if (!z)
549 return 0;
550 j->walresv += z;
553 job_list_insert(&j->tube->buried, j);
554 global_stat.buried_ct++;
555 j->tube->stat.buried_ct++;
556 j->r.state = Buried;
557 j->reserver = NULL;
558 j->r.bury_ct++;
560 if (update_store) {
561 if (!walwrite(&s->wal, j)) {
562 return 0;
564 walmaint(&s->wal);
567 return 1;
570 void
571 enqueue_reserved_jobs(Conn *c)
573 while (!job_list_is_empty(&c->reserved_jobs)) {
574 Job *j = job_list_remove(c->reserved_jobs.next);
575 int r = enqueue_job(c->srv, j, 0, 0);
576 if (r < 1)
577 bury_job(c->srv, j, 0);
578 global_stat.reserved_ct--;
579 j->tube->stat.reserved_ct--;
580 c->soonest_job = NULL;
584 static int
585 kick_buried_job(Server *s, Job *j)
587 int r;
588 int z;
590 z = walresvupdate(&s->wal);
591 if (!z)
592 return 0;
593 j->walresv += z;
595 remove_buried_job(j);
597 j->r.kick_ct++;
598 r = enqueue_job(s, j, 0, 1);
599 if (r == 1)
600 return 1;
602 /* ready queue is full, so bury it */
603 bury_job(s, j, 0);
604 return 0;
607 static uint
608 get_delayed_job_ct()
610 size_t i;
611 uint count = 0;
613 for (i = 0; i < tubes.len; i++) {
614 Tube *t = tubes.items[i];
615 count += t->delay.len;
617 return count;
620 static int
621 kick_delayed_job(Server *s, Job *j)
623 int r;
624 int z;
626 z = walresvupdate(&s->wal);
627 if (!z)
628 return 0;
629 j->walresv += z;
631 heapremove(&j->tube->delay, j->heap_index);
633 j->r.kick_ct++;
634 r = enqueue_job(s, j, 0, 1);
635 if (r == 1)
636 return 1;
638 /* ready queue is full, so delay it again */
639 r = enqueue_job(s, j, j->r.delay, 0);
640 if (r == 1)
641 return 0;
643 /* last resort */
644 bury_job(s, j, 0);
645 return 0;
648 static int
649 buried_job_p(Tube *t)
651 // this function does not do much. inline?
652 return !job_list_is_empty(&t->buried);
655 /* return the number of jobs successfully kicked */
656 static uint
657 kick_buried_jobs(Server *s, Tube *t, uint n)
659 uint i;
660 for (i = 0; (i < n) && buried_job_p(t); ++i) {
661 kick_buried_job(s, t->buried.next);
663 return i;
666 /* return the number of jobs successfully kicked */
667 static uint
668 kick_delayed_jobs(Server *s, Tube *t, uint n)
670 uint i;
671 for (i = 0; (i < n) && (t->delay.len > 0); ++i) {
672 kick_delayed_job(s, (Job *)t->delay.data[0]);
674 return i;
677 static uint
678 kick_jobs(Server *s, Tube *t, uint n)
680 if (buried_job_p(t))
681 return kick_buried_jobs(s, t, n);
682 return kick_delayed_jobs(s, t, n);
685 // remove_buried_job returns non-NULL value if job j was in the buried state.
686 // It excludes the job from the buried list and updates counters.
687 static Job *
688 remove_buried_job(Job *j)
690 if (!j || j->r.state != Buried)
691 return NULL;
692 j = job_list_remove(j);
693 if (j) {
694 global_stat.buried_ct--;
695 j->tube->stat.buried_ct--;
697 return j;
700 // remove_delayed_job returns non-NULL value if job j was in the delayed state.
701 // It removes the job from the tube delayed heap.
702 static Job *
703 remove_delayed_job(Job *j)
705 if (!j || j->r.state != Delayed)
706 return NULL;
707 heapremove(&j->tube->delay, j->heap_index);
709 return j;
712 // remove_ready_job returns non-NULL value if job j was in the ready state.
713 // It removes the job from the tube ready heap and updates counters.
714 static Job *
715 remove_ready_job(Job *j)
717 if (!j || j->r.state != Ready)
718 return NULL;
719 heapremove(&j->tube->ready, j->heap_index);
720 ready_ct--;
721 if (j->r.pri < URGENT_THRESHOLD) {
722 global_stat.urgent_ct--;
723 j->tube->stat.urgent_ct--;
725 return j;
728 static bool
729 is_job_reserved_by_conn(Conn *c, Job *j)
731 return j && j->reserver == c && j->r.state == Reserved;
734 static bool
735 touch_job(Conn *c, Job *j)
737 if (is_job_reserved_by_conn(c, j)) {
738 j->r.deadline_at = nanoseconds() + j->r.ttr;
739 c->soonest_job = NULL;
740 return true;
742 return false;
745 static void
746 check_err(Conn *c, const char *s)
748 if (errno == EAGAIN)
749 return;
750 if (errno == EINTR)
751 return;
752 if (errno == EWOULDBLOCK)
753 return;
755 twarn("%s", s);
756 c->state = STATE_CLOSE;
759 /* Scan the given string for the sequence "\r\n" and return the line length.
760 * Always returns at least 2 if a match is found. Returns 0 if no match. */
761 static size_t
762 scan_line_end(const char *s, int size)
764 char *match;
766 match = memchr(s, '\r', size - 1);
767 if (!match)
768 return 0;
770 /* this is safe because we only scan size - 1 chars above */
771 if (match[1] == '\n')
772 return match - s + 2;
774 return 0;
777 /* parse the command line */
778 static int
779 which_cmd(Conn *c)
781 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
782 TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
783 TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
784 TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
785 TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
786 TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
787 TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
788 TEST_CMD(c->cmd, CMD_RESERVE_JOB, OP_RESERVE_JOB);
789 TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
790 TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
791 TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
792 TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
793 TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
794 TEST_CMD(c->cmd, CMD_KICKJOB, OP_KICKJOB);
795 TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
796 TEST_CMD(c->cmd, CMD_STATSJOB, OP_STATSJOB);
797 TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
798 TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
799 TEST_CMD(c->cmd, CMD_USE, OP_USE);
800 TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
801 TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
802 TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
803 TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
804 TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
805 TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
806 TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
807 return OP_UNKNOWN;
810 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
811 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
812 * This function is idempotent(). */
813 static void
814 fill_extra_data(Conn *c)
816 if (!c->sock.fd)
817 return; /* the connection was closed */
818 if (!c->cmd_len)
819 return; /* we don't have a complete command */
821 /* how many extra bytes did we read? */
822 int64 extra_bytes = c->cmd_read - c->cmd_len;
824 int64 job_data_bytes = 0;
825 /* how many bytes should we put into the job body? */
826 if (c->in_job) {
827 job_data_bytes = min(extra_bytes, c->in_job->r.body_size);
828 memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
829 c->in_job_read = job_data_bytes;
830 } else if (c->in_job_read) {
831 /* we are in bit-bucket mode, throwing away data */
832 job_data_bytes = min(extra_bytes, c->in_job_read);
833 c->in_job_read -= job_data_bytes;
836 /* how many bytes are left to go into the future cmd? */
837 int64 cmd_bytes = extra_bytes - job_data_bytes;
838 memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
839 c->cmd_read = cmd_bytes;
840 c->cmd_len = 0; /* we no longer know the length of the new command */
843 #define skip(conn,n,msg) (_skip(conn, n, msg, CONSTSTRLEN(msg)))
845 static void
846 _skip(Conn *c, int64 n, char *msg, int msglen)
848 /* Invert the meaning of in_job_read while throwing away data -- it
849 * counts the bytes that remain to be thrown away. */
850 c->in_job = 0;
851 c->in_job_read = n;
852 fill_extra_data(c);
854 if (c->in_job_read == 0) {
855 reply(c, msg, msglen, STATE_SEND_WORD);
856 return;
859 c->reply = msg;
860 c->reply_len = msglen;
861 c->reply_sent = 0;
862 c->state = STATE_BITBUCKET;
865 static void
866 enqueue_incoming_job(Conn *c)
868 int r;
869 Job *j = c->in_job;
871 c->in_job = NULL; /* the connection no longer owns this job */
872 c->in_job_read = 0;
874 /* check if the trailer is present and correct */
875 if (memcmp(j->body + j->r.body_size - 2, "\r\n", 2)) {
876 job_free(j);
877 reply_msg(c, MSG_EXPECTED_CRLF);
878 return;
881 if (verbose >= 2) {
882 printf("<%d job %"PRIu64"\n", c->sock.fd, j->r.id);
885 if (drain_mode) {
886 job_free(j);
887 reply_serr(c, MSG_DRAINING);
888 return;
891 if (j->walresv) {
892 reply_serr(c, MSG_INTERNAL_ERROR);
893 return;
895 j->walresv = walresvput(&c->srv->wal, j);
896 if (!j->walresv) {
897 reply_serr(c, MSG_OUT_OF_MEMORY);
898 return;
901 /* we have a complete job, so let's stick it in the pqueue */
902 r = enqueue_job(c->srv, j, j->r.delay, 1);
904 // Dead code: condition cannot happen, r can take 1 or 0 values only.
905 if (r < 0) {
906 reply_serr(c, MSG_INTERNAL_ERROR);
907 return;
910 global_stat.total_jobs_ct++;
911 j->tube->stat.total_jobs_ct++;
913 if (r == 1) {
914 reply_line(c, STATE_SEND_WORD, MSG_INSERTED_FMT, j->r.id);
915 return;
918 /* out of memory trying to grow the queue, so it gets buried */
919 bury_job(c->srv, j, 0);
920 reply_line(c, STATE_SEND_WORD, MSG_BURIED_FMT, j->r.id);
923 static uint
924 uptime()
926 return (nanoseconds() - started_at) / 1000000000;
929 static int
930 fmt_stats(char *buf, size_t size, void *x)
932 int whead = 0, wcur = 0;
933 Server *s = x;
934 struct rusage ru;
936 s = x;
938 if (s->wal.head) {
939 whead = s->wal.head->seq;
942 if (s->wal.cur) {
943 wcur = s->wal.cur->seq;
946 getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
947 return snprintf(buf, size, STATS_FMT,
948 global_stat.urgent_ct,
949 ready_ct,
950 global_stat.reserved_ct,
951 get_delayed_job_ct(),
952 global_stat.buried_ct,
953 op_ct[OP_PUT],
954 op_ct[OP_PEEKJOB],
955 op_ct[OP_PEEK_READY],
956 op_ct[OP_PEEK_DELAYED],
957 op_ct[OP_PEEK_BURIED],
958 op_ct[OP_RESERVE],
959 op_ct[OP_RESERVE_TIMEOUT],
960 op_ct[OP_DELETE],
961 op_ct[OP_RELEASE],
962 op_ct[OP_USE],
963 op_ct[OP_WATCH],
964 op_ct[OP_IGNORE],
965 op_ct[OP_BURY],
966 op_ct[OP_KICK],
967 op_ct[OP_TOUCH],
968 op_ct[OP_STATS],
969 op_ct[OP_STATSJOB],
970 op_ct[OP_STATS_TUBE],
971 op_ct[OP_LIST_TUBES],
972 op_ct[OP_LIST_TUBE_USED],
973 op_ct[OP_LIST_TUBES_WATCHED],
974 op_ct[OP_PAUSE_TUBE],
975 timeout_ct,
976 global_stat.total_jobs_ct,
977 job_data_size_limit,
978 tubes.len,
979 count_cur_conns(),
980 count_cur_producers(),
981 count_cur_workers(),
982 global_stat.waiting_ct,
983 count_tot_conns(),
984 (long) getpid(),
985 version,
986 (int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
987 (int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
988 uptime(),
989 whead,
990 wcur,
991 s->wal.nmig,
992 s->wal.nrec,
993 s->wal.filesize,
994 drain_mode ? "true" : "false",
995 instance_hex,
996 node_info.nodename,
997 node_info.version,
998 node_info.machine);
1001 /* Read an integer from the given buffer and place it in num.
1002 * Parsed integer should fit into uint64.
1003 * Update end to point to the address after the last character consumed.
1004 * num and end can be NULL. If they are both NULL, read_u64() will do the
1005 * conversion and return the status code but not update any values.
1006 * This is an easy way to check for errors.
1007 * If end is NULL, read_u64() will also check that the entire input string
1008 * was consumed and return an error code otherwise.
1009 * Return 0 on success, or nonzero on failure.
1010 * If a failure occurs, num and end are not modified. */
1011 static int
1012 read_u64(uint64 *num, const char *buf, char **end)
1014 uintmax_t tnum;
1015 char *tend;
1017 errno = 0;
1018 while (buf[0] == ' ')
1019 buf++;
1020 if (buf[0] < '0' || '9' < buf[0])
1021 return -1;
1022 tnum = strtoumax(buf, &tend, 10);
1023 if (tend == buf)
1024 return -1;
1025 if (errno)
1026 return -1;
1027 if (!end && tend[0] != '\0')
1028 return -1;
1029 if (tnum > UINT64_MAX)
1030 return -1;
1032 if (num) *num = (uint64)tnum;
1033 if (end) *end = tend;
1034 return 0;
1037 // Indentical to read_u64() but instead reads into uint32.
1038 static int
1039 read_u32(uint32 *num, const char *buf, char **end)
1041 uintmax_t tnum;
1042 char *tend;
1044 errno = 0;
1045 while (buf[0] == ' ')
1046 buf++;
1047 if (buf[0] < '0' || '9' < buf[0])
1048 return -1;
1049 tnum = strtoumax(buf, &tend, 10);
1050 if (tend == buf)
1051 return -1;
1052 if (errno)
1053 return -1;
1054 if (!end && tend[0] != '\0')
1055 return -1;
1056 if (tnum > UINT32_MAX)
1057 return -1;
1059 if (num) *num = (uint32)tnum;
1060 if (end) *end = tend;
1061 return 0;
1064 /* Read a delay value in seconds from the given buffer and
1065 place it in duration in nanoseconds.
1066 The interface and behavior are analogous to read_u32(). */
1067 static int
1068 read_duration(int64 *duration, const char *buf, char **end)
1070 int r;
1071 uint32 dur_sec;
1073 r = read_u32(&dur_sec, buf, end);
1074 if (r)
1075 return r;
1076 *duration = ((int64) dur_sec) * 1000000000;
1077 return 0;
1080 /* Read a tube name from the given buffer moving the buffer to the name start */
1081 static int
1082 read_tube_name(char **tubename, char *buf, char **end)
1084 size_t len;
1086 while (buf[0] == ' ')
1087 buf++;
1088 len = strspn(buf, NAME_CHARS);
1089 if (len == 0)
1090 return -1;
1091 if (tubename)
1092 *tubename = buf;
1093 if (end)
1094 *end = buf + len;
1095 return 0;
1098 static void
1099 wait_for_job(Conn *c, int timeout)
1101 c->state = STATE_WAIT;
1102 enqueue_waiting_conn(c);
1104 /* Set the pending timeout to the requested timeout amount */
1105 c->pending_timeout = timeout;
1107 // only care if they hang up
1108 epollq_add(c, 'h');
1111 typedef int(*fmt_fn)(char *, size_t, void *);
1113 static void
1114 do_stats(Conn *c, fmt_fn fmt, void *data)
1116 int r, stats_len;
1118 /* first, measure how big a buffer we will need */
1119 stats_len = fmt(NULL, 0, data) + 16;
1121 c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
1122 if (!c->out_job) {
1123 reply_serr(c, MSG_OUT_OF_MEMORY);
1124 return;
1127 /* Mark this job as a copy so it can be appropriately freed later on */
1128 c->out_job->r.state = Copy;
1130 /* now actually format the stats data */
1131 r = fmt(c->out_job->body, stats_len, data);
1132 /* and set the actual body size */
1133 c->out_job->r.body_size = r;
1134 if (r > stats_len) {
1135 reply_serr(c, MSG_INTERNAL_ERROR);
1136 return;
1139 c->out_job_sent = 0;
1140 reply_line(c, STATE_SEND_JOB, "OK %d\r\n", r - 2);
1143 static void
1144 do_list_tubes(Conn *c, Ms *l)
1146 char *buf;
1147 Tube *t;
1148 size_t i, resp_z;
1150 /* first, measure how big a buffer we will need */
1151 resp_z = 6; /* initial "---\n" and final "\r\n" */
1152 for (i = 0; i < l->len; i++) {
1153 t = l->items[i];
1154 resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
1157 c->out_job = allocate_job(resp_z); /* fake job to hold response data */
1158 if (!c->out_job) {
1159 reply_serr(c, MSG_OUT_OF_MEMORY);
1160 return;
1163 /* Mark this job as a copy so it can be appropriately freed later on */
1164 c->out_job->r.state = Copy;
1166 /* now actually format the response */
1167 buf = c->out_job->body;
1168 buf += snprintf(buf, 5, "---\n");
1169 for (i = 0; i < l->len; i++) {
1170 t = l->items[i];
1171 buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
1173 buf[0] = '\r';
1174 buf[1] = '\n';
1176 c->out_job_sent = 0;
1177 reply_line(c, STATE_SEND_JOB, "OK %zu\r\n", resp_z - 2);
1180 static int
1181 fmt_job_stats(char *buf, size_t size, Job *j)
1183 int64 t;
1184 int64 time_left;
1185 int file = 0;
1187 t = nanoseconds();
1188 if (j->r.state == Reserved || j->r.state == Delayed) {
1189 time_left = (j->r.deadline_at - t) / 1000000000;
1190 } else {
1191 time_left = 0;
1193 if (j->file) {
1194 file = j->file->seq;
1196 return snprintf(buf, size, STATS_JOB_FMT,
1197 j->r.id,
1198 j->tube->name,
1199 job_state(j),
1200 j->r.pri,
1201 (t - j->r.created_at) / 1000000000,
1202 j->r.delay / 1000000000,
1203 j->r.ttr / 1000000000,
1204 time_left,
1205 file,
1206 j->r.reserve_ct,
1207 j->r.timeout_ct,
1208 j->r.release_ct,
1209 j->r.bury_ct,
1210 j->r.kick_ct);
1213 static int
1214 fmt_stats_tube(char *buf, size_t size, Tube *t)
1216 uint64 time_left;
1218 if (t->pause > 0) {
1219 time_left = (t->unpause_at - nanoseconds()) / 1000000000;
1220 } else {
1221 time_left = 0;
1223 return snprintf(buf, size, STATS_TUBE_FMT,
1224 t->name,
1225 t->stat.urgent_ct,
1226 t->ready.len,
1227 t->stat.reserved_ct,
1228 t->delay.len,
1229 t->stat.buried_ct,
1230 t->stat.total_jobs_ct,
1231 t->using_ct,
1232 t->watching_ct,
1233 t->stat.waiting_ct,
1234 t->stat.total_delete_ct,
1235 t->stat.pause_ct,
1236 t->pause / 1000000000,
1237 time_left);
1240 static void
1241 maybe_enqueue_incoming_job(Conn *c)
1243 Job *j = c->in_job;
1245 /* do we have a complete job? */
1246 if (c->in_job_read == j->r.body_size) {
1247 enqueue_incoming_job(c);
1248 return;
1251 /* otherwise we have incomplete data, so just keep waiting */
1252 c->state = STATE_WANT_DATA;
1255 /* j can be NULL */
1256 static Job *
1257 remove_this_reserved_job(Conn *c, Job *j)
1259 j = job_list_remove(j);
1260 if (j) {
1261 global_stat.reserved_ct--;
1262 j->tube->stat.reserved_ct--;
1263 j->reserver = NULL;
1265 c->soonest_job = NULL;
1266 return j;
1269 static Job *
1270 remove_reserved_job(Conn *c, Job *j)
1272 if (!is_job_reserved_by_conn(c, j))
1273 return NULL;
1274 return remove_this_reserved_job(c, j);
1277 static bool
1278 is_valid_tube(const char *name, size_t max)
1280 size_t len = strlen(name);
1281 return 0 < len && len <= max &&
1282 strspn(name, NAME_CHARS) == len &&
1283 name[0] != '-';
1286 static void
1287 dispatch_cmd(Conn *c)
1289 int r, timeout = -1;
1290 uint i;
1291 uint count;
1292 Job *j = 0;
1293 byte type;
1294 char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
1295 uint32 pri;
1296 uint32 body_size;
1297 int64 delay, ttr;
1298 uint64 id;
1299 Tube *t = NULL;
1301 /* NUL-terminate this string so we can use strtol and friends */
1302 c->cmd[c->cmd_len - 2] = '\0';
1304 /* check for possible maliciousness */
1305 if (strlen(c->cmd) != c->cmd_len - 2) {
1306 reply_msg(c, MSG_BAD_FORMAT);
1307 return;
1310 type = which_cmd(c);
1311 if (verbose >= 2) {
1312 printf("<%d command %s\n", c->sock.fd, op_names[type]);
1315 switch (type) {
1316 case OP_PUT:
1317 if (read_u32(&pri, c->cmd + 4, &delay_buf) ||
1318 read_duration(&delay, delay_buf, &ttr_buf) ||
1319 read_duration(&ttr, ttr_buf, &size_buf) ||
1320 read_u32(&body_size, size_buf, &end_buf)) {
1321 reply_msg(c, MSG_BAD_FORMAT);
1322 return;
1324 op_ct[type]++;
1326 if (body_size > job_data_size_limit) {
1327 /* throw away the job body and respond with JOB_TOO_BIG */
1328 skip(c, (int64)body_size + 2, MSG_JOB_TOO_BIG);
1329 return;
1332 /* don't allow trailing garbage */
1333 if (end_buf[0] != '\0') {
1334 reply_msg(c, MSG_BAD_FORMAT);
1335 return;
1338 connsetproducer(c);
1340 if (ttr < 1000000000) {
1341 ttr = 1000000000;
1344 c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);
1346 /* OOM? */
1347 if (!c->in_job) {
1348 /* throw away the job body and respond with OUT_OF_MEMORY */
1349 twarnx("server error: " MSG_OUT_OF_MEMORY);
1350 skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
1351 return;
1354 fill_extra_data(c);
1356 /* it's possible we already have a complete job */
1357 maybe_enqueue_incoming_job(c);
1358 return;
1360 case OP_PEEK_READY:
1361 /* don't allow trailing garbage */
1362 if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
1363 reply_msg(c, MSG_BAD_FORMAT);
1364 return;
1366 op_ct[type]++;
1368 if (c->use->ready.len) {
1369 j = job_copy(c->use->ready.data[0]);
1372 if (!j) {
1373 reply_msg(c, MSG_NOTFOUND);
1374 return;
1376 reply_job(c, j, MSG_FOUND);
1377 return;
1379 case OP_PEEK_DELAYED:
1380 /* don't allow trailing garbage */
1381 if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
1382 reply_msg(c, MSG_BAD_FORMAT);
1383 return;
1385 op_ct[type]++;
1387 if (c->use->delay.len) {
1388 j = job_copy(c->use->delay.data[0]);
1391 if (!j) {
1392 reply_msg(c, MSG_NOTFOUND);
1393 return;
1395 reply_job(c, j, MSG_FOUND);
1396 return;
1398 case OP_PEEK_BURIED:
1399 /* don't allow trailing garbage */
1400 if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
1401 reply_msg(c, MSG_BAD_FORMAT);
1402 return;
1404 op_ct[type]++;
1406 if (buried_job_p(c->use))
1407 j = job_copy(c->use->buried.next);
1408 else
1409 j = NULL;
1411 if (!j) {
1412 reply_msg(c, MSG_NOTFOUND);
1413 return;
1415 reply_job(c, j, MSG_FOUND);
1416 return;
1418 case OP_PEEKJOB:
1419 if (read_u64(&id, c->cmd + CMD_PEEKJOB_LEN, NULL)) {
1420 reply_msg(c, MSG_BAD_FORMAT);
1421 return;
1423 op_ct[type]++;
1425 /* So, peek is annoying, because some other connection might free the
1426 * job while we are still trying to write it out. So we copy it and
1427 * free the copy when it's done sending, in the "conn_want_command" function. */
1428 j = job_copy(job_find(id));
1430 if (!j) {
1431 reply_msg(c, MSG_NOTFOUND);
1432 return;
1434 reply_job(c, j, MSG_FOUND);
1435 return;
1437 case OP_RESERVE_TIMEOUT:
1438 errno = 0;
1439 uint32 utimeout = 0;
1440 if (read_u32(&utimeout, c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf) != 0 || utimeout > INT_MAX) {
1441 reply_msg(c, MSG_BAD_FORMAT);
1442 return;
1444 timeout = (int)utimeout;
1445 /* Falls through */
1447 case OP_RESERVE:
1448 /* don't allow trailing garbage */
1449 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
1450 reply_msg(c, MSG_BAD_FORMAT);
1451 return;
1453 op_ct[type]++;
1454 connsetworker(c);
1456 if (conndeadlinesoon(c) && !conn_ready(c)) {
1457 reply_msg(c, MSG_DEADLINE_SOON);
1458 return;
1461 /* try to get a new job for this guy */
1462 wait_for_job(c, timeout);
1463 process_queue();
1464 return;
1466 case OP_RESERVE_JOB:
1467 if (read_u64(&id, c->cmd + CMD_RESERVE_JOB_LEN, NULL)) {
1468 reply_msg(c, MSG_BAD_FORMAT);
1469 return;
1471 op_ct[type]++;
1473 // This command could produce "deadline soon" if
1474 // the connection has a reservation about to expire.
1475 // We choose not to do it, because this command does not block
1476 // for an arbitrary amount of time as reserve and reserve-with-timeout.
1478 j = job_find(id);
1479 if (!j) {
1480 reply_msg(c, MSG_NOTFOUND);
1481 return;
1483 // Check if this job is already reserved.
1484 if (j->r.state == Reserved || j->r.state == Invalid) {
1485 reply_msg(c, MSG_NOTFOUND);
1486 return;
1489 // Job can be in ready, buried or delayed states.
1490 if (j->r.state == Ready) {
1491 j = remove_ready_job(j);
1492 } else if (j->r.state == Buried) {
1493 j = remove_buried_job(j);
1494 } else if (j->r.state == Delayed) {
1495 j = remove_delayed_job(j);
1496 } else {
1497 reply_serr(c, MSG_INTERNAL_ERROR);
1498 return;
1501 connsetworker(c);
1502 global_stat.reserved_ct++;
1504 conn_reserve_job(c, j);
1505 reply_job(c, j, MSG_RESERVED);
1506 return;
1508 case OP_DELETE:
1509 if (read_u64(&id, c->cmd + CMD_DELETE_LEN, NULL)) {
1510 reply_msg(c, MSG_BAD_FORMAT);
1511 return;
1513 op_ct[type]++;
1516 Job *jf = job_find(id);
1517 j = remove_reserved_job(c, jf);
1518 if (!j)
1519 j = remove_ready_job(jf);
1520 if (!j)
1521 j = remove_buried_job(jf);
1522 if (!j)
1523 j = remove_delayed_job(jf);
1526 if (!j) {
1527 reply_msg(c, MSG_NOTFOUND);
1528 return;
1531 j->tube->stat.total_delete_ct++;
1533 j->r.state = Invalid;
1534 r = walwrite(&c->srv->wal, j);
1535 walmaint(&c->srv->wal);
1536 job_free(j);
1538 if (!r) {
1539 reply_serr(c, MSG_INTERNAL_ERROR);
1540 return;
1542 reply_msg(c, MSG_DELETED);
1543 return;
1545 case OP_RELEASE:
1546 if (read_u64(&id, c->cmd + CMD_RELEASE_LEN, &pri_buf) ||
1547 read_u32(&pri, pri_buf, &delay_buf) ||
1548 read_duration(&delay, delay_buf, NULL)) {
1549 reply_msg(c, MSG_BAD_FORMAT);
1550 return;
1552 op_ct[type]++;
1554 j = remove_reserved_job(c, job_find(id));
1556 if (!j) {
1557 reply_msg(c, MSG_NOTFOUND);
1558 return;
1561 /* We want to update the delay deadline on disk, so reserve space for
1562 * that. */
1563 if (delay) {
1564 int z = walresvupdate(&c->srv->wal);
1565 if (!z) {
1566 reply_serr(c, MSG_OUT_OF_MEMORY);
1567 return;
1569 j->walresv += z;
1572 j->r.pri = pri;
1573 j->r.delay = delay;
1574 j->r.release_ct++;
1576 r = enqueue_job(c->srv, j, delay, !!delay);
1577 if (r < 0) {
1578 reply_serr(c, MSG_INTERNAL_ERROR);
1579 return;
1581 if (r == 1) {
1582 reply_msg(c, MSG_RELEASED);
1583 return;
1586 /* out of memory trying to grow the queue, so it gets buried */
1587 bury_job(c->srv, j, 0);
1588 reply_msg(c, MSG_BURIED);
1589 return;
1591 case OP_BURY:
1592 if (read_u64(&id, c->cmd + CMD_BURY_LEN, &pri_buf) ||
1593 read_u32(&pri, pri_buf, NULL)) {
1594 reply_msg(c, MSG_BAD_FORMAT);
1595 return;
1598 op_ct[type]++;
1600 j = remove_reserved_job(c, job_find(id));
1602 if (!j) {
1603 reply_msg(c, MSG_NOTFOUND);
1604 return;
1607 j->r.pri = pri;
1608 r = bury_job(c->srv, j, 1);
1609 if (!r) {
1610 reply_serr(c, MSG_INTERNAL_ERROR);
1611 return;
1613 reply_msg(c, MSG_BURIED);
1614 return;
1616 case OP_KICK:
1617 errno = 0;
1618 count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
1619 if (end_buf == c->cmd + CMD_KICK_LEN || errno) {
1620 reply_msg(c, MSG_BAD_FORMAT);
1621 return;
1624 op_ct[type]++;
1626 i = kick_jobs(c->srv, c->use, count);
1627 reply_line(c, STATE_SEND_WORD, "KICKED %u\r\n", i);
1628 return;
1630 case OP_KICKJOB:
1631 if (read_u64(&id, c->cmd + CMD_KICKJOB_LEN, NULL)) {
1632 reply_msg(c, MSG_BAD_FORMAT);
1633 return;
1636 op_ct[type]++;
1638 j = job_find(id);
1639 if (!j) {
1640 reply_msg(c, MSG_NOTFOUND);
1641 return;
1644 if ((j->r.state == Buried && kick_buried_job(c->srv, j)) ||
1645 (j->r.state == Delayed && kick_delayed_job(c->srv, j))) {
1646 reply_msg(c, MSG_KICKED);
1647 } else {
1648 reply_msg(c, MSG_NOTFOUND);
1650 return;
1652 case OP_TOUCH:
1653 if (read_u64(&id, c->cmd + CMD_TOUCH_LEN, NULL)) {
1654 reply_msg(c, MSG_BAD_FORMAT);
1655 return;
1657 op_ct[type]++;
1659 if (touch_job(c, job_find(id))) {
1660 reply_msg(c, MSG_TOUCHED);
1661 } else {
1662 reply_msg(c, MSG_NOTFOUND);
1664 return;
1666 case OP_STATS:
1667 /* don't allow trailing garbage */
1668 if (c->cmd_len != CMD_STATS_LEN + 2) {
1669 reply_msg(c, MSG_BAD_FORMAT);
1670 return;
1672 op_ct[type]++;
1674 do_stats(c, fmt_stats, c->srv);
1675 return;
1677 case OP_STATSJOB:
1678 if (read_u64(&id, c->cmd + CMD_STATSJOB_LEN, NULL)) {
1679 reply_msg(c, MSG_BAD_FORMAT);
1680 return;
1682 op_ct[type]++;
1684 j = job_find(id);
1685 if (!j) {
1686 reply_msg(c, MSG_NOTFOUND);
1687 return;
1690 if (!j->tube) {
1691 reply_serr(c, MSG_INTERNAL_ERROR);
1692 return;
1694 do_stats(c, (fmt_fn) fmt_job_stats, j);
1695 return;
1697 case OP_STATS_TUBE:
1698 name = c->cmd + CMD_STATS_TUBE_LEN;
1699 if (!is_valid_tube(name, MAX_TUBE_NAME_LEN - 1)) {
1700 reply_msg(c, MSG_BAD_FORMAT);
1701 return;
1703 op_ct[type]++;
1705 t = tube_find(&tubes, name);
1706 if (!t) {
1707 reply_msg(c, MSG_NOTFOUND);
1708 return;
1710 do_stats(c, (fmt_fn) fmt_stats_tube, t);
1711 t = NULL;
1712 return;
1714 case OP_LIST_TUBES:
1715 /* don't allow trailing garbage */
1716 if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
1717 reply_msg(c, MSG_BAD_FORMAT);
1718 return;
1720 op_ct[type]++;
1721 do_list_tubes(c, &tubes);
1722 return;
1724 case OP_LIST_TUBE_USED:
1725 /* don't allow trailing garbage */
1726 if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
1727 reply_msg(c, MSG_BAD_FORMAT);
1728 return;
1730 op_ct[type]++;
1731 reply_line(c, STATE_SEND_WORD, "USING %s\r\n", c->use->name);
1732 return;
1734 case OP_LIST_TUBES_WATCHED:
1735 /* don't allow trailing garbage */
1736 if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
1737 reply_msg(c, MSG_BAD_FORMAT);
1738 return;
1740 op_ct[type]++;
1741 do_list_tubes(c, &c->watch);
1742 return;
1744 case OP_USE:
1745 name = c->cmd + CMD_USE_LEN;
1746 if (!is_valid_tube(name, MAX_TUBE_NAME_LEN - 1)) {
1747 reply_msg(c, MSG_BAD_FORMAT);
1748 return;
1750 op_ct[type]++;
1752 TUBE_ASSIGN(t, tube_find_or_make(name));
1753 if (!t) {
1754 reply_serr(c, MSG_OUT_OF_MEMORY);
1755 return;
1758 c->use->using_ct--;
1759 TUBE_ASSIGN(c->use, t);
1760 TUBE_ASSIGN(t, NULL);
1761 c->use->using_ct++;
1763 reply_line(c, STATE_SEND_WORD, "USING %s\r\n", c->use->name);
1764 return;
1766 case OP_WATCH:
1767 name = c->cmd + CMD_WATCH_LEN;
1768 if (!is_valid_tube(name, MAX_TUBE_NAME_LEN - 1)) {
1769 reply_msg(c, MSG_BAD_FORMAT);
1770 return;
1772 op_ct[type]++;
1774 TUBE_ASSIGN(t, tube_find_or_make(name));
1775 if (!t) {
1776 reply_serr(c, MSG_OUT_OF_MEMORY);
1777 return;
1780 r = 1;
1781 if (!ms_contains(&c->watch, t))
1782 r = ms_append(&c->watch, t);
1783 TUBE_ASSIGN(t, NULL);
1784 if (!r) {
1785 reply_serr(c, MSG_OUT_OF_MEMORY);
1786 return;
1788 reply_line(c, STATE_SEND_WORD, "WATCHING %zu\r\n", c->watch.len);
1789 return;
1791 case OP_IGNORE:
1792 name = c->cmd + CMD_IGNORE_LEN;
1793 if (!is_valid_tube(name, MAX_TUBE_NAME_LEN - 1)) {
1794 reply_msg(c, MSG_BAD_FORMAT);
1795 return;
1797 op_ct[type]++;
1799 t = tube_find(&c->watch, name);
1800 if (t && c->watch.len < 2) {
1801 reply_msg(c, MSG_NOT_IGNORED);
1802 return;
1805 if (t)
1806 ms_remove(&c->watch, t); /* may free t if refcount => 0 */
1807 t = NULL;
1808 reply_line(c, STATE_SEND_WORD, "WATCHING %zu\r\n", c->watch.len);
1809 return;
1811 case OP_QUIT:
1812 c->state = STATE_CLOSE;
1813 return;
1815 case OP_PAUSE_TUBE:
1816 if (read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf) ||
1817 read_duration(&delay, delay_buf, NULL)) {
1818 reply_msg(c, MSG_BAD_FORMAT);
1819 return;
1821 op_ct[type]++;
1823 *delay_buf = '\0';
1824 if (!is_valid_tube(name, MAX_TUBE_NAME_LEN - 1)) {
1825 reply_msg(c, MSG_BAD_FORMAT);
1826 return;
1828 t = tube_find(&tubes, name);
1829 if (!t) {
1830 reply_msg(c, MSG_NOTFOUND);
1831 return;
1834 // Always pause for a positive amount of time, to make sure
1835 // that waiting clients wake up when the deadline arrives.
1836 if (delay == 0) {
1837 delay = 1;
1840 t->unpause_at = nanoseconds() + delay;
1841 t->pause = delay;
1842 t->stat.pause_ct++;
1844 reply_line(c, STATE_SEND_WORD, "PAUSED\r\n");
1845 return;
1847 default:
1848 reply_msg(c, MSG_UNKNOWN_COMMAND);
1852 /* There are three reasons this function may be called. We need to check for
1853 * all of them.
1855 * 1. A reserved job has run out of time.
1856 * 2. A waiting client's reserved job has entered the safety margin.
1857 * 3. A waiting client's requested timeout has occurred.
1859 * If any of these happen, we must do the appropriate thing. */
1860 static void
1861 conn_timeout(Conn *c)
1863 int should_timeout = 0;
1864 Job *j;
1866 /* Check if the client was trying to reserve a job. */
1867 if (conn_waiting(c) && conndeadlinesoon(c))
1868 should_timeout = 1;
1870 /* Check if any reserved jobs have run out of time. We should do this
1871 * whether or not the client is waiting for a new reservation. */
1872 while ((j = connsoonestjob(c))) {
1873 if (j->r.deadline_at >= nanoseconds())
1874 break;
1876 /* This job is in the middle of being written out. If we return it to
1877 * the ready queue, someone might free it before we finish writing it
1878 * out to the socket. So we'll copy it here and free the copy when it's
1879 * done sending. */
1880 if (j == c->out_job) {
1881 c->out_job = job_copy(c->out_job);
1884 timeout_ct++; /* stats */
1885 j->r.timeout_ct++;
1886 int r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0);
1887 if (r < 1)
1888 bury_job(c->srv, j, 0); /* out of memory, so bury it */
1889 connsched(c);
1892 if (should_timeout) {
1893 remove_waiting_conn(c);
1894 reply_msg(c, MSG_DEADLINE_SOON);
1895 } else if (conn_waiting(c) && c->pending_timeout >= 0) {
1896 c->pending_timeout = -1;
1897 remove_waiting_conn(c);
1898 reply_msg(c, MSG_TIMED_OUT);
1902 void
1903 enter_drain_mode(int sig)
1905 UNUSED_PARAMETER(sig);
1906 drain_mode = 1;
1909 static void
1910 conn_want_command(Conn *c)
1912 epollq_add(c, 'r');
1914 /* was this a peek or stats command? */
1915 if (c->out_job && c->out_job->r.state == Copy)
1916 job_free(c->out_job);
1917 c->out_job = NULL;
1919 c->reply_sent = 0; /* now that we're done, reset this */
1920 c->state = STATE_WANT_COMMAND;
1923 static void
1924 conn_process_io(Conn *c)
1926 int r;
1927 int64 to_read;
1928 Job *j;
1929 struct iovec iov[2];
1931 switch (c->state) {
1932 case STATE_WANT_COMMAND:
1933 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1934 if (r == -1) {
1935 check_err(c, "read()");
1936 return;
1938 if (r == 0) {
1939 c->state = STATE_CLOSE;
1940 return;
1943 c->cmd_read += r;
1944 c->cmd_len = scan_line_end(c->cmd, c->cmd_read);
1945 if (c->cmd_len) {
1946 // We found complete command line. Bail out to h_conn.
1947 return;
1950 // c->cmd_read > LINE_BUF_SIZE can't happen
1952 if (c->cmd_read == LINE_BUF_SIZE) {
1953 // Command line too long.
1954 // Put connection into special state that discards
1955 // the command line until the end line is found.
1956 c->cmd_read = 0; // discard the input so far
1957 c->state = STATE_WANT_ENDLINE;
1959 // We have an incomplete line, so just keep waiting.
1960 return;
1962 case STATE_WANT_ENDLINE:
1963 r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
1964 if (r == -1) {
1965 check_err(c, "read()");
1966 return;
1968 if (r == 0) {
1969 c->state = STATE_CLOSE;
1970 return;
1973 c->cmd_read += r;
1974 c->cmd_len = scan_line_end(c->cmd, c->cmd_read);
1975 if (c->cmd_len) {
1976 // Found the EOL. Reply and reuse whatever was read afer the EOL.
1977 reply_msg(c, MSG_BAD_FORMAT);
1978 fill_extra_data(c);
1979 return;
1982 // c->cmd_read > LINE_BUF_SIZE can't happen
1984 if (c->cmd_read == LINE_BUF_SIZE) {
1985 // Keep discarding the input since no EOL was found.
1986 c->cmd_read = 0;
1988 return;
1990 case STATE_BITBUCKET: {
1991 /* Invert the meaning of in_job_read while throwing away data -- it
1992 * counts the bytes that remain to be thrown away. */
1993 static char bucket[BUCKET_BUF_SIZE];
1994 to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
1995 r = read(c->sock.fd, bucket, to_read);
1996 if (r == -1) {
1997 check_err(c, "read()");
1998 return;
2000 if (r == 0) {
2001 c->state = STATE_CLOSE;
2002 return;
2005 c->in_job_read -= r; /* we got some bytes */
2007 /* (c->in_job_read < 0) can't happen */
2009 if (c->in_job_read == 0) {
2010 reply(c, c->reply, c->reply_len, STATE_SEND_WORD);
2012 return;
2014 case STATE_WANT_DATA:
2015 j = c->in_job;
2017 r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
2018 if (r == -1) {
2019 check_err(c, "read()");
2020 return;
2022 if (r == 0) {
2023 c->state = STATE_CLOSE;
2024 return;
2027 c->in_job_read += r; /* we got some bytes */
2029 /* (j->in_job_read > j->r.body_size) can't happen */
2031 maybe_enqueue_incoming_job(c);
2032 return;
2033 case STATE_SEND_WORD:
2034 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
2035 if (r == -1) {
2036 check_err(c, "write()");
2037 return;
2039 if (r == 0) {
2040 c->state = STATE_CLOSE;
2041 return;
2044 c->reply_sent += r; /* we got some bytes */
2046 /* (c->reply_sent > c->reply_len) can't happen */
2048 if (c->reply_sent == c->reply_len) {
2049 conn_want_command(c);
2050 return;
2053 /* otherwise we sent an incomplete reply, so just keep waiting */
2054 break;
2055 case STATE_SEND_JOB:
2056 j = c->out_job;
2058 iov[0].iov_base = (void *)(c->reply + c->reply_sent);
2059 iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
2060 iov[1].iov_base = j->body + c->out_job_sent;
2061 iov[1].iov_len = j->r.body_size - c->out_job_sent;
2063 r = writev(c->sock.fd, iov, 2);
2064 if (r == -1) {
2065 check_err(c, "writev()");
2066 return;
2068 if (r == 0) {
2069 c->state = STATE_CLOSE;
2070 return;
2073 /* update the sent values */
2074 c->reply_sent += r;
2075 if (c->reply_sent >= c->reply_len) {
2076 c->out_job_sent += c->reply_sent - c->reply_len;
2077 c->reply_sent = c->reply_len;
2080 /* (c->out_job_sent > j->r.body_size) can't happen */
2082 /* are we done? */
2083 if (c->out_job_sent == j->r.body_size) {
2084 if (verbose >= 2) {
2085 printf(">%d job %"PRIu64"\n", c->sock.fd, j->r.id);
2087 conn_want_command(c);
2088 return;
2091 /* otherwise we sent incomplete data, so just keep waiting */
2092 break;
2093 case STATE_WAIT:
2094 if (c->halfclosed) {
2095 c->pending_timeout = -1;
2096 remove_waiting_conn(c);
2097 reply_msg(c, MSG_TIMED_OUT);
2098 return;
2100 break;
2104 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANT_COMMAND))
2105 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
2107 static void
2108 h_conn(const int fd, const short which, Conn *c)
2110 if (fd != c->sock.fd) {
2111 twarnx("Argh! event fd doesn't match conn fd.");
2112 close(fd);
2113 connclose(c);
2114 epollq_apply();
2115 return;
2118 if (which == 'h') {
2119 c->halfclosed = 1;
2122 conn_process_io(c);
2123 while (cmd_data_ready(c) && (c->cmd_len = scan_line_end(c->cmd, c->cmd_read))) {
2124 dispatch_cmd(c);
2125 fill_extra_data(c);
2127 if (c->state == STATE_CLOSE) {
2128 epollq_rmconn(c);
2129 connclose(c);
2131 epollq_apply();
2134 static void
2135 prothandle(Conn *c, int ev)
2137 h_conn(c->sock.fd, ev, c);
2140 // prottick returns nanoseconds till the next work.
2141 int64
2142 prottick(Server *s)
2144 Job *j;
2145 int64 now;
2146 Tube *t;
2147 int64 period = 0x34630B8A000LL; /* 1 hour in nanoseconds */
2148 int64 d;
2150 now = nanoseconds();
2152 // Enqueue all jobs that are no longer delayed.
2153 // Capture the smallest period from the soonest delayed job.
2154 while ((j = soonest_delayed_job())) {
2155 d = j->r.deadline_at - now;
2156 if (d > 0) {
2157 period = min(period, d);
2158 break;
2160 heapremove(&j->tube->delay, j->heap_index);
2161 int r = enqueue_job(s, j, 0, 0);
2162 if (r < 1)
2163 bury_job(s, j, 0); /* out of memory */
2166 // Unpause every possible tube and process the queue.
2167 // Capture the smallest period from the soonest pause deadline.
2168 size_t i;
2169 for (i = 0; i < tubes.len; i++) {
2170 t = tubes.items[i];
2171 d = t->unpause_at - now;
2172 if (t->pause && d <= 0) {
2173 t->pause = 0;
2174 process_queue();
2176 else if (d > 0) {
2177 period = min(period, d);
2181 // Process connections with pending timeouts. Release jobs with expired ttr.
2182 // Capture the smallest period from the soonest connection.
2183 while (s->conns.len) {
2184 Conn *c = s->conns.data[0];
2185 d = c->tickat - now;
2186 if (d > 0) {
2187 period = min(period, d);
2188 break;
2190 heapremove(&s->conns, 0);
2191 c->in_conns = 0;
2192 conn_timeout(c);
2195 epollq_apply();
2197 return period;
2200 void
2201 h_accept(const int fd, const short which, Server *s)
2203 UNUSED_PARAMETER(which);
2204 struct sockaddr_storage addr;
2206 socklen_t addrlen = sizeof addr;
2207 int cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
2208 if (cfd == -1) {
2209 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
2210 epollq_apply();
2211 return;
2213 if (verbose) {
2214 printf("accept %d\n", cfd);
2217 int flags = fcntl(cfd, F_GETFL, 0);
2218 if (flags < 0) {
2219 twarn("getting flags");
2220 close(cfd);
2221 if (verbose) {
2222 printf("close %d\n", cfd);
2224 epollq_apply();
2225 return;
2228 int r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
2229 if (r < 0) {
2230 twarn("setting O_NONBLOCK");
2231 close(cfd);
2232 if (verbose) {
2233 printf("close %d\n", cfd);
2235 epollq_apply();
2236 return;
2239 Conn *c = make_conn(cfd, STATE_WANT_COMMAND, default_tube, default_tube);
2240 if (!c) {
2241 twarnx("make_conn() failed");
2242 close(cfd);
2243 if (verbose) {
2244 printf("close %d\n", cfd);
2246 epollq_apply();
2247 return;
2249 c->srv = s;
2250 c->sock.x = c;
2251 c->sock.f = (Handle)prothandle;
2252 c->sock.fd = cfd;
2254 r = sockwant(&c->sock, 'r');
2255 if (r == -1) {
2256 twarn("sockwant");
2257 close(cfd);
2258 if (verbose) {
2259 printf("close %d\n", cfd);
2262 epollq_apply();
2265 void
2266 prot_init()
2268 started_at = nanoseconds();
2269 memset(op_ct, 0, sizeof(op_ct));
2271 int dev_random = open("/dev/urandom", O_RDONLY);
2272 if (dev_random < 0) {
2273 twarn("open /dev/urandom");
2274 exit(50);
2277 int i, r;
2278 byte rand_data[instance_id_bytes];
2279 r = read(dev_random, &rand_data, instance_id_bytes);
2280 if (r != instance_id_bytes) {
2281 twarn("read /dev/urandom");
2282 exit(50);
2284 for (i = 0; i < instance_id_bytes; i++) {
2285 sprintf(instance_hex + (i * 2), "%02x", rand_data[i]);
2287 close(dev_random);
2289 if (uname(&node_info) == -1) {
2290 warn("uname");
2291 exit(50);
2294 ms_init(&tubes, NULL, NULL);
2296 TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
2297 if (!default_tube)
2298 twarnx("Out of memory during startup!");
2301 // For each job in list, inserts the job into the appropriate data
2302 // structures and adds it to the log.
2304 // Returns 1 on success, 0 on failure.
2306 prot_replay(Server *s, Job *list)
2308 Job *j, *nj;
2309 int64 t;
2310 int r;
2312 for (j = list->next ; j != list ; j = nj) {
2313 nj = j->next;
2314 job_list_remove(j);
2315 int z = walresvupdate(&s->wal);
2316 if (!z) {
2317 twarnx("failed to reserve space");
2318 return 0;
2320 int64 delay = 0;
2321 switch (j->r.state) {
2322 case Buried: {
2323 bury_job(s, j, 0);
2324 break;
2326 case Delayed:
2327 t = nanoseconds();
2328 if (t < j->r.deadline_at) {
2329 delay = j->r.deadline_at - t;
2331 /* Falls through */
2332 default:
2333 r = enqueue_job(s, j, delay, 0);
2334 if (r < 1)
2335 twarnx("error recovering job %"PRIu64, j->r.id);
2338 return 1;