1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include <sys/resource.h>
39 /* job body cannot be greater than this many bytes long */
40 size_t job_data_size_limit
= ((1 << 16) - 1);
43 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
44 "abcdefghijklmnopqrstuvwxyz" \
47 #define CMD_PUT "put "
48 #define CMD_PEEKJOB "peek "
49 #define CMD_PEEK_READY "peek-ready"
50 #define CMD_PEEK_DELAYED "peek-delayed"
51 #define CMD_PEEK_BURIED "peek-buried"
52 #define CMD_RESERVE "reserve"
53 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
54 #define CMD_DELETE "delete "
55 #define CMD_RELEASE "release "
56 #define CMD_BURY "bury "
57 #define CMD_KICK "kick "
58 #define CMD_STATS "stats"
59 #define CMD_JOBSTATS "stats-job "
60 #define CMD_USE "use "
61 #define CMD_WATCH "watch "
62 #define CMD_IGNORE "ignore "
63 #define CMD_LIST_TUBES "list-tubes"
64 #define CMD_LIST_TUBE_USED "list-tube-used"
65 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
66 #define CMD_STATS_TUBE "stats-tube "
68 #define CONSTSTRLEN(m) (sizeof(m) - 1)
70 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
71 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
72 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
73 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
74 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
75 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
76 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
77 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
78 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
79 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
80 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
81 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
82 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
83 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
84 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
85 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
86 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
87 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
88 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
90 #define MSG_FOUND "FOUND"
91 #define MSG_NOTFOUND "NOT_FOUND\r\n"
92 #define MSG_RESERVED "RESERVED"
93 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
94 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
95 #define MSG_DELETED "DELETED\r\n"
96 #define MSG_RELEASED "RELEASED\r\n"
97 #define MSG_BURIED "BURIED\r\n"
98 #define MSG_BURIED_FMT "BURIED %llu\r\n"
99 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
100 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
102 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
103 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
104 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
105 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
106 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
108 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
109 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
110 #define MSG_DRAINING "DRAINING\r\n"
111 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
112 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
113 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
114 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
116 #define STATE_WANTCOMMAND 0
117 #define STATE_WANTDATA 1
118 #define STATE_SENDJOB 2
119 #define STATE_SENDWORD 3
121 #define STATE_BITBUCKET 5
132 #define OP_JOBSTATS 9
133 #define OP_PEEK_BURIED 10
137 #define OP_LIST_TUBES 14
138 #define OP_LIST_TUBE_USED 15
139 #define OP_LIST_TUBES_WATCHED 16
140 #define OP_STATS_TUBE 17
141 #define OP_PEEK_READY 18
142 #define OP_PEEK_DELAYED 19
143 #define OP_RESERVE_TIMEOUT 20
146 #define STATS_FMT "---\n" \
147 "current-jobs-urgent: %u\n" \
148 "current-jobs-ready: %u\n" \
149 "current-jobs-reserved: %u\n" \
150 "current-jobs-delayed: %u\n" \
151 "current-jobs-buried: %u\n" \
154 "cmd-peek-ready: %llu\n" \
155 "cmd-peek-delayed: %llu\n" \
156 "cmd-peek-buried: %llu\n" \
157 "cmd-reserve: %llu\n" \
158 "cmd-reserve-with-timeout: %llu\n" \
159 "cmd-delete: %llu\n" \
160 "cmd-release: %llu\n" \
162 "cmd-watch: %llu\n" \
163 "cmd-ignore: %llu\n" \
166 "cmd-stats: %llu\n" \
167 "cmd-stats-job: %llu\n" \
168 "cmd-stats-tube: %llu\n" \
169 "cmd-list-tubes: %llu\n" \
170 "cmd-list-tube-used: %llu\n" \
171 "cmd-list-tubes-watched: %llu\n" \
172 "job-timeouts: %llu\n" \
173 "total-jobs: %llu\n" \
174 "max-job-size: %zu\n" \
175 "current-tubes: %zu\n" \
176 "current-connections: %u\n" \
177 "current-producers: %u\n" \
178 "current-workers: %u\n" \
179 "current-waiting: %u\n" \
180 "total-connections: %u\n" \
183 "rusage-utime: %d.%06d\n" \
184 "rusage-stime: %d.%06d\n" \
188 #define STATS_TUBE_FMT "---\n" \
190 "current-jobs-urgent: %u\n" \
191 "current-jobs-ready: %u\n" \
192 "current-jobs-reserved: %u\n" \
193 "current-jobs-delayed: %u\n" \
194 "current-jobs-buried: %u\n" \
195 "total-jobs: %llu\n" \
196 "current-using: %u\n" \
197 "current-watching: %u\n" \
198 "current-waiting: %u\n" \
201 #define JOB_STATS_FMT "---\n" \
216 /* this number is pretty arbitrary */
217 #define BUCKET_BUF_SIZE 1024
219 static char bucket
[BUCKET_BUF_SIZE
];
221 static unsigned int ready_ct
= 0;
222 static struct stats global_stat
= {0, 0, 0, 0, 0};
224 static tube default_tube
;
225 static struct ms tubes
;
227 static int drain_mode
= 0;
228 static time_t start_time
;
229 static unsigned long long int op_ct
[TOTAL_OPS
], timeout_ct
= 0;
232 /* Doubly-linked list of connections with at least one reserved job. */
233 static struct conn running
= { &running
, &running
, 0 };
236 static const char * op_names
[] = {
253 CMD_LIST_TUBES_WATCHED
,
264 return job_list_any_p(&t
->buried
);
268 reply(conn c
, const char *line
, int len
, int state
)
274 r
= conn_update_evq(c
, EV_WRITE
| EV_PERSIST
);
275 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
281 dprintf("sending reply: %.*s", len
, line
);
284 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
286 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
290 reply_line(conn c
, int state
, const char *fmt
, ...)
296 r
= vsnprintf(c
->reply_buf
, LINE_BUF_SIZE
, fmt
, ap
);
299 /* Make sure the buffer was big enough. If not, we have a bug. */
300 if (r
>= LINE_BUF_SIZE
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
302 return reply(c
, c
->reply_buf
, r
, state
);
306 reply_job(conn c
, job j
, const char *word
)
308 /* tell this connection which job to send */
312 return reply_line(c
, STATE_SENDJOB
, "%s %llu %u\r\n",
313 word
, j
->id
, j
->body_size
- 2);
317 remove_waiting_conn(conn c
)
322 if (!conn_waiting(c
)) return NULL
;
324 c
->type
&= ~CONN_TYPE_WAITING
;
325 global_stat
.waiting_ct
--;
326 for (i
= 0; i
< c
->watch
.used
; i
++) {
327 t
= c
->watch
.items
[i
];
328 t
->stat
.waiting_ct
--;
329 ms_remove(&t
->waiting
, c
);
335 reserve_job(conn c
, job j
)
337 j
->deadline
= time(NULL
) + j
->ttr
;
338 global_stat
.reserved_ct
++; /* stats */
339 j
->tube
->stat
.reserved_ct
++;
340 conn_insert(&running
, c
);
341 j
->state
= JOB_STATE_RESERVED
;
342 job_insert(&c
->reserved_jobs
, j
);
344 if (c
->soonest_job
&& j
->deadline
< c
->soonest_job
->deadline
) {
347 return reply_job(c
, j
, MSG_RESERVED
);
355 job j
= NULL
, candidate
;
357 dprintf("tubes.used = %zu\n", tubes
.used
);
358 for (i
= 0; i
< tubes
.used
; i
++) {
360 dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
361 t
->name
, t
->waiting
.used
, t
->ready
.used
);
362 if (t
->waiting
.used
&& t
->ready
.used
) {
363 candidate
= pq_peek(&t
->ready
);
364 if (!j
|| candidate
->id
< j
->id
) j
= candidate
;
366 dprintf("i = %zu, tubes.used = %zu\n", i
, tubes
.used
);
377 dprintf("processing queue\n");
378 while ((j
= next_eligible_job())) {
379 dprintf("got eligible job %llu in %s\n", j
->id
, j
->tube
->name
);
380 j
= pq_take(&j
->tube
->ready
);
382 if (j
->pri
< URGENT_THRESHOLD
) {
383 global_stat
.urgent_ct
--;
384 j
->tube
->stat
.urgent_ct
--;
386 reserve_job(remove_waiting_conn(ms_take(&j
->tube
->waiting
)), j
);
397 for (i
= 0; i
< tubes
.used
; i
++) {
399 nj
= pq_peek(&t
->delay
);
401 if (!j
|| nj
->deadline
< j
->deadline
) j
= nj
;
408 set_main_delay_timeout()
412 set_main_timeout((j
= delay_q_peek()) ? j
->deadline
: 0);
416 enqueue_job(job j
, unsigned int delay
)
422 j
->deadline
= time(NULL
) + delay
;
423 r
= pq_give(&j
->tube
->delay
, j
);
425 j
->state
= JOB_STATE_DELAYED
;
426 set_main_delay_timeout();
428 r
= pq_give(&j
->tube
->ready
, j
);
430 j
->state
= JOB_STATE_READY
;
432 if (j
->pri
< URGENT_THRESHOLD
) {
433 global_stat
.urgent_ct
++;
434 j
->tube
->stat
.urgent_ct
++;
444 job_insert(&j
->tube
->buried
, j
);
445 global_stat
.buried_ct
++;
446 j
->tube
->stat
.buried_ct
++;
447 j
->state
= JOB_STATE_BURIED
;
453 enqueue_reserved_jobs(conn c
)
458 while (job_list_any_p(&c
->reserved_jobs
)) {
459 j
= job_remove(c
->reserved_jobs
.next
);
460 r
= enqueue_job(j
, 0);
462 global_stat
.reserved_ct
--;
463 j
->tube
->stat
.reserved_ct
--;
464 c
->soonest_job
= NULL
;
465 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
472 job j
= delay_q_peek();
473 return j
? pq_take(&j
->tube
->delay
) : NULL
;
477 remove_this_buried_job(job j
)
481 global_stat
.buried_ct
--;
482 j
->tube
->stat
.buried_ct
--;
488 kick_buried_job(tube t
)
493 if (!buried_job_p(t
)) return 0;
494 j
= remove_this_buried_job(t
->buried
.next
);
496 r
= enqueue_job(j
, 0);
499 /* ready queue is full, so bury it */
509 unsigned int count
= 0;
511 for (i
= 0; i
< tubes
.used
; i
++) {
513 count
+= pq_used(&t
->delay
);
519 kick_delayed_job(tube t
)
524 j
= pq_take(&t
->delay
);
527 r
= enqueue_job(j
, 0);
530 /* ready queue is full, so delay it again */
531 r
= enqueue_job(j
, j
->delay
);
539 /* return the number of jobs successfully kicked */
541 kick_buried_jobs(tube t
, unsigned int n
)
544 for (i
= 0; (i
< n
) && kick_buried_job(t
); ++i
);
548 /* return the number of jobs successfully kicked */
550 kick_delayed_jobs(tube t
, unsigned int n
)
553 for (i
= 0; (i
< n
) && kick_delayed_job(t
); ++i
);
558 kick_jobs(tube t
, unsigned int n
)
560 if (buried_job_p(t
)) return kick_buried_jobs(t
, n
);
561 return kick_delayed_jobs(t
, n
);
565 find_buried_job(unsigned long long int id
)
567 job j
= job_find(id
);
568 return (j
&& j
->state
== JOB_STATE_BURIED
) ? j
: NULL
;
572 remove_buried_job(unsigned long long int id
)
574 return remove_this_buried_job(find_buried_job(id
));
578 enqueue_waiting_conn(conn c
)
583 global_stat
.waiting_ct
++;
584 c
->type
|= CONN_TYPE_WAITING
;
585 for (i
= 0; i
< c
->watch
.used
; i
++) {
586 t
= c
->watch
.items
[i
];
587 t
->stat
.waiting_ct
++;
588 ms_append(&t
->waiting
, c
);
593 find_reserved_job_in_conn(conn c
, unsigned long long int id
)
595 job j
= job_find(id
);
596 return (j
&& j
->reserver
== c
&& j
->state
== JOB_STATE_RESERVED
) ? j
: NULL
;
600 peek_job(unsigned long long int id
)
606 check_err(conn c
, const char *s
)
608 if (errno
== EAGAIN
) return;
609 if (errno
== EINTR
) return;
610 if (errno
== EWOULDBLOCK
) return;
617 /* Scan the given string for the sequence "\r\n" and return the line length.
618 * Always returns at least 2 if a match is found. Returns 0 if no match. */
620 scan_line_end(const char *s
, int size
)
624 match
= memchr(s
, '\r', size
- 1);
625 if (!match
) return 0;
627 /* this is safe because we only scan size - 1 chars above */
628 if (match
[1] == '\n') return match
- s
+ 2;
636 return scan_line_end(c
->cmd
, c
->cmd_read
);
639 /* parse the command line */
643 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
644 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
645 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
646 TEST_CMD(c
->cmd
, CMD_PEEK_READY
, OP_PEEK_READY
);
647 TEST_CMD(c
->cmd
, CMD_PEEK_DELAYED
, OP_PEEK_DELAYED
);
648 TEST_CMD(c
->cmd
, CMD_PEEK_BURIED
, OP_PEEK_BURIED
);
649 TEST_CMD(c
->cmd
, CMD_RESERVE_TIMEOUT
, OP_RESERVE_TIMEOUT
);
650 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
651 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
652 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
653 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
654 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
655 TEST_CMD(c
->cmd
, CMD_JOBSTATS
, OP_JOBSTATS
);
656 TEST_CMD(c
->cmd
, CMD_STATS_TUBE
, OP_STATS_TUBE
);
657 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
658 TEST_CMD(c
->cmd
, CMD_USE
, OP_USE
);
659 TEST_CMD(c
->cmd
, CMD_WATCH
, OP_WATCH
);
660 TEST_CMD(c
->cmd
, CMD_IGNORE
, OP_IGNORE
);
661 TEST_CMD(c
->cmd
, CMD_LIST_TUBES_WATCHED
, OP_LIST_TUBES_WATCHED
);
662 TEST_CMD(c
->cmd
, CMD_LIST_TUBE_USED
, OP_LIST_TUBE_USED
);
663 TEST_CMD(c
->cmd
, CMD_LIST_TUBES
, OP_LIST_TUBES
);
667 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
668 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
669 * This function is idempotent(). */
671 fill_extra_data(conn c
)
673 int extra_bytes
, job_data_bytes
= 0, cmd_bytes
;
675 if (!c
->fd
) return; /* the connection was closed */
676 if (!c
->cmd_len
) return; /* we don't have a complete command */
678 /* how many extra bytes did we read? */
679 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
681 /* how many bytes should we put into the job body? */
683 job_data_bytes
= min(extra_bytes
, c
->in_job
->body_size
);
684 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
685 c
->in_job_read
= job_data_bytes
;
686 } else if (c
->in_job_read
) {
687 /* we are in bit-bucket mode, throwing away data */
688 job_data_bytes
= min(extra_bytes
, c
->in_job_read
);
689 c
->in_job_read
-= job_data_bytes
;
692 /* how many bytes are left to go into the future cmd? */
693 cmd_bytes
= extra_bytes
- job_data_bytes
;
694 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
695 c
->cmd_read
= cmd_bytes
;
696 c
->cmd_len
= 0; /* we no longer know the length of the new command */
700 enqueue_incoming_job(conn c
)
705 c
->in_job
= NULL
; /* the connection no longer owns this job */
708 /* check if the trailer is present and correct */
709 if (memcmp(j
->body
+ j
->body_size
- 2, "\r\n", 2)) {
711 return reply_msg(c
, MSG_EXPECTED_CRLF
);
716 return reply_serr(c
, MSG_DRAINING
);
719 /* we have a complete job, so let's stick it in the pqueue */
720 r
= enqueue_job(j
, j
->delay
);
721 op_ct
[OP_PUT
]++; /* stats */
722 global_stat
.total_jobs_ct
++;
723 j
->tube
->stat
.total_jobs_ct
++;
725 if (r
) return reply_line(c
, STATE_SENDWORD
, MSG_INSERTED_FMT
, j
->id
);
727 /* out of memory trying to grow the queue, so it gets buried */
729 reply_line(c
, STATE_SENDWORD
, MSG_BURIED_FMT
, j
->id
);
735 return time(NULL
) - start_time
;
739 fmt_stats(char *buf
, size_t size
, void *x
)
741 struct rusage ru
= {{0, 0}, {0, 0}};
742 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
743 return snprintf(buf
, size
, STATS_FMT
,
744 global_stat
.urgent_ct
,
746 global_stat
.reserved_ct
,
747 get_delayed_job_ct(),
748 global_stat
.buried_ct
,
751 op_ct
[OP_PEEK_READY
],
752 op_ct
[OP_PEEK_DELAYED
],
753 op_ct
[OP_PEEK_BURIED
],
755 op_ct
[OP_RESERVE_TIMEOUT
],
765 op_ct
[OP_STATS_TUBE
],
766 op_ct
[OP_LIST_TUBES
],
767 op_ct
[OP_LIST_TUBE_USED
],
768 op_ct
[OP_LIST_TUBES_WATCHED
],
770 global_stat
.total_jobs_ct
,
774 count_cur_producers(),
776 global_stat
.waiting_ct
,
780 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
781 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
786 /* Read a priority value from the given buffer and place it in pri.
787 * Update end to point to the address after the last character consumed.
788 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
789 * conversion and return the status code but not update any values. This is an
790 * easy way to check for errors.
791 * If end is NULL, read_pri will also check that the entire input string was
792 * consumed and return an error code otherwise.
793 * Return 0 on success, or nonzero on failure.
794 * If a failure occurs, pri and end are not modified. */
796 read_pri(unsigned int *pri
, const char *buf
, char **end
)
802 tpri
= strtoul(buf
, &tend
, 10);
803 if (tend
== buf
) return -1;
804 if (errno
&& errno
!= ERANGE
) return -1;
805 if (!end
&& tend
[0] != '\0') return -1;
807 if (pri
) *pri
= tpri
;
808 if (end
) *end
= tend
;
812 /* Read a delay value from the given buffer and place it in delay.
813 * The interface and behavior are the same as in read_pri(). */
815 read_delay(unsigned int *delay
, const char *buf
, char **end
)
817 return read_pri(delay
, buf
, end
);
820 /* Read a timeout value from the given buffer and place it in ttr.
821 * The interface and behavior are the same as in read_pri(). */
823 read_ttr(unsigned int *ttr
, const char *buf
, char **end
)
825 return read_pri(ttr
, buf
, end
);
829 wait_for_job(conn c
, int timeout
)
833 c
->state
= STATE_WAIT
;
834 enqueue_waiting_conn(c
);
836 /* Set the pending timeout to the requested timeout amount */
837 c
->pending_timeout
= timeout
;
839 /* this conn is waiting, but we want to know if they hang up */
840 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
841 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
844 typedef int(*fmt_fn
)(char *, size_t, void *);
847 do_stats(conn c
, fmt_fn fmt
, void *data
)
851 /* first, measure how big a buffer we will need */
852 stats_len
= fmt(NULL
, 0, data
) + 16;
854 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
855 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
857 /* now actually format the stats data */
858 r
= fmt(c
->out_job
->body
, stats_len
, data
);
859 /* and set the actual body size */
860 c
->out_job
->body_size
= r
;
861 if (r
> stats_len
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
864 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", r
- 2);
868 do_list_tubes(conn c
, ms l
)
874 /* first, measure how big a buffer we will need */
875 resp_z
= 6; /* initial "---\n" and final "\r\n" */
876 for (i
= 0; i
< l
->used
; i
++) {
878 resp_z
+= 3 + strlen(t
->name
); /* including "- " and "\n" */
881 c
->out_job
= allocate_job(resp_z
); /* fake job to hold response data */
882 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
884 /* now actually format the response */
885 buf
= c
->out_job
->body
;
886 buf
+= snprintf(buf
, 5, "---\n");
887 for (i
= 0; i
< l
->used
; i
++) {
889 buf
+= snprintf(buf
, 4 + strlen(t
->name
), "- %s\n", t
->name
);
895 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", resp_z
- 2);
899 fmt_job_stats(char *buf
, size_t size
, job j
)
904 return snprintf(buf
, size
, JOB_STATS_FMT
,
909 (unsigned int) (t
- j
->creation
),
912 (unsigned int) (j
->deadline
- t
),
920 fmt_stats_tube(char *buf
, size_t size
, tube t
)
922 return snprintf(buf
, size
, STATS_TUBE_FMT
,
929 t
->stat
.total_jobs_ct
,
936 maybe_enqueue_incoming_job(conn c
)
940 /* do we have a complete job? */
941 if (c
->in_job_read
== j
->body_size
) return enqueue_incoming_job(c
);
943 /* otherwise we have incomplete data, so just keep waiting */
944 c
->state
= STATE_WANTDATA
;
949 remove_this_reserved_job(conn c
, job j
)
953 global_stat
.reserved_ct
--;
954 j
->tube
->stat
.reserved_ct
--;
957 c
->soonest_job
= NULL
;
958 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
963 remove_reserved_job(conn c
, unsigned long long int id
)
965 return remove_this_reserved_job(c
, find_reserved_job_in_conn(c
, id
));
969 name_is_ok(const char *name
, size_t max
)
971 size_t len
= strlen(name
);
972 return len
> 0 && len
<= max
&&
973 strspn(name
, NAME_CHARS
) == len
&& name
[0] != '-';
977 find_tube(const char *name
)
982 for (i
= 0; i
< tubes
.used
; i
++) {
984 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) return t
;
990 prot_remove_tube(tube t
)
992 ms_remove(&tubes
, t
);
996 make_and_insert_tube(const char *name
)
1001 t
= make_tube(name
);
1002 if (!t
) return NULL
;
1004 /* We want this global tube list to behave like "weak" refs, so don't
1005 * increment the ref count. */
1006 r
= ms_append(&tubes
, t
);
1007 if (!r
) return tube_dref(t
), NULL
;
1013 find_or_make_tube(const char *name
)
1015 return find_tube(name
) ? : make_and_insert_tube(name
);
1019 dispatch_cmd(conn c
)
1021 int r
, i
, timeout
= -1;
1025 char *size_buf
, *delay_buf
, *ttr_buf
, *pri_buf
, *end_buf
, *name
;
1026 unsigned int pri
, delay
, ttr
, body_size
;
1027 unsigned long long int id
;
1030 /* NUL-terminate this string so we can use strtol and friends */
1031 c
->cmd
[c
->cmd_len
- 2] = '\0';
1033 /* check for possible maliciousness */
1034 if (strlen(c
->cmd
) != c
->cmd_len
- 2) {
1035 return reply_msg(c
, MSG_BAD_FORMAT
);
1038 type
= which_cmd(c
);
1039 dprintf("got %s command: \"%s\"\n", op_names
[(int) type
], c
->cmd
);
1043 r
= read_pri(&pri
, c
->cmd
+ 4, &delay_buf
);
1044 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1046 r
= read_delay(&delay
, delay_buf
, &ttr_buf
);
1047 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1049 r
= read_ttr(&ttr
, ttr_buf
, &size_buf
);
1050 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1053 body_size
= strtoul(size_buf
, &end_buf
, 10);
1054 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1056 if (body_size
> job_data_size_limit
) {
1057 return reply_msg(c
, MSG_JOB_TOO_BIG
);
1060 /* don't allow trailing garbage */
1061 if (end_buf
[0] != '\0') return reply_msg(c
, MSG_BAD_FORMAT
);
1063 conn_set_producer(c
);
1065 c
->in_job
= make_job(pri
, delay
, ttr
? : 1, body_size
+ 2, c
->use
);
1069 /* throw away the job body and respond with OUT_OF_MEMORY */
1071 /* Invert the meaning of in_job_read while throwing away data -- it
1072 * counts the bytes that remain to be thrown away. */
1073 c
->in_job_read
= body_size
+ 2;
1076 if (c
->in_job_read
== 0) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1078 c
->state
= STATE_BITBUCKET
;
1084 /* it's possible we already have a complete job */
1085 maybe_enqueue_incoming_job(c
);
1089 /* don't allow trailing garbage */
1090 if (c
->cmd_len
!= CMD_PEEK_READY_LEN
+ 2) {
1091 return reply_msg(c
, MSG_BAD_FORMAT
);
1095 j
= job_copy(pq_peek(&c
->use
->ready
));
1097 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1099 reply_job(c
, j
, MSG_FOUND
);
1101 case OP_PEEK_DELAYED
:
1102 /* don't allow trailing garbage */
1103 if (c
->cmd_len
!= CMD_PEEK_DELAYED_LEN
+ 2) {
1104 return reply_msg(c
, MSG_BAD_FORMAT
);
1108 j
= job_copy(pq_peek(&c
->use
->delay
));
1110 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1112 reply_job(c
, j
, MSG_FOUND
);
1114 case OP_PEEK_BURIED
:
1115 /* don't allow trailing garbage */
1116 if (c
->cmd_len
!= CMD_PEEK_BURIED_LEN
+ 2) {
1117 return reply_msg(c
, MSG_BAD_FORMAT
);
1121 j
= job_copy(buried_job_p(c
->use
)? j
= c
->use
->buried
.next
: NULL
);
1123 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1125 reply_job(c
, j
, MSG_FOUND
);
1129 id
= strtoull(c
->cmd
+ CMD_PEEKJOB_LEN
, &end_buf
, 10);
1130 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1133 /* So, peek is annoying, because some other connection might free the
1134 * job while we are still trying to write it out. So we copy it and
1135 * then free the copy when it's done sending. */
1136 j
= job_copy(peek_job(id
));
1138 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1140 reply_job(c
, j
, MSG_FOUND
);
1142 case OP_RESERVE_TIMEOUT
:
1144 timeout
= strtol(c
->cmd
+ CMD_RESERVE_TIMEOUT_LEN
, &end_buf
, 10);
1145 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1146 case OP_RESERVE
: /* FALLTHROUGH */
1147 /* don't allow trailing garbage */
1148 if (type
== OP_RESERVE
&& c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) {
1149 return reply_msg(c
, MSG_BAD_FORMAT
);
1155 if (conn_has_close_deadline(c
) && !conn_ready(c
)) {
1156 return reply_msg(c
, MSG_DEADLINE_SOON
);
1159 /* try to get a new job for this guy */
1160 wait_for_job(c
, timeout
);
1165 id
= strtoull(c
->cmd
+ CMD_DELETE_LEN
, &end_buf
, 10);
1166 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1169 j
= remove_reserved_job(c
, id
) ? : remove_buried_job(id
);
1171 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1175 reply(c
, MSG_DELETED
, MSG_DELETED_LEN
, STATE_SENDWORD
);
1179 id
= strtoull(c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
, 10);
1180 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1182 r
= read_pri(&pri
, pri_buf
, &delay_buf
);
1183 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1185 r
= read_delay(&delay
, delay_buf
, NULL
);
1186 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1189 j
= remove_reserved_job(c
, id
);
1191 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1196 r
= enqueue_job(j
, delay
);
1197 if (r
) return reply(c
, MSG_RELEASED
, MSG_RELEASED_LEN
, STATE_SENDWORD
);
1199 /* out of memory trying to grow the queue, so it gets buried */
1201 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1205 id
= strtoull(c
->cmd
+ CMD_BURY_LEN
, &pri_buf
, 10);
1206 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1208 r
= read_pri(&pri
, pri_buf
, NULL
);
1209 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1212 j
= remove_reserved_job(c
, id
);
1214 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1218 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1222 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &end_buf
, 10);
1223 if (end_buf
== c
->cmd
+ CMD_KICK_LEN
) {
1224 return reply_msg(c
, MSG_BAD_FORMAT
);
1226 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1230 i
= kick_jobs(c
->use
, count
);
1232 return reply_line(c
, STATE_SENDWORD
, "KICKED %u\r\n", i
);
1234 /* don't allow trailing garbage */
1235 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) {
1236 return reply_msg(c
, MSG_BAD_FORMAT
);
1241 do_stats(c
, fmt_stats
, NULL
);
1245 id
= strtoull(c
->cmd
+ CMD_JOBSTATS_LEN
, &end_buf
, 10);
1246 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1251 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1253 if (!j
->tube
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1254 do_stats(c
, (fmt_fn
) fmt_job_stats
, j
);
1257 name
= c
->cmd
+ CMD_STATS_TUBE_LEN
;
1258 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1262 t
= find_tube(name
);
1263 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1265 do_stats(c
, (fmt_fn
) fmt_stats_tube
, t
);
1269 /* don't allow trailing garbage */
1270 if (c
->cmd_len
!= CMD_LIST_TUBES_LEN
+ 2) {
1271 return reply_msg(c
, MSG_BAD_FORMAT
);
1275 do_list_tubes(c
, &tubes
);
1277 case OP_LIST_TUBE_USED
:
1278 /* don't allow trailing garbage */
1279 if (c
->cmd_len
!= CMD_LIST_TUBE_USED_LEN
+ 2) {
1280 return reply_msg(c
, MSG_BAD_FORMAT
);
1284 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1286 case OP_LIST_TUBES_WATCHED
:
1287 /* don't allow trailing garbage */
1288 if (c
->cmd_len
!= CMD_LIST_TUBES_WATCHED_LEN
+ 2) {
1289 return reply_msg(c
, MSG_BAD_FORMAT
);
1293 do_list_tubes(c
, &c
->watch
);
1296 name
= c
->cmd
+ CMD_USE_LEN
;
1297 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1300 TUBE_ASSIGN(t
, find_or_make_tube(name
));
1301 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1304 TUBE_ASSIGN(c
->use
, t
);
1305 TUBE_ASSIGN(t
, NULL
);
1308 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1311 name
= c
->cmd
+ CMD_WATCH_LEN
;
1312 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1315 TUBE_ASSIGN(t
, find_or_make_tube(name
));
1316 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1319 if (!ms_contains(&c
->watch
, t
)) r
= ms_append(&c
->watch
, t
);
1320 TUBE_ASSIGN(t
, NULL
);
1321 if (!r
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1323 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1326 name
= c
->cmd
+ CMD_IGNORE_LEN
;
1327 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1331 for (i
= 0; i
< c
->watch
.used
; i
++) {
1332 t
= c
->watch
.items
[i
];
1333 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) break;
1337 if (t
&& c
->watch
.used
< 2) return reply_msg(c
, MSG_NOT_IGNORED
);
1339 if (t
) ms_remove(&c
->watch
, t
); /* may free t if refcount => 0 */
1342 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1345 return reply_msg(c
, MSG_UNKNOWN_COMMAND
);
1349 /* if we get a timeout, it means that a job has been reserved for too long, so
1350 * we should put it back in the queue */
1352 h_conn_timeout(conn c
)
1354 int should_timeout
= 0;
1356 if (conn_waiting(c
) && conn_has_close_deadline(c
)) should_timeout
= 1;
1358 if (should_timeout
) {
1361 dprintf("conn_waiting(%p) = %d\n", c
, conn_waiting(c
));
1362 while ((j
= soonest_job(c
))) {
1363 if (j
->deadline
> time(NULL
)) break;
1364 timeout_ct
++; /* stats */
1366 r
= enqueue_job(remove_this_reserved_job(c
, j
), 0);
1367 /* there was no room in the queue, so bury it */
1368 if (!r
) bury_job(j
);
1369 r
= conn_update_evq(c
, c
->evq
.ev_events
);
1371 return twarnx("conn_update_evq() failed"), conn_close(c
);
1373 return reply_msg(remove_waiting_conn(c
), MSG_DEADLINE_SOON
);
1374 } else if (conn_waiting(c
) && c
->pending_timeout
>= 0) {
1375 dprintf("conn_waiting(%p) = %d\n", c
, conn_waiting(c
));
1376 c
->pending_timeout
=-1;
1377 return reply_msg(remove_waiting_conn(c
), MSG_TIMED_OUT
);
1382 enter_drain_mode(int sig
)
1399 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
1400 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
1402 /* was this a peek or stats command? */
1403 if (!has_reserved_this_job(c
, c
->out_job
)) job_free(c
->out_job
);
1406 c
->reply_sent
= 0; /* now that we're done, reset this */
1407 c
->state
= STATE_WANTCOMMAND
;
1415 struct iovec iov
[2];
1418 case STATE_WANTCOMMAND
:
1419 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1420 if (r
== -1) return check_err(c
, "read()");
1421 if (r
== 0) return conn_close(c
); /* the client hung up */
1423 c
->cmd_read
+= r
; /* we got some bytes */
1425 c
->cmd_len
= cmd_len(c
); /* find the EOL */
1427 /* yay, complete command line */
1428 if (c
->cmd_len
) return do_cmd(c
);
1430 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1432 /* command line too long? */
1433 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1434 c
->cmd_read
= 0; /* discard the input so far */
1435 return reply_msg(c
, MSG_BAD_FORMAT
);
1438 /* otherwise we have an incomplete line, so just keep waiting */
1440 case STATE_BITBUCKET
:
1441 /* Invert the meaning of in_job_read while throwing away data -- it
1442 * counts the bytes that remain to be thrown away. */
1443 to_read
= min(c
->in_job_read
, BUCKET_BUF_SIZE
);
1444 r
= read(c
->fd
, bucket
, to_read
);
1445 if (r
== -1) return check_err(c
, "read()");
1446 if (r
== 0) return conn_close(c
); /* the client hung up */
1448 c
->in_job_read
-= r
; /* we got some bytes */
1450 /* (c->in_job_read < 0) can't happen */
1452 if (c
->in_job_read
== 0) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1454 case STATE_WANTDATA
:
1457 r
= read(c
->fd
, j
->body
+ c
->in_job_read
, j
->body_size
-c
->in_job_read
);
1458 if (r
== -1) return check_err(c
, "read()");
1459 if (r
== 0) return conn_close(c
); /* the client hung up */
1461 c
->in_job_read
+= r
; /* we got some bytes */
1463 /* (j->in_job_read > j->body_size) can't happen */
1465 maybe_enqueue_incoming_job(c
);
1467 case STATE_SENDWORD
:
1468 r
= write(c
->fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
1469 if (r
== -1) return check_err(c
, "write()");
1470 if (r
== 0) return conn_close(c
); /* the client hung up */
1472 c
->reply_sent
+= r
; /* we got some bytes */
1474 /* (c->reply_sent > c->reply_len) can't happen */
1476 if (c
->reply_sent
== c
->reply_len
) return reset_conn(c
);
1478 /* otherwise we sent an incomplete reply, so just keep waiting */
1483 iov
[0].iov_base
= (void *)(c
->reply
+ c
->reply_sent
);
1484 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
1485 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
1486 iov
[1].iov_len
= j
->body_size
- c
->out_job_sent
;
1488 r
= writev(c
->fd
, iov
, 2);
1489 if (r
== -1) return check_err(c
, "writev()");
1490 if (r
== 0) return conn_close(c
); /* the client hung up */
1492 /* update the sent values */
1494 if (c
->reply_sent
>= c
->reply_len
) {
1495 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
1496 c
->reply_sent
= c
->reply_len
;
1499 /* (c->out_job_sent > j->body_size) can't happen */
1502 if (c
->out_job_sent
== j
->body_size
) return reset_conn(c
);
1504 /* otherwise we sent incomplete data, so just keep waiting */
1506 case STATE_WAIT
: /* keep an eye out in case they hang up */
1507 /* but don't hang up just because our buffer is full */
1508 if (LINE_BUF_SIZE
- c
->cmd_read
< 1) break;
1510 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1511 if (r
== -1) return check_err(c
, "read()");
1512 if (r
== 0) return conn_close(c
); /* the client hung up */
1513 c
->cmd_read
+= r
; /* we got some bytes */
1517 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1518 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1521 h_conn(const int fd
, const short which
, conn c
)
1524 twarnx("Argh! event fd doesn't match conn fd.");
1526 return conn_close(c
);
1532 event_add(&c
->evq
, NULL
); /* seems to be necessary */
1535 /* fall through... */
1537 /* fall through... */
1542 while (cmd_data_ready(c
) && (c
->cmd_len
= cmd_len(c
))) do_cmd(c
);
1553 while ((j
= delay_q_peek())) {
1554 if (j
->deadline
> t
) break;
1556 r
= enqueue_job(j
, 0);
1557 if (!r
) bury_job(j
); /* there was no room in the queue, so bury it */
1560 set_main_delay_timeout();
1564 h_accept(const int fd
, const short which
, struct event
*ev
)
1569 struct sockaddr addr
;
1571 if (which
== EV_TIMEOUT
) return h_delay();
1573 addrlen
= sizeof addr
;
1574 cfd
= accept(fd
, &addr
, &addrlen
);
1576 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
1577 if (errno
== EMFILE
) brake();
1581 flags
= fcntl(cfd
, F_GETFL
, 0);
1582 if (flags
< 0) return twarn("getting flags"), close(cfd
), v();
1584 r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
1585 if (r
< 0) return twarn("setting O_NONBLOCK"), close(cfd
), v();
1587 c
= make_conn(cfd
, STATE_WANTCOMMAND
, default_tube
, default_tube
);
1588 if (!c
) return twarnx("make_conn() failed"), close(cfd
), brake();
1590 dprintf("accepted conn, fd=%d\n", cfd
);
1591 r
= conn_set_evq(c
, EV_READ
| EV_PERSIST
, (evh
) h_conn
);
1592 if (r
== -1) return twarnx("conn_set_evq() failed"), close(cfd
), brake();
1598 start_time
= time(NULL
);
1599 memset(op_ct
, 0, sizeof(op_ct
));
1601 ms_init(&tubes
, NULL
, NULL
);
1603 TUBE_ASSIGN(default_tube
, find_or_make_tube("default"));
1604 if (!default_tube
) twarnx("Out of memory during startup!");