Merge pull request #50 from lemonsqueeze/can_countercap
[pachi.git] / distributed / protocol.c
blob3c80b8308f84a66128f391507deb0f07081150d1
1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
6 /* The receive queue is an array of pointers to binary buffers.
7 * These pointers are invalidated in one of two ways when a buffer
8 * is recycled: (1) the queue age is increased when the queue is
9 * emptied at a new move, (2) the pointer itself is set to NULL
10 * immmediately, and stays so until at least the next queue age
11 * increment. */
13 #include <assert.h>
14 #include <stdio.h>
15 #include <pthread.h>
16 #include <ctype.h>
17 #include <unistd.h>
19 #define DEBUG
21 #include "random.h"
22 #include "timeinfo.h"
23 #include "playout.h"
24 #include "network.h"
25 #include "debug.h"
26 #include "distributed/distributed.h"
27 #include "distributed/protocol.h"
29 /* All gtp commands for current game separated by \n */
30 static char gtp_cmds[CMDS_SIZE];
32 /* Latest gtp command sent to slaves. */
33 static char *gtp_cmd = NULL;
35 /* Slaves send gtp_cmd when cmd_count changes. */
36 static int cmd_count = 0;
38 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
39 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
40 #define MAX_CMDS_PER_MOVE 10
42 /* History of gtp commands sent for current game, indexed by move. */
43 static struct cmd_history {
44 int gtp_id;
45 char *next_cmd;
46 } history[MAX_GAMELEN][MAX_CMDS_PER_MOVE];
48 /* Number of active slave machines working for this master. */
49 int active_slaves = 0;
51 /* Number of replies to last gtp command already received. */
52 int reply_count = 0;
54 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
55 char **gtp_replies;
58 struct buf_state **receive_queue;
59 int queue_length = 0;
60 int queue_age = 0;
61 static int queue_max_length;
63 /* Mutex protecting all variables above. receive_queue may be
64 * read without the lock but is only written with lock held. */
65 static pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER;
67 /* Condition signaled when a new gtp command is available. */
68 static pthread_cond_t cmd_cond = PTHREAD_COND_INITIALIZER;
70 /* Condition signaled when reply_count increases. */
71 static pthread_cond_t reply_cond = PTHREAD_COND_INITIALIZER;
73 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
74 static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
76 /* Absolute time when this program was started.
77 * For debugging only. */
78 static double start_time;
80 /* Default slave state. */
81 struct slave_state default_sstate;
84 /* Get exclusive access to the threads and commands state. */
85 void
86 protocol_lock(void)
88 pthread_mutex_lock(&slave_lock);
91 /* Release exclusive access to the threads and commands state. */
92 void
93 protocol_unlock(void)
95 pthread_mutex_unlock(&slave_lock);
98 /* Write the time, client address, prefix, and string s to stderr atomically.
99 * s should end with a \n */
100 void
101 logline(struct in_addr *client, char *prefix, char *s)
103 double now = time_now();
105 char addr[INET_ADDRSTRLEN];
106 if (client) {
107 #ifdef _WIN32
108 strcpy(addr, inet_ntoa(*client));
109 #else
110 inet_ntop(AF_INET, client, addr, sizeof(addr));
111 #endif
112 } else {
113 addr[0] = '\0';
115 pthread_mutex_lock(&log_lock);
116 fprintf(stderr, "%s%15s %9.3f: %s", prefix, addr, now - start_time, s);
117 pthread_mutex_unlock(&log_lock);
120 /* Thread opening a connection on the given socket and copying input
121 * from there to stderr. */
122 static void * __attribute__((noreturn))
123 proxy_thread(void *arg)
125 int proxy_sock = (intptr_t)arg;
126 assert(proxy_sock >= 0);
127 for (;;) {
128 struct in_addr client;
129 int conn = open_server_connection(proxy_sock, &client);
130 FILE *f = fdopen(conn, "r");
131 char buf[BSIZE];
132 while (fgets(buf, BSIZE, f)) {
133 logline(&client, "< ", buf);
135 fclose(f);
137 pthread_exit(NULL);
140 /* Get a reply to one gtp command. Return the gtp command id,
141 * or -1 if error. reply must have at least CMDS_SIZE bytes.
142 * The ascii reply ends with an empty line; if the first line
143 * contains "@size", a binary reply of size bytes follows the
144 * empty line. @size is not standard gtp, it is only used
145 * internally by Pachi for the genmoves command; it must be the
146 * last parameter on the line.
147 * *bin_size is the maximum size upon entry, actual size on return.
148 * slave_lock is not held on either entry or exit of this function. */
149 static int
150 get_reply(FILE *f, struct in_addr client, char *reply, void *bin_reply, int *bin_size)
152 double start = time_now();
154 int reply_id = -1;
155 *reply = '\0';
156 if (!fgets(reply, CMDS_SIZE, f)) return -1;
158 /* Check for binary reply. */
159 char *s = strchr(reply, '@');
160 int size = 0;
161 if (s) size = atoi(s+1);
162 assert(size <= *bin_size);
163 *bin_size = size;
165 if (DEBUGV(s, 2))
166 logline(&client, "<<", reply);
167 if ((*reply == '=' || *reply == '?') && isdigit(reply[1]))
168 reply_id = atoi(reply+1);
170 /* Read the rest of the ascii reply */
171 char *line = reply + strlen(reply);
172 while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') {
173 if (DEBUGL(3))
174 logline(&client, "<<", line);
175 line += strlen(line);
177 if (*line != '\n') return -1;
179 /* Read the binary reply if any. */
180 int len;
181 while (size && (len = fread(bin_reply, 1, size, f)) > 0) {
182 bin_reply = (char *)bin_reply + len;
183 size -= len;
185 if (*bin_size && DEBUGVV(2)) {
186 char buf[1024];
187 snprintf(buf, sizeof(buf), "read reply %d+%d bytes in %.4fms\n",
188 (int)strlen(reply), *bin_size,
189 (time_now() - start)*1000);
190 logline(&client, "= ", buf);
192 return size ? -1 : reply_id;
195 /* Send the gtp command to_send and get a reply from the slave machine.
196 * Write the reply in buf which must have at least CMDS_SIZE bytes.
197 * If *bin_size > 0, send bin_buf after the gtp command.
198 * Return any binary reply in bin_buf and set its size in bin_size.
199 * bin_buf is private to the slave and need not be copied.
200 * Return the gtp command id, or -1 if error.
201 * slave_lock is held on both entry and exit of this function. */
202 static int
203 send_command(char *to_send, void *bin_buf, int *bin_size,
204 FILE *f, struct slave_state *sstate, char *buf)
206 assert(to_send && gtp_cmd && bin_buf && bin_size);
207 strncpy(buf, to_send, CMDS_SIZE);
208 bool resend = to_send != gtp_cmd;
210 pthread_mutex_unlock(&slave_lock);
212 if (DEBUGL(1) && resend)
213 logline(&sstate->client, "? ",
214 to_send == gtp_cmds ? "resend all\n" : "partial resend\n");
216 double start = time_now();
217 fputs(buf, f);
219 if (*bin_size)
220 fwrite(bin_buf, 1, *bin_size, f);
221 fflush(f);
223 if (DEBUGV(strchr(buf, '@'), 2)) {
224 double ms = (time_now() - start) * 1000.0;
225 if (!DEBUGL(3)) {
226 char *s = strchr(buf, '\n');
227 if (s) s[1] = '\0';
229 logline(&sstate->client, ">>", buf);
230 if (*bin_size) {
231 char b[1024];
232 snprintf(b, sizeof(b),
233 "sent cmd %d+%d bytes in %.4fms\n",
234 (int)strlen(buf), *bin_size, ms);
235 logline(&sstate->client, "= ", b);
239 /* Reuse the buffers for the reply. */
240 *bin_size = sstate->max_buf_size;
241 int reply_id = get_reply(f, sstate->client, buf, bin_buf, bin_size);
243 pthread_mutex_lock(&slave_lock);
244 return reply_id;
247 /* Return the command sent after that with the given gtp id,
248 * or gtp_cmds if the id wasn't used in this game. If a play command
249 * has overwritten a genmoves command, return the play command.
250 * slave_lock is held on both entry and exit of this function. */
251 static char *
252 next_command(int cmd_id)
254 if (cmd_id == -1) return gtp_cmds;
256 int last_id = atoi(gtp_cmd);
257 int reply_move = move_number(cmd_id);
258 if (reply_move > move_number(last_id)) return gtp_cmds;
260 int slot;
261 for (slot = 0; slot < MAX_CMDS_PER_MOVE; slot++) {
262 if (cmd_id == history[reply_move][slot].gtp_id) break;
264 if (slot == MAX_CMDS_PER_MOVE) return gtp_cmds;
266 char *next = history[reply_move][slot].next_cmd;
267 assert(next);
268 return next;
271 /* Allocate buffers for a slave thread. The state should have been
272 * initialized already as a copy of the default slave state.
273 * slave_lock is not held on either entry or exit of this function. */
274 static void
275 slave_state_alloc(struct slave_state *sstate)
277 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
278 sstate->b[n].buf = malloc2(sstate->max_buf_size);
279 sstate->b[n].owner = sstate->thread_id;
281 if (sstate->alloc_hook) sstate->alloc_hook(sstate);
284 /* Get a free binary buffer, first invalidating it in the receive
285 * queue if necessary. In practice all buffers should be used
286 * before they are invalidated, if BUFFERS_PER_SLAVE is large enough.
287 * slave_lock is held on both entry and exit of this function. */
288 static void *
289 get_free_buf(struct slave_state *sstate)
291 int newest = (sstate->newest_buf + 1) & (BUFFERS_PER_SLAVE - 1);
292 sstate->newest_buf = newest;
293 void *buf = sstate->b[newest].buf;
295 if (DEBUGVV(7)) {
296 char b[1024];
297 snprintf(b, sizeof(b),
298 "get free %d index %d buf=%p age %d qlength %d\n", newest,
299 sstate->b[newest].queue_index, buf, queue_age, queue_length);
300 logline(&sstate->client, "? ", b);
303 int index = sstate->b[newest].queue_index;
304 if (index < 0) return buf;
306 /* Invalidate the buffer if the calling thread still owns its previous
307 * entry in the receive queue. The entry may have been overwritten by
308 * another thread, but only after a new move which invalidates the
309 * entire receive queue. */
310 if (receive_queue[index] && receive_queue[index]->owner == sstate->thread_id) {
311 receive_queue[index] = NULL;
313 sstate->b[newest].queue_index = -1;
314 return buf;
317 /* Insert a buffer in the receive queue. It should be the most
318 * recent buffer allocated by the calling thread.
319 * slave_lock is held on both entry and exit of this function. */
320 static void
321 insert_buf(struct slave_state *sstate, void *buf, int size)
323 assert(queue_length < queue_max_length);
325 int newest = sstate->newest_buf;
326 assert(buf == sstate->b[newest].buf);
328 /* Update the buffer if necessary before making it
329 * available to other threads. */
330 if (sstate->insert_hook) sstate->insert_hook(buf, size);
332 if (DEBUGVV(7)) {
333 char b[1024];
334 snprintf(b, sizeof(b),
335 "insert newest %d age %d rq[%d]->%p owner %d\n",
336 newest, queue_age, queue_length, buf, sstate->thread_id);
337 logline(&sstate->client, "? ", b);
339 receive_queue[queue_length] = &sstate->b[newest];
340 receive_queue[queue_length]->size = size;
341 receive_queue[queue_length]->queue_index = queue_length;
342 queue_length++;
345 /* Clear the receive queue. The buffer pointers do not have to be cleared
346 * here, this is done as each buffer is recycled.
347 * slave_lock is held on both entry and exit of this function. */
348 void
349 clear_receive_queue(void)
351 if (DEBUGL(3)) {
352 char buf[1024];
353 snprintf(buf, sizeof(buf), "clear queue, old length %d age %d\n",
354 queue_length, queue_age);
355 logline(NULL, "? ", buf);
357 queue_length = 0;
358 queue_age++;
361 /* Process the reply received from a slave machine.
362 * Copy the ascii part to reply_buf and insert the binary part
363 * (if any) in the receive queue.
364 * Return false if ok, true if the slave is out of sync.
365 * slave_lock is held on both entry and exit of this function. */
366 static bool
367 process_reply(int reply_id, char *reply, char *reply_buf,
368 void *bin_reply, int bin_size, int *last_reply_id,
369 int *reply_slot, struct slave_state *sstate)
371 /* Resend everything if slave returned an error. */
372 /* FIXME: this often results in infinite loops on errors
373 * not caused by syncing. These should be reported from
374 * the distributed engine. */
375 if (*reply != '=') {
376 *last_reply_id = -1;
377 return true;
379 /* Make sure we are still in sync. cmd_count may have
380 * changed but the reply is valid as long as cmd_id didn't
381 * change (this only occurs for consecutive genmoves). */
382 int cmd_id = atoi(gtp_cmd);
383 if (reply_id != cmd_id) {
384 *last_reply_id = reply_id;
385 return true;
388 strncpy(reply_buf, reply, CMDS_SIZE);
389 if (reply_id != *last_reply_id)
390 *reply_slot = reply_count++;
391 gtp_replies[*reply_slot] = reply_buf;
393 if (bin_size) insert_buf(sstate, bin_reply, bin_size);
395 pthread_cond_signal(&reply_cond);
396 *last_reply_id = reply_id;
397 return false;
400 /* Get the binary arg for the given command, and update the command
401 * if necessary. For now, only genmoves has a binary argument, and
402 * we return the best stats increments from all other slaves.
403 * Set *bin_size to 0 if the command doesn't take binary arguments,
404 * but still return a buffer, to be used for the reply.
405 * Return NULL if the binary arg is obsolete by the time we have
406 * finished computing it, because a new command is available.
407 * This version only gets the buffer for the reply, to be completed
408 * in future commits.
409 * slave_lock is held on both entry and exit of this function. */
410 void *
411 get_binary_arg(struct slave_state *sstate, char *cmd, int cmd_size, int *bin_size)
413 int cmd_id = atoi(gtp_cmd);
414 void *buf = get_free_buf(sstate);
416 *bin_size = 0;
417 char *s = strchr(cmd, '@');
418 if (!s || !sstate->args_hook) return buf;
420 int size = sstate->args_hook(buf, sstate, cmd_id);
422 /* Check that the command is still valid. */
423 if (atoi(gtp_cmd) != cmd_id) return NULL;
425 /* Set the correct binary size for this slave.
426 * cmd may have been overwritten with new parameters. */
427 *bin_size = size;
428 s = strchr(cmd, '@');
429 assert(s);
430 snprintf(s, cmd + cmd_size - s, "@%d\n", size);
431 return buf;
434 /* Main loop of a slave thread.
435 * Send the current command to the slave machine and wait for a reply.
436 * Resend command history if the slave machine is out of sync.
437 * Returns when the connection with the slave machine is cut.
438 * slave_lock is held on both entry and exit of this function. */
439 static void
440 slave_loop(FILE *f, char *reply_buf, struct slave_state *sstate, bool resend)
442 char *to_send;
443 int last_cmd_count = 0;
444 int last_reply_id = -1;
445 int reply_slot = -1;
446 for (;;) {
447 if (resend) {
448 /* Resend complete or partial history */
449 to_send = next_command(last_reply_id);
450 } else {
451 /* Wait for a new command. */
452 while (last_cmd_count == cmd_count)
453 pthread_cond_wait(&cmd_cond, &slave_lock);
454 to_send = gtp_cmd;
457 /* Command available, send it to slave machine.
458 * If slave was out of sync, send the history.
459 * But first get binary arguments if necessary. */
460 int bin_size = 0;
461 void *bin_buf = get_binary_arg(sstate, gtp_cmd,
462 gtp_cmds + CMDS_SIZE - gtp_cmd,
463 &bin_size);
464 /* Check that the command is still valid. */
465 resend = true;
466 if (!bin_buf) continue;
468 /* Send the command and get the reply, which always ends with \n\n
469 * The slave machine sends "=id reply" or "?id reply"
470 * with id == cmd_id if it is in sync. */
471 last_cmd_count = cmd_count;
472 char buf[CMDS_SIZE];
473 int reply_id = send_command(to_send, bin_buf, &bin_size, f,
474 sstate, buf);
475 if (reply_id == -1) return;
477 resend = process_reply(reply_id, buf, reply_buf, bin_buf, bin_size,
478 &last_reply_id, &reply_slot, sstate);
482 /* Minimimal check of slave identity. Close the file if error. */
483 static bool
484 is_pachi_slave(FILE *f, struct in_addr *client)
486 char buf[1024];
487 fputs("name\n", f);
488 if (!fgets(buf, sizeof(buf), f)
489 || strncasecmp(buf, "= Pachi", 7)
490 || !fgets(buf, sizeof(buf), f)
491 || strcmp(buf, "\n")) {
492 logline(client, "? ", "bad slave\n");
493 fclose(f);
494 sleep(1); // avoid busy loop if error
495 return false;
497 return true;
500 /* Thread sending gtp commands to one slave machine, and
501 * reading replies. If a slave machine dies, this thread waits
502 * for a connection from another slave.
503 * The large buffers are allocated only once we get a first
504 * connection, to avoid wasting memory if max_slaves is too large.
505 * We do not invalidate the received buffers if a slave disconnects;
506 * they are still useful for other slaves. */
507 static void * __attribute__((noreturn))
508 slave_thread(void *arg)
510 struct slave_state sstate = default_sstate;
511 sstate.thread_id = (intptr_t)arg;
513 assert(sstate.slave_sock >= 0);
514 char reply_buf[CMDS_SIZE];
515 bool resend = false;
517 for (;;) {
518 /* Wait for a connection from any slave. */
519 struct in_addr client;
520 int conn = open_server_connection(sstate.slave_sock, &client);
522 FILE *f = fdopen(conn, "r+");
523 if (DEBUGL(2)) {
524 snprintf(reply_buf, sizeof(reply_buf),
525 "new slave, id %d\n", sstate.thread_id);
526 logline(&client, "= ", reply_buf);
528 if (!is_pachi_slave(f, &client)) continue;
530 if (!resend) slave_state_alloc(&sstate);
531 sstate.client = client;
533 pthread_mutex_lock(&slave_lock);
534 active_slaves++;
535 slave_loop(f, reply_buf, &sstate, resend);
537 assert(active_slaves > 0);
538 active_slaves--;
539 // Unblock main thread if it was waiting for this slave.
540 pthread_cond_signal(&reply_cond);
541 pthread_mutex_unlock(&slave_lock);
543 resend = true;
544 if (DEBUGL(2))
545 logline(&client, "= ", "lost slave\n");
546 fclose(f);
548 pthread_exit(NULL);
551 /* Create a new gtp command for all slaves. The slave lock is held
552 * upon entry and upon return, so the command will actually be
553 * sent when the lock is released. The last command is overwritten
554 * if gtp_cmd points to a non-empty string. cmd is a single word;
555 * args has all arguments and is empty or has a trailing \n */
556 void
557 update_cmd(struct board *b, char *cmd, char *args, bool new_id)
559 assert(gtp_cmd);
560 /* To make sure the slaves are in sync, we ignore the original id
561 * and use the board number plus some random bits as gtp id. */
562 static int gtp_id = -1;
563 int moves = is_reset(cmd) ? 0 : b->moves;
564 if (new_id) {
565 int prev_id = gtp_id;
566 do {
567 /* fast_random() is 16-bit only so the multiplication can't overflow. */
568 gtp_id = force_reply(moves + fast_random(65535) * DIST_GAMELEN);
569 } while (gtp_id == prev_id);
570 reply_count = 0;
572 snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s",
573 gtp_id, cmd, *args ? args : "\n");
574 cmd_count++;
576 /* Remember history for out-of-sync slaves. */
577 static int slot = 0;
578 static struct cmd_history *last = NULL;
579 if (new_id) {
580 if (last) last->next_cmd = gtp_cmd;
581 slot = (slot + 1) % MAX_CMDS_PER_MOVE;
582 last = &history[moves][slot];
583 last->gtp_id = gtp_id;
584 last->next_cmd = NULL;
586 // Notify the slave threads about the new command.
587 pthread_cond_broadcast(&cmd_cond);
590 /* Update the command history, then create a new gtp command
591 * for all slaves. The slave lock is held upon entry and
592 * upon return, so the command will actually be sent when the
593 * lock is released. cmd is a single word; args has all
594 * arguments and is empty or has a trailing \n */
595 void
596 new_cmd(struct board *b, char *cmd, char *args)
598 // Clear the history when a new game starts:
599 if (!gtp_cmd || is_gamestart(cmd)) {
600 gtp_cmd = gtp_cmds;
601 memset(history, 0, sizeof(history));
602 } else {
603 /* Preserve command history for new slaves.
604 * To indicate that the slave should only reply to
605 * the last command we force the id of previous
606 * commands to be just the move number. */
607 int id = prevent_reply(atoi(gtp_cmd));
608 int len = strspn(gtp_cmd, "0123456789");
609 char buf[32];
610 snprintf(buf, sizeof(buf), "%0*d", len, id);
611 memcpy(gtp_cmd, buf, len);
613 gtp_cmd += strlen(gtp_cmd);
616 // Let the slave threads send the new gtp command:
617 update_cmd(b, cmd, args, true);
620 /* Wait for at least one new reply. Return when at least
621 * min_replies slaves have already replied, or when the
622 * given absolute time is passed.
623 * The replies are returned in gtp_replies[0..reply_count-1]
624 * slave_lock is held on entry and on return. */
625 void
626 get_replies(double time_limit, int min_replies)
628 for (;;) {
629 if (reply_count > 0) {
630 struct timespec ts;
631 double sec;
632 ts.tv_nsec = (int)(modf(time_limit, &sec)*1000000000.0);
633 ts.tv_sec = (int)sec;
634 pthread_cond_timedwait(&reply_cond, &slave_lock, &ts);
635 } else {
636 pthread_cond_wait(&reply_cond, &slave_lock);
638 if (reply_count == 0) continue;
639 if (reply_count >= min_replies || reply_count >= active_slaves) return;
640 if (time_now() >= time_limit) break;
642 if (DEBUGL(1)) {
643 char buf[1024];
644 snprintf(buf, sizeof(buf),
645 "get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
646 time_now() - start_time, time_limit - start_time,
647 reply_count, min_replies, active_slaves);
648 logline(NULL, "? ", buf);
650 assert(reply_count > 0);
653 /* In a 5mn move with at least 5ms per genmoves we get at most
654 * 300*200=60000 genmoves per slave. */
655 #define MAX_GENMOVES_PER_SLAVE 60000
657 /* Allocate the receive queue, and create the slave and proxy threads.
658 * max_buf_size and the merge-related fields of default_sstate must
659 * already be initialized. */
660 void
661 protocol_init(char *slave_port, char *proxy_port, int max_slaves)
663 start_time = time_now();
665 queue_max_length = max_slaves * MAX_GENMOVES_PER_SLAVE;
666 receive_queue = calloc2(queue_max_length, sizeof(*receive_queue));
668 default_sstate.slave_sock = port_listen(slave_port, max_slaves);
669 default_sstate.last_processed = -1;
671 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
672 default_sstate.b[n].queue_index = -1;
675 pthread_t thread;
676 for (int id = 0; id < max_slaves; id++) {
677 pthread_create(&thread, NULL, slave_thread, (void *)(intptr_t)id);
680 if (proxy_port) {
681 int proxy_sock = port_listen(proxy_port, max_slaves);
682 for (int id = 0; id < max_slaves; id++) {
683 pthread_create(&thread, NULL, proxy_thread, (void *)(intptr_t)proxy_sock);