1 /* The functions implementing the master-slave protocol of the
2 * distributed engine are grouped here. They are independent
3 * of the gtp protocol. See the comments at the top of distributed.c
4 * for a general introduction to the distributed engine. */
18 #include "distributed/distributed.h"
19 #include "distributed/protocol.h"
21 /* All gtp commands for current game separated by \n */
22 static char gtp_cmds
[CMDS_SIZE
];
24 /* Latest gtp command sent to slaves. */
25 static char *gtp_cmd
= NULL
;
27 /* Slaves send gtp_cmd when cmd_count changes. */
28 static int cmd_count
= 0;
30 /* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
31 * time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
32 #define MAX_CMDS_PER_MOVE 10
34 /* History of gtp commands sent for current game, indexed by move. */
38 } history
[MAX_GAMELEN
][MAX_CMDS_PER_MOVE
];
40 /* Number of active slave machines working for this master. */
41 int active_slaves
= 0;
43 /* Number of replies to last gtp command already received. */
46 /* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
49 /* Mutex protecting gtp_cmds, gtp_cmd, history,
50 * cmd_count, active_slaves, reply_count & gtp_replies */
51 static pthread_mutex_t slave_lock
= PTHREAD_MUTEX_INITIALIZER
;
53 /* Condition signaled when a new gtp command is available. */
54 static pthread_cond_t cmd_cond
= PTHREAD_COND_INITIALIZER
;
56 /* Condition signaled when reply_count increases. */
57 static pthread_cond_t reply_cond
= PTHREAD_COND_INITIALIZER
;
59 /* Mutex protecting stderr. Must not be held at same time as slave_lock. */
60 static pthread_mutex_t log_lock
= PTHREAD_MUTEX_INITIALIZER
;
62 /* Absolute time when this program was started.
63 * For debugging only. */
64 static double start_time
;
66 /* Get exclusive access to the threads and commands state. */
70 pthread_mutex_lock(&slave_lock
);
73 /* Release exclusive access to the threads and commands state. */
77 pthread_mutex_unlock(&slave_lock
);
80 /* Write the time, client address, prefix, and string s to stderr atomically.
81 * s should end with a \n */
83 logline(struct in_addr
*client
, char *prefix
, char *s
)
85 double now
= time_now();
87 char addr
[INET_ADDRSTRLEN
];
89 inet_ntop(AF_INET
, client
, addr
, sizeof(addr
));
93 pthread_mutex_lock(&log_lock
);
94 fprintf(stderr
, "%s%15s %9.3f: %s", prefix
, addr
, now
- start_time
, s
);
95 pthread_mutex_unlock(&log_lock
);
98 /* Thread opening a connection on the given socket and copying input
99 * from there to stderr. */
101 proxy_thread(void *arg
)
103 int proxy_sock
= (long)arg
;
104 assert(proxy_sock
>= 0);
106 struct in_addr client
;
107 int conn
= open_server_connection(proxy_sock
, &client
);
108 FILE *f
= fdopen(conn
, "r");
110 while (fgets(buf
, BSIZE
, f
)) {
111 logline(&client
, "< ", buf
);
117 /* Get a reply to one gtp command. Return the gtp command id,
118 * or -1 if error. reply must have at least CMDS_SIZE bytes.
119 * slave_lock is not held on either entry or exit of this function. */
121 get_reply(FILE *f
, struct in_addr client
, char *reply
)
126 while (fgets(line
, reply
+ CMDS_SIZE
- line
, f
) && *line
!= '\n') {
127 if (DEBUGL(3) || (DEBUGL(2) && line
== reply
))
128 logline(&client
, "<<", line
);
129 if (reply_id
< 0 && (*line
== '=' || *line
== '?') && isdigit(line
[1]))
130 reply_id
= atoi(line
+1);
131 line
+= strlen(line
);
133 if (*line
!= '\n') return -1;
137 /* Send one gtp command and get a reply from the slave machine.
138 * Write the reply in buf which must have at least CMDS_SIZE bytes.
139 * Return the gtp command id, or -1 if error.
140 * slave_lock is held on both entry and exit of this function. */
142 send_command(char *to_send
, FILE *f
, struct in_addr client
, char *buf
)
144 assert(to_send
&& gtp_cmd
);
145 strncpy(buf
, to_send
, CMDS_SIZE
);
146 bool resend
= to_send
!= gtp_cmd
;
148 pthread_mutex_unlock(&slave_lock
);
150 if (DEBUGL(1) && resend
)
151 logline(&client
, "? ",
152 to_send
== gtp_cmds
? "resend all\n" : "partial resend\n");
157 char *s
= strchr(buf
, '\n');
160 logline(&client
, ">>", buf
);
163 int reply_id
= get_reply(f
, client
, buf
);
165 pthread_mutex_lock(&slave_lock
);
169 /* Return the command sent after that with the given gtp id,
170 * or gtp_cmds if the id wasn't used in this game. If a play command
171 * has overwritten a genmoves command, return the play command.
172 * slave_lock is held on both entry and exit of this function. */
174 next_command(int cmd_id
)
176 if (cmd_id
== -1) return gtp_cmds
;
178 int last_id
= atoi(gtp_cmd
);
179 int reply_move
= move_number(cmd_id
);
180 if (reply_move
> move_number(last_id
)) return gtp_cmds
;
183 for (slot
= 0; slot
< MAX_CMDS_PER_MOVE
; slot
++) {
184 if (cmd_id
== history
[reply_move
][slot
].gtp_id
) break;
186 if (slot
== MAX_CMDS_PER_MOVE
) return gtp_cmds
;
188 char *next
= history
[reply_move
][slot
].next_cmd
;
193 /* Process the reply received from a slave machine.
194 * Copy it to reply_buf and return false if ok, or return
195 * true if the slave is out of sync.
196 * slave_lock is held on both entry and exit of this function. */
198 process_reply(int reply_id
, char *reply
, char *reply_buf
,
199 int *last_reply_id
, int *reply_slot
)
202 /* For resend everything if slave returned an error. */
207 /* Make sure we are still in sync. cmd_count may have
208 * changed but the reply is valid as long as cmd_id didn't
209 * change (this only occurs for consecutive genmoves). */
210 int cmd_id
= atoi(gtp_cmd
);
211 if (reply_id
== cmd_id
) {
212 strncpy(reply_buf
, reply
, CMDS_SIZE
);
213 if (reply_id
!= *last_reply_id
)
214 *reply_slot
= reply_count
++;
215 gtp_replies
[*reply_slot
] = reply_buf
;
217 pthread_cond_signal(&reply_cond
);
220 *last_reply_id
= reply_id
;
224 /* Main loop of a slave thread.
225 * Send the current command to the slave machine and wait for a reply.
226 * Resend command history if the slave machine is out of sync.
227 * Returns when the connection with the slave machine is cut.
228 * slave_lock is held on both entry and exit of this function. */
230 slave_loop(FILE *f
, struct in_addr client
, char *reply_buf
, bool resend
)
233 int last_cmd_sent
= 0;
234 int last_reply_id
= -1;
238 /* Resend complete or partial history */
239 to_send
= next_command(last_reply_id
);
241 /* Wait for a new command. */
242 while (last_cmd_sent
== cmd_count
)
243 pthread_cond_wait(&cmd_cond
, &slave_lock
);
247 /* Command available, send it to slave machine.
248 * If slave was out of sync, send the history. */
250 last_cmd_sent
= cmd_count
;
252 /* Send the command and get the reply, which always ends with \n\n
253 * The slave machine sends "=id reply" or "?id reply"
254 * with id == cmd_id if it is in sync. */
255 int reply_id
= send_command(to_send
, f
, client
, buf
);
256 if (reply_id
== -1) return;
258 resend
= process_reply(reply_id
, buf
, reply_buf
,
259 &last_reply_id
, &reply_slot
);
261 /* Good reply. Force waiting for a new command.
262 * The next genmoves stats we send must include those
263 * just received (this is assumed by the slave). */
264 last_cmd_sent
= cmd_count
;
268 /* Thread sending gtp commands to one slave machine, and
269 * reading replies. If a slave machine dies, this thread waits
270 * for a connection from another slave. */
272 slave_thread(void *arg
)
274 int slave_sock
= (long)arg
;
275 assert(slave_sock
>= 0);
276 char reply_buf
[CMDS_SIZE
];
280 /* Wait for a connection from any slave. */
281 struct in_addr client
;
282 int conn
= open_server_connection(slave_sock
, &client
);
284 FILE *f
= fdopen(conn
, "r+");
286 logline(&client
, "= ", "new slave\n");
288 /* Minimal check of the slave identity. */
290 if (!fgets(reply_buf
, sizeof(reply_buf
), f
)
291 || strncasecmp(reply_buf
, "= Pachi", 7)
292 || !fgets(reply_buf
, sizeof(reply_buf
), f
)
293 || strcmp(reply_buf
, "\n")) {
294 logline(&client
, "? ", "bad slave\n");
299 pthread_mutex_lock(&slave_lock
);
301 slave_loop(f
, client
, reply_buf
, resend
);
303 assert(active_slaves
> 0);
305 // Unblock main thread if it was waiting for this slave.
306 pthread_cond_signal(&reply_cond
);
307 pthread_mutex_unlock(&slave_lock
);
311 logline(&client
, "= ", "lost slave\n");
316 /* Create a new gtp command for all slaves. The slave lock is held
317 * upon entry and upon return, so the command will actually be
318 * sent when the lock is released. The last command is overwritten
319 * if gtp_cmd points to a non-empty string. cmd is a single word;
320 * args has all arguments and is empty or has a trailing \n */
322 update_cmd(struct board
*b
, char *cmd
, char *args
, bool new_id
)
325 /* To make sure the slaves are in sync, we ignore the original id
326 * and use the board number plus some random bits as gtp id. */
327 static int gtp_id
= -1;
328 int moves
= is_reset(cmd
) ? 0 : b
->moves
;
330 /* fast_random() is 16-bit only so the multiplication can't overflow. */
331 gtp_id
= force_reply(moves
+ fast_random(65535) * DIST_GAMELEN
);
334 snprintf(gtp_cmd
, gtp_cmds
+ CMDS_SIZE
- gtp_cmd
, "%d %s %s",
335 gtp_id
, cmd
, *args
? args
: "\n");
338 /* Remember history for out-of-sync slaves. */
340 static struct cmd_history
*last
= NULL
;
342 if (last
) last
->next_cmd
= gtp_cmd
;
343 slot
= (slot
+ 1) % MAX_CMDS_PER_MOVE
;
344 last
= &history
[moves
][slot
];
345 last
->gtp_id
= gtp_id
;
346 last
->next_cmd
= NULL
;
348 // Notify the slave threads about the new command.
349 pthread_cond_broadcast(&cmd_cond
);
352 /* Update the command history, then create a new gtp command
353 * for all slaves. The slave lock is held upon entry and
354 * upon return, so the command will actually be sent when the
355 * lock is released. cmd is a single word; args has all
356 * arguments and is empty or has a trailing \n */
358 new_cmd(struct board
*b
, char *cmd
, char *args
)
360 // Clear the history when a new game starts:
361 if (!gtp_cmd
|| is_gamestart(cmd
)) {
363 memset(history
, 0, sizeof(history
));
365 /* Preserve command history for new slaves.
366 * To indicate that the slave should only reply to
367 * the last command we force the id of previous
368 * commands to be just the move number. */
369 int id
= prevent_reply(atoi(gtp_cmd
));
370 int len
= strspn(gtp_cmd
, "0123456789");
372 snprintf(buf
, sizeof(buf
), "%0*d", len
, id
);
373 memcpy(gtp_cmd
, buf
, len
);
375 gtp_cmd
+= strlen(gtp_cmd
);
378 // Let the slave threads send the new gtp command:
379 update_cmd(b
, cmd
, args
, true);
382 /* Wait for at least one new reply. Return when at least
383 * min_replies slaves have already replied, or when the
384 * given absolute time is passed.
385 * The replies are returned in gtp_replies[0..reply_count-1]
386 * slave_lock is held on entry and on return. */
388 get_replies(double time_limit
, int min_replies
)
391 if (reply_count
> 0) {
394 ts
.tv_nsec
= (int)(modf(time_limit
, &sec
)*1000000000.0);
395 ts
.tv_sec
= (int)sec
;
396 pthread_cond_timedwait(&reply_cond
, &slave_lock
, &ts
);
398 pthread_cond_wait(&reply_cond
, &slave_lock
);
400 if (reply_count
== 0) continue;
401 if (reply_count
>= min_replies
|| reply_count
>= active_slaves
) return;
402 if (time_now() >= time_limit
) break;
406 snprintf(buf
, sizeof(buf
),
407 "get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
408 time_now() - start_time
, time_limit
- start_time
,
409 reply_count
, min_replies
, active_slaves
);
410 logline(NULL
, "? ", buf
);
412 assert(reply_count
> 0);
415 /* Create the slave and proxy threads. */
417 protocol_init(char *slave_port
, char *proxy_port
, int max_slaves
)
419 start_time
= time_now();
421 int slave_sock
= port_listen(slave_port
, max_slaves
);
423 for (int id
= 0; id
< max_slaves
; id
++) {
424 pthread_create(&thread
, NULL
, slave_thread
, (void *)(long)slave_sock
);
428 int proxy_sock
= port_listen(proxy_port
, max_slaves
);
429 for (int id
= 0; id
< max_slaves
; id
++) {
430 pthread_create(&thread
, NULL
, proxy_thread
, (void *)(long)proxy_sock
);