From 7c451fb1848810038e8734cbc56ace9e8a19c896 Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Tue, 26 Feb 2008 03:35:25 -0800 Subject: [PATCH] Implement per-tube statistics. --- conn.h | 1 + doc/protocol.txt | 42 +++++++++++++++++ prot.c | 141 ++++++++++++++++++++++++++++++++++++------------------- tube.h => stat.h | 31 ++++-------- tube.c | 3 ++ tube.h | 2 + 6 files changed, 150 insertions(+), 70 deletions(-) copy tube.h => stat.h (60%) diff --git a/conn.h b/conn.h index 8fad267..1a6d339 100644 --- a/conn.h +++ b/conn.h @@ -51,6 +51,7 @@ #define OP_IGNORE 13 #define OP_LIST_TUBES 14 #define OP_LIST_WATCHED_TUBES 15 +#define OP_STATS_TUBE 16 /* CONN_TYPE_* are bit masks */ #define CONN_TYPE_PRODUCER 1 diff --git a/doc/protocol.txt b/doc/protocol.txt index f9598e7..0ed84d0 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -392,6 +392,46 @@ to scalars. It contains these keys: - "kicks" is the number of times this job has been kicked. +The stats-tube command gives statistical information about the specified tube +if it exists. Its form is: + +stats-tube \r\n + + - is a name at most 200 bytes. Stats will be returned for this tube. + +The response is one of: + + - "NOT_FOUND\r\n" if the tube does not exist. + + - "OK \r\n\r\n" + + - is the size of the following data section in bytes. + + - is a sequence of bytes of length from the previous line. It + is a YAML file with statistical information represented a dictionary. + +The stats-tube data is a YAML file representing a single dictionary of strings +to scalars. It contains these keys: + + - "name" is the tube's name. + + - "current-jobs-urgent" is the number of ready jobs with priority < 1024 in + this tube. + + - "current-jobs-ready" is the number of jobs in the ready queue in this tube. + + - "current-jobs-reserved" is the number of jobs reserved by all clients in + this tube. + + - "current-jobs-delayed" is the number of delayed jobs in this tube. + + - "current-jobs-buried" is the number of buried jobs in this tube. + + - "total-jobs" is the cumulative count of jobs created in this tube. + + - "current-waiting" is the number of open connections that have issued a + reserve command while watching this tube but not yet received a response. + The stats command gives statistical information about the system as a whole. Its form is: @@ -438,6 +478,8 @@ of strings to scalars. It contains these keys: - "cmd-stats-job" is the cumulative number of stats-job commands. + - "cmd-stats-tube" is the cumulative number of stats-tube commands. + - "cmd-list-tubes" is the cumulative number of list-tubes commands. - "cmd-list-watched-tubes" is the cumulative number of list-watched-tubes diff --git a/prot.c b/prot.c index 790d4d7..d869b4f 100644 --- a/prot.c +++ b/prot.c @@ -25,6 +25,7 @@ #include #include +#include "stat.h" #include "prot.h" #include "pq.h" #include "ms.h" @@ -58,6 +59,7 @@ #define CMD_IGNORE "ignore " #define CMD_LIST_TUBES "list-tubes" #define CMD_LIST_WATCHED_TUBES "list-watched-tubes" +#define CMD_STATS_TUBE "stats-tube " #define CONSTSTRLEN(m) (sizeof(m) - 1) @@ -75,6 +77,7 @@ #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE) #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES) #define CMD_LIST_WATCHED_TUBES_LEN CONSTSTRLEN(CMD_LIST_WATCHED_TUBES) +#define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE) #define MSG_FOUND "FOUND" #define MSG_NOTFOUND "NOT_FOUND\r\n" @@ -115,6 +118,7 @@ "cmd-kick: %llu\n" \ "cmd-stats: %llu\n" \ "cmd-stats-job: %llu\n" \ + "cmd-stats-tube: %llu\n" \ "cmd-list-tubes: %llu\n" \ "cmd-list-watched-tubes: %llu\n" \ "job-timeouts: %llu\n" \ @@ -132,6 +136,17 @@ "uptime: %u\n" \ "\r\n" +#define STATS_TUBE_FMT "---\n" \ + "name: %s\n" \ + "current-jobs-urgent: %u\n" \ + "current-jobs-ready: %u\n" \ + "current-jobs-reserved: %u\n" \ + "current-jobs-delayed: %u\n" \ + "current-jobs-buried: %u\n" \ + "total-jobs: %llu\n" \ + "current-waiting: %u\n" \ + "\r\n" + #define JOB_STATS_FMT "---\n" \ "id: %llu\n" \ "tube: %s\n" \ @@ -152,7 +167,8 @@ static struct pq delay_q; /* Doubly-linked list of waiting connections. */ static struct job graveyard = { &graveyard, &graveyard, 0 }; -static unsigned int buried_ct = 0, ready_ct = 0, urgent_ct = 0, waiting_ct = 0; +static unsigned int ready_ct = 0; +static struct stats global_stat = {0, 0, 0, 0, 0}; static tube default_tube; static struct ms tubes; @@ -162,9 +178,8 @@ static time_t start_time; static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0, delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0, stats_job_ct = 0, stats_ct = 0, timeout_ct = 0, - list_tubes_ct = 0, list_watched_tubes_ct = 0; - -static unsigned int cur_reserved_ct = 0; + list_tubes_ct = 0, stats_tube_ct = 0, + list_watched_tubes_ct = 0; /* Doubly-linked list of connections with at least one reserved job. */ @@ -188,6 +203,7 @@ static const char * op_names[] = { CMD_IGNORE, CMD_LIST_TUBES, CMD_LIST_WATCHED_TUBES, + CMD_STATS_TUBE, }; #endif @@ -247,13 +263,16 @@ reply_job(conn c, job j, const char *word) conn remove_waiting_conn(conn c) { + tube t; size_t i; if (!(c->type & CONN_TYPE_WAITING)) return NULL; c->type &= ~CONN_TYPE_WAITING; - waiting_ct--; + global_stat.waiting_ct--; for (i = 0; i < c->watch.used; i++) { - ms_remove(&((tube) c->watch.items[i])->waiting, c); + t = c->watch.items[i]; + t->stat.waiting_ct--; + ms_remove(&t->waiting, c); } return c; } @@ -262,7 +281,8 @@ static void reserve_job(conn c, job j) { j->deadline = time(NULL) + j->ttr; - cur_reserved_ct++; /* stats */ + global_stat.reserved_ct++; /* stats */ + j->tube->stat.reserved_ct++; conn_insert(&running, c); j->state = JOB_STATE_RESERVED; job_insert(&c->reserved_jobs, j); @@ -301,7 +321,10 @@ process_queue() dprintf("got eligible job %llu in %s\n", j->id, j->tube->name); j = pq_take(&j->tube->ready); ready_ct--; - if (j->pri < URGENT_THRESHOLD) urgent_ct--; + if (j->pri < URGENT_THRESHOLD) { + global_stat.urgent_ct--; + j->tube->stat.urgent_ct--; + } reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j); } } @@ -322,7 +345,10 @@ enqueue_job(job j, unsigned int delay) if (!r) return 0; j->state = JOB_STATE_READY; ready_ct++; - if (j->pri < URGENT_THRESHOLD) urgent_ct++; + if (j->pri < URGENT_THRESHOLD) { + global_stat.urgent_ct++; + j->tube->stat.urgent_ct++; + } } process_queue(); return 1; @@ -332,7 +358,8 @@ static void bury_job(job j) { job_insert(&graveyard, j); - buried_ct++; + global_stat.buried_ct++; + j->tube->stat.buried_ct++; j->state = JOB_STATE_BURIED; j->bury_ct++; } @@ -347,7 +374,8 @@ enqueue_reserved_jobs(conn c) j = job_remove(c->reserved_jobs.next); r = enqueue_job(j, 0); if (!r) bury_job(j); - cur_reserved_ct--; + global_stat.reserved_ct--; + j->tube->stat.reserved_ct--; if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c); } } @@ -368,7 +396,10 @@ static job remove_this_buried_job(job j) { j = job_remove(j); - if (j) buried_ct--; + if (j) { + global_stat.buried_ct--; + j->tube->stat.buried_ct--; + } return j; } @@ -467,12 +498,15 @@ remove_buried_job(unsigned long long int id) static void enqueue_waiting_conn(conn c) { + tube t; size_t i; - waiting_ct++; + global_stat.waiting_ct++; c->type |= CONN_TYPE_WAITING; for (i = 0; i < c->watch.used; i++) { - ms_append(&((tube) c->watch.items[i])->waiting, c); + t = c->watch.items[i]; + t->stat.waiting_ct++; + ms_append(&t->waiting, c); } } @@ -530,30 +564,6 @@ peek_job(unsigned long long int id) find_buried_job(id); } -static unsigned int -get_ready_job_ct() -{ - return ready_ct; -} - -static unsigned int -get_buried_job_ct() -{ - return buried_ct; -} - -static unsigned int -get_urgent_job_ct() -{ - return urgent_ct; -} - -static int -count_cur_waiting() -{ - return waiting_ct; -} - static void check_err(conn c, const char *s) { @@ -602,6 +612,7 @@ which_cmd(conn c) TEST_CMD(c->cmd, CMD_BURY, OP_BURY); TEST_CMD(c->cmd, CMD_KICK, OP_KICK); TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS); + TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE); TEST_CMD(c->cmd, CMD_STATS, OP_STATS); TEST_CMD(c->cmd, CMD_USE, OP_USE); TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH); @@ -657,6 +668,8 @@ enqueue_incoming_job(conn c) /* we have a complete job, so let's stick it in the pqueue */ r = enqueue_job(j, j->delay); put_ct++; /* stats */ + global_stat.total_jobs_ct++; + j->tube->stat.total_jobs_ct++; if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id); @@ -677,11 +690,11 @@ fmt_stats(char *buf, size_t size, void *x) struct rusage ru = {{0, 0}, {0, 0}}; getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */ return snprintf(buf, size, STATS_FMT, - get_urgent_job_ct(), - get_ready_job_ct(), - cur_reserved_ct, + global_stat.urgent_ct, + ready_ct, + global_stat.reserved_ct, get_delayed_job_ct(), - get_buried_job_ct(), + global_stat.buried_ct, put_ct, peek_ct, reserve_ct, @@ -691,15 +704,16 @@ fmt_stats(char *buf, size_t size, void *x) kick_ct, stats_ct, stats_job_ct, + stats_tube_ct, list_tubes_ct, list_watched_tubes_ct, timeout_ct, - total_jobs(), + global_stat.total_jobs_ct, tubes.used, count_cur_conns(), count_cur_producers(), count_cur_workers(), - count_cur_waiting(), + global_stat.waiting_ct, count_tot_conns(), getpid(), VERSION, @@ -764,8 +778,10 @@ wait_for_job(conn c) enqueue_waiting_conn(c); } +typedef int(*fmt_fn)(char *, size_t, void *); + static void -do_stats(conn c, int(*fmt)(char *, size_t, void *), void *data) +do_stats(conn c, fmt_fn fmt, void *data) { int r, stats_len; @@ -816,10 +832,9 @@ do_list_tubes(conn c, ms l) } static int -fmt_job_stats(char *buf, size_t size, void *jp) +fmt_job_stats(char *buf, size_t size, job j) { time_t t; - job j = (job) jp; t = time(NULL); return snprintf(buf, size, JOB_STATS_FMT, @@ -837,6 +852,20 @@ fmt_job_stats(char *buf, size_t size, void *jp) j->kick_ct); } +static int +fmt_stats_tube(char *buf, size_t size, tube t) +{ + return snprintf(buf, size, STATS_TUBE_FMT, + t->name, + t->stat.urgent_ct, + t->ready.used, + t->stat.reserved_ct, + t->delay.used, + t->stat.buried_ct, + t->stat.total_jobs_ct, + t->stat.waiting_ct); +} + static void maybe_enqueue_incoming_job(conn c) { @@ -854,7 +883,10 @@ static job remove_this_reserved_job(conn c, job j) { j = job_remove(j); - if (j) cur_reserved_ct--; + if (j) { + global_stat.reserved_ct--; + j->tube->stat.reserved_ct--; + } if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c); return j; } @@ -1104,7 +1136,18 @@ dispatch_cmd(conn c) stats_job_ct++; /* stats */ if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR); - do_stats(c, fmt_job_stats, j); + do_stats(c, (fmt_fn) fmt_job_stats, j); + break; + case OP_STATS_TUBE: + name = c->cmd + CMD_STATS_TUBE_LEN; + if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT); + + t = find_tube(name); + if (!t) return reply_msg(c, MSG_NOTFOUND); + + stats_tube_ct++; /* stats */ + + do_stats(c, (fmt_fn) fmt_stats_tube, t); break; case OP_LIST_TUBES: /* don't allow trailing garbage */ diff --git a/tube.h b/stat.h similarity index 60% copy from tube.h copy to stat.h index 9e3083f..b4a51aa 100644 --- a/tube.h +++ b/stat.h @@ -1,4 +1,4 @@ -/* tube.h - tubes header */ +/* stat.h - stats struct */ /* Copyright (C) 2008 Keith Rarick and Philotic Inc. @@ -16,26 +16,15 @@ * along with this program. If not, see . */ -#ifndef tube_h -#define tube_h +#ifndef stat_h +#define stat_h -typedef struct tube *tube; - -#include "pq.h" -#include "ms.h" - -#define MAX_TUBE_NAME_LEN 201 - -struct tube { - unsigned int refs; - char name[MAX_TUBE_NAME_LEN]; - struct pq ready, delay; - struct ms waiting; /* set of conns */ +struct stats { + unsigned int urgent_ct; + unsigned int waiting_ct; + unsigned int buried_ct; + unsigned int reserved_ct; + long long unsigned int total_jobs_ct; }; -tube make_tube(const char *name); -void tube_dref(tube t); -void tube_iref(tube t); -#define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a)) - -#endif /*tube_h*/ +#endif /*stat_h*/ diff --git a/tube.c b/tube.c index 0f4c885..521e3c5 100644 --- a/tube.c +++ b/tube.c @@ -19,6 +19,7 @@ #include #include +#include "stat.h" #include "tube.h" #include "prot.h" #include "util.h" @@ -41,6 +42,8 @@ make_tube(const char *name) pq_init(&t->delay, job_delay_cmp); ms_init(&t->waiting, NULL, NULL); + t->stat = (struct stats) {0, 0, 0, 0}; + return t; } diff --git a/tube.h b/tube.h index 9e3083f..4187c1a 100644 --- a/tube.h +++ b/tube.h @@ -21,6 +21,7 @@ typedef struct tube *tube; +#include "stat.h" #include "pq.h" #include "ms.h" @@ -31,6 +32,7 @@ struct tube { char name[MAX_TUBE_NAME_LEN]; struct pq ready, delay; struct ms waiting; /* set of conns */ + struct stats stat; }; tube make_tube(const char *name); -- 2.11.4.GIT