1 /* prot.c - protocol implementation */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include <sys/resource.h>
43 /* job body cannot be greater than this many bytes long */
44 size_t job_data_size_limit
= JOB_DATA_SIZE_LIMIT_DEFAULT
;
47 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
48 "abcdefghijklmnopqrstuvwxyz" \
51 #define CMD_PUT "put "
52 #define CMD_PEEKJOB "peek "
53 #define CMD_PEEK_READY "peek-ready"
54 #define CMD_PEEK_DELAYED "peek-delayed"
55 #define CMD_PEEK_BURIED "peek-buried"
56 #define CMD_RESERVE "reserve"
57 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
58 #define CMD_DELETE "delete "
59 #define CMD_RELEASE "release "
60 #define CMD_BURY "bury "
61 #define CMD_KICK "kick "
62 #define CMD_TOUCH "touch "
63 #define CMD_STATS "stats"
64 #define CMD_JOBSTATS "stats-job "
65 #define CMD_USE "use "
66 #define CMD_WATCH "watch "
67 #define CMD_IGNORE "ignore "
68 #define CMD_LIST_TUBES "list-tubes"
69 #define CMD_LIST_TUBE_USED "list-tube-used"
70 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
71 #define CMD_STATS_TUBE "stats-tube "
72 #define CMD_QUIT "quit"
73 #define CMD_PAUSE_TUBE "pause-tube"
75 #define CONSTSTRLEN(m) (sizeof(m) - 1)
77 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
78 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
79 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
80 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
81 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
82 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
83 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
84 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
85 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
86 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
87 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
88 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
89 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
90 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
91 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
92 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
93 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
94 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
95 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
96 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
97 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
99 #define MSG_FOUND "FOUND"
100 #define MSG_NOTFOUND "NOT_FOUND\r\n"
101 #define MSG_RESERVED "RESERVED"
102 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
103 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
104 #define MSG_DELETED "DELETED\r\n"
105 #define MSG_RELEASED "RELEASED\r\n"
106 #define MSG_BURIED "BURIED\r\n"
107 #define MSG_TOUCHED "TOUCHED\r\n"
108 #define MSG_BURIED_FMT "BURIED %llu\r\n"
109 #define MSG_INSERTED_FMT "INSERTED %llu\r\n"
110 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
112 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
113 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
114 #define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
115 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
116 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
117 #define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
119 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
120 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
121 #define MSG_DRAINING "DRAINING\r\n"
122 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
123 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
124 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
125 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
127 #define STATE_WANTCOMMAND 0
128 #define STATE_WANTDATA 1
129 #define STATE_SENDJOB 2
130 #define STATE_SENDWORD 3
132 #define STATE_BITBUCKET 5
143 #define OP_JOBSTATS 9
144 #define OP_PEEK_BURIED 10
148 #define OP_LIST_TUBES 14
149 #define OP_LIST_TUBE_USED 15
150 #define OP_LIST_TUBES_WATCHED 16
151 #define OP_STATS_TUBE 17
152 #define OP_PEEK_READY 18
153 #define OP_PEEK_DELAYED 19
154 #define OP_RESERVE_TIMEOUT 20
157 #define OP_PAUSE_TUBE 23
160 #define STATS_FMT "---\n" \
161 "current-jobs-urgent: %u\n" \
162 "current-jobs-ready: %u\n" \
163 "current-jobs-reserved: %u\n" \
164 "current-jobs-delayed: %u\n" \
165 "current-jobs-buried: %u\n" \
166 "cmd-put: %" PRIu64 "\n" \
167 "cmd-peek: %" PRIu64 "\n" \
168 "cmd-peek-ready: %" PRIu64 "\n" \
169 "cmd-peek-delayed: %" PRIu64 "\n" \
170 "cmd-peek-buried: %" PRIu64 "\n" \
171 "cmd-reserve: %" PRIu64 "\n" \
172 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
173 "cmd-delete: %" PRIu64 "\n" \
174 "cmd-release: %" PRIu64 "\n" \
175 "cmd-use: %" PRIu64 "\n" \
176 "cmd-watch: %" PRIu64 "\n" \
177 "cmd-ignore: %" PRIu64 "\n" \
178 "cmd-bury: %" PRIu64 "\n" \
179 "cmd-kick: %" PRIu64 "\n" \
180 "cmd-touch: %" PRIu64 "\n" \
181 "cmd-stats: %" PRIu64 "\n" \
182 "cmd-stats-job: %" PRIu64 "\n" \
183 "cmd-stats-tube: %" PRIu64 "\n" \
184 "cmd-list-tubes: %" PRIu64 "\n" \
185 "cmd-list-tube-used: %" PRIu64 "\n" \
186 "cmd-list-tubes-watched: %" PRIu64 "\n" \
187 "cmd-pause-tube: %" PRIu64 "\n" \
188 "job-timeouts: %" PRIu64 "\n" \
189 "total-jobs: %" PRIu64 "\n" \
190 "max-job-size: %zu\n" \
191 "current-tubes: %zu\n" \
192 "current-connections: %u\n" \
193 "current-producers: %u\n" \
194 "current-workers: %u\n" \
195 "current-waiting: %u\n" \
196 "total-connections: %u\n" \
199 "rusage-utime: %d.%06d\n" \
200 "rusage-stime: %d.%06d\n" \
202 "binlog-oldest-index: %s\n" \
203 "binlog-current-index: %s\n" \
204 "binlog-max-size: %zu\n" \
207 #define STATS_TUBE_FMT "---\n" \
209 "current-jobs-urgent: %u\n" \
210 "current-jobs-ready: %u\n" \
211 "current-jobs-reserved: %u\n" \
212 "current-jobs-delayed: %u\n" \
213 "current-jobs-buried: %u\n" \
214 "total-jobs: %" PRIu64 "\n" \
215 "current-using: %u\n" \
216 "current-watching: %u\n" \
217 "current-waiting: %u\n" \
218 "cmd-pause-tube: %u\n" \
219 "pause: %" PRIu64 "\n" \
220 "pause-time-left: %" PRIu64 "\n" \
223 /* this number is pretty arbitrary */
224 #define BUCKET_BUF_SIZE 1024
226 static char bucket
[BUCKET_BUF_SIZE
];
228 static unsigned int ready_ct
= 0;
229 static struct stats global_stat
= {0, 0, 0, 0, 0};
231 static tube default_tube
;
233 static int drain_mode
= 0;
234 static usec started_at
;
235 static uint64_t op_ct
[TOTAL_OPS
], timeout_ct
= 0;
238 /* Doubly-linked list of connections with at least one reserved job. */
239 static struct conn running
= { &running
, &running
, 0 };
242 static const char * op_names
[] = {
259 CMD_LIST_TUBES_WATCHED
,
270 static job
remove_buried_job(job j
);
275 return job_list_any_p(&t
->buried
);
279 reply(conn c
, const char *line
, int len
, int state
)
285 r
= conn_update_evq(c
, EV_WRITE
| EV_PERSIST
);
286 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
292 dprintf("sending reply: %.*s", len
, line
);
295 #define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
297 #define reply_serr(c,e) (twarnx("server error: %s",(e)),\
301 reply_line(conn c
, int state
, const char *fmt
, ...)
307 r
= vsnprintf(c
->reply_buf
, LINE_BUF_SIZE
, fmt
, ap
);
310 /* Make sure the buffer was big enough. If not, we have a bug. */
311 if (r
>= LINE_BUF_SIZE
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
313 return reply(c
, c
->reply_buf
, r
, state
);
317 reply_job(conn c
, job j
, const char *word
)
319 /* tell this connection which job to send */
323 return reply_line(c
, STATE_SENDJOB
, "%s %llu %u\r\n",
324 word
, j
->id
, j
->body_size
- 2);
328 remove_waiting_conn(conn c
)
333 if (!conn_waiting(c
)) return NULL
;
335 c
->type
&= ~CONN_TYPE_WAITING
;
336 global_stat
.waiting_ct
--;
337 for (i
= 0; i
< c
->watch
.used
; i
++) {
338 t
= c
->watch
.items
[i
];
339 t
->stat
.waiting_ct
--;
340 ms_remove(&t
->waiting
, c
);
346 reserve_job(conn c
, job j
)
348 j
->deadline_at
= now_usec() + j
->ttr
;
349 global_stat
.reserved_ct
++; /* stats */
350 j
->tube
->stat
.reserved_ct
++;
352 conn_insert(&running
, c
);
353 j
->state
= JOB_STATE_RESERVED
;
354 job_insert(&c
->reserved_jobs
, j
);
356 if (c
->soonest_job
&& j
->deadline_at
< c
->soonest_job
->deadline_at
) {
359 return reply_job(c
, j
, MSG_RESERVED
);
363 next_eligible_job(usec now
)
367 job j
= NULL
, candidate
;
369 dprintf("tubes.used = %zu\n", tubes
.used
);
370 for (i
= 0; i
< tubes
.used
; i
++) {
372 dprintf("for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64
"\n",
373 t
->name
, t
->waiting
.used
, t
->ready
.used
, t
->pause
);
375 if (t
->deadline_at
> now
) continue;
378 if (t
->waiting
.used
&& t
->ready
.used
) {
379 candidate
= pq_peek(&t
->ready
);
380 if (!j
|| job_pri_cmp(candidate
, j
) < 0) j
= candidate
;
382 dprintf("i = %zu, tubes.used = %zu\n", i
, tubes
.used
);
392 usec now
= now_usec();
394 dprintf("processing queue\n");
395 while ((j
= next_eligible_job(now
))) {
396 dprintf("got eligible job %llu in %s\n", j
->id
, j
->tube
->name
);
397 j
= pq_take(&j
->tube
->ready
);
399 if (j
->pri
< URGENT_THRESHOLD
) {
400 global_stat
.urgent_ct
--;
401 j
->tube
->stat
.urgent_ct
--;
403 reserve_job(remove_waiting_conn(ms_take(&j
->tube
->waiting
)), j
);
414 for (i
= 0; i
< tubes
.used
; i
++) {
416 nj
= pq_peek(&t
->delay
);
418 if (!j
|| nj
->deadline_at
< j
->deadline_at
) j
= nj
;
430 for (i
= 0; i
< tubes
.used
; i
++) {
433 if (!nt
|| t
->deadline_at
< nt
->deadline_at
) nt
= t
;
441 set_main_delay_timeout()
443 job j
= delay_q_peek();
444 tube t
= pause_tube_peek();
445 usec deadline_at
= t
? t
->deadline_at
: 0;
447 if (j
&& (!deadline_at
|| j
->deadline_at
< deadline_at
)) deadline_at
= j
->deadline_at
;
449 dprintf("deadline_at=%" PRIu64
"\n", deadline_at
);
450 set_main_timeout(deadline_at
);
454 enqueue_job(job j
, usec delay
, char update_store
)
460 j
->deadline_at
= now_usec() + delay
;
461 r
= pq_give(&j
->tube
->delay
, j
);
463 j
->state
= JOB_STATE_DELAYED
;
464 set_main_delay_timeout();
466 r
= pq_give(&j
->tube
->ready
, j
);
468 j
->state
= JOB_STATE_READY
;
470 if (j
->pri
< URGENT_THRESHOLD
) {
471 global_stat
.urgent_ct
++;
472 j
->tube
->stat
.urgent_ct
++;
477 r
= binlog_write_job(j
);
486 bury_job(job j
, char update_store
)
491 z
= binlog_reserve_space_update(j
);
493 j
->reserved_binlog_space
+= z
;
496 job_insert(&j
->tube
->buried
, j
);
497 global_stat
.buried_ct
++;
498 j
->tube
->stat
.buried_ct
++;
499 j
->state
= JOB_STATE_BURIED
;
503 if (update_store
) return binlog_write_job(j
);
509 enqueue_reserved_jobs(conn c
)
514 while (job_list_any_p(&c
->reserved_jobs
)) {
515 j
= job_remove(c
->reserved_jobs
.next
);
516 r
= enqueue_job(j
, 0, 0);
517 if (r
< 1) bury_job(j
, 0);
518 global_stat
.reserved_ct
--;
519 j
->tube
->stat
.reserved_ct
--;
520 c
->soonest_job
= NULL
;
521 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
528 job j
= delay_q_peek();
529 return j
? pq_take(&j
->tube
->delay
) : NULL
;
533 kick_buried_job(tube t
)
539 if (!buried_job_p(t
)) return 0;
540 j
= remove_buried_job(t
->buried
.next
);
542 z
= binlog_reserve_space_update(j
);
543 if (!z
) return pq_give(&t
->delay
, j
), 0; /* put it back */
544 j
->reserved_binlog_space
+= z
;
547 r
= enqueue_job(j
, 0, 1);
548 if (r
== 1) return 1;
550 /* ready queue is full, so bury it */
560 unsigned int count
= 0;
562 for (i
= 0; i
< tubes
.used
; i
++) {
564 count
+= t
->delay
.used
;
570 kick_delayed_job(tube t
)
576 j
= pq_take(&t
->delay
);
579 z
= binlog_reserve_space_update(j
);
580 if (!z
) return pq_give(&t
->delay
, j
), 0; /* put it back */
581 j
->reserved_binlog_space
+= z
;
584 r
= enqueue_job(j
, 0, 1);
585 if (r
== 1) return 1;
587 /* ready queue is full, so delay it again */
588 r
= enqueue_job(j
, j
->delay
, 0);
589 if (r
== 1) return 0;
596 /* return the number of jobs successfully kicked */
598 kick_buried_jobs(tube t
, unsigned int n
)
601 for (i
= 0; (i
< n
) && kick_buried_job(t
); ++i
);
605 /* return the number of jobs successfully kicked */
607 kick_delayed_jobs(tube t
, unsigned int n
)
610 for (i
= 0; (i
< n
) && kick_delayed_job(t
); ++i
);
615 kick_jobs(tube t
, unsigned int n
)
617 if (buried_job_p(t
)) return kick_buried_jobs(t
, n
);
618 return kick_delayed_jobs(t
, n
);
622 remove_buried_job(job j
)
624 if (!j
|| j
->state
!= JOB_STATE_BURIED
) return NULL
;
627 global_stat
.buried_ct
--;
628 j
->tube
->stat
.buried_ct
--;
634 remove_ready_job(job j
)
636 if (!j
|| j
->state
!= JOB_STATE_READY
) return NULL
;
637 j
= pq_remove(&j
->tube
->ready
, j
);
640 if (j
->pri
< URGENT_THRESHOLD
) {
641 global_stat
.urgent_ct
--;
642 j
->tube
->stat
.urgent_ct
--;
649 enqueue_waiting_conn(conn c
)
654 global_stat
.waiting_ct
++;
655 c
->type
|= CONN_TYPE_WAITING
;
656 for (i
= 0; i
< c
->watch
.used
; i
++) {
657 t
= c
->watch
.items
[i
];
658 t
->stat
.waiting_ct
++;
659 ms_append(&t
->waiting
, c
);
664 find_reserved_job_in_conn(conn c
, job j
)
666 return (j
&& j
->reserver
== c
&& j
->state
== JOB_STATE_RESERVED
) ? j
: NULL
;
670 touch_job(conn c
, job j
)
672 j
= find_reserved_job_in_conn(c
, j
);
674 j
->deadline_at
= now_usec() + j
->ttr
;
675 c
->soonest_job
= NULL
;
681 peek_job(uint64_t id
)
687 check_err(conn c
, const char *s
)
689 if (errno
== EAGAIN
) return;
690 if (errno
== EINTR
) return;
691 if (errno
== EWOULDBLOCK
) return;
698 /* Scan the given string for the sequence "\r\n" and return the line length.
699 * Always returns at least 2 if a match is found. Returns 0 if no match. */
701 scan_line_end(const char *s
, int size
)
705 match
= memchr(s
, '\r', size
- 1);
706 if (!match
) return 0;
708 /* this is safe because we only scan size - 1 chars above */
709 if (match
[1] == '\n') return match
- s
+ 2;
717 return scan_line_end(c
->cmd
, c
->cmd_read
);
720 /* parse the command line */
724 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
725 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
726 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
727 TEST_CMD(c
->cmd
, CMD_PEEK_READY
, OP_PEEK_READY
);
728 TEST_CMD(c
->cmd
, CMD_PEEK_DELAYED
, OP_PEEK_DELAYED
);
729 TEST_CMD(c
->cmd
, CMD_PEEK_BURIED
, OP_PEEK_BURIED
);
730 TEST_CMD(c
->cmd
, CMD_RESERVE_TIMEOUT
, OP_RESERVE_TIMEOUT
);
731 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
732 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
733 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
734 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
735 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
736 TEST_CMD(c
->cmd
, CMD_TOUCH
, OP_TOUCH
);
737 TEST_CMD(c
->cmd
, CMD_JOBSTATS
, OP_JOBSTATS
);
738 TEST_CMD(c
->cmd
, CMD_STATS_TUBE
, OP_STATS_TUBE
);
739 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
740 TEST_CMD(c
->cmd
, CMD_USE
, OP_USE
);
741 TEST_CMD(c
->cmd
, CMD_WATCH
, OP_WATCH
);
742 TEST_CMD(c
->cmd
, CMD_IGNORE
, OP_IGNORE
);
743 TEST_CMD(c
->cmd
, CMD_LIST_TUBES_WATCHED
, OP_LIST_TUBES_WATCHED
);
744 TEST_CMD(c
->cmd
, CMD_LIST_TUBE_USED
, OP_LIST_TUBE_USED
);
745 TEST_CMD(c
->cmd
, CMD_LIST_TUBES
, OP_LIST_TUBES
);
746 TEST_CMD(c
->cmd
, CMD_QUIT
, OP_QUIT
);
747 TEST_CMD(c
->cmd
, CMD_PAUSE_TUBE
, OP_PAUSE_TUBE
);
751 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
752 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
753 * This function is idempotent(). */
755 fill_extra_data(conn c
)
757 int extra_bytes
, job_data_bytes
= 0, cmd_bytes
;
759 if (!c
->fd
) return; /* the connection was closed */
760 if (!c
->cmd_len
) return; /* we don't have a complete command */
762 /* how many extra bytes did we read? */
763 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
765 /* how many bytes should we put into the job body? */
767 job_data_bytes
= min(extra_bytes
, c
->in_job
->body_size
);
768 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
769 c
->in_job_read
= job_data_bytes
;
770 } else if (c
->in_job_read
) {
771 /* we are in bit-bucket mode, throwing away data */
772 job_data_bytes
= min(extra_bytes
, c
->in_job_read
);
773 c
->in_job_read
-= job_data_bytes
;
776 /* how many bytes are left to go into the future cmd? */
777 cmd_bytes
= extra_bytes
- job_data_bytes
;
778 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
779 c
->cmd_read
= cmd_bytes
;
780 c
->cmd_len
= 0; /* we no longer know the length of the new command */
784 enqueue_incoming_job(conn c
)
789 c
->in_job
= NULL
; /* the connection no longer owns this job */
792 /* check if the trailer is present and correct */
793 if (memcmp(j
->body
+ j
->body_size
- 2, "\r\n", 2)) {
795 return reply_msg(c
, MSG_EXPECTED_CRLF
);
800 return reply_serr(c
, MSG_DRAINING
);
803 if (j
->reserved_binlog_space
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
804 j
->reserved_binlog_space
= binlog_reserve_space_put(j
);
805 if (!j
->reserved_binlog_space
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
807 /* we have a complete job, so let's stick it in the pqueue */
808 r
= enqueue_job(j
, j
->delay
, 1);
809 if (r
< 0) return reply_serr(c
, MSG_INTERNAL_ERROR
);
811 op_ct
[OP_PUT
]++; /* stats */
812 global_stat
.total_jobs_ct
++;
813 j
->tube
->stat
.total_jobs_ct
++;
815 if (r
== 1) return reply_line(c
, STATE_SENDWORD
, MSG_INSERTED_FMT
, j
->id
);
817 /* out of memory trying to grow the queue, so it gets buried */
819 reply_line(c
, STATE_SENDWORD
, MSG_BURIED_FMT
, j
->id
);
825 return (now_usec() - started_at
) / 1000000;
829 fmt_stats(char *buf
, size_t size
, void *x
)
831 struct rusage ru
= {{0, 0}, {0, 0}};
832 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
833 return snprintf(buf
, size
, STATS_FMT
,
834 global_stat
.urgent_ct
,
836 global_stat
.reserved_ct
,
837 get_delayed_job_ct(),
838 global_stat
.buried_ct
,
841 op_ct
[OP_PEEK_READY
],
842 op_ct
[OP_PEEK_DELAYED
],
843 op_ct
[OP_PEEK_BURIED
],
845 op_ct
[OP_RESERVE_TIMEOUT
],
856 op_ct
[OP_STATS_TUBE
],
857 op_ct
[OP_LIST_TUBES
],
858 op_ct
[OP_LIST_TUBE_USED
],
859 op_ct
[OP_LIST_TUBES_WATCHED
],
860 op_ct
[OP_PAUSE_TUBE
],
862 global_stat
.total_jobs_ct
,
866 count_cur_producers(),
868 global_stat
.waiting_ct
,
872 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
873 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
875 binlog_oldest_index(),
876 binlog_current_index(),
881 /* Read a priority value from the given buffer and place it in pri.
882 * Update end to point to the address after the last character consumed.
883 * Pri and end can be NULL. If they are both NULL, read_pri() will do the
884 * conversion and return the status code but not update any values. This is an
885 * easy way to check for errors.
886 * If end is NULL, read_pri will also check that the entire input string was
887 * consumed and return an error code otherwise.
888 * Return 0 on success, or nonzero on failure.
889 * If a failure occurs, pri and end are not modified. */
891 read_pri(unsigned int *pri
, const char *buf
, char **end
)
897 while (buf
[0] == ' ') buf
++;
898 if (!isdigit(buf
[0])) return -1;
899 tpri
= strtoul(buf
, &tend
, 10);
900 if (tend
== buf
) return -1;
901 if (errno
&& errno
!= ERANGE
) return -1;
902 if (!end
&& tend
[0] != '\0') return -1;
904 if (pri
) *pri
= tpri
;
905 if (end
) *end
= tend
;
909 /* Read a delay value from the given buffer and place it in delay.
910 * The interface and behavior are analogous to read_pri(). */
912 read_delay(usec
*delay
, const char *buf
, char **end
)
915 unsigned int delay_sec
;
917 r
= read_pri(&delay_sec
, buf
, end
);
919 *delay
= ((usec
) delay_sec
) * 1000000;
923 /* Read a timeout value from the given buffer and place it in ttr.
924 * The interface and behavior are the same as in read_delay(). */
926 read_ttr(usec
*ttr
, const char *buf
, char **end
)
928 return read_delay(ttr
, buf
, end
);
931 /* Read a tube name from the given buffer moving the buffer to the name start */
933 read_tube_name(char **tubename
, char *buf
, char **end
)
937 while (buf
[0] == ' ') buf
++;
938 len
= strspn(buf
, NAME_CHARS
);
939 if (len
== 0) return -1;
940 if (tubename
) *tubename
= buf
;
941 if (end
) *end
= buf
+ len
;
946 wait_for_job(conn c
, int timeout
)
950 c
->state
= STATE_WAIT
;
951 enqueue_waiting_conn(c
);
953 /* Set the pending timeout to the requested timeout amount */
954 c
->pending_timeout
= timeout
;
956 /* this conn is waiting, but we want to know if they hang up */
957 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
958 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
961 typedef int(*fmt_fn
)(char *, size_t, void *);
964 do_stats(conn c
, fmt_fn fmt
, void *data
)
968 /* first, measure how big a buffer we will need */
969 stats_len
= fmt(NULL
, 0, data
) + 16;
971 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
972 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
974 /* now actually format the stats data */
975 r
= fmt(c
->out_job
->body
, stats_len
, data
);
976 /* and set the actual body size */
977 c
->out_job
->body_size
= r
;
978 if (r
> stats_len
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
981 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", r
- 2);
985 do_list_tubes(conn c
, ms l
)
991 /* first, measure how big a buffer we will need */
992 resp_z
= 6; /* initial "---\n" and final "\r\n" */
993 for (i
= 0; i
< l
->used
; i
++) {
995 resp_z
+= 3 + strlen(t
->name
); /* including "- " and "\n" */
998 c
->out_job
= allocate_job(resp_z
); /* fake job to hold response data */
999 if (!c
->out_job
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1001 /* now actually format the response */
1002 buf
= c
->out_job
->body
;
1003 buf
+= snprintf(buf
, 5, "---\n");
1004 for (i
= 0; i
< l
->used
; i
++) {
1006 buf
+= snprintf(buf
, 4 + strlen(t
->name
), "- %s\n", t
->name
);
1011 c
->out_job_sent
= 0;
1012 return reply_line(c
, STATE_SENDJOB
, "OK %d\r\n", resp_z
- 2);
1016 fmt_job_stats(char *buf
, size_t size
, job j
)
1022 if (j
->state
== JOB_STATE_RESERVED
|| j
->state
== JOB_STATE_DELAYED
) {
1023 time_left
= (j
->deadline_at
- t
) / 1000000;
1027 return snprintf(buf
, size
,
1032 "age: %" PRIu64
"\n"
1033 "delay: %" PRIu64
"\n"
1034 "ttr: %" PRIu64
"\n"
1035 "time-left: %" PRIu64
"\n"
1047 (t
- j
->created_at
) / 1000000,
1059 fmt_stats_tube(char *buf
, size_t size
, tube t
)
1064 time_left
= (t
->deadline_at
- now_usec()) / 1000000;
1068 return snprintf(buf
, size
, STATS_TUBE_FMT
,
1072 t
->stat
.reserved_ct
,
1075 t
->stat
.total_jobs_ct
,
1085 maybe_enqueue_incoming_job(conn c
)
1089 /* do we have a complete job? */
1090 if (c
->in_job_read
== j
->body_size
) return enqueue_incoming_job(c
);
1092 /* otherwise we have incomplete data, so just keep waiting */
1093 c
->state
= STATE_WANTDATA
;
1098 remove_this_reserved_job(conn c
, job j
)
1102 global_stat
.reserved_ct
--;
1103 j
->tube
->stat
.reserved_ct
--;
1106 c
->soonest_job
= NULL
;
1107 if (!job_list_any_p(&c
->reserved_jobs
)) conn_remove(c
);
1112 remove_reserved_job(conn c
, job j
)
1114 return remove_this_reserved_job(c
, find_reserved_job_in_conn(c
, j
));
1118 name_is_ok(const char *name
, size_t max
)
1120 size_t len
= strlen(name
);
1121 return len
> 0 && len
<= max
&&
1122 strspn(name
, NAME_CHARS
) == len
&& name
[0] != '-';
1126 prot_remove_tube(tube t
)
1128 ms_remove(&tubes
, t
);
1132 dispatch_cmd(conn c
)
1134 int r
, i
, timeout
= -1;
1139 char *size_buf
, *delay_buf
, *ttr_buf
, *pri_buf
, *end_buf
, *name
;
1140 unsigned int pri
, body_size
;
1145 /* NUL-terminate this string so we can use strtol and friends */
1146 c
->cmd
[c
->cmd_len
- 2] = '\0';
1148 /* check for possible maliciousness */
1149 if (strlen(c
->cmd
) != c
->cmd_len
- 2) {
1150 return reply_msg(c
, MSG_BAD_FORMAT
);
1153 type
= which_cmd(c
);
1154 dprintf("got %s command: \"%s\"\n", op_names
[(int) type
], c
->cmd
);
1158 r
= read_pri(&pri
, c
->cmd
+ 4, &delay_buf
);
1159 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1161 r
= read_delay(&delay
, delay_buf
, &ttr_buf
);
1162 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1164 r
= read_ttr(&ttr
, ttr_buf
, &size_buf
);
1165 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1168 body_size
= strtoul(size_buf
, &end_buf
, 10);
1169 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1171 if (body_size
> job_data_size_limit
) {
1172 return reply_msg(c
, MSG_JOB_TOO_BIG
);
1175 /* don't allow trailing garbage */
1176 if (end_buf
[0] != '\0') return reply_msg(c
, MSG_BAD_FORMAT
);
1178 conn_set_producer(c
);
1180 c
->in_job
= make_job(pri
, delay
, ttr
? : 1, body_size
+ 2, c
->use
);
1184 /* throw away the job body and respond with OUT_OF_MEMORY */
1186 /* Invert the meaning of in_job_read while throwing away data -- it
1187 * counts the bytes that remain to be thrown away. */
1188 c
->in_job_read
= body_size
+ 2;
1191 if (c
->in_job_read
== 0) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1193 c
->state
= STATE_BITBUCKET
;
1199 /* it's possible we already have a complete job */
1200 maybe_enqueue_incoming_job(c
);
1204 /* don't allow trailing garbage */
1205 if (c
->cmd_len
!= CMD_PEEK_READY_LEN
+ 2) {
1206 return reply_msg(c
, MSG_BAD_FORMAT
);
1210 j
= job_copy(pq_peek(&c
->use
->ready
));
1212 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1214 reply_job(c
, j
, MSG_FOUND
);
1216 case OP_PEEK_DELAYED
:
1217 /* don't allow trailing garbage */
1218 if (c
->cmd_len
!= CMD_PEEK_DELAYED_LEN
+ 2) {
1219 return reply_msg(c
, MSG_BAD_FORMAT
);
1223 j
= job_copy(pq_peek(&c
->use
->delay
));
1225 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1227 reply_job(c
, j
, MSG_FOUND
);
1229 case OP_PEEK_BURIED
:
1230 /* don't allow trailing garbage */
1231 if (c
->cmd_len
!= CMD_PEEK_BURIED_LEN
+ 2) {
1232 return reply_msg(c
, MSG_BAD_FORMAT
);
1236 j
= job_copy(buried_job_p(c
->use
)? j
= c
->use
->buried
.next
: NULL
);
1238 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1240 reply_job(c
, j
, MSG_FOUND
);
1244 id
= strtoull(c
->cmd
+ CMD_PEEKJOB_LEN
, &end_buf
, 10);
1245 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1248 /* So, peek is annoying, because some other connection might free the
1249 * job while we are still trying to write it out. So we copy it and
1250 * then free the copy when it's done sending. */
1251 j
= job_copy(peek_job(id
));
1253 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1255 reply_job(c
, j
, MSG_FOUND
);
1257 case OP_RESERVE_TIMEOUT
:
1259 timeout
= strtol(c
->cmd
+ CMD_RESERVE_TIMEOUT_LEN
, &end_buf
, 10);
1260 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1261 case OP_RESERVE
: /* FALLTHROUGH */
1262 /* don't allow trailing garbage */
1263 if (type
== OP_RESERVE
&& c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) {
1264 return reply_msg(c
, MSG_BAD_FORMAT
);
1270 if (conn_has_close_deadline(c
) && !conn_ready(c
)) {
1271 return reply_msg(c
, MSG_DEADLINE_SOON
);
1274 /* try to get a new job for this guy */
1275 wait_for_job(c
, timeout
);
1280 id
= strtoull(c
->cmd
+ CMD_DELETE_LEN
, &end_buf
, 10);
1281 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1285 j
= remove_reserved_job(c
, j
) ? :
1286 remove_ready_job(j
) ? :
1287 remove_buried_job(j
);
1289 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1291 j
->state
= JOB_STATE_INVALID
;
1292 r
= binlog_write_job(j
);
1295 if (!r
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1297 reply(c
, MSG_DELETED
, MSG_DELETED_LEN
, STATE_SENDWORD
);
1301 id
= strtoull(c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
, 10);
1302 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1304 r
= read_pri(&pri
, pri_buf
, &delay_buf
);
1305 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1307 r
= read_delay(&delay
, delay_buf
, NULL
);
1308 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1311 j
= remove_reserved_job(c
, job_find(id
));
1313 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1315 /* We want to update the delay deadline on disk, so reserve space for
1318 z
= binlog_reserve_space_update(j
);
1319 if (!z
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1320 j
->reserved_binlog_space
+= z
;
1327 r
= enqueue_job(j
, delay
, !!delay
);
1328 if (r
< 0) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1330 return reply(c
, MSG_RELEASED
, MSG_RELEASED_LEN
, STATE_SENDWORD
);
1333 /* out of memory trying to grow the queue, so it gets buried */
1335 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1339 id
= strtoull(c
->cmd
+ CMD_BURY_LEN
, &pri_buf
, 10);
1340 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1342 r
= read_pri(&pri
, pri_buf
, NULL
);
1343 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1346 j
= remove_reserved_job(c
, job_find(id
));
1348 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1352 if (!r
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1353 reply(c
, MSG_BURIED
, MSG_BURIED_LEN
, STATE_SENDWORD
);
1357 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &end_buf
, 10);
1358 if (end_buf
== c
->cmd
+ CMD_KICK_LEN
) {
1359 return reply_msg(c
, MSG_BAD_FORMAT
);
1361 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1365 i
= kick_jobs(c
->use
, count
);
1367 return reply_line(c
, STATE_SENDWORD
, "KICKED %u\r\n", i
);
1370 id
= strtoull(c
->cmd
+ CMD_TOUCH_LEN
, &end_buf
, 10);
1371 if (errno
) return twarn("strtoull"), reply_msg(c
, MSG_BAD_FORMAT
);
1375 j
= touch_job(c
, job_find(id
));
1378 reply(c
, MSG_TOUCHED
, MSG_TOUCHED_LEN
, STATE_SENDWORD
);
1380 return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1384 /* don't allow trailing garbage */
1385 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) {
1386 return reply_msg(c
, MSG_BAD_FORMAT
);
1391 do_stats(c
, fmt_stats
, NULL
);
1395 id
= strtoull(c
->cmd
+ CMD_JOBSTATS_LEN
, &end_buf
, 10);
1396 if (errno
) return reply_msg(c
, MSG_BAD_FORMAT
);
1401 if (!j
) return reply(c
, MSG_NOTFOUND
, MSG_NOTFOUND_LEN
, STATE_SENDWORD
);
1403 if (!j
->tube
) return reply_serr(c
, MSG_INTERNAL_ERROR
);
1404 do_stats(c
, (fmt_fn
) fmt_job_stats
, j
);
1407 name
= c
->cmd
+ CMD_STATS_TUBE_LEN
;
1408 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1412 t
= tube_find(name
);
1413 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1415 do_stats(c
, (fmt_fn
) fmt_stats_tube
, t
);
1419 /* don't allow trailing garbage */
1420 if (c
->cmd_len
!= CMD_LIST_TUBES_LEN
+ 2) {
1421 return reply_msg(c
, MSG_BAD_FORMAT
);
1425 do_list_tubes(c
, &tubes
);
1427 case OP_LIST_TUBE_USED
:
1428 /* don't allow trailing garbage */
1429 if (c
->cmd_len
!= CMD_LIST_TUBE_USED_LEN
+ 2) {
1430 return reply_msg(c
, MSG_BAD_FORMAT
);
1434 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1436 case OP_LIST_TUBES_WATCHED
:
1437 /* don't allow trailing garbage */
1438 if (c
->cmd_len
!= CMD_LIST_TUBES_WATCHED_LEN
+ 2) {
1439 return reply_msg(c
, MSG_BAD_FORMAT
);
1443 do_list_tubes(c
, &c
->watch
);
1446 name
= c
->cmd
+ CMD_USE_LEN
;
1447 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1450 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1451 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1454 TUBE_ASSIGN(c
->use
, t
);
1455 TUBE_ASSIGN(t
, NULL
);
1458 reply_line(c
, STATE_SENDWORD
, "USING %s\r\n", c
->use
->name
);
1461 name
= c
->cmd
+ CMD_WATCH_LEN
;
1462 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1465 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1466 if (!t
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1469 if (!ms_contains(&c
->watch
, t
)) r
= ms_append(&c
->watch
, t
);
1470 TUBE_ASSIGN(t
, NULL
);
1471 if (!r
) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1473 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1476 name
= c
->cmd
+ CMD_IGNORE_LEN
;
1477 if (!name_is_ok(name
, 200)) return reply_msg(c
, MSG_BAD_FORMAT
);
1481 for (i
= 0; i
< c
->watch
.used
; i
++) {
1482 t
= c
->watch
.items
[i
];
1483 if (strncmp(t
->name
, name
, MAX_TUBE_NAME_LEN
) == 0) break;
1487 if (t
&& c
->watch
.used
< 2) return reply_msg(c
, MSG_NOT_IGNORED
);
1489 if (t
) ms_remove(&c
->watch
, t
); /* may free t if refcount => 0 */
1492 reply_line(c
, STATE_SENDWORD
, "WATCHING %d\r\n", c
->watch
.used
);
1500 r
= read_tube_name(&name
, c
->cmd
+ CMD_PAUSE_TUBE_LEN
, &delay_buf
);
1501 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1503 r
= read_delay(&delay
, delay_buf
, NULL
);
1504 if (r
) return reply_msg(c
, MSG_BAD_FORMAT
);
1507 t
= tube_find(name
);
1508 if (!t
) return reply_msg(c
, MSG_NOTFOUND
);
1510 t
->deadline_at
= now_usec() + delay
;
1513 set_main_delay_timeout();
1515 reply_line(c
, STATE_SENDWORD
, "PAUSED\r\n");
1518 return reply_msg(c
, MSG_UNKNOWN_COMMAND
);
1522 /* There are three reasons this function may be called. We need to check for
1525 * 1. A reserved job has run out of time.
1526 * 2. A waiting client's reserved job has entered the safety margin.
1527 * 3. A waiting client's requested timeout has occurred.
1529 * If any of these happen, we must do the appropriate thing. */
1531 h_conn_timeout(conn c
)
1533 int r
, should_timeout
= 0;
1536 /* Check if the client was trying to reserve a job. */
1537 if (conn_waiting(c
) && conn_has_close_deadline(c
)) should_timeout
= 1;
1539 /* Check if any reserved jobs have run out of time. We should do this
1540 * whether or not the client is waiting for a new reservation. */
1541 while ((j
= soonest_job(c
))) {
1542 if (j
->deadline_at
>= now_usec()) break;
1544 /* This job is in the middle of being written out. If we return it to
1545 * the ready queue, someone might free it before we finish writing it
1546 * out to the socket. So we'll copy it here and free the copy when it's
1548 if (j
== c
->out_job
) {
1549 c
->out_job
= job_copy(c
->out_job
);
1552 timeout_ct
++; /* stats */
1554 r
= enqueue_job(remove_this_reserved_job(c
, j
), 0, 0);
1555 if (r
< 1) bury_job(j
, 0); /* out of memory, so bury it */
1556 r
= conn_update_evq(c
, c
->evq
.ev_events
);
1557 if (r
== -1) return twarnx("conn_update_evq() failed"), conn_close(c
);
1560 if (should_timeout
) {
1561 dprintf("conn_waiting(%p) = %d\n", c
, conn_waiting(c
));
1562 return reply_msg(remove_waiting_conn(c
), MSG_DEADLINE_SOON
);
1563 } else if (conn_waiting(c
) && c
->pending_timeout
>= 0) {
1564 dprintf("conn_waiting(%p) = %d\n", c
, conn_waiting(c
));
1565 c
->pending_timeout
= -1;
1566 return reply_msg(remove_waiting_conn(c
), MSG_TIMED_OUT
);
1571 enter_drain_mode(int sig
)
1588 r
= conn_update_evq(c
, EV_READ
| EV_PERSIST
);
1589 if (r
== -1) return twarnx("update events failed"), conn_close(c
);
1591 /* was this a peek or stats command? */
1592 if (c
->out_job
&& c
->out_job
->state
== JOB_STATE_COPY
) job_free(c
->out_job
);
1595 c
->reply_sent
= 0; /* now that we're done, reset this */
1596 c
->state
= STATE_WANTCOMMAND
;
1604 struct iovec iov
[2];
1607 case STATE_WANTCOMMAND
:
1608 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1609 if (r
== -1) return check_err(c
, "read()");
1610 if (r
== 0) return conn_close(c
); /* the client hung up */
1612 c
->cmd_read
+= r
; /* we got some bytes */
1614 c
->cmd_len
= cmd_len(c
); /* find the EOL */
1616 /* yay, complete command line */
1617 if (c
->cmd_len
) return do_cmd(c
);
1619 /* c->cmd_read > LINE_BUF_SIZE can't happen */
1621 /* command line too long? */
1622 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1623 c
->cmd_read
= 0; /* discard the input so far */
1624 return reply_msg(c
, MSG_BAD_FORMAT
);
1627 /* otherwise we have an incomplete line, so just keep waiting */
1629 case STATE_BITBUCKET
:
1630 /* Invert the meaning of in_job_read while throwing away data -- it
1631 * counts the bytes that remain to be thrown away. */
1632 to_read
= min(c
->in_job_read
, BUCKET_BUF_SIZE
);
1633 r
= read(c
->fd
, bucket
, to_read
);
1634 if (r
== -1) return check_err(c
, "read()");
1635 if (r
== 0) return conn_close(c
); /* the client hung up */
1637 c
->in_job_read
-= r
; /* we got some bytes */
1639 /* (c->in_job_read < 0) can't happen */
1641 if (c
->in_job_read
== 0) return reply_serr(c
, MSG_OUT_OF_MEMORY
);
1643 case STATE_WANTDATA
:
1646 r
= read(c
->fd
, j
->body
+ c
->in_job_read
, j
->body_size
-c
->in_job_read
);
1647 if (r
== -1) return check_err(c
, "read()");
1648 if (r
== 0) return conn_close(c
); /* the client hung up */
1650 c
->in_job_read
+= r
; /* we got some bytes */
1652 /* (j->in_job_read > j->body_size) can't happen */
1654 maybe_enqueue_incoming_job(c
);
1656 case STATE_SENDWORD
:
1657 r
= write(c
->fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
1658 if (r
== -1) return check_err(c
, "write()");
1659 if (r
== 0) return conn_close(c
); /* the client hung up */
1661 c
->reply_sent
+= r
; /* we got some bytes */
1663 /* (c->reply_sent > c->reply_len) can't happen */
1665 if (c
->reply_sent
== c
->reply_len
) return reset_conn(c
);
1667 /* otherwise we sent an incomplete reply, so just keep waiting */
1672 iov
[0].iov_base
= (void *)(c
->reply
+ c
->reply_sent
);
1673 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
1674 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
1675 iov
[1].iov_len
= j
->body_size
- c
->out_job_sent
;
1677 r
= writev(c
->fd
, iov
, 2);
1678 if (r
== -1) return check_err(c
, "writev()");
1679 if (r
== 0) return conn_close(c
); /* the client hung up */
1681 /* update the sent values */
1683 if (c
->reply_sent
>= c
->reply_len
) {
1684 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
1685 c
->reply_sent
= c
->reply_len
;
1688 /* (c->out_job_sent > j->body_size) can't happen */
1691 if (c
->out_job_sent
== j
->body_size
) return reset_conn(c
);
1693 /* otherwise we sent incomplete data, so just keep waiting */
1695 case STATE_WAIT
: /* keep an eye out in case they hang up */
1696 /* but don't hang up just because our buffer is full */
1697 if (LINE_BUF_SIZE
- c
->cmd_read
< 1) break;
1699 r
= read(c
->fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1700 if (r
== -1) return check_err(c
, "read()");
1701 if (r
== 0) return conn_close(c
); /* the client hung up */
1702 c
->cmd_read
+= r
; /* we got some bytes */
1706 #define want_command(c) ((c)->fd && ((c)->state == STATE_WANTCOMMAND))
1707 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
1710 h_conn(const int fd
, const short which
, conn c
)
1713 twarnx("Argh! event fd doesn't match conn fd.");
1715 return conn_close(c
);
1721 event_add(&c
->evq
, NULL
); /* seems to be necessary */
1724 /* fall through... */
1726 /* fall through... */
1731 while (cmd_data_ready(c
) && (c
->cmd_len
= cmd_len(c
))) do_cmd(c
);
1744 while ((j
= delay_q_peek())) {
1745 if (j
->deadline_at
> now
) break;
1747 r
= enqueue_job(j
, 0, 0);
1748 if (r
< 1) bury_job(j
, 0); /* out of memory, so bury it */
1751 for (i
= 0; i
< tubes
.used
; i
++) {
1754 dprintf("h_delay for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64
"\n",
1755 t
->name
, t
->waiting
.used
, t
->ready
.used
, t
->pause
);
1756 if (t
->pause
&& t
->deadline_at
<= now
) {
1762 set_main_delay_timeout();
1766 h_accept(const int fd
, const short which
, struct event
*ev
)
1771 struct sockaddr addr
;
1773 if (which
== EV_TIMEOUT
) return h_delay();
1775 addrlen
= sizeof addr
;
1776 cfd
= accept(fd
, &addr
, &addrlen
);
1778 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
1779 if (errno
== EMFILE
) brake();
1783 flags
= fcntl(cfd
, F_GETFL
, 0);
1784 if (flags
< 0) return twarn("getting flags"), close(cfd
), v();
1786 r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
1787 if (r
< 0) return twarn("setting O_NONBLOCK"), close(cfd
), v();
1789 c
= make_conn(cfd
, STATE_WANTCOMMAND
, default_tube
, default_tube
);
1790 if (!c
) return twarnx("make_conn() failed"), close(cfd
), brake();
1792 dprintf("accepted conn, fd=%d\n", cfd
);
1793 r
= conn_set_evq(c
, EV_READ
| EV_PERSIST
, (evh
) h_conn
);
1794 if (r
== -1) return twarnx("conn_set_evq() failed"), close(cfd
), brake();
1800 started_at
= now_usec();
1801 memset(op_ct
, 0, sizeof(op_ct
));
1803 ms_init(&tubes
, NULL
, NULL
);
1805 TUBE_ASSIGN(default_tube
, tube_find_or_make("default"));
1806 if (!default_tube
) twarnx("Out of memory during startup!");
1810 prot_replay_binlog(job binlog_jobs
)
1816 for (j
= binlog_jobs
->next
; j
!= binlog_jobs
; j
= nj
) {
1819 binlog_reserve_space_update(j
); /* reserve space for a delete */
1822 case JOB_STATE_BURIED
:
1825 case JOB_STATE_DELAYED
:
1826 if (started_at
< j
->deadline_at
) {
1827 delay
= j
->deadline_at
- started_at
;
1831 r
= enqueue_job(j
, delay
, 0);
1832 if (r
< 1) twarnx("error processing binlog job %llu", j
->id
);