1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
6 /* The receive queue is an array of pointers to binary buffers.
7 * These pointers are invalidated in one of two ways when a buffer
8 * is recycled: (1) the queue age is increased when the queue is
9 * emptied at a new move, (2) the pointer itself is set to NULL
10 * immmediately, and stays so until at least the next queue age
26 #include "distributed/distributed.h"
27 #include "distributed/protocol.h"
29 /* All gtp commands for current game separated by \n */
30 static char gtp_cmds
[CMDS_SIZE
];
32 /* Latest gtp command sent to slaves. */
33 static char *gtp_cmd
= NULL
;
35 /* Slaves send gtp_cmd when cmd_count changes. */
36 static int cmd_count
= 0;
38 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
39 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
40 #define MAX_CMDS_PER_MOVE 10
42 /* History of gtp commands sent for current game, indexed by move. */
43 static struct cmd_history
{
46 } history
[MAX_GAMELEN
][MAX_CMDS_PER_MOVE
];
48 /* Number of active slave machines working for this master. */
49 int active_slaves
= 0;
51 /* Number of replies to last gtp command already received. */
54 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
58 struct buf_state
**receive_queue
;
61 static int queue_max_length
;
63 /* Mutex protecting all variables above. receive_queue may be
64 * read without the lock but is only written with lock held. */
65 static pthread_mutex_t slave_lock
= PTHREAD_MUTEX_INITIALIZER
;
67 /* Condition signaled when a new gtp command is available. */
68 static pthread_cond_t cmd_cond
= PTHREAD_COND_INITIALIZER
;
70 /* Condition signaled when reply_count increases. */
71 static pthread_cond_t reply_cond
= PTHREAD_COND_INITIALIZER
;
73 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
74 static pthread_mutex_t log_lock
= PTHREAD_MUTEX_INITIALIZER
;
76 /* Absolute time when this program was started.
77 * For debugging only. */
78 static double start_time
;
80 /* Default slave state. */
81 struct slave_state default_sstate
;
84 /* Get exclusive access to the threads and commands state. */
88 pthread_mutex_lock(&slave_lock
);
91 /* Release exclusive access to the threads and commands state. */
95 pthread_mutex_unlock(&slave_lock
);
98 /* Write the time, client address, prefix, and string s to stderr atomically.
99 * s should end with a \n */
101 logline(struct in_addr
*client
, char *prefix
, char *s
)
103 double now
= time_now();
105 char addr
[INET_ADDRSTRLEN
];
108 strcpy(addr
, inet_ntoa(*client
));
110 inet_ntop(AF_INET
, client
, addr
, sizeof(addr
));
115 pthread_mutex_lock(&log_lock
);
116 fprintf(stderr
, "%s%15s %9.3f: %s", prefix
, addr
, now
- start_time
, s
);
117 pthread_mutex_unlock(&log_lock
);
120 /* Thread opening a connection on the given socket and copying input
121 * from there to stderr. */
122 static void * __attribute__((noreturn
))
123 proxy_thread(void *arg
)
125 int proxy_sock
= (intptr_t)arg
;
126 assert(proxy_sock
>= 0);
128 struct in_addr client
;
129 int conn
= open_server_connection(proxy_sock
, &client
);
130 FILE *f
= fdopen(conn
, "r");
132 while (fgets(buf
, BSIZE
, f
)) {
133 logline(&client
, "< ", buf
);
140 /* Get a reply to one gtp command. Return the gtp command id,
141 * or -1 if error. reply must have at least CMDS_SIZE bytes.
142 * The ascii reply ends with an empty line; if the first line
143 * contains "@size", a binary reply of size bytes follows the
144 * empty line. @size is not standard gtp, it is only used
145 * internally by Pachi for the genmoves command; it must be the
146 * last parameter on the line.
147 * *bin_size is the maximum size upon entry, actual size on return.
148 * slave_lock is not held on either entry or exit of this function. */
150 get_reply(FILE *f
, struct in_addr client
, char *reply
, void *bin_reply
, int *bin_size
)
152 double start
= time_now();
156 if (!fgets(reply
, CMDS_SIZE
, f
)) return -1;
158 /* Check for binary reply. */
159 char *s
= strchr(reply
, '@');
161 if (s
) size
= atoi(s
+1);
162 assert(size
<= *bin_size
);
166 logline(&client
, "<<", reply
);
167 if ((*reply
== '=' || *reply
== '?') && isdigit(reply
[1]))
168 reply_id
= atoi(reply
+1);
170 /* Read the rest of the ascii reply */
171 char *line
= reply
+ strlen(reply
);
172 while (fgets(line
, reply
+ CMDS_SIZE
- line
, f
) && *line
!= '\n') {
174 logline(&client
, "<<", line
);
175 line
+= strlen(line
);
177 if (*line
!= '\n') return -1;
179 /* Read the binary reply if any. */
181 while (size
&& (len
= fread(bin_reply
, 1, size
, f
)) > 0) {
182 bin_reply
= (char *)bin_reply
+ len
;
185 if (*bin_size
&& DEBUGVV(2)) {
187 snprintf(buf
, sizeof(buf
), "read reply %d+%d bytes in %.4fms\n",
188 (int)strlen(reply
), *bin_size
,
189 (time_now() - start
)*1000);
190 logline(&client
, "= ", buf
);
192 return size
? -1 : reply_id
;
195 /* Send the gtp command to_send and get a reply from the slave machine.
196 * Write the reply in buf which must have at least CMDS_SIZE bytes.
197 * If *bin_size > 0, send bin_buf after the gtp command.
198 * Return any binary reply in bin_buf and set its size in bin_size.
199 * bin_buf is private to the slave and need not be copied.
200 * Return the gtp command id, or -1 if error.
201 * slave_lock is held on both entry and exit of this function. */
203 send_command(char *to_send
, void *bin_buf
, int *bin_size
,
204 FILE *f
, struct slave_state
*sstate
, char *buf
)
206 assert(to_send
&& gtp_cmd
&& bin_buf
&& bin_size
);
207 strncpy(buf
, to_send
, CMDS_SIZE
);
208 bool resend
= to_send
!= gtp_cmd
;
210 pthread_mutex_unlock(&slave_lock
);
212 if (DEBUGL(1) && resend
)
213 logline(&sstate
->client
, "? ",
214 to_send
== gtp_cmds
? "resend all\n" : "partial resend\n");
216 double start
= time_now();
220 fwrite(bin_buf
, 1, *bin_size
, f
);
223 if (DEBUGV(strchr(buf
, '@'), 2)) {
224 double ms
= (time_now() - start
) * 1000.0;
226 char *s
= strchr(buf
, '\n');
229 logline(&sstate
->client
, ">>", buf
);
232 snprintf(b
, sizeof(b
),
233 "sent cmd %d+%d bytes in %.4fms\n",
234 (int)strlen(buf
), *bin_size
, ms
);
235 logline(&sstate
->client
, "= ", b
);
239 /* Reuse the buffers for the reply. */
240 *bin_size
= sstate
->max_buf_size
;
241 int reply_id
= get_reply(f
, sstate
->client
, buf
, bin_buf
, bin_size
);
243 pthread_mutex_lock(&slave_lock
);
247 /* Return the command sent after that with the given gtp id,
248 * or gtp_cmds if the id wasn't used in this game. If a play command
249 * has overwritten a genmoves command, return the play command.
250 * slave_lock is held on both entry and exit of this function. */
252 next_command(int cmd_id
)
254 if (cmd_id
== -1) return gtp_cmds
;
256 int last_id
= atoi(gtp_cmd
);
257 int reply_move
= move_number(cmd_id
);
258 if (reply_move
> move_number(last_id
)) return gtp_cmds
;
261 for (slot
= 0; slot
< MAX_CMDS_PER_MOVE
; slot
++) {
262 if (cmd_id
== history
[reply_move
][slot
].gtp_id
) break;
264 if (slot
== MAX_CMDS_PER_MOVE
) return gtp_cmds
;
266 char *next
= history
[reply_move
][slot
].next_cmd
;
271 /* Allocate buffers for a slave thread. The state should have been
272 * initialized already as a copy of the default slave state.
273 * slave_lock is not held on either entry or exit of this function. */
275 slave_state_alloc(struct slave_state
*sstate
)
277 for (int n
= 0; n
< BUFFERS_PER_SLAVE
; n
++) {
278 sstate
->b
[n
].buf
= malloc2(sstate
->max_buf_size
);
279 sstate
->b
[n
].owner
= sstate
->thread_id
;
281 if (sstate
->alloc_hook
) sstate
->alloc_hook(sstate
);
284 /* Get a free binary buffer, first invalidating it in the receive
285 * queue if necessary. In practice all buffers should be used
286 * before they are invalidated, if BUFFERS_PER_SLAVE is large enough.
287 * slave_lock is held on both entry and exit of this function. */
289 get_free_buf(struct slave_state
*sstate
)
291 int newest
= (sstate
->newest_buf
+ 1) & (BUFFERS_PER_SLAVE
- 1);
292 sstate
->newest_buf
= newest
;
293 void *buf
= sstate
->b
[newest
].buf
;
297 snprintf(b
, sizeof(b
),
298 "get free %d index %d buf=%p age %d qlength %d\n", newest
,
299 sstate
->b
[newest
].queue_index
, buf
, queue_age
, queue_length
);
300 logline(&sstate
->client
, "? ", b
);
303 int index
= sstate
->b
[newest
].queue_index
;
304 if (index
< 0) return buf
;
306 /* Invalidate the buffer if the calling thread still owns its previous
307 * entry in the receive queue. The entry may have been overwritten by
308 * another thread, but only after a new move which invalidates the
309 * entire receive queue. */
310 if (receive_queue
[index
] && receive_queue
[index
]->owner
== sstate
->thread_id
) {
311 receive_queue
[index
] = NULL
;
313 sstate
->b
[newest
].queue_index
= -1;
317 /* Insert a buffer in the receive queue. It should be the most
318 * recent buffer allocated by the calling thread.
319 * slave_lock is held on both entry and exit of this function. */
321 insert_buf(struct slave_state
*sstate
, void *buf
, int size
)
323 assert(queue_length
< queue_max_length
);
325 int newest
= sstate
->newest_buf
;
326 assert(buf
== sstate
->b
[newest
].buf
);
328 /* Update the buffer if necessary before making it
329 * available to other threads. */
330 if (sstate
->insert_hook
) sstate
->insert_hook(buf
, size
);
334 snprintf(b
, sizeof(b
),
335 "insert newest %d age %d rq[%d]->%p owner %d\n",
336 newest
, queue_age
, queue_length
, buf
, sstate
->thread_id
);
337 logline(&sstate
->client
, "? ", b
);
339 receive_queue
[queue_length
] = &sstate
->b
[newest
];
340 receive_queue
[queue_length
]->size
= size
;
341 receive_queue
[queue_length
]->queue_index
= queue_length
;
345 /* Clear the receive queue. The buffer pointers do not have to be cleared
346 * here, this is done as each buffer is recycled.
347 * slave_lock is held on both entry and exit of this function. */
349 clear_receive_queue(void)
353 snprintf(buf
, sizeof(buf
), "clear queue, old length %d age %d\n",
354 queue_length
, queue_age
);
355 logline(NULL
, "? ", buf
);
361 /* Process the reply received from a slave machine.
362 * Copy the ascii part to reply_buf and insert the binary part
363 * (if any) in the receive queue.
364 * Return false if ok, true if the slave is out of sync.
365 * slave_lock is held on both entry and exit of this function. */
367 process_reply(int reply_id
, char *reply
, char *reply_buf
,
368 void *bin_reply
, int bin_size
, int *last_reply_id
,
369 int *reply_slot
, struct slave_state
*sstate
)
371 /* Resend everything if slave returned an error. */
372 /* FIXME: this often results in infinite loops on errors
373 * not caused by syncing. These should be reported from
374 * the distributed engine. */
379 /* Make sure we are still in sync. cmd_count may have
380 * changed but the reply is valid as long as cmd_id didn't
381 * change (this only occurs for consecutive genmoves). */
382 int cmd_id
= atoi(gtp_cmd
);
383 if (reply_id
!= cmd_id
) {
384 *last_reply_id
= reply_id
;
388 strncpy(reply_buf
, reply
, CMDS_SIZE
);
389 if (reply_id
!= *last_reply_id
)
390 *reply_slot
= reply_count
++;
391 gtp_replies
[*reply_slot
] = reply_buf
;
393 if (bin_size
) insert_buf(sstate
, bin_reply
, bin_size
);
395 pthread_cond_signal(&reply_cond
);
396 *last_reply_id
= reply_id
;
400 /* Get the binary arg for the given command, and update the command
401 * if necessary. For now, only genmoves has a binary argument, and
402 * we return the best stats increments from all other slaves.
403 * Set *bin_size to 0 if the command doesn't take binary arguments,
404 * but still return a buffer, to be used for the reply.
405 * Return NULL if the binary arg is obsolete by the time we have
406 * finished computing it, because a new command is available.
407 * This version only gets the buffer for the reply, to be completed
409 * slave_lock is held on both entry and exit of this function. */
411 get_binary_arg(struct slave_state
*sstate
, char *cmd
, int cmd_size
, int *bin_size
)
413 int cmd_id
= atoi(gtp_cmd
);
414 void *buf
= get_free_buf(sstate
);
417 char *s
= strchr(cmd
, '@');
418 if (!s
|| !sstate
->args_hook
) return buf
;
420 int size
= sstate
->args_hook(buf
, sstate
, cmd_id
);
422 /* Check that the command is still valid. */
423 if (atoi(gtp_cmd
) != cmd_id
) return NULL
;
425 /* Set the correct binary size for this slave.
426 * cmd may have been overwritten with new parameters. */
428 s
= strchr(cmd
, '@');
430 snprintf(s
, cmd
+ cmd_size
- s
, "@%d\n", size
);
434 /* Main loop of a slave thread.
435 * Send the current command to the slave machine and wait for a reply.
436 * Resend command history if the slave machine is out of sync.
437 * Returns when the connection with the slave machine is cut.
438 * slave_lock is held on both entry and exit of this function. */
440 slave_loop(FILE *f
, char *reply_buf
, struct slave_state
*sstate
, bool resend
)
443 int last_cmd_count
= 0;
444 int last_reply_id
= -1;
448 /* Resend complete or partial history */
449 to_send
= next_command(last_reply_id
);
451 /* Wait for a new command. */
452 while (last_cmd_count
== cmd_count
)
453 pthread_cond_wait(&cmd_cond
, &slave_lock
);
457 /* Command available, send it to slave machine.
458 * If slave was out of sync, send the history.
459 * But first get binary arguments if necessary. */
461 void *bin_buf
= get_binary_arg(sstate
, gtp_cmd
,
462 gtp_cmds
+ CMDS_SIZE
- gtp_cmd
,
464 /* Check that the command is still valid. */
466 if (!bin_buf
) continue;
468 /* Send the command and get the reply, which always ends with \n\n
469 * The slave machine sends "=id reply" or "?id reply"
470 * with id == cmd_id if it is in sync. */
471 last_cmd_count
= cmd_count
;
473 int reply_id
= send_command(to_send
, bin_buf
, &bin_size
, f
,
475 if (reply_id
== -1) return;
477 resend
= process_reply(reply_id
, buf
, reply_buf
, bin_buf
, bin_size
,
478 &last_reply_id
, &reply_slot
, sstate
);
482 /* Minimimal check of slave identity. Close the file if error. */
484 is_pachi_slave(FILE *f
, struct in_addr
*client
)
488 if (!fgets(buf
, sizeof(buf
), f
)
489 || strncasecmp(buf
, "= Pachi", 7)
490 || !fgets(buf
, sizeof(buf
), f
)
491 || strcmp(buf
, "\n")) {
492 logline(client
, "? ", "bad slave\n");
494 sleep(1); // avoid busy loop if error
500 /* Thread sending gtp commands to one slave machine, and
501 * reading replies. If a slave machine dies, this thread waits
502 * for a connection from another slave.
503 * The large buffers are allocated only once we get a first
504 * connection, to avoid wasting memory if max_slaves is too large.
505 * We do not invalidate the received buffers if a slave disconnects;
506 * they are still useful for other slaves. */
507 static void * __attribute__((noreturn
))
508 slave_thread(void *arg
)
510 struct slave_state sstate
= default_sstate
;
511 sstate
.thread_id
= (intptr_t)arg
;
513 assert(sstate
.slave_sock
>= 0);
514 char reply_buf
[CMDS_SIZE
];
518 /* Wait for a connection from any slave. */
519 struct in_addr client
;
520 int conn
= open_server_connection(sstate
.slave_sock
, &client
);
522 FILE *f
= fdopen(conn
, "r+");
524 snprintf(reply_buf
, sizeof(reply_buf
),
525 "new slave, id %d\n", sstate
.thread_id
);
526 logline(&client
, "= ", reply_buf
);
528 if (!is_pachi_slave(f
, &client
)) continue;
530 if (!resend
) slave_state_alloc(&sstate
);
531 sstate
.client
= client
;
533 pthread_mutex_lock(&slave_lock
);
535 slave_loop(f
, reply_buf
, &sstate
, resend
);
537 assert(active_slaves
> 0);
539 // Unblock main thread if it was waiting for this slave.
540 pthread_cond_signal(&reply_cond
);
541 pthread_mutex_unlock(&slave_lock
);
545 logline(&client
, "= ", "lost slave\n");
551 /* Create a new gtp command for all slaves. The slave lock is held
552 * upon entry and upon return, so the command will actually be
553 * sent when the lock is released. The last command is overwritten
554 * if gtp_cmd points to a non-empty string. cmd is a single word;
555 * args has all arguments and is empty or has a trailing \n */
557 update_cmd(struct board
*b
, char *cmd
, char *args
, bool new_id
)
560 /* To make sure the slaves are in sync, we ignore the original id
561 * and use the board number plus some random bits as gtp id. */
562 static int gtp_id
= -1;
563 int moves
= is_reset(cmd
) ? 0 : b
->moves
;
565 int prev_id
= gtp_id
;
567 /* fast_random() is 16-bit only so the multiplication can't overflow. */
568 gtp_id
= force_reply(moves
+ fast_random(65535) * DIST_GAMELEN
);
569 } while (gtp_id
== prev_id
);
572 snprintf(gtp_cmd
, gtp_cmds
+ CMDS_SIZE
- gtp_cmd
, "%d %s %s",
573 gtp_id
, cmd
, *args
? args
: "\n");
576 /* Remember history for out-of-sync slaves. */
578 static struct cmd_history
*last
= NULL
;
580 if (last
) last
->next_cmd
= gtp_cmd
;
581 slot
= (slot
+ 1) % MAX_CMDS_PER_MOVE
;
582 last
= &history
[moves
][slot
];
583 last
->gtp_id
= gtp_id
;
584 last
->next_cmd
= NULL
;
586 // Notify the slave threads about the new command.
587 pthread_cond_broadcast(&cmd_cond
);
590 /* Update the command history, then create a new gtp command
591 * for all slaves. The slave lock is held upon entry and
592 * upon return, so the command will actually be sent when the
593 * lock is released. cmd is a single word; args has all
594 * arguments and is empty or has a trailing \n */
596 new_cmd(struct board
*b
, char *cmd
, char *args
)
598 // Clear the history when a new game starts:
599 if (!gtp_cmd
|| is_gamestart(cmd
)) {
601 memset(history
, 0, sizeof(history
));
603 /* Preserve command history for new slaves.
604 * To indicate that the slave should only reply to
605 * the last command we force the id of previous
606 * commands to be just the move number. */
607 int id
= prevent_reply(atoi(gtp_cmd
));
608 int len
= strspn(gtp_cmd
, "0123456789");
610 snprintf(buf
, sizeof(buf
), "%0*d", len
, id
);
611 memcpy(gtp_cmd
, buf
, len
);
613 gtp_cmd
+= strlen(gtp_cmd
);
616 // Let the slave threads send the new gtp command:
617 update_cmd(b
, cmd
, args
, true);
620 /* Wait for at least one new reply. Return when at least
621 * min_replies slaves have already replied, or when the
622 * given absolute time is passed.
623 * The replies are returned in gtp_replies[0..reply_count-1]
624 * slave_lock is held on entry and on return. */
626 get_replies(double time_limit
, int min_replies
)
629 if (reply_count
> 0) {
632 ts
.tv_nsec
= (int)(modf(time_limit
, &sec
)*1000000000.0);
633 ts
.tv_sec
= (int)sec
;
634 pthread_cond_timedwait(&reply_cond
, &slave_lock
, &ts
);
636 pthread_cond_wait(&reply_cond
, &slave_lock
);
638 if (reply_count
== 0) continue;
639 if (reply_count
>= min_replies
|| reply_count
>= active_slaves
) return;
640 if (time_now() >= time_limit
) break;
644 snprintf(buf
, sizeof(buf
),
645 "get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
646 time_now() - start_time
, time_limit
- start_time
,
647 reply_count
, min_replies
, active_slaves
);
648 logline(NULL
, "? ", buf
);
650 assert(reply_count
> 0);
653 /* In a 5mn move with at least 5ms per genmoves we get at most
654 * 300*200=60000 genmoves per slave. */
655 #define MAX_GENMOVES_PER_SLAVE 60000
657 /* Allocate the receive queue, and create the slave and proxy threads.
658 * max_buf_size and the merge-related fields of default_sstate must
659 * already be initialized. */
661 protocol_init(char *slave_port
, char *proxy_port
, int max_slaves
)
663 start_time
= time_now();
665 queue_max_length
= max_slaves
* MAX_GENMOVES_PER_SLAVE
;
666 receive_queue
= calloc2(queue_max_length
, sizeof(*receive_queue
));
668 default_sstate
.slave_sock
= port_listen(slave_port
, max_slaves
);
669 default_sstate
.last_processed
= -1;
671 for (int n
= 0; n
< BUFFERS_PER_SLAVE
; n
++) {
672 default_sstate
.b
[n
].queue_index
= -1;
676 for (int id
= 0; id
< max_slaves
; id
++) {
677 pthread_create(&thread
, NULL
, slave_thread
, (void *)(intptr_t)id
);
681 int proxy_sock
= port_listen(proxy_port
, max_slaves
);
682 for (int id
= 0; id
< max_slaves
; id
++) {
683 pthread_create(&thread
, NULL
, proxy_thread
, (void *)(intptr_t)proxy_sock
);