From 4b33b9e229b2a328edc37d94176abb85cc7be019 Mon Sep 17 00:00:00 2001 From: Jean-loup Gailly Date: Sun, 9 May 2010 15:53:39 +0200 Subject: [PATCH] Distributed engine: Defined shared_nodes, initialize default slave state --- distributed/distributed.c | 11 +++++++++-- distributed/distributed.h | 7 +++++++ distributed/protocol.c | 22 ++++++++++++++++++---- distributed/protocol.h | 2 +- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/distributed/distributed.c b/distributed/distributed.c index 24c6598..cd0110e 100644 --- a/distributed/distributed.c +++ b/distributed/distributed.c @@ -46,6 +46,7 @@ * Supported arguments: * slave_port=SLAVE_PORT slaves connect to this port; this parameter is mandatory. * max_slaves=MAX_SLAVES default 24 + * shared_nodes=SHARED_NODES default 10K * slaves_quit=0|1 quit gtp command also sent to slaves, default false. * proxy_port=PROXY_PORT slaves optionally send their logs to this port. * Warning: with proxy_port, the master stderr mixes the logs of all @@ -92,6 +93,7 @@ struct distributed { char *slave_port; char *proxy_port; int max_slaves; + int shared_nodes; bool slaves_quit; struct move my_last_move; struct move_stats my_last_stats; @@ -439,7 +441,8 @@ distributed_state_init(char *arg, struct board *b) { struct distributed *dist = calloc2(1, sizeof(struct distributed)); - dist->max_slaves = 100; + dist->max_slaves = DEFAULT_MAX_SLAVES; + dist->shared_nodes = DEFAULT_SHARED_NODES; if (arg) { char *optspec, *next = arg; while (*next) { @@ -457,6 +460,10 @@ distributed_state_init(char *arg, struct board *b) dist->proxy_port = strdup(optval); } else if (!strcasecmp(optname, "max_slaves") && optval) { dist->max_slaves = atoi(optval); + } else if (!strcasecmp(optname, "shared_nodes") && optval) { + /* Share at most shared_nodes between master and slave at each genmoves. + * Must use the same value in master and slaves. */ + dist->shared_nodes = atoi(optval); } else if (!strcasecmp(optname, "slaves_quit")) { dist->slaves_quit = !optval || atoi(optval); } else { @@ -471,7 +478,7 @@ distributed_state_init(char *arg, struct board *b) fprintf(stderr, "distributed: missing slave_port\n"); exit(1); } - protocol_init(dist->slave_port, dist->proxy_port, dist->max_slaves); + protocol_init(dist->slave_port, dist->proxy_port, dist->max_slaves, dist->shared_nodes); return dist; } diff --git a/distributed/distributed.h b/distributed/distributed.h index d672798..912ebd7 100644 --- a/distributed/distributed.h +++ b/distributed/distributed.h @@ -80,6 +80,13 @@ struct incr_stats { * time to clear them at each move in the master. */ #define DEFAULT_STATS_HBITS 21 +/* If we select a cycle of at most 40ms, a slave machine can update at + * most 10K different nodes per cycle. In practice the updates + * are biased so we update fewer nodes. As shorter cyle is preferable + * because the stats are more fresh. The cycle time does not affect + * the number of slaves and the hash table size. */ +#define DEFAULT_SHARED_NODES (10*1024) + #define DIST_GAMELEN 1000 diff --git a/distributed/protocol.c b/distributed/protocol.c index ac7b813..99756ec 100644 --- a/distributed/protocol.c +++ b/distributed/protocol.c @@ -633,16 +633,30 @@ get_replies(double time_limit, int min_replies) assert(reply_count > 0); } -/* Create the slave and proxy threads. */ +/* In a 30s move with at least 5ms per genmoves we get at most + * 6000 genmoves per slave. */ +#define MAX_GENMOVES_PER_SLAVE 6000 + +/* Allocate the receive queue, and create the slave and proxy threads. */ void -protocol_init(char *slave_port, char *proxy_port, int max_slaves) +protocol_init(char *slave_port, char *proxy_port, int max_slaves, int shared_nodes) { start_time = time_now(); - int slave_sock = port_listen(slave_port, max_slaves); + queue_max_length = max_slaves * MAX_GENMOVES_PER_SLAVE; + receive_queue = calloc2(queue_max_length, sizeof(*receive_queue)); + + default_sstate.slave_sock = port_listen(slave_port, max_slaves); + default_sstate.max_buf_size = shared_nodes * sizeof(struct incr_stats); + default_sstate.last_processed = -1; + + for (int n = 0; n < BUFFERS_PER_SLAVE; n++) { + default_sstate.b[n].queue_index = -1; + } + pthread_t thread; for (int id = 0; id < max_slaves; id++) { - pthread_create(&thread, NULL, slave_thread, (void *)(long)slave_sock); + pthread_create(&thread, NULL, slave_thread, (void *)(long)id); } if (proxy_port) { diff --git a/distributed/protocol.h b/distributed/protocol.h index 8e16a0a..8f4f7f8 100644 --- a/distributed/protocol.h +++ b/distributed/protocol.h @@ -14,7 +14,7 @@ void logline(struct in_addr *client, char *prefix, char *s); void update_cmd(struct board *b, char *cmd, char *args, bool new_id); void new_cmd(struct board *b, char *cmd, char *args); void get_replies(double time_limit, int min_replies); -void protocol_init(char *slave_port, char *proxy_port, int max_slaves); +void protocol_init(char *slave_port, char *proxy_port, int max_slaves, int shared_nodes); extern int reply_count; extern char **gtp_replies; -- 2.11.4.GIT