From c80708b48f25b2bb8556e2d3282b66f01e83b835 Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Tue, 26 Feb 2008 17:26:44 -0800 Subject: [PATCH] Per-tube buried list; kick takes tube name. --- conn.h | 2 +- doc/protocol.txt | 11 +++++++---- job.c | 2 +- prot.c | 58 +++++++++++++++++++++++++++++++++++++++----------------- tube.c | 1 + tube.h | 2 ++ 6 files changed, 53 insertions(+), 23 deletions(-) diff --git a/conn.h b/conn.h index 00e50c4..2a277bb 100644 --- a/conn.h +++ b/conn.h @@ -21,8 +21,8 @@ #include "event.h" #include "ms.h" -#include "job.h" #include "tube.h" +#include "job.h" #define STATE_WANTCOMMAND 0 #define STATE_WANTDATA 1 diff --git a/doc/protocol.txt b/doc/protocol.txt index d6cc036..3f3f199 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -331,15 +331,18 @@ FOUND \r\n - is the job body -- a sequence of bytes of length from the previous line. -The kick command moves jobs into the ready queue. If there are any buried jobs, -it will only kick buried jobs. Otherwise it will kick delayed jobs. It looks -like: +The kick command moves jobs into the ready queue. If there are any buried jobs +in the specified tube, it will only kick buried jobs. Otherwise it will kick +delayed jobs. It looks like: -kick \r\n +kick \r\n - is an integer upper bound on the number of jobs to kick. The server will kick no more than jobs. + - is a name at most 200 bytes. Buried jobs will be kicked from this + tube. + The response is of the form KICKED \r\n diff --git a/job.c b/job.c index f7bb332..47b5ca8 100644 --- a/job.c +++ b/job.c @@ -19,8 +19,8 @@ #include #include -#include "job.h" #include "tube.h" +#include "job.h" #include "util.h" static unsigned long long int next_id = 1; diff --git a/prot.c b/prot.c index 4aba92c..59667e8 100644 --- a/prot.c +++ b/prot.c @@ -163,7 +163,6 @@ static struct pq delay_q; -static struct job graveyard = { &graveyard, &graveyard, 0 }; static unsigned int ready_ct = 0; static struct stats global_stat = {0, 0, 0, 0, 0}; @@ -205,9 +204,9 @@ static const char * op_names[] = { #endif static int -buried_job_p() +buried_job_p(tube t) { - return job_list_any_p(&graveyard); + return job_list_any_p(&t->buried); } static void @@ -354,7 +353,7 @@ enqueue_job(job j, unsigned int delay) static void bury_job(job j) { - job_insert(&graveyard, j); + job_insert(&j->tube->buried, j); global_stat.buried_ct++; j->tube->stat.buried_ct++; j->state = JOB_STATE_BURIED; @@ -401,13 +400,13 @@ remove_this_buried_job(job j) } static int -kick_buried_job() +kick_buried_job(tube t) { int r; job j; - if (!buried_job_p()) return 0; - j = remove_this_buried_job(graveyard.next); + if (!buried_job_p(t)) return 0; + j = remove_this_buried_job(t->buried.next); j->kick_ct++; r = enqueue_job(j, 0); if (r) return 1; @@ -446,10 +445,10 @@ kick_delayed_job() /* return the number of jobs successfully kicked */ static unsigned int -kick_buried_jobs(unsigned int n) +kick_buried_jobs(tube t, unsigned int n) { unsigned int i; - for (i = 0; (i < n) && kick_buried_job(); ++i); + for (i = 0; (i < n) && kick_buried_job(t); ++i); return i; } @@ -463,30 +462,50 @@ kick_delayed_jobs(unsigned int n) } static unsigned int -kick_jobs(unsigned int n) +kick_jobs(tube t, unsigned int n) { - if (buried_job_p()) return kick_buried_jobs(n); + if (buried_job_p(t)) return kick_buried_jobs(t, n); return kick_delayed_jobs(n); } static job peek_buried_job() { - return buried_job_p() ? graveyard.next : NULL; + tube t; + size_t i; + + for (i = 0; i < tubes.used; i++) { + t = tubes.items[i]; + if (buried_job_p(t)) return t->buried.next; + } + return NULL; } static job -find_buried_job(unsigned long long int id) +find_buried_job_in_tube(tube t, unsigned long long int id) { job j; - for (j = graveyard.next; j != &graveyard; j = j->next) { + for (j = t->buried.next; j != &t->buried; j = j->next) { if (j->id == id) return j; } return NULL; } static job +find_buried_job(unsigned long long int id) +{ + job j; + size_t i; + + for (i = 0; i < tubes.used; i++) { + j = find_buried_job_in_tube(tubes.items[i], id); + if (j) return j; + } + return NULL; +} + +static job remove_buried_job(unsigned long long int id) { return remove_this_buried_job(find_buried_job(id)); @@ -1100,15 +1119,19 @@ dispatch_cmd(conn c) break; case OP_KICK: errno = 0; - count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10); - if (end_buf == c->cmd + CMD_KICK_LEN) { + count = strtoul(c->cmd + CMD_KICK_LEN, &name, 10); + if (name++ == c->cmd + CMD_KICK_LEN) { return reply_msg(c, MSG_BAD_FORMAT); } if (errno) return reply_msg(c, MSG_BAD_FORMAT); kick_ct++; /* stats */ - i = kick_jobs(count); + t = find_tube(name); + if (!t) return reply_msg(c, MSG_NOTFOUND); + + i = kick_jobs(t, count); + t = NULL; return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i); case OP_STATS: @@ -1144,6 +1167,7 @@ dispatch_cmd(conn c) stats_tube_ct++; /* stats */ do_stats(c, (fmt_fn) fmt_stats_tube, t); + t = NULL; break; case OP_LIST_TUBES: /* don't allow trailing garbage */ diff --git a/tube.c b/tube.c index 3259b52..8a1a940 100644 --- a/tube.c +++ b/tube.c @@ -39,6 +39,7 @@ make_tube(const char *name) if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name"); pq_init(&t->ready, job_pri_cmp); + t->buried = (struct job) { &t->buried, &t->buried, 0 }; ms_init(&t->waiting, NULL, NULL); t->stat = (struct stats) {0, 0, 0, 0}; diff --git a/tube.h b/tube.h index 0ab9e0b..1756740 100644 --- a/tube.h +++ b/tube.h @@ -22,6 +22,7 @@ typedef struct tube *tube; #include "stat.h" +#include "job.h" #include "pq.h" #include "ms.h" @@ -31,6 +32,7 @@ struct tube { unsigned int refs; char name[MAX_TUBE_NAME_LEN]; struct pq ready; + struct job buried; struct ms waiting; /* set of conns */ struct stats stat; }; -- 2.11.4.GIT