tactics.h: Include debug.h
[pachi.git] / distributed / protocol.c
blob9399b66b3c7912ab70876120cce23860bf64956c
1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
6 #include <assert.h>
7 #include <stdio.h>
8 #include <pthread.h>
9 #include <ctype.h>
11 #define DEBUG
13 #include "random.h"
14 #include "timeinfo.h"
15 #include "playout.h"
16 #include "network.h"
17 #include "debug.h"
18 #include "distributed/distributed.h"
19 #include "distributed/protocol.h"
21 /* All gtp commands for current game separated by \n */
22 static char gtp_cmds[CMDS_SIZE];
24 /* Latest gtp command sent to slaves. */
25 static char *gtp_cmd = NULL;
27 /* Slaves send gtp_cmd when cmd_count changes. */
28 static int cmd_count = 0;
30 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
31 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
32 #define MAX_CMDS_PER_MOVE 10
34 /* History of gtp commands sent for current game, indexed by move. */
35 static struct cmd_history {
36 int gtp_id;
37 char *next_cmd;
38 } history[MAX_GAMELEN][MAX_CMDS_PER_MOVE];
40 /* Number of active slave machines working for this master. */
41 int active_slaves = 0;
43 /* Number of replies to last gtp command already received. */
44 int reply_count = 0;
46 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
47 char **gtp_replies;
50 struct receive_buf *receive_queue;
51 int queue_length = 0;
52 static int queue_max_length;
54 /* Mutex protecting all variables above. receive_queue may be
55 * read without the lock but is only written with lock held. */
56 static pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER;
58 /* Condition signaled when a new gtp command is available. */
59 static pthread_cond_t cmd_cond = PTHREAD_COND_INITIALIZER;
61 /* Condition signaled when reply_count increases. */
62 static pthread_cond_t reply_cond = PTHREAD_COND_INITIALIZER;
64 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
65 static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
67 /* Absolute time when this program was started.
68 * For debugging only. */
69 static double start_time;
71 /* Default slave state. */
72 struct slave_state default_sstate;
75 /* Get exclusive access to the threads and commands state. */
76 void
77 protocol_lock(void)
79 pthread_mutex_lock(&slave_lock);
82 /* Release exclusive access to the threads and commands state. */
83 void
84 protocol_unlock(void)
86 pthread_mutex_unlock(&slave_lock);
89 /* Write the time, client address, prefix, and string s to stderr atomically.
90 * s should end with a \n */
91 void
92 logline(struct in_addr *client, char *prefix, char *s)
94 double now = time_now();
96 char addr[INET_ADDRSTRLEN];
97 if (client) {
98 inet_ntop(AF_INET, client, addr, sizeof(addr));
99 } else {
100 addr[0] = '\0';
102 pthread_mutex_lock(&log_lock);
103 fprintf(stderr, "%s%15s %9.3f: %s", prefix, addr, now - start_time, s);
104 pthread_mutex_unlock(&log_lock);
107 /* Thread opening a connection on the given socket and copying input
108 * from there to stderr. */
109 static void *
110 proxy_thread(void *arg)
112 int proxy_sock = (long)arg;
113 assert(proxy_sock >= 0);
114 for (;;) {
115 struct in_addr client;
116 int conn = open_server_connection(proxy_sock, &client);
117 FILE *f = fdopen(conn, "r");
118 char buf[BSIZE];
119 while (fgets(buf, BSIZE, f)) {
120 logline(&client, "< ", buf);
122 fclose(f);
126 /* Get a reply to one gtp command. Return the gtp command id,
127 * or -1 if error. reply must have at least CMDS_SIZE bytes.
128 * The ascii reply ends with an empty line; if the first line
129 * contains "@size", a binary reply of size bytes follows the
130 * empty line. @size is not standard gtp, it is only used
131 * internally by Pachi for the genmoves command; it must be the
132 * last parameter on the line.
133 * *bin_size is the maximum size upon entry, actual size on return.
134 * slave_lock is not held on either entry or exit of this function. */
135 static int
136 get_reply(FILE *f, struct in_addr client, char *reply, void *bin_reply, int *bin_size)
138 int reply_id = -1;
139 *reply = '\0';
140 if (!fgets(reply, CMDS_SIZE, f)) return -1;
142 /* Check for binary reply. */
143 char *s = strchr(reply, '@');
144 int size = 0;
145 if (s) size = atoi(s+1);
146 assert(size <= *bin_size);
147 *bin_size = size;
149 if (DEBUGV(s, 2))
150 logline(&client, "<<", reply);
151 if ((*reply == '=' || *reply == '?') && isdigit(reply[1]))
152 reply_id = atoi(reply+1);
154 /* Read the rest of the ascii reply */
155 char *line = reply + strlen(reply);
156 while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') {
157 if (DEBUGL(3))
158 logline(&client, "<<", line);
159 line += strlen(line);
161 if (*line != '\n') return -1;
163 /* Read the binary reply if any. */
164 double start = time_now();
165 int len;
166 while (size && (len = fread(bin_reply, 1, size, f)) > 0) {
167 bin_reply = (char *)bin_reply + len;
168 size -= len;
170 if (*bin_size && DEBUGVV(2)) {
171 char buf[1024];
172 snprintf(buf, sizeof(buf), "read reply %d bytes in %.4fms\n", *bin_size,
173 (time_now() - start)*1000);
174 logline(&client, "= ", buf);
176 return size ? -1 : reply_id;
179 /* Send the gtp command to_send and get a reply from the slave machine.
180 * Write the reply in buf which must have at least CMDS_SIZE bytes.
181 * If *bin_size > 0, send bin_buf after the gtp command.
182 * Return any binary reply in bin_buf and set its size in bin_size.
183 * bin_buf is private to the slave and need not be copied.
184 * Return the gtp command id, or -1 if error.
185 * slave_lock is held on both entry and exit of this function. */
186 static int
187 send_command(char *to_send, void *bin_buf, int *bin_size,
188 FILE *f, struct slave_state *sstate, char *buf)
190 assert(to_send && gtp_cmd && bin_buf && bin_size);
191 strncpy(buf, to_send, CMDS_SIZE);
192 bool resend = to_send != gtp_cmd;
194 pthread_mutex_unlock(&slave_lock);
196 if (DEBUGL(1) && resend)
197 logline(&sstate->client, "? ",
198 to_send == gtp_cmds ? "resend all\n" : "partial resend\n");
199 fputs(buf, f);
201 double start = time_now();
202 if (*bin_size)
203 fwrite(bin_buf, 1, *bin_size, f);
204 fflush(f);
206 if (DEBUGV(strchr(buf, '@'), 2)) {
207 double ms = (time_now() - start) * 1000.0;
208 if (!DEBUGL(3)) {
209 char *s = strchr(buf, '\n');
210 if (s) s[1] = '\0';
212 logline(&sstate->client, ">>", buf);
213 if (*bin_size) {
214 char b[1024];
215 snprintf(b, sizeof(b),
216 "sent args %d bytes in %.4fms\n", *bin_size, ms);
217 logline(&sstate->client, "= ", b);
221 /* Reuse the buffers for the reply. */
222 *bin_size = sstate->max_buf_size;
223 int reply_id = get_reply(f, sstate->client, buf, bin_buf, bin_size);
225 pthread_mutex_lock(&slave_lock);
226 return reply_id;
229 /* Return the command sent after that with the given gtp id,
230 * or gtp_cmds if the id wasn't used in this game. If a play command
231 * has overwritten a genmoves command, return the play command.
232 * slave_lock is held on both entry and exit of this function. */
233 static char *
234 next_command(int cmd_id)
236 if (cmd_id == -1) return gtp_cmds;
238 int last_id = atoi(gtp_cmd);
239 int reply_move = move_number(cmd_id);
240 if (reply_move > move_number(last_id)) return gtp_cmds;
242 int slot;
243 for (slot = 0; slot < MAX_CMDS_PER_MOVE; slot++) {
244 if (cmd_id == history[reply_move][slot].gtp_id) break;
246 if (slot == MAX_CMDS_PER_MOVE) return gtp_cmds;
248 char *next = history[reply_move][slot].next_cmd;
249 assert(next);
250 return next;
253 /* Allocate buffers for a slave thread. The state should have been
254 * initialized already as a copy of the default slave state.
255 * slave_lock is not held on either entry or exit of this function. */
256 static void
257 slave_state_alloc(struct slave_state *sstate)
259 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
260 sstate->b[n].buf = malloc2(sstate->max_buf_size);
262 if (sstate->alloc_hook) sstate->alloc_hook(sstate);
265 /* Get a free binary buffer, first invalidating it in the receive
266 * queue if necessary. In practice all buffers should be used
267 * before they are invalidated, if BUFFERS_PER_SLAVE is large enough.
268 * slave_lock is held on both entry and exit of this function. */
269 static void *
270 get_free_buf(struct slave_state *sstate, bool new_id)
272 int newest = (sstate->newest_buf + 1) & (BUFFERS_PER_SLAVE - 1);
273 sstate->newest_buf = newest;
274 void *buf = sstate->b[newest].buf;
276 if (DEBUGVV(7)) {
277 char b[1024];
278 snprintf(b, sizeof(b), "get free %d index %d buf=%p qlength %d\n",
279 newest, sstate->b[newest].queue_index, buf, queue_length);
280 logline(&sstate->client, "? ", b);
283 /* For a new command, previous indices in receive_queue
284 * are now meaningless. In particular they may be
285 * beyond the current queue_length. */
286 if (new_id) {
287 sstate->last_processed = -1;
288 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
289 sstate->b[n].queue_index = -1;
291 return buf;
294 int index = sstate->b[newest].queue_index;
295 if (index >= 0) {
296 assert(receive_queue[index].thread_id == sstate->thread_id);
297 assert(receive_queue[index].buf == buf);
298 /* Invalidate the buffer. */
299 receive_queue[index].buf = NULL;
300 sstate->b[newest].queue_index = -1;
302 return buf;
305 /* Insert a buffer in the receive queue. It should be the most
306 * recent buffer allocated by the calling thread.
307 * slave_lock is held on both entry and exit of this function. */
308 static void
309 insert_buf(struct slave_state *sstate, void *buf, int size)
311 assert(queue_length < queue_max_length);
313 int newest = sstate->newest_buf;
314 assert(buf == sstate->b[newest].buf);
316 /* Update the buffer if necessary before making it
317 * available to other threads. */
318 if (sstate->insert_hook) sstate->insert_hook(buf, size);
320 if (DEBUGVV(7)) {
321 char b[1024];
322 snprintf(b, sizeof(b), "insert newest %d rq[%d]->%p\n",
323 newest, queue_length, buf);
324 logline(&sstate->client, "? ", b);
326 receive_queue[queue_length].buf = buf;
327 receive_queue[queue_length].size = size;
328 receive_queue[queue_length].thread_id = sstate->thread_id;
329 sstate->b[newest].queue_index = queue_length;
330 queue_length++;
333 /* Clear the receive queue. The receive buffers are also invalidated
334 * so that slave threads scanning the queue notice it as soon as possible
335 * but this is only an optimization.
336 * slave_lock is held on both entry and exit of this function. */
337 void
338 clear_receive_queue(void)
340 if (!queue_length) return;
341 memset(receive_queue, 0, queue_length * sizeof(receive_queue[0]));
342 queue_length = 0;
345 /* Process the reply received from a slave machine.
346 * Copy the ascii part to reply_buf and insert the binary part
347 * (if any) in the receive queue.
348 * Return false if ok, true if the slave is out of sync.
349 * slave_lock is held on both entry and exit of this function. */
350 static bool
351 process_reply(int reply_id, char *reply, char *reply_buf,
352 void *bin_reply, int bin_size, int *last_reply_id,
353 int *reply_slot, struct slave_state *sstate)
355 /* Resend everything if slave returned an error. */
356 if (*reply != '=') {
357 *last_reply_id = -1;
358 return true;
360 /* Make sure we are still in sync. cmd_count may have
361 * changed but the reply is valid as long as cmd_id didn't
362 * change (this only occurs for consecutive genmoves). */
363 int cmd_id = atoi(gtp_cmd);
364 if (reply_id != cmd_id) {
365 *last_reply_id = reply_id;
366 return true;
369 strncpy(reply_buf, reply, CMDS_SIZE);
370 if (reply_id != *last_reply_id)
371 *reply_slot = reply_count++;
372 gtp_replies[*reply_slot] = reply_buf;
374 if (bin_size) insert_buf(sstate, bin_reply, bin_size);
376 pthread_cond_signal(&reply_cond);
377 *last_reply_id = reply_id;
378 return false;
381 /* Get the binary arg for the given command, and update the command
382 * if necessary. For now, only genmoves has a binary argument, and
383 * we return the best stats increments from all other slaves.
384 * Set *bin_size to 0 if the command doesn't take binary arguments,
385 * but still return a buffer, to be used for the reply.
386 * Return NULL if the binary arg is obsolete by the time we have
387 * finished computing it, because a new command is available.
388 * This version only gets the buffer for the reply, to be completed
389 * in future commits.
390 * slave_lock is held on both entry and exit of this function. */
391 void *
392 get_binary_arg(struct slave_state *sstate, char *cmd, int cmd_size, int *bin_size)
394 int cmd_id = atoi(gtp_cmd);
395 void *buf = get_free_buf(sstate, cmd_id != sstate->last_cmd_id);
396 sstate->last_cmd_id = cmd_id;
398 *bin_size = 0;
399 char *s = strchr(cmd, '@');
400 if (!s || !sstate->args_hook) return buf;
402 int size = sstate->args_hook(buf, sstate, cmd_id);
404 /* Check that the command is still valid. */
405 if (atoi(gtp_cmd) != cmd_id) return NULL;
407 /* Set the correct binary size for this slave.
408 * cmd may have been overwritten with new parameters. */
409 *bin_size = size;
410 s = strchr(cmd, '@');
411 assert(s);
412 snprintf(s, cmd + cmd_size - s, "@%d\n", size);
413 return buf;
416 /* Main loop of a slave thread.
417 * Send the current command to the slave machine and wait for a reply.
418 * Resend command history if the slave machine is out of sync.
419 * Returns when the connection with the slave machine is cut.
420 * slave_lock is held on both entry and exit of this function. */
421 static void
422 slave_loop(FILE *f, char *reply_buf, struct slave_state *sstate, bool resend)
424 char *to_send;
425 int last_cmd_count = 0;
426 int last_reply_id = -1;
427 int reply_slot = -1;
428 for (;;) {
429 if (resend) {
430 /* Resend complete or partial history */
431 to_send = next_command(last_reply_id);
432 } else {
433 /* Wait for a new command. */
434 while (last_cmd_count == cmd_count)
435 pthread_cond_wait(&cmd_cond, &slave_lock);
436 to_send = gtp_cmd;
439 /* Command available, send it to slave machine.
440 * If slave was out of sync, send the history.
441 * But first get binary arguments if necessary. */
442 int bin_size = 0;
443 void *bin_buf = get_binary_arg(sstate, gtp_cmd,
444 gtp_cmds + CMDS_SIZE - gtp_cmd,
445 &bin_size);
446 /* Check that the command is still valid. */
447 resend = true;
448 if (!bin_buf) continue;
450 /* Send the command and get the reply, which always ends with \n\n
451 * The slave machine sends "=id reply" or "?id reply"
452 * with id == cmd_id if it is in sync. */
453 last_cmd_count = cmd_count;
454 char buf[CMDS_SIZE];
455 int reply_id = send_command(to_send, bin_buf, &bin_size, f,
456 sstate, buf);
457 if (reply_id == -1) return;
459 resend = process_reply(reply_id, buf, reply_buf, bin_buf, bin_size,
460 &last_reply_id, &reply_slot, sstate);
464 /* Thread sending gtp commands to one slave machine, and
465 * reading replies. If a slave machine dies, this thread waits
466 * for a connection from another slave.
467 * The large buffers are allocated only once we get a first
468 * connection, to avoid wasting memory if max_slaves is too large.
469 * We do not invalidate the received buffers if a slave disconnects;
470 * they are still useful for other slaves. */
471 static void *
472 slave_thread(void *arg)
474 struct slave_state sstate = default_sstate;
475 sstate.thread_id = (long)arg;
477 assert(sstate.slave_sock >= 0);
478 char reply_buf[CMDS_SIZE];
479 bool resend = false;
481 for (;;) {
482 /* Wait for a connection from any slave. */
483 struct in_addr client;
484 int conn = open_server_connection(sstate.slave_sock, &client);
486 FILE *f = fdopen(conn, "r+");
487 if (DEBUGL(2)) {
488 snprintf(reply_buf, sizeof(reply_buf),
489 "new slave, id %d\n", sstate.thread_id);
490 logline(&client, "= ", reply_buf);
493 /* Minimal check of the slave identity. */
494 fputs("name\n", f);
495 if (!fgets(reply_buf, sizeof(reply_buf), f)
496 || strncasecmp(reply_buf, "= Pachi", 7)
497 || !fgets(reply_buf, sizeof(reply_buf), f)
498 || strcmp(reply_buf, "\n")) {
499 logline(&client, "? ", "bad slave\n");
500 fclose(f);
501 continue;
504 if (!resend) slave_state_alloc(&sstate);
505 sstate.client = client;
507 pthread_mutex_lock(&slave_lock);
508 active_slaves++;
509 slave_loop(f, reply_buf, &sstate, resend);
511 assert(active_slaves > 0);
512 active_slaves--;
513 // Unblock main thread if it was waiting for this slave.
514 pthread_cond_signal(&reply_cond);
515 pthread_mutex_unlock(&slave_lock);
517 resend = true;
518 if (DEBUGL(2))
519 logline(&client, "= ", "lost slave\n");
520 fclose(f);
524 /* Create a new gtp command for all slaves. The slave lock is held
525 * upon entry and upon return, so the command will actually be
526 * sent when the lock is released. The last command is overwritten
527 * if gtp_cmd points to a non-empty string. cmd is a single word;
528 * args has all arguments and is empty or has a trailing \n */
529 void
530 update_cmd(struct board *b, char *cmd, char *args, bool new_id)
532 assert(gtp_cmd);
533 /* To make sure the slaves are in sync, we ignore the original id
534 * and use the board number plus some random bits as gtp id. */
535 static int gtp_id = -1;
536 int moves = is_reset(cmd) ? 0 : b->moves;
537 if (new_id) {
538 int prev_id = gtp_id;
539 do {
540 /* fast_random() is 16-bit only so the multiplication can't overflow. */
541 gtp_id = force_reply(moves + fast_random(65535) * DIST_GAMELEN);
542 } while (gtp_id == prev_id);
543 reply_count = 0;
545 snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s",
546 gtp_id, cmd, *args ? args : "\n");
547 cmd_count++;
549 /* Remember history for out-of-sync slaves. */
550 static int slot = 0;
551 static struct cmd_history *last = NULL;
552 if (new_id) {
553 if (last) last->next_cmd = gtp_cmd;
554 slot = (slot + 1) % MAX_CMDS_PER_MOVE;
555 last = &history[moves][slot];
556 last->gtp_id = gtp_id;
557 last->next_cmd = NULL;
559 // Notify the slave threads about the new command.
560 pthread_cond_broadcast(&cmd_cond);
563 /* Update the command history, then create a new gtp command
564 * for all slaves. The slave lock is held upon entry and
565 * upon return, so the command will actually be sent when the
566 * lock is released. cmd is a single word; args has all
567 * arguments and is empty or has a trailing \n */
568 void
569 new_cmd(struct board *b, char *cmd, char *args)
571 // Clear the history when a new game starts:
572 if (!gtp_cmd || is_gamestart(cmd)) {
573 gtp_cmd = gtp_cmds;
574 memset(history, 0, sizeof(history));
575 } else {
576 /* Preserve command history for new slaves.
577 * To indicate that the slave should only reply to
578 * the last command we force the id of previous
579 * commands to be just the move number. */
580 int id = prevent_reply(atoi(gtp_cmd));
581 int len = strspn(gtp_cmd, "0123456789");
582 char buf[32];
583 snprintf(buf, sizeof(buf), "%0*d", len, id);
584 memcpy(gtp_cmd, buf, len);
586 gtp_cmd += strlen(gtp_cmd);
589 // Let the slave threads send the new gtp command:
590 update_cmd(b, cmd, args, true);
593 /* Wait for at least one new reply. Return when at least
594 * min_replies slaves have already replied, or when the
595 * given absolute time is passed.
596 * The replies are returned in gtp_replies[0..reply_count-1]
597 * slave_lock is held on entry and on return. */
598 void
599 get_replies(double time_limit, int min_replies)
601 for (;;) {
602 if (reply_count > 0) {
603 struct timespec ts;
604 double sec;
605 ts.tv_nsec = (int)(modf(time_limit, &sec)*1000000000.0);
606 ts.tv_sec = (int)sec;
607 pthread_cond_timedwait(&reply_cond, &slave_lock, &ts);
608 } else {
609 pthread_cond_wait(&reply_cond, &slave_lock);
611 if (reply_count == 0) continue;
612 if (reply_count >= min_replies || reply_count >= active_slaves) return;
613 if (time_now() >= time_limit) break;
615 if (DEBUGL(1)) {
616 char buf[1024];
617 snprintf(buf, sizeof(buf),
618 "get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
619 time_now() - start_time, time_limit - start_time,
620 reply_count, min_replies, active_slaves);
621 logline(NULL, "? ", buf);
623 assert(reply_count > 0);
626 /* In a 30s move with at least 5ms per genmoves we get at most
627 * 6000 genmoves per slave. */
628 #define MAX_GENMOVES_PER_SLAVE 6000
630 /* Allocate the receive queue, and create the slave and proxy threads.
631 * max_buf_size and the merge-related fields of default_sstate must
632 * already be initialized. */
633 void
634 protocol_init(char *slave_port, char *proxy_port, int max_slaves)
636 start_time = time_now();
638 queue_max_length = max_slaves * MAX_GENMOVES_PER_SLAVE;
639 receive_queue = calloc2(queue_max_length, sizeof(*receive_queue));
641 default_sstate.slave_sock = port_listen(slave_port, max_slaves);
642 default_sstate.last_processed = -1;
644 for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
645 default_sstate.b[n].queue_index = -1;
648 pthread_t thread;
649 for (int id = 0; id < max_slaves; id++) {
650 pthread_create(&thread, NULL, slave_thread, (void *)(long)id);
653 if (proxy_port) {
654 int proxy_sock = port_listen(proxy_port, max_slaves);
655 for (int id = 0; id < max_slaves; id++) {
656 pthread_create(&thread, NULL, proxy_thread, (void *)(long)proxy_sock);