From fce9a721a0a580324ffc6b687f9de65050dde3bb Mon Sep 17 00:00:00 2001 From: Jean-loup Gailly Date: Mon, 10 May 2010 15:00:31 +0200 Subject: [PATCH] Distributed engine: Send binary stats updates to slaves --- distributed/distributed.c | 58 +++++++++++++++++++++-------------------------- distributed/distributed.h | 5 ---- distributed/merge.c | 16 +++++++++++++ distributed/merge.h | 1 + distributed/protocol.c | 12 ++++++++++ distributed/protocol.h | 1 + 6 files changed, 56 insertions(+), 37 deletions(-) diff --git a/distributed/distributed.c b/distributed/distributed.c index 9f8b557..16d0baa 100644 --- a/distributed/distributed.c +++ b/distributed/distributed.c @@ -27,8 +27,6 @@ * The master remembers stats in a queue of received buffers that * are merged together, plus one hash table per slave. The master * queue and the hash tables are cleared at each new move. */ -/* This version only has the slave receiving part, the rest - * comes in subsequent commits. */ /* To allow the master to select the best move, slaves also send * absolute playout counts for the best top level nodes (children @@ -184,9 +182,11 @@ distributed_notify(struct engine *e, struct board *b, int id, char *cmd, char *a return P_OK; } -/* genmoves returns a line "=id played_own total_playouts threads keep_looking[ reserved]" +/* genmoves returns "=id played_own total_playouts threads keep_looking @size" * then a list of lines "coord playouts value" with absolute counts for - * children of the root node. + * children of the root node, then a binary array of incr_stats structs. + * To simplify the code, we assume that master and slave have the same architecture + * (store values identically). * Return the move with most playouts, and additional stats. * Keep this code in sync with uct/slave.c:report_stats(). * slave_lock is held on entry and on return. */ @@ -214,7 +214,7 @@ select_best_move(struct board *b, struct move_stats *stats, int *played, *total_playouts += p; *total_threads += t; keep += k; - // Skip the rest of the firt line if any (allow future extensions) + // Skip the rest of the firt line in particular @size r = strchr(r, '\n'); char move[64]; @@ -236,15 +236,16 @@ select_best_move(struct board *b, struct move_stats *stats, int *played, return best_move; } -/* Set the args for the genmoves command. If stats is not null, - * append the stats from all slaves above min_playouts, except - * for pass and resign. args must have CMDS_SIZE bytes and - * upon return ends with an empty line. - * Keep this code in sync with uct_genmoves(). - * slave_lock is held on entry and on return. */ +/* Set the args for the genmoves command. If binary_args is set, + * each slave thred will add the correct binary size when sending + * (see get_binary_arg()). args must have CMDS_SIZE bytes and + * upon return ends with a single \n. + * Keep this code in sync with uct/slave.c:uct_genmoves(). + * slave_lock is held on entry and on return but we don't + * rely on the lock here. */ static void -genmoves_args(char *args, struct board *b, enum stone color, int played, - struct time_info *ti, struct move_stats2 *stats, int min_playouts) +genmoves_args(char *args, enum stone color, int played, + struct time_info *ti, bool binary_args) { char *end = args + CMDS_SIZE; char *s = args + snprintf(args, CMDS_SIZE, "%s %d", stone2str(color), played); @@ -254,17 +255,7 @@ genmoves_args(char *args, struct board *b, enum stone color, int played, ti->len.t.main_time, ti->len.t.byoyomi_time, ti->len.t.byoyomi_periods, ti->len.t.byoyomi_stones); } - s += snprintf(s, end - s, "\n"); - if (stats) { - foreach_point(b) { - if (stats[c].u.playouts <= min_playouts) continue; - s += snprintf(s, end - s, "%s %d %.7f %d %.7f\n", - coord2sstr(c, b), - stats[c].u.playouts, stats[c].u.value, - stats[c].amaf.playouts, stats[c].amaf.value); - } foreach_point_end; - } - s += snprintf(s, end - s, "\n"); + s += snprintf(s, end - s, binary_args ? " @0\n" : "\n"); } /* Time control is mostly done by the slaves, so we use default values here. */ @@ -303,9 +294,10 @@ distributed_genmove(struct engine *e, struct board *b, struct time_info *ti, stats += 2; protocol_lock(); + clear_receive_queue(); /* Send the first genmoves without stats. */ - genmoves_args(args, b, color, 0, ti, NULL, 0); + genmoves_args(args, color, 0, ti, false); new_cmd(b, cmd, args); /* Loop until most slaves want to quit or time elapsed. */ @@ -339,8 +331,7 @@ distributed_genmove(struct engine *e, struct board *b, struct time_info *ti, } /* Send the command with the same gtp id, to avoid discarding * a reply to a previous genmoves at the same move. */ - /* Do not send ascii stats, slave now expects binary args. */ - genmoves_args(args, b, color, played, ti, NULL, 0); + genmoves_args(args, color, played, ti, true); update_cmd(b, cmd, args, false); } int replies = reply_count; @@ -355,7 +346,9 @@ distributed_genmove(struct engine *e, struct board *b, struct time_info *ti, /* Tell the slaves to commit to the selected move, overwriting * the last "pachi-genmoves" in the command history. */ - char *coord = coord2str(best, b); + clear_receive_queue(); + char coordbuf[4]; + char *coord = coord2bstr(coordbuf, best, b); snprintf(args, sizeof(args), "%s %s\n", stone2str(color), coord); update_cmd(b, "play", args, true); protocol_unlock(); @@ -372,7 +365,10 @@ distributed_genmove(struct engine *e, struct board *b, struct time_info *ti, (int)(played/time/threads), 1000*time/iterations); logline(NULL, "* ", buf); } - free(coord); + if (DEBUGL(3)) { + int total_hnodes = replies * (1 << dist->stats_hbits); + merge_print_stats(total_hnodes); + } return coord_copy(best); } @@ -429,9 +425,7 @@ distributed_dead_group_list(struct engine *e, struct board *b, struct move_queue char *dead = gtp_replies[best_reply]; dead = strchr(dead, ' '); // skip "id " while (dead && *++dead != '\n') { - coord_t *c = str2coord(dead, board_size(b)); - mq_add(mq, *c); - coord_done(c); + mq_add(mq, str2scoord(dead, board_size(b))); dead = strchr(dead, '\n'); } protocol_unlock(); diff --git a/distributed/distributed.h b/distributed/distributed.h index 9d30797..2359f02 100644 --- a/distributed/distributed.h +++ b/distributed/distributed.h @@ -97,11 +97,6 @@ struct incr_stats { #define move_number(id) ((id) % DIST_GAMELEN) #define reply_disabled(id) ((id) < DIST_GAMELEN) -struct move_stats2 { - struct move_stats u; - struct move_stats amaf; -}; - char *path2sstr(path_t path, struct board *b); struct engine *engine_distributed_init(char *arg, struct board *b); diff --git a/distributed/merge.c b/distributed/merge.c index 1ce7509..f554063 100644 --- a/distributed/merge.c +++ b/distributed/merge.c @@ -16,6 +16,22 @@ /* We merge together debug stats for all hash tables. */ static struct hash_counts h_counts; +/* Display and reset hash statistics. For debugging only. */ +void +merge_print_stats(int total_hnodes) +{ + if (DEBUGL(3)) { + char buf[BSIZE]; + snprintf(buf, sizeof(buf), + "stats occupied %ld %.1f%% inserts %ld collisions %ld/%ld %.1f%%\n", + h_counts.occupied, h_counts.occupied * 100.0 / total_hnodes, + h_counts.inserts, h_counts.collisions, h_counts.lookups, + h_counts.collisions * 100.0 / (h_counts.lookups + 1)); + logline(NULL, "* ", buf); + } + if (DEBUG_MODE) h_counts.occupied = 0; +} + /* We maintain counts per bucket to avoid sorting large arrays. * All nodes with n updates since last send go to bucket n. * We have at most max_merged_nodes = (max_slaves-1) * shared_nodes diff --git a/distributed/merge.h b/distributed/merge.h index 5c36762..d3da3b3 100644 --- a/distributed/merge.h +++ b/distributed/merge.h @@ -3,6 +3,7 @@ #include "distributed/protocol.h" +void merge_print_stats(int total_hnodes); void merge_init(struct slave_state *sstate, int shared_nodes, int stats_hbits, int max_slaves); #endif diff --git a/distributed/protocol.c b/distributed/protocol.c index 9107f06..9399b66 100644 --- a/distributed/protocol.c +++ b/distributed/protocol.c @@ -330,6 +330,18 @@ insert_buf(struct slave_state *sstate, void *buf, int size) queue_length++; } +/* Clear the receive queue. The receive buffers are also invalidated + * so that slave threads scanning the queue notice it as soon as possible + * but this is only an optimization. + * slave_lock is held on both entry and exit of this function. */ +void +clear_receive_queue(void) +{ + if (!queue_length) return; + memset(receive_queue, 0, queue_length * sizeof(receive_queue[0])); + queue_length = 0; +} + /* Process the reply received from a slave machine. * Copy the ascii part to reply_buf and insert the binary part * (if any) in the receive queue. diff --git a/distributed/protocol.h b/distributed/protocol.h index 8333cb8..fc0004b 100644 --- a/distributed/protocol.h +++ b/distributed/protocol.h @@ -64,6 +64,7 @@ void protocol_unlock(void); void logline(struct in_addr *client, char *prefix, char *s); +void clear_receive_queue(void); void update_cmd(struct board *b, char *cmd, char *args, bool new_id); void new_cmd(struct board *b, char *cmd, char *args); void get_replies(double time_limit, int min_replies); -- 2.11.4.GIT