Move implementation of the master-slave protocol to procotol.c.
[pachi/pachi-r6144.git] / distributed / protocol.c
blob87d33edc35062116fb38294cdce151064b801884
1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
6 #include <assert.h>
7 #include <stdio.h>
8 #include <pthread.h>
9 #include <ctype.h>
11 #define DEBUG
13 #include "random.h"
14 #include "timeinfo.h"
15 #include "playout.h"
16 #include "network.h"
17 #include "debug.h"
18 #include "distributed/distributed.h"
19 #include "distributed/protocol.h"
21 /* All gtp commands for current game separated by \n */
22 static char gtp_cmds[CMDS_SIZE];
24 /* Latest gtp command sent to slaves. */
25 static char *gtp_cmd = NULL;
27 /* Slaves send gtp_cmd when cmd_count changes. */
28 static int cmd_count = 0;
30 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
31 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
32 #define MAX_CMDS_PER_MOVE 10
34 /* History of gtp commands sent for current game, indexed by move. */
35 struct cmd_history {
36 int gtp_id;
37 char *next_cmd;
38 } history[MAX_GAMELEN][MAX_CMDS_PER_MOVE];
40 /* Number of active slave machines working for this master. */
41 int active_slaves = 0;
43 /* Number of replies to last gtp command already received. */
44 int reply_count = 0;
46 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
47 char **gtp_replies;
49 /* Mutex protecting gtp_cmds, gtp_cmd, history,
50 * cmd_count, active_slaves, reply_count & gtp_replies */
51 static pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER;
53 /* Condition signaled when a new gtp command is available. */
54 static pthread_cond_t cmd_cond = PTHREAD_COND_INITIALIZER;
56 /* Condition signaled when reply_count increases. */
57 static pthread_cond_t reply_cond = PTHREAD_COND_INITIALIZER;
59 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
60 static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
62 /* Absolute time when this program was started.
63 * For debugging only. */
64 static double start_time;
66 /* Get exclusive access to the threads and commands state. */
67 void
68 protocol_lock(void)
70 pthread_mutex_lock(&slave_lock);
73 /* Release exclusive access to the threads and commands state. */
74 void
75 protocol_unlock(void)
77 pthread_mutex_unlock(&slave_lock);
80 /* Write the time, client address, prefix, and string s to stderr atomically.
81 * s should end with a \n */
82 void
83 logline(struct in_addr *client, char *prefix, char *s)
85 double now = time_now();
87 char addr[INET_ADDRSTRLEN];
88 if (client) {
89 inet_ntop(AF_INET, client, addr, sizeof(addr));
90 } else {
91 addr[0] = '\0';
93 pthread_mutex_lock(&log_lock);
94 fprintf(stderr, "%s%15s %9.3f: %s", prefix, addr, now - start_time, s);
95 pthread_mutex_unlock(&log_lock);
98 /* Thread opening a connection on the given socket and copying input
99 * from there to stderr. */
100 static void *
101 proxy_thread(void *arg)
103 int proxy_sock = (long)arg;
104 assert(proxy_sock >= 0);
105 for (;;) {
106 struct in_addr client;
107 int conn = open_server_connection(proxy_sock, &client);
108 FILE *f = fdopen(conn, "r");
109 char buf[BSIZE];
110 while (fgets(buf, BSIZE, f)) {
111 logline(&client, "< ", buf);
113 fclose(f);
117 /* Get a reply to one gtp command. Return the gtp command id,
118 * or -1 if error. reply must have at least CMDS_SIZE bytes.
119 * slave_lock is not held on either entry or exit of this function. */
120 static int
121 get_reply(FILE *f, struct in_addr client, char *reply)
123 int reply_id = -1;
124 *reply = '\0';
125 char *line = reply;
126 while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') {
127 if (DEBUGL(3) || (DEBUGL(2) && line == reply))
128 logline(&client, "<<", line);
129 if (reply_id < 0 && (*line == '=' || *line == '?') && isdigit(line[1]))
130 reply_id = atoi(line+1);
131 line += strlen(line);
133 if (*line != '\n') return -1;
134 return reply_id;
137 /* Send one gtp command and get a reply from the slave machine.
138 * Write the reply in buf which must have at least CMDS_SIZE bytes.
139 * Return the gtp command id, or -1 if error.
140 * slave_lock is held on both entry and exit of this function. */
141 static int
142 send_command(char *to_send, FILE *f, struct in_addr client, char *buf)
144 assert(to_send && gtp_cmd);
145 strncpy(buf, to_send, CMDS_SIZE);
146 bool resend = to_send != gtp_cmd;
148 pthread_mutex_unlock(&slave_lock);
150 if (DEBUGL(1) && resend)
151 logline(&client, "? ",
152 to_send == gtp_cmds ? "resend all\n" : "partial resend\n");
153 fputs(buf, f);
154 fflush(f);
155 if (DEBUGL(2)) {
156 if (!DEBUGL(3)) {
157 char *s = strchr(buf, '\n');
158 if (s) s[1] = '\0';
160 logline(&client, ">>", buf);
163 int reply_id = get_reply(f, client, buf);
165 pthread_mutex_lock(&slave_lock);
166 return reply_id;
169 /* Return the command sent after that with the given gtp id,
170 * or gtp_cmds if the id wasn't used in this game. If a play command
171 * has overwritten a genmoves command, return the play command.
172 * slave_lock is held on both entry and exit of this function. */
173 static char *
174 next_command(int cmd_id)
176 if (cmd_id == -1) return gtp_cmds;
178 int last_id = atoi(gtp_cmd);
179 int reply_move = move_number(cmd_id);
180 if (reply_move > move_number(last_id)) return gtp_cmds;
182 int slot;
183 for (slot = 0; slot < MAX_CMDS_PER_MOVE; slot++) {
184 if (cmd_id == history[reply_move][slot].gtp_id) break;
186 if (slot == MAX_CMDS_PER_MOVE) return gtp_cmds;
188 char *next = history[reply_move][slot].next_cmd;
189 assert(next);
190 return next;
193 /* Process the reply received from a slave machine.
194 * Copy it to reply_buf and return false if ok, or return
195 * true if the slave is out of sync.
196 * slave_lock is held on both entry and exit of this function. */
197 static bool
198 process_reply(int reply_id, char *reply, char *reply_buf,
199 int *last_reply_id, int *reply_slot)
201 bool resend = true;
202 /* For resend everything if slave returned an error. */
203 if (*reply != '=') {
204 *last_reply_id = -1;
205 return resend;
207 /* Make sure we are still in sync. cmd_count may have
208 * changed but the reply is valid as long as cmd_id didn't
209 * change (this only occurs for consecutive genmoves). */
210 int cmd_id = atoi(gtp_cmd);
211 if (reply_id == cmd_id) {
212 strncpy(reply_buf, reply, CMDS_SIZE);
213 if (reply_id != *last_reply_id)
214 *reply_slot = reply_count++;
215 gtp_replies[*reply_slot] = reply_buf;
217 pthread_cond_signal(&reply_cond);
218 resend = false;
220 *last_reply_id = reply_id;
221 return resend;
224 /* Main loop of a slave thread.
225 * Send the current command to the slave machine and wait for a reply.
226 * Resend command history if the slave machine is out of sync.
227 * Returns when the connection with the slave machine is cut.
228 * slave_lock is held on both entry and exit of this function. */
229 static void
230 slave_loop(FILE *f, struct in_addr client, char *reply_buf, bool resend)
232 char *to_send;
233 int last_cmd_sent = 0;
234 int last_reply_id = -1;
235 int reply_slot = -1;
236 for (;;) {
237 if (resend) {
238 /* Resend complete or partial history */
239 to_send = next_command(last_reply_id);
240 } else {
241 /* Wait for a new command. */
242 while (last_cmd_sent == cmd_count)
243 pthread_cond_wait(&cmd_cond, &slave_lock);
244 to_send = gtp_cmd;
247 /* Command available, send it to slave machine.
248 * If slave was out of sync, send the history. */
249 char buf[CMDS_SIZE];
250 last_cmd_sent = cmd_count;
252 /* Send the command and get the reply, which always ends with \n\n
253 * The slave machine sends "=id reply" or "?id reply"
254 * with id == cmd_id if it is in sync. */
255 int reply_id = send_command(to_send, f, client, buf);
256 if (reply_id == -1) return;
258 resend = process_reply(reply_id, buf, reply_buf,
259 &last_reply_id, &reply_slot);
260 if (!resend)
261 /* Good reply. Force waiting for a new command.
262 * The next genmoves stats we send must include those
263 * just received (this is assumed by the slave). */
264 last_cmd_sent = cmd_count;
268 /* Thread sending gtp commands to one slave machine, and
269 * reading replies. If a slave machine dies, this thread waits
270 * for a connection from another slave. */
271 static void *
272 slave_thread(void *arg)
274 int slave_sock = (long)arg;
275 assert(slave_sock >= 0);
276 char reply_buf[CMDS_SIZE];
277 bool resend = false;
279 for (;;) {
280 /* Wait for a connection from any slave. */
281 struct in_addr client;
282 int conn = open_server_connection(slave_sock, &client);
284 FILE *f = fdopen(conn, "r+");
285 if (DEBUGL(2))
286 logline(&client, "= ", "new slave\n");
288 /* Minimal check of the slave identity. */
289 fputs("name\n", f);
290 if (!fgets(reply_buf, sizeof(reply_buf), f)
291 || strncasecmp(reply_buf, "= Pachi", 7)
292 || !fgets(reply_buf, sizeof(reply_buf), f)
293 || strcmp(reply_buf, "\n")) {
294 logline(&client, "? ", "bad slave\n");
295 fclose(f);
296 continue;
299 pthread_mutex_lock(&slave_lock);
300 active_slaves++;
301 slave_loop(f, client, reply_buf, resend);
303 assert(active_slaves > 0);
304 active_slaves--;
305 // Unblock main thread if it was waiting for this slave.
306 pthread_cond_signal(&reply_cond);
307 pthread_mutex_unlock(&slave_lock);
309 resend = true;
310 if (DEBUGL(2))
311 logline(&client, "= ", "lost slave\n");
312 fclose(f);
316 /* Create a new gtp command for all slaves. The slave lock is held
317 * upon entry and upon return, so the command will actually be
318 * sent when the lock is released. The last command is overwritten
319 * if gtp_cmd points to a non-empty string. cmd is a single word;
320 * args has all arguments and is empty or has a trailing \n */
321 void
322 update_cmd(struct board *b, char *cmd, char *args, bool new_id)
324 assert(gtp_cmd);
325 /* To make sure the slaves are in sync, we ignore the original id
326 * and use the board number plus some random bits as gtp id. */
327 static int gtp_id = -1;
328 int moves = is_reset(cmd) ? 0 : b->moves;
329 if (new_id) {
330 /* fast_random() is 16-bit only so the multiplication can't overflow. */
331 gtp_id = force_reply(moves + fast_random(65535) * DIST_GAMELEN);
332 reply_count = 0;
334 snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s",
335 gtp_id, cmd, *args ? args : "\n");
336 cmd_count++;
338 /* Remember history for out-of-sync slaves. */
339 static int slot = 0;
340 static struct cmd_history *last = NULL;
341 if (new_id) {
342 if (last) last->next_cmd = gtp_cmd;
343 slot = (slot + 1) % MAX_CMDS_PER_MOVE;
344 last = &history[moves][slot];
345 last->gtp_id = gtp_id;
346 last->next_cmd = NULL;
348 // Notify the slave threads about the new command.
349 pthread_cond_broadcast(&cmd_cond);
352 /* Update the command history, then create a new gtp command
353 * for all slaves. The slave lock is held upon entry and
354 * upon return, so the command will actually be sent when the
355 * lock is released. cmd is a single word; args has all
356 * arguments and is empty or has a trailing \n */
357 void
358 new_cmd(struct board *b, char *cmd, char *args)
360 // Clear the history when a new game starts:
361 if (!gtp_cmd || is_gamestart(cmd)) {
362 gtp_cmd = gtp_cmds;
363 memset(history, 0, sizeof(history));
364 } else {
365 /* Preserve command history for new slaves.
366 * To indicate that the slave should only reply to
367 * the last command we force the id of previous
368 * commands to be just the move number. */
369 int id = prevent_reply(atoi(gtp_cmd));
370 int len = strspn(gtp_cmd, "0123456789");
371 char buf[32];
372 snprintf(buf, sizeof(buf), "%0*d", len, id);
373 memcpy(gtp_cmd, buf, len);
375 gtp_cmd += strlen(gtp_cmd);
378 // Let the slave threads send the new gtp command:
379 update_cmd(b, cmd, args, true);
382 /* Wait for at least one new reply. Return when all slaves have
383 * replied, or when the given absolute time is passed.
384 * The replies are returned in gtp_replies[0..reply_count-1]
385 * slave_lock is held on entry and on return. */
386 void
387 get_replies(double time_limit)
389 for (;;) {
390 if (reply_count > 0) {
391 struct timespec ts;
392 double sec;
393 ts.tv_nsec = (int)(modf(time_limit, &sec)*1000000000.0);
394 ts.tv_sec = (int)sec;
395 pthread_cond_timedwait(&reply_cond, &slave_lock, &ts);
396 } else {
397 pthread_cond_wait(&reply_cond, &slave_lock);
399 if (reply_count == 0) continue;
400 if (reply_count >= active_slaves) return;
401 if (time_now() >= time_limit) break;
403 if (DEBUGL(1)) {
404 char buf[1024];
405 snprintf(buf, sizeof(buf),
406 "get_replies timeout %.3f >= %.3f, replies %d < active %d\n",
407 time_now() - start_time, time_limit - start_time,
408 reply_count, active_slaves);
409 logline(NULL, "? ", buf);
411 assert(reply_count > 0);
414 /* Create the slave and proxy threads. */
415 void
416 protocol_init(char *slave_port, char *proxy_port, int max_slaves)
418 start_time = time_now();
420 int slave_sock = port_listen(slave_port, max_slaves);
421 pthread_t thread;
422 for (int id = 0; id < max_slaves; id++) {
423 pthread_create(&thread, NULL, slave_thread, (void *)(long)slave_sock);
426 if (proxy_port) {
427 int proxy_sock = port_listen(proxy_port, max_slaves);
428 for (int id = 0; id < max_slaves; id++) {
429 pthread_create(&thread, NULL, proxy_thread, (void *)(long)proxy_sock);