1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include <sys/resource.h>
39 /* job body cannot be greater than this many bytes long */
40 #define JOB_DATA_SIZE_LIMIT ((1 << 16) - 1)
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
47 #define CMD_PUT "put "
48 #define CMD_PEEK "peek"
49 #define CMD_PEEKJOB "peek "
50 #define CMD_RESERVE "reserve"
51 #define CMD_DELETE "delete "
52 #define CMD_RELEASE "release "
53 #define CMD_BURY "bury "
54 #define CMD_KICK "kick "
55 #define CMD_STATS "stats"
56 #define CMD_JOBSTATS "stats-job "
57 #define CMD_USE "use "
58 #define CMD_WATCH "watch "
59 #define CMD_IGNORE "ignore "
60 #define CMD_LIST_TUBES "list-tubes"
61 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
62 #define CMD_STATS_TUBE "stats-tube "
64 #define CONSTSTRLEN(m) (sizeof(m) - 1)
66 #define CMD_PEEK_LEN CONSTSTRLEN(CMD_PEEK)
67 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
68 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
69 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
70 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
71 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
72 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
73 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
74 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
75 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
76 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
77 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
78 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
79 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
80 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
82 #define MSG_FOUND "FOUND"
83 #define MSG_NOTFOUND "NOT_FOUND\r\n"
84 #define MSG_RESERVED "RESERVED"
85 #define MSG_DELETED "DELETED\r\n"
86 #define MSG_RELEASED "RELEASED\r\n"
87 #define MSG_BURIED "BURIED\r\n"
88 #define MSG_BURIED_FMT "BURIED %llu\r\n"
89 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
90 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
92 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
93 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
94 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
95 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
96 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
98 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
99 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
100 #define MSG_DRAINING "DRAINING\r\n"
101 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
102 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
103 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
104 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
106 #define STATS_FMT "---\n" \
107 "current-jobs-urgent: %u\n" \
108 "current-jobs-ready: %u\n" \
109 "current-jobs-reserved: %u\n" \
110 "current-jobs-delayed: %u\n" \
111 "current-jobs-buried: %u\n" \
114 "cmd-reserve: %llu\n" \
115 "cmd-delete: %llu\n" \
116 "cmd-release: %llu\n" \
119 "cmd-stats: %llu\n" \
120 "cmd-stats-job: %llu\n" \
121 "cmd-stats-tube: %llu\n" \
122 "cmd-list-tubes: %llu\n" \
123 "cmd-list-tubes-watched: %llu\n" \
124 "job-timeouts: %llu\n" \
125 "total-jobs: %llu\n" \
126 "current-tubes: %u\n" \
127 "current-connections: %u\n" \
128 "current-producers: %u\n" \
129 "current-workers: %u\n" \
130 "current-waiting: %u\n" \
131 "total-connections: %u\n" \
134 "rusage-utime: %d.%06d\n" \
135 "rusage-stime: %d.%06d\n" \
139 #define STATS_TUBE_FMT "---\n" \
141 "current-jobs-urgent: %u\n" \
142 "current-jobs-ready: %u\n" \
143 "current-jobs-reserved: %u\n" \
144 "current-jobs-buried: %u\n" \
145 "total-jobs: %llu\n" \
146 "current-waiting: %u\n" \
149 #define JOB_STATS_FMT "---\n" \
164 static struct pq delay_q
;
166 static unsigned int ready_ct
= 0;
167 static struct stats global_stat
= {0, 0, 0, 0, 0};
169 static tube default_tube
;
170 static struct ms tubes
;
172 static int drain_mode
= 0;
173 static time_t start_time
;
174 static unsigned long long int put_ct
= 0, peek_ct
= 0, reserve_ct
= 0,
175 delete_ct
= 0, release_ct
= 0, bury_ct
= 0, kick_ct
= 0,
176 stats_job_ct
= 0, stats_ct
= 0, timeout_ct
= 0,
177 list_tubes_ct
= 0, stats_tube_ct
= 0,
178 list_watched_tubes_ct
= 0;
181 /* Doubly-linked list of connections with at least one reserved job. */
182 static struct conn running
= { &running
, &running
, 0 };
185 static const char * op_names
[] = {
201 CMD_LIST_TUBES_WATCHED
,
209 return job_list_any_p(&t
->buried
);
213 reply(conn c
, const char *line
, int len
, int state
)
217 r
= conn_update_evq(c
, EV_WRITE
| EV_PERSIST
);
218 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
224 dprintf("sending reply: %.*s", len
, line
);
227 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
229 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
233 reply_line(conn c
, int state
, const char *fmt
, ...)
239 r
= vsnprintf(c
->reply_buf
, LINE_BUF_SIZE
, fmt
, ap
);
242 /* Make sure the buffer was big enough. If not, we have a bug. */
243 if (r
>= LINE_BUF_SIZE
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
245 return reply(c
, c
->reply_buf
, r
, state
);
249 reply_job(conn c
, job j
, const char *word
)
251 /* tell this connection which job to send */
255 return reply_line(c
, STATE_SENDJOB
, "%s %llu %u\r\n",
256 word
, j
->id
, j
->body_size
- 2);
260 remove_waiting_conn(conn c
)
265 if (!(c
->type
& CONN_TYPE_WAITING
)) return NULL
;
266 c
->type
&= ~CONN_TYPE_WAITING
;
267 global_stat
.waiting_ct
--;
268 for (i
= 0; i
< c
->watch
.used
; i
++) {
269 t
= c
->watch
.items
[i
];
270 t
->stat
.waiting_ct
--;
271 ms_remove(&t
->waiting
, c
);
277 reserve_job(conn c
, job j
)
279 j
->deadline
= time(NULL
) + j
->ttr
;
280 global_stat
.reserved_ct
++; /* stats */
281 j
->tube
->stat
.reserved_ct
++;
282 conn_insert(&running
, c
);
283 j
->state
= JOB_STATE_RESERVED
;
284 job_insert(&c
->reserved_jobs
, j
);
285 return reply_job(c
, j
, MSG_RESERVED
);
293 job j
= NULL
, candidate
;
295 dprintf("tubes.used = %d\n", tubes
.used
);
296 for (i
= 0; i
< tubes
.used
; i
++) {
298 dprintf("for %s t->waiting.used=%d t->ready.used=%d\n",
299 t
->name
, t
->waiting
.used
, t
->ready
.used
);
300 if (t
->waiting
.used
&& t
->ready
.used
) {
301 candidate
= pq_peek(&t
->ready
);
302 if (!j
|| candidate
->id
< j
->id
) j
= candidate
;
304 dprintf("i = %d, tubes.used = %d\n", i
, tubes
.used
);
315 dprintf("processing queue\n");
316 while ((j
= next_eligible_job())) {
317 dprintf("got eligible job %llu in %s\n", j
->id
, j
->tube
->name
);
318 j
= pq_take(&j
->tube
->ready
);
320 if (j
->pri
< URGENT_THRESHOLD
) {
321 global_stat
.urgent_ct
--;
322 j
->tube
->stat
.urgent_ct
--;
324 reserve_job(remove_waiting_conn(ms_take(&j
->tube
->waiting
)), j
);
329 enqueue_job(job j
, unsigned int delay
)
334 j
->deadline
= time(NULL
) + delay
;
335 r
= pq_give(&delay_q
, j
);
337 j
->state
= JOB_STATE_DELAYED
;
338 set_main_timeout(pq_peek(&delay_q
)->deadline
);
340 r
= pq_give(&j
->tube
->ready
, j
);
342 j
->state
= JOB_STATE_READY
;
344 if (j
->pri
< URGENT_THRESHOLD
) {
345 global_stat
.urgent_ct
++;
346 j
->tube
->stat
.urgent_ct
++;
356 job_insert(&j
->tube
->buried
, j
);
357 global_stat
.buried_ct
++;
358 j
->tube
->stat
.buried_ct
++;
359 j
->state
= JOB_STATE_BURIED
;
364 enqueue_reserved_jobs(conn c
)
369 while (job_list_any_p(&c
->reserved_jobs
)) {
370 j
= job_remove(c
->reserved_jobs
.next
);
371 r
= enqueue_job(j
, 0);
373 global_stat
.reserved_ct
--;
374 j
->tube
->stat
.reserved_ct
--;
375 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
382 return pq_peek(&delay_q
);
388 return pq_take(&delay_q
);
392 remove_this_buried_job(job j
)
396 global_stat
.buried_ct
--;
397 j
->tube
->stat
.buried_ct
--;
403 kick_buried_job(tube t
)
408 if (!buried_job_p(t
)) return 0;
409 j
= remove_this_buried_job(t
->buried
.next
);
411 r
= enqueue_job(j
, 0);
414 /* ready queue is full, so bury it */
422 return pq_used(&delay_q
);
431 if (get_delayed_job_ct() < 1) return 0;
434 r
= enqueue_job(j
, 0);
437 /* ready queue is full, so delay it again */
438 r
= enqueue_job(j
, j
->delay
);
446 /* return the number of jobs successfully kicked */
448 kick_buried_jobs(tube t
, unsigned int n
)
451 for (i
= 0; (i
< n
) && kick_buried_job(t
); ++i
);
455 /* return the number of jobs successfully kicked */
457 kick_delayed_jobs(unsigned int n
)
460 for (i
= 0; (i
< n
) && kick_delayed_job(); ++i
);
465 kick_jobs(tube t
, unsigned int n
)
467 if (buried_job_p(t
)) return kick_buried_jobs(t
, n
);
468 return kick_delayed_jobs(n
);
477 for (i
= 0; i
< tubes
.used
; i
++) {
479 if (buried_job_p(t
)) return t
->buried
.next
;
485 find_buried_job_in_tube(tube t
, unsigned long long int id
)
489 for (j
= t
->buried
.next
; j
!= &t
->buried
; j
= j
->next
) {
490 if (j
->id
== id
) return j
;
496 find_buried_job(unsigned long long int id
)
501 for (i
= 0; i
< tubes
.used
; i
++) {
502 j
= find_buried_job_in_tube(tubes
.items
[i
], id
);
509 remove_buried_job(unsigned long long int id
)
511 return remove_this_buried_job(find_buried_job(id
));
515 enqueue_waiting_conn(conn c
)
520 global_stat
.waiting_ct
++;
521 c
->type
|= CONN_TYPE_WAITING
;
522 for (i
= 0; i
< c
->watch
.used
; i
++) {
523 t
= c
->watch
.items
[i
];
524 t
->stat
.waiting_ct
++;
525 ms_append(&t
->waiting
, c
);
530 find_reserved_job_in_conn(conn c
, unsigned long long int id
)
534 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
535 if (j
->id
== id
) return j
;
541 find_reserved_job_in_list(conn list
, unsigned long long int id
)
546 for (c
= list
->next
; c
!= list
; c
= c
->next
) {
547 j
= find_reserved_job_in_conn(c
, id
);
554 find_reserved_job(unsigned long long int id
)
556 return find_reserved_job_in_list(&running
, id
);
560 peek_ready_job(unsigned long long int id
)
566 for (i
= 0; i
< tubes
.used
; i
++) {
567 j
= pq_find(&((tube
) tubes
.items
[i
])->ready
, id
);
573 /* TODO: make a global hashtable of jobs because this is slow */
575 peek_job(unsigned long long int id
)
577 return peek_ready_job(id
) ? :
578 pq_find(&delay_q
, id
) ? :
579 find_reserved_job(id
) ? :
584 check_err(conn c
, const char *s
)
586 if (errno
== EAGAIN
) return;
587 if (errno
== EINTR
) return;
588 if (errno
== EWOULDBLOCK
) return;
595 /* Scan the given string for the sequence "\r\n" and return the line length.
596 * Always returns at least 2 if a match is found. Returns 0 if no match. */
598 scan_line_end(const char *s
, int size
)
602 match
= memchr(s
, '\r', size
- 1);
603 if (!match
) return 0;
605 /* this is safe because we only scan size - 1 chars above */
606 if (match
[1] == '\n') return match
- s
+ 2;
614 return scan_line_end(c
->cmd
, c
->cmd_read
);
617 /* parse the command line */
621 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
622 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
623 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
624 TEST_CMD(c
->cmd
, CMD_PEEK
, OP_PEEK
);
625 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
626 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
627 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
628 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
629 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
630 TEST_CMD(c
->cmd
, CMD_JOBSTATS
, OP_JOBSTATS
);
631 TEST_CMD(c
->cmd
, CMD_STATS_TUBE
, OP_STATS_TUBE
);
632 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
633 TEST_CMD(c
->cmd
, CMD_USE
, OP_USE
);
634 TEST_CMD(c
->cmd
, CMD_WATCH
, OP_WATCH
);
635 TEST_CMD(c
->cmd
, CMD_IGNORE
, OP_IGNORE
);
636 TEST_CMD(c
->cmd
, CMD_LIST_TUBES_WATCHED
, OP_LIST_TUBES_WATCHED
);
637 TEST_CMD(c
->cmd
, CMD_LIST_TUBES
, OP_LIST_TUBES
);
641 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
642 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
643 * This function is idempotent(). */
645 fill_extra_data(conn c
)
647 int extra_bytes
, job_data_bytes
= 0, cmd_bytes
;
649 if (!c
->fd
) return; /* the connection was closed */
650 if (!c
->cmd_len
) return; /* we don't have a complete command */
652 /* how many extra bytes did we read? */
653 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
655 /* how many bytes should we put into the job body? */
657 job_data_bytes
= min(extra_bytes
, c
->in_job
->body_size
);
658 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
659 c
->in_job_read
= job_data_bytes
;
662 /* how many bytes are left to go into the future cmd? */
663 cmd_bytes
= extra_bytes
- job_data_bytes
;
664 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
665 c
->cmd_read
= cmd_bytes
;
666 c
->cmd_len
= 0; /* we no longer know the length of the new command */
670 enqueue_incoming_job(conn c
)
675 c
->in_job
= NULL
; /* the connection no longer owns this job */
678 /* check if the trailer is present and correct */
679 if (memcmp(j
->body
+ j
->body_size
- 2, "\r\n", 2)) {
681 return reply_msg(c
, MSG_EXPECTED_CRLF
);
684 /* we have a complete job, so let's stick it in the pqueue */
685 r
= enqueue_job(j
, j
->delay
);
686 put_ct
++; /* stats */
687 global_stat
.total_jobs_ct
++;
688 j
->tube
->stat
.total_jobs_ct
++;
690 if (r
) return reply_line(c
, STATE_SENDWORD
, MSG_INSERTED_FMT
, j
->id
);
692 /* out of memory trying to grow the queue, so it gets buried */
694 reply_line(c
, STATE_SENDWORD
, MSG_BURIED_FMT
, j
->id
);
700 return time(NULL
) - start_time
;
704 fmt_stats(char *buf
, size_t size
, void *x
)
706 struct rusage ru
= {{0, 0}, {0, 0}};
707 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
708 return snprintf(buf
, size
, STATS_FMT
,
709 global_stat
.urgent_ct
,
711 global_stat
.reserved_ct
,
712 get_delayed_job_ct(),
713 global_stat
.buried_ct
,
725 list_watched_tubes_ct
,
727 global_stat
.total_jobs_ct
,
730 count_cur_producers(),
732 global_stat
.waiting_ct
,
736 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
737 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
742 /* Read a priority value from the given buffer and place it in pri.
743 * Update end to point to the address after the last character consumed.
744 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
745 * conversion and return the status code but not update any values. This is an
746 * easy way to check for errors.
747 * If end is NULL, read_pri will also check that the entire input string was
748 * consumed and return an error code otherwise.
749 * Return 0 on success, or nonzero on failure.
750 * If a failure occurs, pri and end are not modified. */
752 read_pri(unsigned int *pri
, const char *buf
, char **end
)
758 tpri
= strtoul(buf
, &tend
, 10);
759 if (tend
== buf
) return -1;
760 if (errno
&& errno
!= ERANGE
) return -1;
761 if (!end
&& tend
[0] != '\0') return -1;
763 if (pri
) *pri
= tpri
;
764 if (end
) *end
= tend
;
768 /* Read a delay value from the given buffer and place it in delay.
769 * The interface and behavior are the same as in read_pri(). */
771 read_delay(unsigned int *delay
, const char *buf
, char **end
)
773 return read_pri(delay
, buf
, end
);
776 /* Read a timeout value from the given buffer and place it in ttr.
777 * The interface and behavior are the same as in read_pri(). */
779 read_ttr(unsigned int *ttr
, const char *buf
, char **end
)
781 return read_pri(ttr
, buf
, end
);
789 /* this conn is waiting, but we want to know if they hang up */
790 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
791 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
793 c
->state
= STATE_WAIT
;
794 enqueue_waiting_conn(c
);
797 typedef int(*fmt_fn
)(char *, size_t, void *);
800 do_stats(conn c
, fmt_fn fmt
, void *data
)
804 /* first, measure how big a buffer we will need */
805 stats_len
= fmt(NULL
, 0, data
);
807 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
808 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
810 /* now actually format the stats data */
811 r
= fmt(c
->out_job
->body
, stats_len
, data
);
812 if (r
!= stats_len
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
813 c
->out_job
->body
[stats_len
- 1] = '\n'; /* patch up sprintf's output */
816 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", stats_len
- 2);
820 do_list_tubes(conn c
, ms l
)
826 /* first, measure how big a buffer we will need */
827 resp_z
= 6; /* initial "---\n" and final "\r\n" */
828 for (i
= 0; i
< l
->used
; i
++) {
830 resp_z
+= 3 + strlen(t
->name
); /* including "- " and "\n" */
833 c
->out_job
= allocate_job(resp_z
); /* fake job to hold response data */
834 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
836 /* now actually format the response */
837 buf
= c
->out_job
->body
;
838 buf
+= snprintf(buf
, 5, "---\n");
839 for (i
= 0; i
< l
->used
; i
++) {
841 buf
+= snprintf(buf
, 4 + strlen(t
->name
), "- %s\n", t
->name
);
847 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", resp_z
- 2);
851 fmt_job_stats(char *buf
, size_t size
, job j
)
856 return snprintf(buf
, size
, JOB_STATS_FMT
,
861 (unsigned int) (t
- j
->creation
),
864 (unsigned int) (j
->deadline
- t
),
872 fmt_stats_tube(char *buf
, size_t size
, tube t
)
874 return snprintf(buf
, size
, STATS_TUBE_FMT
,
880 t
->stat
.total_jobs_ct
,
885 maybe_enqueue_incoming_job(conn c
)
889 /* do we have a complete job? */
890 if (c
->in_job_read
== j
->body_size
) return enqueue_incoming_job(c
);
892 /* otherwise we have incomplete data, so just keep waiting */
893 c
->state
= STATE_WANTDATA
;
898 remove_this_reserved_job(conn c
, job j
)
902 global_stat
.reserved_ct
--;
903 j
->tube
->stat
.reserved_ct
--;
905 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
910 remove_reserved_job(conn c
, unsigned long long int id
)
912 return remove_this_reserved_job(c
, find_reserved_job_in_conn(c
, id
));
916 name_is_ok(const char *name
, size_t max
)
918 size_t len
= strlen(name
);
919 return len
> 0 && len
<= max
&&
920 strspn(name
, NAME_CHARS
) == len
&& name
[0] != '-';
924 find_tube(const char *name
)
929 for (i
= 0; i
< tubes
.used
; i
++) {
931 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) return t
;
937 prot_remove_tube(tube t
)
939 ms_remove(&tubes
, t
);
943 make_and_insert_tube(const char *name
)
951 /* We want this global tube list to behave like "weak" refs, so don't
952 * increment the ref count. */
953 r
= ms_append(&tubes
, t
);
954 if (!r
) return tube_dref(t
), NULL
;
960 find_or_make_tube(const char *name
)
962 return find_tube(name
) ? : make_and_insert_tube(name
);
972 char *size_buf
, *delay_buf
, *ttr_buf
, *pri_buf
, *end_buf
, *name
;
973 unsigned int pri
, delay
, ttr
, body_size
;
974 unsigned long long int id
;
977 /* NUL-terminate this string so we can use strtol and friends */
978 c
->cmd
[c
->cmd_len
- 2] = '\0';
980 /* check for possible maliciousness */
981 if (strlen(c
->cmd
) != c
->cmd_len
- 2) {
982 return reply_msg(c
, MSG_BAD_FORMAT
);
986 dprintf("got %s command: \"%s\"\n", op_names
[(int) type
], c
->cmd
);
990 if (drain_mode
) return reply_serr(c
, MSG_DRAINING
);
992 r
= read_pri(&pri
, c
->cmd
+ 4, &delay_buf
);
993 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
995 r
= read_delay(&delay
, delay_buf
, &ttr_buf
);
996 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
998 r
= read_ttr(&ttr
, ttr_buf
, &size_buf
);
999 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1002 body_size
= strtoul(size_buf
, &end_buf
, 10);
1003 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1005 if (body_size
> JOB_DATA_SIZE_LIMIT
) {
1006 return reply_msg(c
, MSG_JOB_TOO_BIG
);
1009 /* don't allow trailing garbage */
1010 if (end_buf
[0] != '\0') return reply_msg(c
, MSG_BAD_FORMAT
);
1012 conn_set_producer(c
);
1014 c
->in_job
= make_job(pri
, delay
, ttr
? : 1, body_size
+ 2, c
->use
);
1018 /* it's possible we already have a complete job */
1019 maybe_enqueue_incoming_job(c
);
1023 /* don't allow trailing garbage */
1024 if (c
->cmd_len
!= CMD_PEEK_LEN
+ 2) {
1025 return reply_msg(c
, MSG_BAD_FORMAT
);
1028 j
= job_copy(peek_buried_job() ? : delay_q_peek());
1030 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1032 peek_ct
++; /* stats */
1033 reply_job(c
, j
, MSG_FOUND
);
1037 id
= strtoull(c
->cmd
+ CMD_PEEKJOB_LEN
, &end_buf
, 10);
1038 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1040 /* So, peek is annoying, because some other connection might free the
1041 * job while we are still trying to write it out. So we copy it and
1042 * then free the copy when it's done sending. */
1043 j
= job_copy(peek_job(id
));
1045 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1047 peek_ct
++; /* stats */
1048 reply_job(c
, j
, MSG_FOUND
);
1051 /* don't allow trailing garbage */
1052 if (c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) {
1053 return reply_msg(c
, MSG_BAD_FORMAT
);
1056 reserve_ct
++; /* stats */
1059 /* try to get a new job for this guy */
1065 id
= strtoull(c
->cmd
+ CMD_DELETE_LEN
, &end_buf
, 10);
1066 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1068 j
= remove_reserved_job(c
, id
) ? : remove_buried_job(id
);
1070 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1072 delete_ct
++; /* stats */
1075 reply(c
, MSG_DELETED
, MSG_DELETED_LEN
, STATE_SENDWORD
);
1079 id
= strtoull(c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
, 10);
1080 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1082 r
= read_pri(&pri
, pri_buf
, &delay_buf
);
1083 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1085 r
= read_delay(&delay
, delay_buf
, NULL
);
1086 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1088 j
= remove_reserved_job(c
, id
);
1090 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1095 release_ct
++; /* stats */
1096 r
= enqueue_job(j
, delay
);
1097 if (r
) return reply(c
, MSG_RELEASED
, MSG_RELEASED_LEN
, STATE_SENDWORD
);
1099 /* out of memory trying to grow the queue, so it gets buried */
1101 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1105 id
= strtoull(c
->cmd
+ CMD_BURY_LEN
, &pri_buf
, 10);
1106 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1108 r
= read_pri(&pri
, pri_buf
, NULL
);
1109 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1111 j
= remove_reserved_job(c
, id
);
1113 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1116 bury_ct
++; /* stats */
1118 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1122 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &name
, 10);
1123 if (name
++ == c
->cmd
+ CMD_KICK_LEN
) {
1124 return reply_msg(c
, MSG_BAD_FORMAT
);
1126 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1128 kick_ct
++; /* stats */
1130 t
= find_tube(name
);
1131 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1133 i
= kick_jobs(t
, count
);
1136 return reply_line(c
, STATE_SENDWORD
, "KICKED %u\r\n", i
);
1138 /* don't allow trailing garbage */
1139 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) {
1140 return reply_msg(c
, MSG_BAD_FORMAT
);
1143 stats_ct
++; /* stats */
1145 do_stats(c
, fmt_stats
, NULL
);
1149 id
= strtoull(c
->cmd
+ CMD_JOBSTATS_LEN
, &end_buf
, 10);
1150 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1153 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1155 stats_job_ct
++; /* stats */
1157 if (!j
->tube
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1158 do_stats(c
, (fmt_fn
) fmt_job_stats
, j
);
1161 name
= c
->cmd
+ CMD_STATS_TUBE_LEN
;
1162 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1164 t
= find_tube(name
);
1165 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1167 stats_tube_ct
++; /* stats */
1169 do_stats(c
, (fmt_fn
) fmt_stats_tube
, t
);
1173 /* don't allow trailing garbage */
1174 if (c
->cmd_len
!= CMD_LIST_TUBES_LEN
+ 2) {
1175 return reply_msg(c
, MSG_BAD_FORMAT
);
1179 do_list_tubes(c
, &tubes
);
1181 case OP_LIST_TUBES_WATCHED
:
1182 /* don't allow trailing garbage */
1183 if (c
->cmd_len
!= CMD_LIST_TUBES_WATCHED_LEN
+ 2) {
1184 return reply_msg(c
, MSG_BAD_FORMAT
);
1187 list_watched_tubes_ct
++;
1188 do_list_tubes(c
, &c
->watch
);
1191 name
= c
->cmd
+ CMD_USE_LEN
;
1192 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1194 TUBE_ASSIGN(t
, find_or_make_tube(name
));
1195 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1197 TUBE_ASSIGN(c
->use
, t
);
1198 TUBE_ASSIGN(t
, NULL
);
1200 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1203 name
= c
->cmd
+ CMD_WATCH_LEN
;
1204 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1206 TUBE_ASSIGN(t
, find_or_make_tube(name
));
1207 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1210 if (!ms_contains(&c
->watch
, t
)) r
= ms_append(&c
->watch
, t
);
1211 TUBE_ASSIGN(t
, NULL
);
1212 if (!r
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1214 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1217 name
= c
->cmd
+ CMD_IGNORE_LEN
;
1218 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1221 for (i
= 0; i
< c
->watch
.used
; i
++) {
1222 t
= c
->watch
.items
[i
];
1223 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) break;
1227 if (t
&& c
->watch
.used
< 2) return reply_msg(c
, MSG_NOT_IGNORED
);
1229 if (t
) ms_remove(&c
->watch
, t
); /* may free t if refcount => 0 */
1232 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1235 return reply_msg(c
, MSG_UNKNOWN_COMMAND
);
1239 /* if we get a timeout, it means that a job has been reserved for too long, so
1240 * we should put it back in the queue */
1242 h_conn_timeout(conn c
)
1247 while ((j
= soonest_job(c
))) {
1248 if (j
->deadline
> time(NULL
)) return;
1249 timeout_ct
++; /* stats */
1251 r
= enqueue_job(remove_this_reserved_job(c
, j
), 0);
1252 if (!r
) bury_job(j
); /* there was no room in the queue, so bury it */
1253 r
= conn_update_evq(c
, c
->evq
.ev_events
);
1254 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
1259 enter_drain_mode(int sig
)
1276 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
1277 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
1279 /* was this a peek or stats command? */
1280 if (!has_reserved_this_job(c
, c
->out_job
)) job_free(c
->out_job
);
1283 c
->reply_sent
= 0; /* now that we're done, reset this */
1284 c
->state
= STATE_WANTCOMMAND
;
1292 struct iovec iov
[2];
1295 case STATE_WANTCOMMAND
:
1296 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1297 if (r
== -1) return check_err(c
, "read()");
1298 if (r
== 0) return conn_close(c
); /* the client hung up */
1300 c
->cmd_read
+= r
; /* we got some bytes */
1302 c
->cmd_len
= cmd_len(c
); /* find the EOL */
1303 dprintf("cmd_len is %d\n", c
->cmd_len
);
1305 /* yay, complete command line */
1306 if (c
->cmd_len
) return do_cmd(c
);
1308 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1310 dprintf("cmd_read is %d\n", c
->cmd_read
);
1311 /* command line too long? */
1312 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1313 return reply_msg(c
, MSG_BAD_FORMAT
);
1316 /* otherwise we have an incomplete line, so just keep waiting */
1318 case STATE_WANTDATA
:
1321 r
= read(c
->fd
, j
->body
+ c
->in_job_read
, j
->body_size
-c
->in_job_read
);
1322 if (r
== -1) return check_err(c
, "read()");
1323 if (r
== 0) return conn_close(c
); /* the client hung up */
1325 c
->in_job_read
+= r
; /* we got some bytes */
1327 /* (j->in_job_read > j->body_size) can't happen */
1329 maybe_enqueue_incoming_job(c
);
1331 case STATE_SENDWORD
:
1332 r
= write(c
->fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
1333 if (r
== -1) return check_err(c
, "write()");
1334 if (r
== 0) return conn_close(c
); /* the client hung up */
1336 c
->reply_sent
+= r
; /* we got some bytes */
1338 /* (c->reply_sent > c->reply_len) can't happen */
1340 if (c
->reply_sent
== c
->reply_len
) return reset_conn(c
);
1342 /* otherwise we sent an incomplete reply, so just keep waiting */
1347 iov
[0].iov_base
= (void *)(c
->reply
+ c
->reply_sent
);
1348 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
1349 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
1350 iov
[1].iov_len
= j
->body_size
- c
->out_job_sent
;
1352 r
= writev(c
->fd
, iov
, 2);
1353 if (r
== -1) return check_err(c
, "writev()");
1354 if (r
== 0) return conn_close(c
); /* the client hung up */
1356 /* update the sent values */
1358 if (c
->reply_sent
>= c
->reply_len
) {
1359 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
1360 c
->reply_sent
= c
->reply_len
;
1363 /* (c->out_job_sent > j->body_size) can't happen */
1366 if (c
->out_job_sent
== j
->body_size
) return reset_conn(c
);
1368 /* otherwise we sent incomplete data, so just keep waiting */
1370 case STATE_WAIT
: /* keep an eye out in case they hang up */
1371 /* but don't hang up just because our buffer is full */
1372 if (LINE_BUF_SIZE
- c
->cmd_read
< 1) break;
1374 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1375 if (r
== -1) return check_err(c
, "read()");
1376 if (r
== 0) return conn_close(c
); /* the client hung up */
1377 c
->cmd_read
+= r
; /* we got some bytes */
1381 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1382 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1385 h_conn(const int fd
, const short which
, conn c
)
1388 twarnx("Argh! event fd doesn't match conn fd.");
1390 return conn_close(c
);
1396 event_add(&c
->evq
, NULL
); /* seems to be necessary */
1399 /* fall through... */
1401 /* fall through... */
1406 while (cmd_data_ready(c
) && (c
->cmd_len
= cmd_len(c
))) do_cmd(c
);
1417 while ((j
= delay_q_peek())) {
1418 if (j
->deadline
> t
) break;
1420 r
= enqueue_job(j
, 0);
1421 if (!r
) bury_job(j
); /* there was no room in the queue, so bury it */
1424 set_main_timeout((j
= delay_q_peek()) ? j
->deadline
: 0);
1428 h_accept(const int fd
, const short which
, struct event
*ev
)
1433 struct sockaddr addr
;
1435 if (which
== EV_TIMEOUT
) return h_delay();
1437 addrlen
= sizeof addr
;
1438 cfd
= accept(fd
, &addr
, &addrlen
);
1440 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
1441 if (errno
== EMFILE
) brake();
1445 flags
= fcntl(cfd
, F_GETFL
, 0);
1446 if (flags
< 0) return twarn("getting flags"), close(cfd
), v();
1448 r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
1449 if (r
< 0) return twarn("setting O_NONBLOCK"), close(cfd
), v();
1451 c
= make_conn(cfd
, STATE_WANTCOMMAND
, default_tube
, default_tube
);
1452 if (!c
) return twarnx("make_conn() failed"), close(cfd
), brake();
1454 dprintf("accepted conn, fd=%d\n", cfd
);
1455 r
= conn_set_evq(c
, EV_READ
| EV_PERSIST
, (evh
) h_conn
);
1456 if (r
== -1) return twarnx("conn_set_evq() failed"), close(cfd
), brake();
1462 start_time
= time(NULL
);
1463 pq_init(&delay_q
, job_delay_cmp
);
1465 ms_init(&tubes
, NULL
, NULL
);
1467 TUBE_ASSIGN(default_tube
, find_or_make_tube("default"));
1468 if (!default_tube
) twarnx("Out of memory during startup!");