From 18451a2f2afe3d339e7b47ba75590a7a84b11464 Mon Sep 17 00:00:00 2001 From: Jean-loup Gailly Date: Mon, 15 Mar 2010 16:10:25 +0100 Subject: [PATCH] Distributed engine: slaves send temporary replies continuously. --- distributed/distributed.c | 86 ++++++++++++++++++++++++++++++++++++----------- uct/internal.h | 1 + uct/uct.c | 58 +++++++++++++++++++++++++------- 3 files changed, 112 insertions(+), 33 deletions(-) diff --git a/distributed/distributed.c b/distributed/distributed.c index ec66545..3ccdcae 100644 --- a/distributed/distributed.c +++ b/distributed/distributed.c @@ -15,6 +15,12 @@ * parameter for the master should be the sum of the parameters * for all slaves. */ +/* To minimize the number of ignored replies because they arrive + * too late, slaves send temporary replies to the genmoves + * command, with the best moves so far. So when the master + * has to choose, it should have final replies from most + * slaves and at least temporary replies from all of them. */ + /* This first version does not send tree updates between slaves, * but it has fault tolerance. If a slave is out of sync, the master * sends it the appropriate command history. */ @@ -121,12 +127,13 @@ int active_slaves = 0; /* Number of replies to last gtp command already received. */ int reply_count = 0; +int final_reply_count = 0; /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */ char **gtp_replies; /* Mutex protecting gtp_cmds, gtp_cmd, id_history, cmd_history, - * active_slaves, reply_count & gtp_replies */ + * active_slaves, reply_count, final_reply_count & gtp_replies */ pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER; /* Condition signaled when a new gtp command is available. */ @@ -178,6 +185,46 @@ proxy_thread(void *arg) } } +/* Get a reply to one gtp command. If we get a temporary + * reply, put it in gtp_replies[reply_slot], notify the main + * thread, and continue reading until we get a final reply. + * Return the gtp command id, or -1 if error. + * slave_buf and reply must have at least CMDS_SIZE bytes. + * slave_lock is not held on either entry or exit of this function. */ +static int +get_reply(FILE *f, struct in_addr client, char *slave_buf, char *reply, int *reply_slot) +{ + int reply_id = -1; + *reply_slot = -1; + *reply = '\0'; + char *line = reply; + while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') { + if (DEBUGL(2)) + logline(&client, "<<", line); + if (reply_id < 0 && (*line == '=' || *line == '?') && isdigit(line[1])) + reply_id = atoi(line+1); + if (*line == '#') { + /* Temporary reply. */ + line = reply; + pthread_mutex_lock(&slave_lock); + if (reply_id != atoi(gtp_cmd)) { + pthread_mutex_unlock(&slave_lock); + continue; // read and discard the rest + } + strncpy(slave_buf, reply, CMDS_SIZE); + if (*reply_slot < 0) + *reply_slot = reply_count++; + gtp_replies[*reply_slot] = slave_buf; + pthread_cond_signal(&reply_cond); + pthread_mutex_unlock(&slave_lock); + } else { + line += strlen(line); + } + } + if (*line != '\n') return -1; + return reply_id; +} + /* Main loop of a slave thread. * Send the current command to the slave machine and wait for a reply. * Resend command history if the slave machine is out of sync. @@ -189,6 +236,7 @@ slave_loop(FILE *f, struct in_addr client, char *buf, bool resend) char *to_send = gtp_cmd; int cmd_id = -1; int reply_id = -1; + int reply_slot; for (;;) { while (cmd_id == reply_id && !resend) { // Wait for a new gtp command. @@ -221,24 +269,21 @@ slave_loop(FILE *f, struct in_addr client, char *buf, bool resend) /* Read the reply, which always ends with \n\n * The slave machine sends "=id reply" or "?id reply" * with id == cmd_id if it is in sync. */ - *buf = '\0'; - reply_id = -1; - char *line = buf; - while (fgets(line, buf + CMDS_SIZE - line, f) && *line != '\n') { - if (DEBUGL(2)) - logline(&client, "<<", line); - if (reply_id < 0 && (*line == '=' || *line == '?') && isdigit(line[1])) - reply_id = atoi(line+1); - line += strlen(line); - } + char reply[CMDS_SIZE]; + reply_id = get_reply(f, client, buf, reply, &reply_slot); pthread_mutex_lock(&slave_lock); - if (*line != '\n') return; + if (reply_id == -1) return; + // Make sure we are still in sync: cmd_id = atoi(gtp_cmd); - if (reply_id == cmd_id && *buf == '=') { + if (reply_id == cmd_id && *reply == '=') { resend = false; - gtp_replies[reply_count++] = buf; + strncpy(buf, reply, CMDS_SIZE); + final_reply_count++; + if (reply_slot < 0) + reply_slot = reply_count++; + gtp_replies[reply_slot] = buf; pthread_cond_signal(&reply_cond); continue; } @@ -333,7 +378,7 @@ update_cmd(struct board *b, char *cmd, char *args) gtp_id = id; snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s", id, cmd, *args ? args : "\n"); - reply_count = 0; + reply_count = final_reply_count = 0; /* Remember history for out-of-sync slaves, at most 3 ids per move * (time_left, genmoves, play). */ @@ -384,7 +429,7 @@ new_cmd(struct board *b, char *cmd, char *args) static void get_replies(double time_limit, int min_playouts, struct board *b) { - while (reply_count == 0 || reply_count < active_slaves) { + while (reply_count == 0 || final_reply_count < active_slaves) { if (time_limit && reply_count > 0) { struct timespec ts; double sec; @@ -395,7 +440,7 @@ get_replies(double time_limit, int min_playouts, struct board *b) pthread_cond_wait(&reply_cond, &slave_lock); } if (reply_count == 0) continue; - if (reply_count >= active_slaves) return; + if (final_reply_count >= active_slaves) return; if (time_limit) { if (time_now() >= time_limit) break; } else { @@ -408,11 +453,12 @@ get_replies(double time_limit, int min_playouts, struct board *b) if (DEBUGL(1)) { char buf[1024]; snprintf(buf, sizeof(buf), - "get_replies timeout %.3f >= %.3f, replies %d < active %d\n", - time_now() - start_time, time_limit - start_time, reply_count, active_slaves); + "get_replies timeout %.3f >= %.3f, final %d, temp %d, active %d\n", + time_now() - start_time, time_limit - start_time, + final_reply_count, reply_count, active_slaves); logline(NULL, "? ", buf); } - assert(reply_count > 0); + assert(reply_count > 0 && final_reply_count <= reply_count); } /* Maximum time (seconds) to wait for answers to fast gtp commands diff --git a/uct/internal.h b/uct/internal.h index a3d847b..524b6e6 100644 --- a/uct/internal.h +++ b/uct/internal.h @@ -50,6 +50,7 @@ struct uct { bool pondering; /* Actually pondering now */ bool slave; /* Act as slave in distributed engine. */ enum stone my_color; + int gtp_id; /* id of the last gtp command */ int fuseki_end; int yose_start; diff --git a/uct/uct.c b/uct/uct.c index a4c295e..bc7c4ba 100644 --- a/uct/uct.c +++ b/uct/uct.c @@ -33,7 +33,7 @@ struct uct_policy *policy_ucb1_init(struct uct *u, char *arg); struct uct_policy *policy_ucb1amaf_init(struct uct *u, char *arg); static void uct_pondering_stop(struct uct *u); static void uct_pondering_start(struct uct *u, struct board *b0, struct tree *t, enum stone color); - +static char *uct_getstats(struct uct *u, struct board *b, coord_t *c); /* Default number of simulations to perform per move. * Note that this is now in total over all threads! (Unless TM_ROOT.) */ @@ -58,6 +58,9 @@ static const struct time_info default_ti = { /* Once per how many simulations (per thread) to show a progress report line. */ #define TREE_SIMPROGRESS_INTERVAL 10000 +/* How often to send stats updates for the distributed engine (in seconds). */ +#define STATS_SEND_INTERVAL 0.5 + /* When terminating uct_search() early, the safety margin to add to the * remaining playout number estimate when deciding whether the result can * still change. */ @@ -157,6 +160,7 @@ uct_notify(struct engine *e, struct board *b, int id, char *cmd, char *args, cha *reply = buf; return P_DONE_ERROR; } + u->gtp_id = id; return reply_disabled(id) ? P_NOREPLY : P_OK; } @@ -603,6 +607,10 @@ uct_search(struct uct *u, struct board *b, struct time_info *ti, enum stone colo int print_interval = TREE_SIMPROGRESS_INTERVAL * (u->thread_model == TM_ROOT ? 1 : u->threads); /* Printed notification about full memory? */ bool print_fullmem = false; + /* Absolute time of last distributed stats update. */ + double last_stats_sent = time_now(); + /* Interval between distributed stats updates. */ + double stats_interval = STATS_SEND_INTERVAL; struct spawn_ctx *ctx = uct_search_start(u, b, color, t); @@ -669,10 +677,13 @@ uct_search(struct uct *u, struct board *b, struct time_info *ti, enum stone colo /* Check against time settings. */ bool desired_done = false; + double now = time_now(); if (ti->dim == TD_WALLTIME) { - double elapsed = time_now() - ti->len.t.timer_start; + double elapsed = now - ti->len.t.timer_start; if (elapsed > stop.worst.time) break; desired_done = elapsed > stop.desired.time; + if (stats_interval < 0.1 * stop.desired.time) + stats_interval = 0.1 * stop.desired.time; } else { assert(ti->dim == TD_GAMES); if (i > stop.worst.playouts) break; @@ -695,6 +706,14 @@ uct_search(struct uct *u, struct board *b, struct time_info *ti, enum stone colo /* TODO: Early break if best->variance goes under threshold and we already * have enough playouts (possibly thanks to book or to pondering)? */ + + /* Send new stats for the distributed engine. + * End with #\n (not \n\n) to indicate a temporary result. */ + if (u->slave && now - last_stats_sent > stats_interval) { + printf("=%d %s\n#\n", u->gtp_id, uct_getstats(u, b, NULL)); + fflush(stdout); + last_stats_sent = now; + } } ctx = uct_search_stop(); @@ -852,17 +871,16 @@ uct_genmove(struct engine *e, struct board *b, struct time_info *ti, enum stone return coord_copy(best->coord); } - +/* Get stats updates for the distributed engine. Return a buffer + * with one line "total_playouts threads" then a list of lines + * "coord playouts value". The last line must not end with \n. + * If c is not null, add this move with root->playouts weight. + * This function is called only by the main thread, but may be + * called while the tree is updated by the worker threads. + * Keep this code in sync with select_best_move(). */ static char * -uct_genmoves(struct engine *e, struct board *b, struct time_info *ti, enum stone color, bool pass_all_alive) +uct_getstats(struct uct *u, struct board *b, coord_t *c) { - struct uct *u = e->data; - assert(u->slave); - - coord_t *c = uct_genmove(e, b, ti, color, pass_all_alive); - - /* Return a buffer with one line "total_playouts threads" then a list of lines - * "coord playouts value". Keep this code in sync with select_best_move(). */ static char reply[10240]; char *r = reply; char *end = reply + sizeof(reply); @@ -871,11 +889,12 @@ uct_genmoves(struct engine *e, struct board *b, struct time_info *ti, enum stone int min_playouts = root->u.playouts / 100; // Give a large weight to pass or resign, but still allow other moves. - if (is_pass(*c) || is_resign(*c)) + if (c) r += snprintf(r, end - r, "\n%s %d %.1f", coord2sstr(*c, b), root->u.playouts, (float)is_pass(*c)); - coord_done(c); + /* We rely on the fact that root->children is set only + * after all children are created. */ for (struct tree_node *ni = root->children; ni; ni = ni->sibling) { if (ni->u.playouts <= min_playouts || ni->hints & TREE_HINT_INVALID @@ -888,6 +907,19 @@ uct_genmoves(struct engine *e, struct board *b, struct time_info *ti, enum stone return reply; } +static char * +uct_genmoves(struct engine *e, struct board *b, struct time_info *ti, enum stone color, bool pass_all_alive) +{ + struct uct *u = e->data; + assert(u->slave); + + coord_t *c = uct_genmove(e, b, ti, color, pass_all_alive); + + char *reply = uct_getstats(u, b, is_pass(*c) || is_resign(*c) ? c : NULL); + coord_done(c); + return reply; +} + bool uct_genbook(struct engine *e, struct board *b, struct time_info *ti, enum stone color) -- 2.11.4.GIT