Distributed engine: force a different id if requested
[pachi.git] / distributed / protocol.c
blobac7b813124afbc4d455e6512d362f7b8de7b7ef9
1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
6 #include <assert.h>
7 #include <stdio.h>
8 #include <pthread.h>
9 #include <ctype.h>
11 #define DEBUG
13 #include "random.h"
14 #include "timeinfo.h"
15 #include "playout.h"
16 #include "network.h"
17 #include "debug.h"
18 #include "distributed/distributed.h"
19 #include "distributed/protocol.h"
21 /* All gtp commands for current game separated by \n */
22 static char gtp_cmds[CMDS_SIZE];
24 /* Latest gtp command sent to slaves. */
25 static char *gtp_cmd = NULL;
27 /* Slaves send gtp_cmd when cmd_count changes. */
28 static int cmd_count = 0;
30 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
31 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
32 #define MAX_CMDS_PER_MOVE 10
34 /* History of gtp commands sent for current game, indexed by move. */
35 static struct cmd_history {
36 int gtp_id;
37 char *next_cmd;
38 } history[MAX_GAMELEN][MAX_CMDS_PER_MOVE];
40 /* Number of active slave machines working for this master. */
41 int active_slaves = 0;
43 /* Number of replies to last gtp command already received. */
44 int reply_count = 0;
46 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
47 char **gtp_replies;
49 /* All binary buffers received from all slaves in current move are in
50 * receive_queue[0..queue_length-1] */
51 static struct receive_buf {
52 volatile void *buf;
53 /* All buffers have the same physical size.
54 * size is the number of valid bytes. */
55 int size;
56 /* id of the thread that received the buffer. */
57 int thread_id;
58 } *receive_queue;
59 volatile static int queue_length = 0;
60 static int queue_max_length;
62 /* Mutex protecting all variables above. receive_queue may be
63 * read without the lock but is only written with lock held. */
64 static pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER;
66 /* Condition signaled when a new gtp command is available. */
67 static pthread_cond_t cmd_cond = PTHREAD_COND_INITIALIZER;
69 /* Condition signaled when reply_count increases. */
70 static pthread_cond_t reply_cond = PTHREAD_COND_INITIALIZER;
72 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
73 static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
75 /* Absolute time when this program was started.
76 * For debugging only. */
77 static double start_time;
79 /* Each slave thread maintains a ring of 32 buffers holding
80 * incremental stats received from the slave. The oldest
81 * buffer is recycled to hold stats sent to the slave and
82 * received the next reply. */
83 #define BUFFERS_PER_SLAVE_BITS 5
84 #define BUFFERS_PER_SLAVE (1 << BUFFERS_PER_SLAVE_BITS)
86 typedef void (*buffer_hook)(void *buf, int size);
88 struct slave_state {
89 struct {
90 void *buf;
91 int size;
92 /* Index in received_queue, -1 if not there. */
93 int queue_index;
94 } b[BUFFERS_PER_SLAVE];
95 int max_buf_size;
96 int newest_buf;
97 buffer_hook insert_hook;
99 int thread_id;
100 int slave_sock;
101 struct in_addr client; // for debugging only
103 /* Index in received_queue of most recent processed
104 * buffer, -1 if none processed yet. */
105 int last_processed;
106 /* Id of gtp command at time of last_processed. */
107 int last_cmd_id;
109 static struct slave_state default_sstate;
112 /* Get exclusive access to the threads and commands state. */
113 void
114 protocol_lock(void)
116 pthread_mutex_lock(&slave_lock);
119 /* Release exclusive access to the threads and commands state. */
120 void
121 protocol_unlock(void)
123 pthread_mutex_unlock(&slave_lock);
126 /* Write the time, client address, prefix, and string s to stderr atomically.
127 * s should end with a \n */
128 void
129 logline(struct in_addr *client, char *prefix, char *s)
131 double now = time_now();
133 char addr[INET_ADDRSTRLEN];
134 if (client) {
135 inet_ntop(AF_INET, client, addr, sizeof(addr));
136 } else {
137 addr[0] = '\0';
139 pthread_mutex_lock(&log_lock);
140 fprintf(stderr, "%s%15s %9.3f: %s", prefix, addr, now - start_time, s);
141 pthread_mutex_unlock(&log_lock);
144 /* Thread opening a connection on the given socket and copying input
145 * from there to stderr. */
146 static void *
147 proxy_thread(void *arg)
149 int proxy_sock = (long)arg;
150 assert(proxy_sock >= 0);
151 for (;;) {
152 struct in_addr client;
153 int conn = open_server_connection(proxy_sock, &client);
154 FILE *f = fdopen(conn, "r");
155 char buf[BSIZE];
156 while (fgets(buf, BSIZE, f)) {
157 logline(&client, "< ", buf);
159 fclose(f);
163 /* Get a reply to one gtp command. Return the gtp command id,
164 * or -1 if error. reply must have at least CMDS_SIZE bytes.
165 * The ascii reply ends with an empty line; if the first line
166 * contains "@size", a binary reply of size bytes follows the
167 * empty line. @size is not standard gtp, it is only used
168 * internally by Pachi for the genmoves command; it must be the
169 * last parameter on the line.
170 * *bin_size is the maximum size upon entry, actual size on return.
171 * slave_lock is not held on either entry or exit of this function. */
172 static int
173 get_reply(FILE *f, struct in_addr client, char *reply, void *bin_reply, int *bin_size)
175 int reply_id = -1;
176 *reply = '\0';
177 if (!fgets(reply, CMDS_SIZE, f)) return -1;
179 /* Check for binary reply. */
180 char *s = strchr(reply, '@');
181 int size = 0;
182 if (s) size = atoi(s+1);
183 assert(size <= *bin_size);
184 *bin_size = size;
186 if (DEBUGV(s, 2))
187 logline(&client, "<<", reply);
188 if ((*reply == '=' || *reply == '?') && isdigit(reply[1]))
189 reply_id = atoi(reply+1);
191 /* Read the rest of the ascii reply */
192 char *line = reply + strlen(reply);
193 while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') {
194 if (DEBUGL(3))
195 logline(&client, "<<", line);
196 line += strlen(line);
198 if (*line != '\n') return -1;
200 /* Read the binary reply if any. */
201 double start = time_now();
202 int len;
203 while (size && (len = fread(bin_reply, 1, size, f)) > 0) {
204 bin_reply = (char *)bin_reply + len;
205 size -= len;
207 if (*bin_size && DEBUGVV(7)) {
208 char buf[1024];
209 snprintf(buf, sizeof(buf), "read reply %d bytes in %.4fms\n", *bin_size,
210 (time_now() - start)*1000);
211 logline(&client, "= ", buf);
213 return size ? -1 : reply_id;
216 /* Send the gtp command to_send and get a reply from the slave machine.
217 * Write the reply in buf which must have at least CMDS_SIZE bytes.
218 * If *bin_size > 0, send bin_buf after the gtp command.
219 * Return any binary reply in bin_buf and set its size in bin_size.
220 * bin_buf is private to the slave and need not be copied.
221 * Return the gtp command id, or -1 if error.
222 * slave_lock is held on both entry and exit of this function. */
223 static int
224 send_command(char *to_send, void *bin_buf, int *bin_size,
225 FILE *f, struct slave_state *sstate, char *buf)
227 assert(to_send && gtp_cmd && bin_buf && bin_size);
228 strncpy(buf, to_send, CMDS_SIZE);
229 bool resend = to_send != gtp_cmd;
231 pthread_mutex_unlock(&slave_lock);
233 if (DEBUGL(1) && resend)
234 logline(&sstate->client, "? ",
235 to_send == gtp_cmds ? "resend all\n" : "partial resend\n");
236 fputs(buf, f);
238 double start = time_now();
239 if (*bin_size)
240 fwrite(bin_buf, 1, *bin_size, f);
241 fflush(f);
243 if (DEBUGV(strchr(buf, '@'), 2)) {
244 double ms = (time_now() - start) * 1000.0;
245 if (!DEBUGL(3)) {
246 char *s = strchr(buf, '\n');
247 if (s) s[1] = '\0';
249 logline(&sstate->client, ">>", buf);
250 if (*bin_size) {
251 char b[1024]; // ??? remove
252 snprintf(b, sizeof(b),
253 "sent args %d bytes in %.4fms\n", *bin_size, ms);
254 logline(&sstate->client, "= ", b);
258 /* Reuse the buffers for the reply. */
259 *bin_size = sstate->max_buf_size;
260 int reply_id = get_reply(f, sstate->client, buf, bin_buf, bin_size);
262 pthread_mutex_lock(&slave_lock);
263 return reply_id;
266 /* Return the command sent after that with the given gtp id,
267 * or gtp_cmds if the id wasn't used in this game. If a play command
268 * has overwritten a genmoves command, return the play command.
269 * slave_lock is held on both entry and exit of this function. */
270 static char *
271 next_command(int cmd_id)
273 if (cmd_id == -1) return gtp_cmds;
275 int last_id = atoi(gtp_cmd);
276 int reply_move = move_number(cmd_id);
277 if (reply_move > move_number(last_id)) return gtp_cmds;
279 int slot;
280 for (slot = 0; slot < MAX_CMDS_PER_MOVE; slot++) {
281 if (cmd_id == history[reply_move][slot].gtp_id) break;
283 if (slot == MAX_CMDS_PER_MOVE) return gtp_cmds;
285 char *next = history[reply_move][slot].next_cmd;
286 assert(next);
287 return next;
290 /* Allocate buffers for a slave thread. The state should have been
291 * initialized already as a copy of the default slave state.
292 * slave_lock is not held on either entry or exit of this function. */
293 static void
294 slave_state_alloc(struct slave_state *sstate)
296 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
297 sstate->b[n].buf = malloc2(sstate->max_buf_size);
301 /* Get a free binary buffer, first invalidating it in the receive
302 * queue if necessary. In practice all buffers should be used
303 * before they are invalidated, if BUFFERS_PER_SLAVE is large enough.
304 * slave_lock is held on both entry and exit of this function. */
305 static void *
306 get_free_buf(struct slave_state *sstate, bool new_id)
308 int newest = (sstate->newest_buf + 1) & (BUFFERS_PER_SLAVE - 1);
309 sstate->newest_buf = newest;
310 void *buf = sstate->b[newest].buf;
312 if (DEBUGVV(7)) {
313 char b[1024];
314 snprintf(b, sizeof(b), "get free %d index %d buf=%p qlength %d\n",
315 newest, sstate->b[newest].queue_index, buf, queue_length);
316 logline(&sstate->client, "? ", b);
319 /* For a new command, previous indices in receive_queue
320 * are now meaningless. In particular they may be
321 * beyond the current queue_length. */
322 if (new_id) {
323 sstate->last_processed = -1;
324 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
325 sstate->b[n].queue_index = -1;
327 return buf;
330 int index = sstate->b[newest].queue_index;
331 if (index >= 0) {
332 assert(receive_queue[index].thread_id == sstate->thread_id);
333 assert(receive_queue[index].buf == buf);
334 /* Invalidate the buffer. */
335 receive_queue[index].buf = NULL;
336 sstate->b[newest].queue_index = -1;
338 return buf;
341 /* Insert a buffer in the receive queue. It should be the most
342 * recent buffer allocated by the calling thread.
343 * slave_lock is held on both entry and exit of this function. */
344 static void
345 insert_buf(struct slave_state *sstate, void *buf, int size)
347 assert(queue_length < queue_max_length);
349 int newest = sstate->newest_buf;
350 assert(buf == sstate->b[newest].buf);
352 /* Update the buffer if necessary before making it
353 * available to other threads. */
354 if (sstate->insert_hook) sstate->insert_hook(buf, size);
356 if (DEBUGVV(7)) {
357 char b[1024];
358 snprintf(b, sizeof(b), "insert newest %d rq[%d]->%p\n",
359 newest, queue_length, buf);
360 logline(&sstate->client, "? ", b);
362 receive_queue[queue_length].buf = buf;
363 receive_queue[queue_length].size = size;
364 receive_queue[queue_length].thread_id = sstate->thread_id;
365 sstate->b[newest].queue_index = queue_length;
366 queue_length++;
369 /* Process the reply received from a slave machine.
370 * Copy the ascii part to reply_buf and insert the binary part
371 * (if any) in the receive queue.
372 * Return false if ok, true if the slave is out of sync.
373 * slave_lock is held on both entry and exit of this function. */
374 static bool
375 process_reply(int reply_id, char *reply, char *reply_buf,
376 void *bin_reply, int bin_size, int *last_reply_id,
377 int *reply_slot, struct slave_state *sstate)
379 /* Resend everything if slave returned an error. */
380 if (*reply != '=') {
381 *last_reply_id = -1;
382 return true;
384 /* Make sure we are still in sync. cmd_count may have
385 * changed but the reply is valid as long as cmd_id didn't
386 * change (this only occurs for consecutive genmoves). */
387 int cmd_id = atoi(gtp_cmd);
388 if (reply_id != cmd_id) {
389 *last_reply_id = reply_id;
390 return true;
393 strncpy(reply_buf, reply, CMDS_SIZE);
394 if (reply_id != *last_reply_id)
395 *reply_slot = reply_count++;
396 gtp_replies[*reply_slot] = reply_buf;
398 if (bin_size) insert_buf(sstate, bin_reply, bin_size);
400 pthread_cond_signal(&reply_cond);
401 *last_reply_id = reply_id;
402 return false;
405 /* Get the binary arg for the given command, and update the command
406 * if necessary. For now, only genmoves has a binary argument, and
407 * we return the best stats increments from all other slaves.
408 * Set *bin_size to 0 if the command doesn't take binary arguments,
409 * but still return a buffer, to be used for the reply.
410 * Return NULL if the binary arg is obsolete by the time we have
411 * finished computing it, because a new command is available.
412 * This version only gets the buffer for the reply, to be completed
413 * in future commits.
414 * slave_lock is held on both entry and exit of this function. */
415 void *
416 get_binary_arg(struct slave_state *sstate, char *cmd, int cmd_size, int *bin_size)
418 int cmd_id = atoi(gtp_cmd);
419 void *buf = get_free_buf(sstate, cmd_id != sstate->last_cmd_id);
420 sstate->last_cmd_id = cmd_id;
422 *bin_size = 0;
423 return buf;
426 /* Main loop of a slave thread.
427 * Send the current command to the slave machine and wait for a reply.
428 * Resend command history if the slave machine is out of sync.
429 * Returns when the connection with the slave machine is cut.
430 * slave_lock is held on both entry and exit of this function. */
431 static void
432 slave_loop(FILE *f, char *reply_buf, struct slave_state *sstate, bool resend)
434 char *to_send;
435 int last_cmd_count = 0;
436 int last_reply_id = -1;
437 int reply_slot = -1;
438 for (;;) {
439 if (resend) {
440 /* Resend complete or partial history */
441 to_send = next_command(last_reply_id);
442 } else {
443 /* Wait for a new command. */
444 while (last_cmd_count == cmd_count)
445 pthread_cond_wait(&cmd_cond, &slave_lock);
446 to_send = gtp_cmd;
449 /* Command available, send it to slave machine.
450 * If slave was out of sync, send the history.
451 * But first get binary arguments if necessary. */
452 int bin_size = 0;
453 void *bin_buf = get_binary_arg(sstate, gtp_cmd,
454 gtp_cmds + CMDS_SIZE - gtp_cmd,
455 &bin_size);
456 /* Check that the command is still valid. */
457 resend = true;
458 if (!bin_buf) continue;
460 /* Send the command and get the reply, which always ends with \n\n
461 * The slave machine sends "=id reply" or "?id reply"
462 * with id == cmd_id if it is in sync. */
463 last_cmd_count = cmd_count;
464 char buf[CMDS_SIZE];
465 int reply_id = send_command(to_send, bin_buf, &bin_size, f,
466 sstate, buf);
467 if (reply_id == -1) return;
469 resend = process_reply(reply_id, buf, reply_buf, bin_buf, bin_size,
470 &last_reply_id, &reply_slot, sstate);
474 /* Thread sending gtp commands to one slave machine, and
475 * reading replies. If a slave machine dies, this thread waits
476 * for a connection from another slave.
477 * The large buffers are allocated only once we get a first
478 * connection, to avoid wasting memory if max_slaves is too large.
479 * We do not invalidate the received buffers if a slave disconnects;
480 * they are still useful for other slaves. */
481 static void *
482 slave_thread(void *arg)
484 struct slave_state sstate = default_sstate;
485 sstate.thread_id = (long)arg;
487 assert(sstate.slave_sock >= 0);
488 char reply_buf[CMDS_SIZE];
489 bool resend = false;
491 for (;;) {
492 /* Wait for a connection from any slave. */
493 struct in_addr client;
494 int conn = open_server_connection(sstate.slave_sock, &client);
496 FILE *f = fdopen(conn, "r+");
497 if (DEBUGL(2)) {
498 snprintf(reply_buf, sizeof(reply_buf),
499 "new slave, id %d\n", sstate.thread_id);
500 logline(&client, "= ", reply_buf);
503 /* Minimal check of the slave identity. */
504 fputs("name\n", f);
505 if (!fgets(reply_buf, sizeof(reply_buf), f)
506 || strncasecmp(reply_buf, "= Pachi", 7)
507 || !fgets(reply_buf, sizeof(reply_buf), f)
508 || strcmp(reply_buf, "\n")) {
509 logline(&client, "? ", "bad slave\n");
510 fclose(f);
511 continue;
514 if (!resend) slave_state_alloc(&sstate);
515 sstate.client = client;
517 pthread_mutex_lock(&slave_lock);
518 active_slaves++;
519 slave_loop(f, reply_buf, &sstate, resend);
521 assert(active_slaves > 0);
522 active_slaves--;
523 // Unblock main thread if it was waiting for this slave.
524 pthread_cond_signal(&reply_cond);
525 pthread_mutex_unlock(&slave_lock);
527 resend = true;
528 if (DEBUGL(2))
529 logline(&client, "= ", "lost slave\n");
530 fclose(f);
534 /* Create a new gtp command for all slaves. The slave lock is held
535 * upon entry and upon return, so the command will actually be
536 * sent when the lock is released. The last command is overwritten
537 * if gtp_cmd points to a non-empty string. cmd is a single word;
538 * args has all arguments and is empty or has a trailing \n */
539 void
540 update_cmd(struct board *b, char *cmd, char *args, bool new_id)
542 assert(gtp_cmd);
543 /* To make sure the slaves are in sync, we ignore the original id
544 * and use the board number plus some random bits as gtp id. */
545 static int gtp_id = -1;
546 int moves = is_reset(cmd) ? 0 : b->moves;
547 if (new_id) {
548 int prev_id = gtp_id;
549 do {
550 /* fast_random() is 16-bit only so the multiplication can't overflow. */
551 gtp_id = force_reply(moves + fast_random(65535) * DIST_GAMELEN);
552 } while (gtp_id == prev_id);
553 reply_count = 0;
555 snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s",
556 gtp_id, cmd, *args ? args : "\n");
557 cmd_count++;
559 /* Remember history for out-of-sync slaves. */
560 static int slot = 0;
561 static struct cmd_history *last = NULL;
562 if (new_id) {
563 if (last) last->next_cmd = gtp_cmd;
564 slot = (slot + 1) % MAX_CMDS_PER_MOVE;
565 last = &history[moves][slot];
566 last->gtp_id = gtp_id;
567 last->next_cmd = NULL;
569 // Notify the slave threads about the new command.
570 pthread_cond_broadcast(&cmd_cond);
573 /* Update the command history, then create a new gtp command
574 * for all slaves. The slave lock is held upon entry and
575 * upon return, so the command will actually be sent when the
576 * lock is released. cmd is a single word; args has all
577 * arguments and is empty or has a trailing \n */
578 void
579 new_cmd(struct board *b, char *cmd, char *args)
581 // Clear the history when a new game starts:
582 if (!gtp_cmd || is_gamestart(cmd)) {
583 gtp_cmd = gtp_cmds;
584 memset(history, 0, sizeof(history));
585 } else {
586 /* Preserve command history for new slaves.
587 * To indicate that the slave should only reply to
588 * the last command we force the id of previous
589 * commands to be just the move number. */
590 int id = prevent_reply(atoi(gtp_cmd));
591 int len = strspn(gtp_cmd, "0123456789");
592 char buf[32];
593 snprintf(buf, sizeof(buf), "%0*d", len, id);
594 memcpy(gtp_cmd, buf, len);
596 gtp_cmd += strlen(gtp_cmd);
599 // Let the slave threads send the new gtp command:
600 update_cmd(b, cmd, args, true);
603 /* Wait for at least one new reply. Return when at least
604 * min_replies slaves have already replied, or when the
605 * given absolute time is passed.
606 * The replies are returned in gtp_replies[0..reply_count-1]
607 * slave_lock is held on entry and on return. */
608 void
609 get_replies(double time_limit, int min_replies)
611 for (;;) {
612 if (reply_count > 0) {
613 struct timespec ts;
614 double sec;
615 ts.tv_nsec = (int)(modf(time_limit, &sec)*1000000000.0);
616 ts.tv_sec = (int)sec;
617 pthread_cond_timedwait(&reply_cond, &slave_lock, &ts);
618 } else {
619 pthread_cond_wait(&reply_cond, &slave_lock);
621 if (reply_count == 0) continue;
622 if (reply_count >= min_replies || reply_count >= active_slaves) return;
623 if (time_now() >= time_limit) break;
625 if (DEBUGL(1)) {
626 char buf[1024];
627 snprintf(buf, sizeof(buf),
628 "get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
629 time_now() - start_time, time_limit - start_time,
630 reply_count, min_replies, active_slaves);
631 logline(NULL, "? ", buf);
633 assert(reply_count > 0);
636 /* Create the slave and proxy threads. */
637 void
638 protocol_init(char *slave_port, char *proxy_port, int max_slaves)
640 start_time = time_now();
642 int slave_sock = port_listen(slave_port, max_slaves);
643 pthread_t thread;
644 for (int id = 0; id < max_slaves; id++) {
645 pthread_create(&thread, NULL, slave_thread, (void *)(long)slave_sock);
648 if (proxy_port) {
649 int proxy_sock = port_listen(proxy_port, max_slaves);
650 for (int id = 0; id < max_slaves; id++) {
651 pthread_create(&thread, NULL, proxy_thread, (void *)(long)proxy_sock);