8 #include <sys/resource.h>
10 #include <sys/types.h>
11 #include <sys/utsname.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
18 /* job body cannot be greater than this many bytes long */
19 size_t job_data_size_limit
= JOB_DATA_SIZE_LIMIT_DEFAULT
;
22 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
23 "abcdefghijklmnopqrstuvwxyz" \
26 #define CMD_PUT "put "
27 #define CMD_PEEKJOB "peek "
28 #define CMD_PEEK_READY "peek-ready"
29 #define CMD_PEEK_DELAYED "peek-delayed"
30 #define CMD_PEEK_BURIED "peek-buried"
31 #define CMD_RESERVE "reserve"
32 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
33 #define CMD_DELETE "delete "
34 #define CMD_RELEASE "release "
35 #define CMD_BURY "bury "
36 #define CMD_KICK "kick "
37 #define CMD_JOBKICK "kick-job "
38 #define CMD_TOUCH "touch "
39 #define CMD_STATS "stats"
40 #define CMD_JOBSTATS "stats-job "
41 #define CMD_USE "use "
42 #define CMD_WATCH "watch "
43 #define CMD_IGNORE "ignore "
44 #define CMD_LIST_TUBES "list-tubes"
45 #define CMD_LIST_TUBE_USED "list-tube-used"
46 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
47 #define CMD_STATS_TUBE "stats-tube "
48 #define CMD_QUIT "quit"
49 #define CMD_PAUSE_TUBE "pause-tube"
51 #define CONSTSTRLEN(m) (sizeof(m) - 1)
53 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
54 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
55 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
56 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
57 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
58 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
59 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
60 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
61 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
62 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
63 #define CMD_JOBKICK_LEN CONSTSTRLEN(CMD_JOBKICK)
64 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
65 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
66 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
67 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
68 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
69 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
70 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
71 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
72 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
73 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
74 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
76 #define MSG_FOUND "FOUND"
77 #define MSG_NOTFOUND "NOT_FOUND\r\n"
78 #define MSG_RESERVED "RESERVED"
79 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
80 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
81 #define MSG_DELETED "DELETED\r\n"
82 #define MSG_RELEASED "RELEASED\r\n"
83 #define MSG_BURIED "BURIED\r\n"
84 #define MSG_KICKED "KICKED\r\n"
85 #define MSG_TOUCHED "TOUCHED\r\n"
86 #define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
87 #define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
88 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
90 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
91 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
92 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
93 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
94 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
95 #define MSG_KICKED_LEN CONSTSTRLEN(MSG_KICKED)
96 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
98 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
99 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
100 #define MSG_DRAINING "DRAINING\r\n"
101 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
102 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
103 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
104 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
106 #define STATE_WANTCOMMAND 0
107 #define STATE_WANTDATA 1
108 #define STATE_SENDJOB 2
109 #define STATE_SENDWORD 3
111 #define STATE_BITBUCKET 5
112 #define STATE_CLOSE 6
123 #define OP_JOBSTATS 9
124 #define OP_PEEK_BURIED 10
128 #define OP_LIST_TUBES 14
129 #define OP_LIST_TUBE_USED 15
130 #define OP_LIST_TUBES_WATCHED 16
131 #define OP_STATS_TUBE 17
132 #define OP_PEEK_READY 18
133 #define OP_PEEK_DELAYED 19
134 #define OP_RESERVE_TIMEOUT 20
137 #define OP_PAUSE_TUBE 23
138 #define OP_JOBKICK 24
141 #define STATS_FMT "---\n" \
142 "current-jobs-urgent: %u\n" \
143 "current-jobs-ready: %u\n" \
144 "current-jobs-reserved: %u\n" \
145 "current-jobs-delayed: %u\n" \
146 "current-jobs-buried: %u\n" \
147 "cmd-put: %" PRIu64 "\n" \
148 "cmd-peek: %" PRIu64 "\n" \
149 "cmd-peek-ready: %" PRIu64 "\n" \
150 "cmd-peek-delayed: %" PRIu64 "\n" \
151 "cmd-peek-buried: %" PRIu64 "\n" \
152 "cmd-reserve: %" PRIu64 "\n" \
153 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
154 "cmd-delete: %" PRIu64 "\n" \
155 "cmd-release: %" PRIu64 "\n" \
156 "cmd-use: %" PRIu64 "\n" \
157 "cmd-watch: %" PRIu64 "\n" \
158 "cmd-ignore: %" PRIu64 "\n" \
159 "cmd-bury: %" PRIu64 "\n" \
160 "cmd-kick: %" PRIu64 "\n" \
161 "cmd-touch: %" PRIu64 "\n" \
162 "cmd-stats: %" PRIu64 "\n" \
163 "cmd-stats-job: %" PRIu64 "\n" \
164 "cmd-stats-tube: %" PRIu64 "\n" \
165 "cmd-list-tubes: %" PRIu64 "\n" \
166 "cmd-list-tube-used: %" PRIu64 "\n" \
167 "cmd-list-tubes-watched: %" PRIu64 "\n" \
168 "cmd-pause-tube: %" PRIu64 "\n" \
169 "job-timeouts: %" PRIu64 "\n" \
170 "total-jobs: %" PRIu64 "\n" \
171 "max-job-size: %zu\n" \
172 "current-tubes: %zu\n" \
173 "current-connections: %u\n" \
174 "current-producers: %u\n" \
175 "current-workers: %u\n" \
176 "current-waiting: %u\n" \
177 "total-connections: %u\n" \
179 "version: \"%s\"\n" \
180 "rusage-utime: %d.%06d\n" \
181 "rusage-stime: %d.%06d\n" \
183 "binlog-oldest-index: %d\n" \
184 "binlog-current-index: %d\n" \
185 "binlog-records-migrated: %" PRId64 "\n" \
186 "binlog-records-written: %" PRId64 "\n" \
187 "binlog-max-size: %d\n" \
192 #define STATS_TUBE_FMT "---\n" \
194 "current-jobs-urgent: %u\n" \
195 "current-jobs-ready: %u\n" \
196 "current-jobs-reserved: %u\n" \
197 "current-jobs-delayed: %u\n" \
198 "current-jobs-buried: %u\n" \
199 "total-jobs: %" PRIu64 "\n" \
200 "current-using: %u\n" \
201 "current-watching: %u\n" \
202 "current-waiting: %u\n" \
203 "cmd-delete: %" PRIu64 "\n" \
204 "cmd-pause-tube: %u\n" \
205 "pause: %" PRIu64 "\n" \
206 "pause-time-left: %" PRId64 "\n" \
209 #define STATS_JOB_FMT "---\n" \
210 "id: %" PRIu64 "\n" \
214 "age: %" PRId64 "\n" \
215 "delay: %" PRId64 "\n" \
216 "ttr: %" PRId64 "\n" \
217 "time-left: %" PRId64 "\n" \
226 /* this number is pretty arbitrary */
227 #define BUCKET_BUF_SIZE 1024
229 static char bucket
[BUCKET_BUF_SIZE
];
231 static uint ready_ct
= 0;
232 static struct stats global_stat
= {0, 0, 0, 0, 0};
234 static tube default_tube
;
236 static int drain_mode
= 0;
237 static int64 started_at
;
243 static char id
[NumIdBytes
* 2 + 1]; // hex-encoded len of NumIdBytes
245 static struct utsname node_info
;
246 static uint64 op_ct
[TOTAL_OPS
], timeout_ct
= 0;
250 static const char * op_names
[] = {
267 CMD_LIST_TUBES_WATCHED
,
278 static job
remove_buried_job(job j
);
283 return job_list_any_p(&t
->buried
);
287 reply(Conn
*c
, char *line
, int len
, int state
)
299 printf(">%d reply %.*s\n", c
->sock
.fd
, len
-2, line
);
307 Conn
*x
, *newdirty
= NULL
;
323 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
325 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
329 reply_line(Conn
*, int, const char*, ...)
330 __attribute__((format(printf
, 3, 4)));
333 reply_line(Conn
*c
, int state
, const char *fmt
, ...)
339 r
= vsnprintf(c
->reply_buf
, LINE_BUF_SIZE
, fmt
, ap
);
342 /* Make sure the buffer was big enough. If not, we have a bug. */
343 if (r
>= LINE_BUF_SIZE
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
345 return reply(c
, c
->reply_buf
, r
, state
);
349 reply_job(Conn
*c
, job j
, const char *word
)
351 /* tell this connection which job to send */
355 return reply_line(c
, STATE_SENDJOB
, "%s %"PRIu64
" %u\r\n",
356 word
, j
->r
.id
, j
->r
.body_size
- 2);
360 remove_waiting_conn(Conn
*c
)
365 if (!conn_waiting(c
)) return NULL
;
367 c
->type
&= ~CONN_TYPE_WAITING
;
368 global_stat
.waiting_ct
--;
369 for (i
= 0; i
< c
->watch
.used
; i
++) {
370 t
= c
->watch
.items
[i
];
371 t
->stat
.waiting_ct
--;
372 ms_remove(&t
->waiting
, c
);
378 reserve_job(Conn
*c
, job j
)
380 j
->r
.deadline_at
= nanoseconds() + j
->r
.ttr
;
381 global_stat
.reserved_ct
++; /* stats */
382 j
->tube
->stat
.reserved_ct
++;
384 j
->r
.state
= Reserved
;
385 job_insert(&c
->reserved_jobs
, j
);
387 c
->pending_timeout
= -1;
388 if (c
->soonest_job
&& j
->r
.deadline_at
< c
->soonest_job
->r
.deadline_at
) {
391 return reply_job(c
, j
, MSG_RESERVED
);
395 next_eligible_job(int64 now
)
399 job j
= NULL
, candidate
;
401 for (i
= 0; i
< tubes
.used
; i
++) {
404 if (t
->deadline_at
> now
) continue;
407 if (t
->waiting
.used
&& t
->ready
.len
) {
408 candidate
= t
->ready
.data
[0];
409 if (!j
|| job_pri_less(candidate
, j
)) {
422 int64 now
= nanoseconds();
424 while ((j
= next_eligible_job(now
))) {
425 heapremove(&j
->tube
->ready
, j
->heap_index
);
427 if (j
->r
.pri
< URGENT_THRESHOLD
) {
428 global_stat
.urgent_ct
--;
429 j
->tube
->stat
.urgent_ct
--;
431 reserve_job(remove_waiting_conn(ms_take(&j
->tube
->waiting
)), j
);
442 for (i
= 0; i
< tubes
.used
; i
++) {
444 if (t
->delay
.len
== 0) {
447 nj
= t
->delay
.data
[0];
448 if (!j
|| nj
->r
.deadline_at
< j
->r
.deadline_at
) j
= nj
;
455 enqueue_job(Server
*s
, job j
, int64 delay
, char update_store
)
461 j
->r
.deadline_at
= nanoseconds() + delay
;
462 r
= heapinsert(&j
->tube
->delay
, j
);
464 j
->r
.state
= Delayed
;
466 r
= heapinsert(&j
->tube
->ready
, j
);
470 if (j
->r
.pri
< URGENT_THRESHOLD
) {
471 global_stat
.urgent_ct
++;
472 j
->tube
->stat
.urgent_ct
++;
477 if (!walwrite(&s
->wal
, j
)) {
488 bury_job(Server
*s
, job j
, char update_store
)
493 z
= walresvupdate(&s
->wal
, j
);
498 job_insert(&j
->tube
->buried
, j
);
499 global_stat
.buried_ct
++;
500 j
->tube
->stat
.buried_ct
++;
506 if (!walwrite(&s
->wal
, j
)) {
516 enqueue_reserved_jobs(Conn
*c
)
521 while (job_list_any_p(&c
->reserved_jobs
)) {
522 j
= job_remove(c
->reserved_jobs
.next
);
523 r
= enqueue_job(c
->srv
, j
, 0, 0);
524 if (r
< 1) bury_job(c
->srv
, j
, 0);
525 global_stat
.reserved_ct
--;
526 j
->tube
->stat
.reserved_ct
--;
527 c
->soonest_job
= NULL
;
534 job j
= delay_q_peek();
538 heapremove(&j
->tube
->delay
, j
->heap_index
);
543 kick_buried_job(Server
*s
, job j
)
548 z
= walresvupdate(&s
->wal
, j
);
552 remove_buried_job(j
);
555 r
= enqueue_job(s
, j
, 0, 1);
556 if (r
== 1) return 1;
558 /* ready queue is full, so bury it */
570 for (i
= 0; i
< tubes
.used
; i
++) {
572 count
+= t
->delay
.len
;
578 kick_delayed_job(Server
*s
, job j
)
583 z
= walresvupdate(&s
->wal
, j
);
587 heapremove(&j
->tube
->delay
, j
->heap_index
);
590 r
= enqueue_job(s
, j
, 0, 1);
591 if (r
== 1) return 1;
593 /* ready queue is full, so delay it again */
594 r
= enqueue_job(s
, j
, j
->r
.delay
, 0);
595 if (r
== 1) return 0;
602 /* return the number of jobs successfully kicked */
604 kick_buried_jobs(Server
*s
, tube t
, uint n
)
607 for (i
= 0; (i
< n
) && buried_job_p(t
); ++i
) {
608 kick_buried_job(s
, t
->buried
.next
);
613 /* return the number of jobs successfully kicked */
615 kick_delayed_jobs(Server
*s
, tube t
, uint n
)
618 for (i
= 0; (i
< n
) && (t
->delay
.len
> 0); ++i
) {
619 kick_delayed_job(s
, (job
)t
->delay
.data
[0]);
625 kick_jobs(Server
*s
, tube t
, uint n
)
627 if (buried_job_p(t
)) return kick_buried_jobs(s
, t
, n
);
628 return kick_delayed_jobs(s
, t
, n
);
632 remove_buried_job(job j
)
634 if (!j
|| j
->r
.state
!= Buried
) return NULL
;
637 global_stat
.buried_ct
--;
638 j
->tube
->stat
.buried_ct
--;
644 remove_delayed_job(job j
)
646 if (!j
|| j
->r
.state
!= Delayed
) return NULL
;
647 heapremove(&j
->tube
->delay
, j
->heap_index
);
653 remove_ready_job(job j
)
655 if (!j
|| j
->r
.state
!= Ready
) return NULL
;
656 heapremove(&j
->tube
->ready
, j
->heap_index
);
658 if (j
->r
.pri
< URGENT_THRESHOLD
) {
659 global_stat
.urgent_ct
--;
660 j
->tube
->stat
.urgent_ct
--;
666 enqueue_waiting_conn(Conn
*c
)
671 global_stat
.waiting_ct
++;
672 c
->type
|= CONN_TYPE_WAITING
;
673 for (i
= 0; i
< c
->watch
.used
; i
++) {
674 t
= c
->watch
.items
[i
];
675 t
->stat
.waiting_ct
++;
676 ms_append(&t
->waiting
, c
);
681 find_reserved_job_in_conn(Conn
*c
, job j
)
683 return (j
&& j
->reserver
== c
&& j
->r
.state
== Reserved
) ? j
: NULL
;
687 touch_job(Conn
*c
, job j
)
689 j
= find_reserved_job_in_conn(c
, j
);
691 j
->r
.deadline_at
= nanoseconds() + j
->r
.ttr
;
692 c
->soonest_job
= NULL
;
704 check_err(Conn
*c
, const char *s
)
706 if (errno
== EAGAIN
) return;
707 if (errno
== EINTR
) return;
708 if (errno
== EWOULDBLOCK
) return;
711 c
->state
= STATE_CLOSE
;
715 /* Scan the given string for the sequence "\r\n" and return the line length.
716 * Always returns at least 2 if a match is found. Returns 0 if no match. */
718 scan_line_end(const char *s
, int size
)
722 match
= memchr(s
, '\r', size
- 1);
723 if (!match
) return 0;
725 /* this is safe because we only scan size - 1 chars above */
726 if (match
[1] == '\n') return match
- s
+ 2;
734 return scan_line_end(c
->cmd
, c
->cmd_read
);
737 /* parse the command line */
741 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
742 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
743 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
744 TEST_CMD(c
->cmd
, CMD_PEEK_READY
, OP_PEEK_READY
);
745 TEST_CMD(c
->cmd
, CMD_PEEK_DELAYED
, OP_PEEK_DELAYED
);
746 TEST_CMD(c
->cmd
, CMD_PEEK_BURIED
, OP_PEEK_BURIED
);
747 TEST_CMD(c
->cmd
, CMD_RESERVE_TIMEOUT
, OP_RESERVE_TIMEOUT
);
748 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
749 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
750 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
751 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
752 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
753 TEST_CMD(c
->cmd
, CMD_JOBKICK
, OP_JOBKICK
);
754 TEST_CMD(c
->cmd
, CMD_TOUCH
, OP_TOUCH
);
755 TEST_CMD(c
->cmd
, CMD_JOBSTATS
, OP_JOBSTATS
);
756 TEST_CMD(c
->cmd
, CMD_STATS_TUBE
, OP_STATS_TUBE
);
757 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
758 TEST_CMD(c
->cmd
, CMD_USE
, OP_USE
);
759 TEST_CMD(c
->cmd
, CMD_WATCH
, OP_WATCH
);
760 TEST_CMD(c
->cmd
, CMD_IGNORE
, OP_IGNORE
);
761 TEST_CMD(c
->cmd
, CMD_LIST_TUBES_WATCHED
, OP_LIST_TUBES_WATCHED
);
762 TEST_CMD(c
->cmd
, CMD_LIST_TUBE_USED
, OP_LIST_TUBE_USED
);
763 TEST_CMD(c
->cmd
, CMD_LIST_TUBES
, OP_LIST_TUBES
);
764 TEST_CMD(c
->cmd
, CMD_QUIT
, OP_QUIT
);
765 TEST_CMD(c
->cmd
, CMD_PAUSE_TUBE
, OP_PAUSE_TUBE
);
769 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
770 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
771 * This function is idempotent(). */
773 fill_extra_data(Conn
*c
)
775 int extra_bytes
, job_data_bytes
= 0, cmd_bytes
;
777 if (!c
->sock
.fd
) return; /* the connection was closed */
778 if (!c
->cmd_len
) return; /* we don't have a complete command */
780 /* how many extra bytes did we read? */
781 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
783 /* how many bytes should we put into the job body? */
785 job_data_bytes
= min(extra_bytes
, c
->in_job
->r
.body_size
);
786 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
787 c
->in_job_read
= job_data_bytes
;
788 } else if (c
->in_job_read
) {
789 /* we are in bit-bucket mode, throwing away data */
790 job_data_bytes
= min(extra_bytes
, c
->in_job_read
);
791 c
->in_job_read
-= job_data_bytes
;
794 /* how many bytes are left to go into the future cmd? */
795 cmd_bytes
= extra_bytes
- job_data_bytes
;
796 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
797 c
->cmd_read
= cmd_bytes
;
798 c
->cmd_len
= 0; /* we no longer know the length of the new command */
802 _skip(Conn
*c
, int n
, char *line
, int len
)
804 /* Invert the meaning of in_job_read while throwing away data -- it
805 * counts the bytes that remain to be thrown away. */
810 if (c
->in_job_read
== 0) return reply(c
, line
, len
, STATE_SENDWORD
);
815 c
->state
= STATE_BITBUCKET
;
819 #define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
822 enqueue_incoming_job(Conn
*c
)
827 c
->in_job
= NULL
; /* the connection no longer owns this job */
830 /* check if the trailer is present and correct */
831 if (memcmp(j
->body
+ j
->r
.body_size
- 2, "\r\n", 2)) {
833 return reply_msg(c
, MSG_EXPECTED_CRLF
);
837 printf("<%d job %"PRIu64
"\n", c
->sock
.fd
, j
->r
.id
);
842 return reply_serr(c
, MSG_DRAINING
);
845 if (j
->walresv
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
846 j
->walresv
= walresvput(&c
->srv
->wal
, j
);
847 if (!j
->walresv
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
849 /* we have a complete job, so let's stick it in the pqueue */
850 r
= enqueue_job(c
->srv
, j
, j
->r
.delay
, 1);
851 if (r
< 0) return reply_serr(c
, MSG_INTERNAL_ERROR
);
853 global_stat
.total_jobs_ct
++;
854 j
->tube
->stat
.total_jobs_ct
++;
856 if (r
== 1) return reply_line(c
, STATE_SENDWORD
, MSG_INSERTED_FMT
, j
->r
.id
);
858 /* out of memory trying to grow the queue, so it gets buried */
859 bury_job(c
->srv
, j
, 0);
860 reply_line(c
, STATE_SENDWORD
, MSG_BURIED_FMT
, j
->r
.id
);
866 return (nanoseconds() - started_at
) / 1000000000;
870 fmt_stats(char *buf
, size_t size
, void *x
)
872 int whead
= 0, wcur
= 0;
874 struct rusage ru
= {{0, 0}, {0, 0}};
879 whead
= srv
->wal
.head
->seq
;
883 wcur
= srv
->wal
.cur
->seq
;
886 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
887 return snprintf(buf
, size
, STATS_FMT
,
888 global_stat
.urgent_ct
,
890 global_stat
.reserved_ct
,
891 get_delayed_job_ct(),
892 global_stat
.buried_ct
,
895 op_ct
[OP_PEEK_READY
],
896 op_ct
[OP_PEEK_DELAYED
],
897 op_ct
[OP_PEEK_BURIED
],
899 op_ct
[OP_RESERVE_TIMEOUT
],
910 op_ct
[OP_STATS_TUBE
],
911 op_ct
[OP_LIST_TUBES
],
912 op_ct
[OP_LIST_TUBE_USED
],
913 op_ct
[OP_LIST_TUBES_WATCHED
],
914 op_ct
[OP_PAUSE_TUBE
],
916 global_stat
.total_jobs_ct
,
920 count_cur_producers(),
922 global_stat
.waiting_ct
,
926 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
927 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
939 /* Read a priority value from the given buffer and place it in pri.
940 * Update end to point to the address after the last character consumed.
941 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
942 * conversion and return the status code but not update any values. This is an
943 * easy way to check for errors.
944 * If end is NULL, read_pri will also check that the entire input string was
945 * consumed and return an error code otherwise.
946 * Return 0 on success, or nonzero on failure.
947 * If a failure occurs, pri and end are not modified. */
949 read_pri(uint
*pri
, const char *buf
, char **end
)
955 while (buf
[0] == ' ') buf
++;
956 if (buf
[0] < '0' || '9' < buf
[0]) return -1;
957 tpri
= strtoul(buf
, &tend
, 10);
958 if (tend
== buf
) return -1;
959 if (errno
&& errno
!= ERANGE
) return -1;
960 if (!end
&& tend
[0] != '\0') return -1;
962 if (pri
) *pri
= tpri
;
963 if (end
) *end
= tend
;
967 /* Read a delay value from the given buffer and place it in delay.
968 * The interface and behavior are analogous to read_pri(). */
970 read_delay(int64
*delay
, const char *buf
, char **end
)
975 r
= read_pri(&delay_sec
, buf
, end
);
977 *delay
= ((int64
) delay_sec
) * 1000000000;
981 /* Read a timeout value from the given buffer and place it in ttr.
982 * The interface and behavior are the same as in read_delay(). */
984 read_ttr(int64
*ttr
, const char *buf
, char **end
)
986 return read_delay(ttr
, buf
, end
);
989 /* Read a tube name from the given buffer moving the buffer to the name start */
991 read_tube_name(char **tubename
, char *buf
, char **end
)
995 while (buf
[0] == ' ') buf
++;
996 len
= strspn(buf
, NAME_CHARS
);
997 if (len
== 0) return -1;
998 if (tubename
) *tubename
= buf
;
999 if (end
) *end
= buf
+ len
;
1004 wait_for_job(Conn
*c
, int timeout
)
1006 c
->state
= STATE_WAIT
;
1007 enqueue_waiting_conn(c
);
1009 /* Set the pending timeout to the requested timeout amount */
1010 c
->pending_timeout
= timeout
;
1012 connwant(c
, 'h'); // only care if they hang up
1017 typedef int(*fmt_fn
)(char *, size_t, void *);
1020 do_stats(Conn
*c
, fmt_fn fmt
, void *data
)
1024 /* first, measure how big a buffer we will need */
1025 stats_len
= fmt(NULL
, 0, data
) + 16;
1027 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
1028 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1030 /* Mark this job as a copy so it can be appropriately freed later on */
1031 c
->out_job
->r
.state
= Copy
;
1033 /* now actually format the stats data */
1034 r
= fmt(c
->out_job
->body
, stats_len
, data
);
1035 /* and set the actual body size */
1036 c
->out_job
->r
.body_size
= r
;
1037 if (r
> stats_len
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1039 c
->out_job_sent
= 0;
1040 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", r
- 2);
1044 do_list_tubes(Conn
*c
, ms l
)
1050 /* first, measure how big a buffer we will need */
1051 resp_z
= 6; /* initial "---\n" and final "\r\n" */
1052 for (i
= 0; i
< l
->used
; i
++) {
1054 resp_z
+= 3 + strlen(t
->name
); /* including "- " and "\n" */
1057 c
->out_job
= allocate_job(resp_z
); /* fake job to hold response data */
1058 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1060 /* Mark this job as a copy so it can be appropriately freed later on */
1061 c
->out_job
->r
.state
= Copy
;
1063 /* now actually format the response */
1064 buf
= c
->out_job
->body
;
1065 buf
+= snprintf(buf
, 5, "---\n");
1066 for (i
= 0; i
< l
->used
; i
++) {
1068 buf
+= snprintf(buf
, 4 + strlen(t
->name
), "- %s\n", t
->name
);
1073 c
->out_job_sent
= 0;
1074 return reply_line(c
, STATE_SENDJOB
, "OK %zu\r\n", resp_z
- 2);
1078 fmt_job_stats(char *buf
, size_t size
, job j
)
1085 if (j
->r
.state
== Reserved
|| j
->r
.state
== Delayed
) {
1086 time_left
= (j
->r
.deadline_at
- t
) / 1000000000;
1091 file
= j
->file
->seq
;
1093 return snprintf(buf
, size
, STATS_JOB_FMT
,
1098 (t
- j
->r
.created_at
) / 1000000000,
1099 j
->r
.delay
/ 1000000000,
1100 j
->r
.ttr
/ 1000000000,
1111 fmt_stats_tube(char *buf
, size_t size
, tube t
)
1116 time_left
= (t
->deadline_at
- nanoseconds()) / 1000000000;
1120 return snprintf(buf
, size
, STATS_TUBE_FMT
,
1124 t
->stat
.reserved_ct
,
1127 t
->stat
.total_jobs_ct
,
1131 t
->stat
.total_delete_ct
,
1133 t
->pause
/ 1000000000,
1138 maybe_enqueue_incoming_job(Conn
*c
)
1142 /* do we have a complete job? */
1143 if (c
->in_job_read
== j
->r
.body_size
) return enqueue_incoming_job(c
);
1145 /* otherwise we have incomplete data, so just keep waiting */
1146 c
->state
= STATE_WANTDATA
;
1151 remove_this_reserved_job(Conn
*c
, job j
)
1155 global_stat
.reserved_ct
--;
1156 j
->tube
->stat
.reserved_ct
--;
1159 c
->soonest_job
= NULL
;
1164 remove_reserved_job(Conn
*c
, job j
)
1166 return remove_this_reserved_job(c
, find_reserved_job_in_conn(c
, j
));
1170 name_is_ok(const char *name
, size_t max
)
1172 size_t len
= strlen(name
);
1173 return len
> 0 && len
<= max
&&
1174 strspn(name
, NAME_CHARS
) == len
&& name
[0] != '-';
1178 prot_remove_tube(tube t
)
1180 ms_remove(&tubes
, t
);
1184 dispatch_cmd(Conn
*c
)
1186 int r
, i
, timeout
= -1;
1191 char *size_buf
, *delay_buf
, *ttr_buf
, *pri_buf
, *end_buf
, *name
;
1192 uint pri
, body_size
;
1197 /* NUL-terminate this string so we can use strtol and friends */
1198 c
->cmd
[c
->cmd_len
- 2] = '\0';
1200 /* check for possible maliciousness */
1201 if (strlen(c
->cmd
) != c
->cmd_len
- 2) {
1202 return reply_msg(c
, MSG_BAD_FORMAT
);
1205 type
= which_cmd(c
);
1207 printf("<%d command %s\n", c
->sock
.fd
, op_names
[type
]);
1212 r
= read_pri(&pri
, c
->cmd
+ 4, &delay_buf
);
1213 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1215 r
= read_delay(&delay
, delay_buf
, &ttr_buf
);
1216 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1218 r
= read_ttr(&ttr
, ttr_buf
, &size_buf
);
1219 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1222 body_size
= strtoul(size_buf
, &end_buf
, 10);
1223 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1227 if (body_size
> job_data_size_limit
) {
1228 /* throw away the job body and respond with JOB_TOO_BIG */
1229 return skip(c
, body_size
+ 2, MSG_JOB_TOO_BIG
);
1232 /* don't allow trailing garbage */
1233 if (end_buf
[0] != '\0') return reply_msg(c
, MSG_BAD_FORMAT
);
1237 if (ttr
< 1000000000) {
1241 c
->in_job
= make_job(pri
, delay
, ttr
, body_size
+ 2, c
->use
);
1245 /* throw away the job body and respond with OUT_OF_MEMORY */
1246 twarnx("server error: " MSG_OUT_OF_MEMORY
);
1247 return skip(c
, body_size
+ 2, MSG_OUT_OF_MEMORY
);
1252 /* it's possible we already have a complete job */
1253 maybe_enqueue_incoming_job(c
);
1257 /* don't allow trailing garbage */
1258 if (c
->cmd_len
!= CMD_PEEK_READY_LEN
+ 2) {
1259 return reply_msg(c
, MSG_BAD_FORMAT
);
1263 if (c
->use
->ready
.len
) {
1264 j
= job_copy(c
->use
->ready
.data
[0]);
1267 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1269 reply_job(c
, j
, MSG_FOUND
);
1271 case OP_PEEK_DELAYED
:
1272 /* don't allow trailing garbage */
1273 if (c
->cmd_len
!= CMD_PEEK_DELAYED_LEN
+ 2) {
1274 return reply_msg(c
, MSG_BAD_FORMAT
);
1278 if (c
->use
->delay
.len
) {
1279 j
= job_copy(c
->use
->delay
.data
[0]);
1282 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1284 reply_job(c
, j
, MSG_FOUND
);
1286 case OP_PEEK_BURIED
:
1287 /* don't allow trailing garbage */
1288 if (c
->cmd_len
!= CMD_PEEK_BURIED_LEN
+ 2) {
1289 return reply_msg(c
, MSG_BAD_FORMAT
);
1293 j
= job_copy(buried_job_p(c
->use
)? j
= c
->use
->buried
.next
: NULL
);
1295 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1297 reply_job(c
, j
, MSG_FOUND
);
1301 id
= strtoull(c
->cmd
+ CMD_PEEKJOB_LEN
, &end_buf
, 10);
1302 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1305 /* So, peek is annoying, because some other connection might free the
1306 * job while we are still trying to write it out. So we copy it and
1307 * then free the copy when it's done sending. */
1308 j
= job_copy(peek_job(id
));
1310 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1312 reply_job(c
, j
, MSG_FOUND
);
1314 case OP_RESERVE_TIMEOUT
:
1316 timeout
= strtol(c
->cmd
+ CMD_RESERVE_TIMEOUT_LEN
, &end_buf
, 10);
1317 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1318 case OP_RESERVE
: /* FALLTHROUGH */
1319 /* don't allow trailing garbage */
1320 if (type
== OP_RESERVE
&& c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) {
1321 return reply_msg(c
, MSG_BAD_FORMAT
);
1327 if (conndeadlinesoon(c
) && !conn_ready(c
)) {
1328 return reply_msg(c
, MSG_DEADLINE_SOON
);
1331 /* try to get a new job for this guy */
1332 wait_for_job(c
, timeout
);
1337 id
= strtoull(c
->cmd
+ CMD_DELETE_LEN
, &end_buf
, 10);
1338 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1342 j
= remove_reserved_job(c
, j
) ? :
1343 remove_ready_job(j
) ? :
1344 remove_buried_job(j
) ? :
1345 remove_delayed_job(j
);
1347 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1349 j
->tube
->stat
.total_delete_ct
++;
1351 j
->r
.state
= Invalid
;
1352 r
= walwrite(&c
->srv
->wal
, j
);
1353 walmaint(&c
->srv
->wal
);
1356 if (!r
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1358 reply(c
, MSG_DELETED
, MSG_DELETED_LEN
, STATE_SENDWORD
);
1362 id
= strtoull(c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
, 10);
1363 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1365 r
= read_pri(&pri
, pri_buf
, &delay_buf
);
1366 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1368 r
= read_delay(&delay
, delay_buf
, NULL
);
1369 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1372 j
= remove_reserved_job(c
, job_find(id
));
1374 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1376 /* We want to update the delay deadline on disk, so reserve space for
1379 z
= walresvupdate(&c
->srv
->wal
, j
);
1380 if (!z
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1388 r
= enqueue_job(c
->srv
, j
, delay
, !!delay
);
1389 if (r
< 0) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1391 return reply(c
, MSG_RELEASED
, MSG_RELEASED_LEN
, STATE_SENDWORD
);
1394 /* out of memory trying to grow the queue, so it gets buried */
1395 bury_job(c
->srv
, j
, 0);
1396 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1400 id
= strtoull(c
->cmd
+ CMD_BURY_LEN
, &pri_buf
, 10);
1401 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1403 r
= read_pri(&pri
, pri_buf
, NULL
);
1404 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1407 j
= remove_reserved_job(c
, job_find(id
));
1409 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1412 r
= bury_job(c
->srv
, j
, 1);
1413 if (!r
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1414 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1418 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &end_buf
, 10);
1419 if (end_buf
== c
->cmd
+ CMD_KICK_LEN
) {
1420 return reply_msg(c
, MSG_BAD_FORMAT
);
1422 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1426 i
= kick_jobs(c
->srv
, c
->use
, count
);
1428 return reply_line(c
, STATE_SENDWORD
, "KICKED %u\r\n", i
);
1431 id
= strtoull(c
->cmd
+ CMD_JOBKICK_LEN
, &end_buf
, 10);
1432 if (errno
) return twarn("strtoull"), reply_msg(c
, MSG_BAD_FORMAT
);
1437 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1439 if ((j
->r
.state
== Buried
&& kick_buried_job(c
->srv
, j
)) ||
1440 (j
->r
.state
== Delayed
&& kick_delayed_job(c
->srv
, j
))) {
1441 reply(c
, MSG_KICKED
, MSG_KICKED_LEN
, STATE_SENDWORD
);
1443 return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1448 id
= strtoull(c
->cmd
+ CMD_TOUCH_LEN
, &end_buf
, 10);
1449 if (errno
) return twarn("strtoull"), reply_msg(c
, MSG_BAD_FORMAT
);
1453 j
= touch_job(c
, job_find(id
));
1456 reply(c
, MSG_TOUCHED
, MSG_TOUCHED_LEN
, STATE_SENDWORD
);
1458 return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1462 /* don't allow trailing garbage */
1463 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) {
1464 return reply_msg(c
, MSG_BAD_FORMAT
);
1469 do_stats(c
, fmt_stats
, c
->srv
);
1473 id
= strtoull(c
->cmd
+ CMD_JOBSTATS_LEN
, &end_buf
, 10);
1474 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1479 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1481 if (!j
->tube
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1482 do_stats(c
, (fmt_fn
) fmt_job_stats
, j
);
1485 name
= c
->cmd
+ CMD_STATS_TUBE_LEN
;
1486 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1490 t
= tube_find(name
);
1491 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1493 do_stats(c
, (fmt_fn
) fmt_stats_tube
, t
);
1497 /* don't allow trailing garbage */
1498 if (c
->cmd_len
!= CMD_LIST_TUBES_LEN
+ 2) {
1499 return reply_msg(c
, MSG_BAD_FORMAT
);
1503 do_list_tubes(c
, &tubes
);
1505 case OP_LIST_TUBE_USED
:
1506 /* don't allow trailing garbage */
1507 if (c
->cmd_len
!= CMD_LIST_TUBE_USED_LEN
+ 2) {
1508 return reply_msg(c
, MSG_BAD_FORMAT
);
1512 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1514 case OP_LIST_TUBES_WATCHED
:
1515 /* don't allow trailing garbage */
1516 if (c
->cmd_len
!= CMD_LIST_TUBES_WATCHED_LEN
+ 2) {
1517 return reply_msg(c
, MSG_BAD_FORMAT
);
1521 do_list_tubes(c
, &c
->watch
);
1524 name
= c
->cmd
+ CMD_USE_LEN
;
1525 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1528 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1529 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1532 TUBE_ASSIGN(c
->use
, t
);
1533 TUBE_ASSIGN(t
, NULL
);
1536 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1539 name
= c
->cmd
+ CMD_WATCH_LEN
;
1540 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1543 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1544 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1547 if (!ms_contains(&c
->watch
, t
)) r
= ms_append(&c
->watch
, t
);
1548 TUBE_ASSIGN(t
, NULL
);
1549 if (!r
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1551 reply_line(c
, STATE_SENDWORD
, "WATCHING %zu\r\n", c
->watch
.used
);
1554 name
= c
->cmd
+ CMD_IGNORE_LEN
;
1555 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1559 for (i
= 0; i
< c
->watch
.used
; i
++) {
1560 t
= c
->watch
.items
[i
];
1561 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) break;
1565 if (t
&& c
->watch
.used
< 2) return reply_msg(c
, MSG_NOT_IGNORED
);
1567 if (t
) ms_remove(&c
->watch
, t
); /* may free t if refcount => 0 */
1570 reply_line(c
, STATE_SENDWORD
, "WATCHING %zu\r\n", c
->watch
.used
);
1573 c
->state
= STATE_CLOSE
;
1578 r
= read_tube_name(&name
, c
->cmd
+ CMD_PAUSE_TUBE_LEN
, &delay_buf
);
1579 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1581 r
= read_delay(&delay
, delay_buf
, NULL
);
1582 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1585 t
= tube_find(name
);
1586 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1588 // Always pause for a positive amount of time, to make sure
1589 // that waiting clients wake up when the deadline arrives.
1594 t
->deadline_at
= nanoseconds() + delay
;
1598 reply_line(c
, STATE_SENDWORD
, "PAUSED\r\n");
1601 return reply_msg(c
, MSG_UNKNOWN_COMMAND
);
1605 /* There are three reasons this function may be called. We need to check for
1608 * 1. A reserved job has run out of time.
1609 * 2. A waiting client's reserved job has entered the safety margin.
1610 * 3. A waiting client's requested timeout has occurred.
1612 * If any of these happen, we must do the appropriate thing. */
1614 conn_timeout(Conn
*c
)
1616 int r
, should_timeout
= 0;
1619 /* Check if the client was trying to reserve a job. */
1620 if (conn_waiting(c
) && conndeadlinesoon(c
)) should_timeout
= 1;
1622 /* Check if any reserved jobs have run out of time. We should do this
1623 * whether or not the client is waiting for a new reservation. */
1624 while ((j
= connsoonestjob(c
))) {
1625 if (j
->r
.deadline_at
>= nanoseconds()) break;
1627 /* This job is in the middle of being written out. If we return it to
1628 * the ready queue, someone might free it before we finish writing it
1629 * out to the socket. So we'll copy it here and free the copy when it's
1631 if (j
== c
->out_job
) {
1632 c
->out_job
= job_copy(c
->out_job
);
1635 timeout_ct
++; /* stats */
1637 r
= enqueue_job(c
->srv
, remove_this_reserved_job(c
, j
), 0, 0);
1638 if (r
< 1) bury_job(c
->srv
, j
, 0); /* out of memory, so bury it */
1642 if (should_timeout
) {
1643 return reply_msg(remove_waiting_conn(c
), MSG_DEADLINE_SOON
);
1644 } else if (conn_waiting(c
) && c
->pending_timeout
>= 0) {
1645 c
->pending_timeout
= -1;
1646 return reply_msg(remove_waiting_conn(c
), MSG_TIMED_OUT
);
1651 enter_drain_mode(int sig
)
1670 /* was this a peek or stats command? */
1671 if (c
->out_job
&& c
->out_job
->r
.state
== Copy
) job_free(c
->out_job
);
1674 c
->reply_sent
= 0; /* now that we're done, reset this */
1675 c
->state
= STATE_WANTCOMMAND
;
1683 struct iovec iov
[2];
1686 case STATE_WANTCOMMAND
:
1687 r
= read(c
->sock
.fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1688 if (r
== -1) return check_err(c
, "read()");
1690 c
->state
= STATE_CLOSE
;
1694 c
->cmd_read
+= r
; /* we got some bytes */
1696 c
->cmd_len
= cmd_len(c
); /* find the EOL */
1698 /* yay, complete command line */
1699 if (c
->cmd_len
) return do_cmd(c
);
1701 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1703 /* command line too long? */
1704 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1705 c
->cmd_read
= 0; /* discard the input so far */
1706 return reply_msg(c
, MSG_BAD_FORMAT
);
1709 /* otherwise we have an incomplete line, so just keep waiting */
1711 case STATE_BITBUCKET
:
1712 /* Invert the meaning of in_job_read while throwing away data -- it
1713 * counts the bytes that remain to be thrown away. */
1714 to_read
= min(c
->in_job_read
, BUCKET_BUF_SIZE
);
1715 r
= read(c
->sock
.fd
, bucket
, to_read
);
1716 if (r
== -1) return check_err(c
, "read()");
1718 c
->state
= STATE_CLOSE
;
1722 c
->in_job_read
-= r
; /* we got some bytes */
1724 /* (c->in_job_read < 0) can't happen */
1726 if (c
->in_job_read
== 0) {
1727 return reply(c
, c
->reply
, c
->reply_len
, STATE_SENDWORD
);
1730 case STATE_WANTDATA
:
1733 r
= read(c
->sock
.fd
, j
->body
+ c
->in_job_read
, j
->r
.body_size
-c
->in_job_read
);
1734 if (r
== -1) return check_err(c
, "read()");
1736 c
->state
= STATE_CLOSE
;
1740 c
->in_job_read
+= r
; /* we got some bytes */
1742 /* (j->in_job_read > j->r.body_size) can't happen */
1744 maybe_enqueue_incoming_job(c
);
1746 case STATE_SENDWORD
:
1747 r
= write(c
->sock
.fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
1748 if (r
== -1) return check_err(c
, "write()");
1750 c
->state
= STATE_CLOSE
;
1754 c
->reply_sent
+= r
; /* we got some bytes */
1756 /* (c->reply_sent > c->reply_len) can't happen */
1758 if (c
->reply_sent
== c
->reply_len
) return reset_conn(c
);
1760 /* otherwise we sent an incomplete reply, so just keep waiting */
1765 iov
[0].iov_base
= (void *)(c
->reply
+ c
->reply_sent
);
1766 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
1767 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
1768 iov
[1].iov_len
= j
->r
.body_size
- c
->out_job_sent
;
1770 r
= writev(c
->sock
.fd
, iov
, 2);
1771 if (r
== -1) return check_err(c
, "writev()");
1773 c
->state
= STATE_CLOSE
;
1777 /* update the sent values */
1779 if (c
->reply_sent
>= c
->reply_len
) {
1780 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
1781 c
->reply_sent
= c
->reply_len
;
1784 /* (c->out_job_sent > j->r.body_size) can't happen */
1787 if (c
->out_job_sent
== j
->r
.body_size
) {
1789 printf(">%d job %"PRIu64
"\n", c
->sock
.fd
, j
->r
.id
);
1791 return reset_conn(c
);
1794 /* otherwise we sent incomplete data, so just keep waiting */
1797 if (c
->halfclosed
) {
1798 c
->pending_timeout
= -1;
1799 return reply_msg(remove_waiting_conn(c
), MSG_TIMED_OUT
);
1805 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
1806 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1816 dirty
= dirty
->next
;
1818 r
= sockwant(&c
->sock
, c
->rw
);
1827 h_conn(const int fd
, const short which
, Conn
*c
)
1829 if (fd
!= c
->sock
.fd
) {
1830 twarnx("Argh! event fd doesn't match conn fd.");
1842 while (cmd_data_ready(c
) && (c
->cmd_len
= cmd_len(c
))) do_cmd(c
);
1843 if (c
->state
== STATE_CLOSE
) {
1851 prothandle(Conn
*c
, int ev
)
1853 h_conn(c
->sock
.fd
, ev
, c
);
1864 int64 period
= 0x34630B8A000LL
; /* 1 hour in nanoseconds */
1867 now
= nanoseconds();
1868 while ((j
= delay_q_peek())) {
1869 d
= j
->r
.deadline_at
- now
;
1871 period
= min(period
, d
);
1875 r
= enqueue_job(s
, j
, 0, 0);
1876 if (r
< 1) bury_job(s
, j
, 0); /* out of memory, so bury it */
1879 for (i
= 0; i
< tubes
.used
; i
++) {
1881 d
= t
->deadline_at
- now
;
1882 if (t
->pause
&& d
<= 0) {
1887 period
= min(period
, d
);
1891 while (s
->conns
.len
) {
1892 Conn
*c
= s
->conns
.data
[0];
1893 d
= c
->tickat
- now
;
1895 period
= min(period
, d
);
1899 heapremove(&s
->conns
, 0);
1909 h_accept(const int fd
, const short which
, Server
*s
)
1914 struct sockaddr_in6 addr
;
1916 addrlen
= sizeof addr
;
1917 cfd
= accept(fd
, (struct sockaddr
*)&addr
, &addrlen
);
1919 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
1924 printf("accept %d\n", cfd
);
1927 flags
= fcntl(cfd
, F_GETFL
, 0);
1929 twarn("getting flags");
1932 printf("close %d\n", cfd
);
1938 r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
1940 twarn("setting O_NONBLOCK");
1943 printf("close %d\n", cfd
);
1949 c
= make_conn(cfd
, STATE_WANTCOMMAND
, default_tube
, default_tube
);
1951 twarnx("make_conn() failed");
1954 printf("close %d\n", cfd
);
1961 c
->sock
.f
= (Handle
)prothandle
;
1964 r
= sockwant(&c
->sock
, 'r');
1969 printf("close %d\n", cfd
);
1980 started_at
= nanoseconds();
1981 memset(op_ct
, 0, sizeof(op_ct
));
1983 int dev_random
= open("/dev/urandom", O_RDONLY
);
1984 if (dev_random
< 0) {
1985 twarn("open /dev/urandom");
1990 byte rand_data
[NumIdBytes
];
1991 r
= read(dev_random
, &rand_data
, NumIdBytes
);
1992 if (r
!= NumIdBytes
) {
1993 twarn("read /dev/urandom");
1996 for (i
= 0; i
< NumIdBytes
; i
++) {
1997 sprintf(id
+ (i
* 2), "%02x", rand_data
[i
]);
2001 if (uname(&node_info
) == -1) {
2006 ms_init(&tubes
, NULL
, NULL
);
2008 TUBE_ASSIGN(default_tube
, tube_find_or_make("default"));
2009 if (!default_tube
) twarnx("Out of memory during startup!");
2012 // For each job in list, inserts the job into the appropriate data
2013 // structures and adds it to the log.
2015 // Returns 1 on success, 0 on failure.
2017 prot_replay(Server
*s
, job list
)
2023 for (j
= list
->next
; j
!= list
; j
= nj
) {
2026 z
= walresvupdate(&s
->wal
, j
);
2028 twarnx("failed to reserve space");
2032 switch (j
->r
.state
) {
2038 if (t
< j
->r
.deadline_at
) {
2039 delay
= j
->r
.deadline_at
- t
;
2043 r
= enqueue_job(s
, j
, delay
, 0);
2044 if (r
< 1) twarnx("error recovering job %"PRIu64
, j
->r
.id
);