From 345ad6057a2b8f8e57a701cde22481e3f5bc4aef Mon Sep 17 00:00:00 2001 From: Jean-loup Gailly Date: Mon, 10 May 2010 11:19:08 +0200 Subject: [PATCH] Distributed engine: Define hook to get stats received from slaves --- distributed/merge.c | 3 ++- distributed/merge.h | 1 - distributed/protocol.c | 18 ++++++++++++++++-- distributed/protocol.h | 6 ++++-- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/distributed/merge.c b/distributed/merge.c index 45be0d6..1ce7509 100644 --- a/distributed/merge.c +++ b/distributed/merge.c @@ -231,7 +231,7 @@ output_stats(struct incr_stats *buf, struct slave_state *sstate, * Return the byte size of the resulting buffer. The caller must * check that the result is still valid. * The slave lock is held on both entry and exit of this function. */ -int +static int get_new_stats(struct incr_stats *buf, struct slave_state *sstate, int cmd_id) { /* Process all valid buffers in receive_queue[min..max] */ @@ -310,6 +310,7 @@ merge_init(struct slave_state *sstate, int shared_nodes, int stats_hbits, int ma sstate->insert_hook = (buffer_hook)merge_insert_hook; sstate->alloc_hook = merge_state_alloc; + sstate->args_hook = (getargs_hook)get_new_stats; /* At worst one late slave thread may have to merge up to * shared_nodes * BUFFERS_PER_SLAVE * (max_slaves - 1) diff --git a/distributed/merge.h b/distributed/merge.h index 0689f1c..5c36762 100644 --- a/distributed/merge.h +++ b/distributed/merge.h @@ -3,7 +3,6 @@ #include "distributed/protocol.h" -int get_new_stats(struct incr_stats *buf, struct slave_state *sstate, int cmd_id); void merge_init(struct slave_state *sstate, int shared_nodes, int stats_hbits, int max_slaves); #endif diff --git a/distributed/protocol.c b/distributed/protocol.c index 3fcd3d4..9107f06 100644 --- a/distributed/protocol.c +++ b/distributed/protocol.c @@ -167,7 +167,7 @@ get_reply(FILE *f, struct in_addr client, char *reply, void *bin_reply, int *bin bin_reply = (char *)bin_reply + len; size -= len; } - if (*bin_size && DEBUGVV(7)) { + if (*bin_size && DEBUGVV(2)) { char buf[1024]; snprintf(buf, sizeof(buf), "read reply %d bytes in %.4fms\n", *bin_size, (time_now() - start)*1000); @@ -211,7 +211,7 @@ send_command(char *to_send, void *bin_buf, int *bin_size, } logline(&sstate->client, ">>", buf); if (*bin_size) { - char b[1024]; // ??? remove + char b[1024]; snprintf(b, sizeof(b), "sent args %d bytes in %.4fms\n", *bin_size, ms); logline(&sstate->client, "= ", b); @@ -384,6 +384,20 @@ get_binary_arg(struct slave_state *sstate, char *cmd, int cmd_size, int *bin_siz sstate->last_cmd_id = cmd_id; *bin_size = 0; + char *s = strchr(cmd, '@'); + if (!s || !sstate->args_hook) return buf; + + int size = sstate->args_hook(buf, sstate, cmd_id); + + /* Check that the command is still valid. */ + if (atoi(gtp_cmd) != cmd_id) return NULL; + + /* Set the correct binary size for this slave. + * cmd may have been overwritten with new parameters. */ + *bin_size = size; + s = strchr(cmd, '@'); + assert(s); + snprintf(s, cmd + cmd_size - s, "@%d\n", size); return buf; } diff --git a/distributed/protocol.h b/distributed/protocol.h index df5f31c..8333cb8 100644 --- a/distributed/protocol.h +++ b/distributed/protocol.h @@ -16,14 +16,16 @@ struct slave_state; typedef void (*buffer_hook)(void *buf, int size); -typedef void (*alloc_hook)(struct slave_state *sstate); +typedef void (*state_alloc_hook)(struct slave_state *sstate); +typedef int (*getargs_hook)(void *buf, struct slave_state *sstate, int cmd_id); struct slave_state { int max_buf_size; int thread_id; struct in_addr client; // for debugging only - alloc_hook alloc_hook; + state_alloc_hook alloc_hook; buffer_hook insert_hook; + getargs_hook args_hook; /* Index in received_queue of most recent processed * buffer, -1 if none processed yet. */ -- 2.11.4.GIT