Distributed engine: Define hook to get stats received from slaves
[pachi/ann.git] / distributed / merge.c
blob1ce75091b4bb65138a88b4688ab71078cad64398
1 /* The master keeps stats received from slaves in a queue of received
2 * buffers that are merged together with the functions implemented
3 * here. It also has one hash table per slave to maintain cumulative
4 * stats that have not yet been sent to the slave machine. The queue
5 * and the hash tables are cleared at each new move. */
7 #include <assert.h>
8 #include <stdio.h>
9 #include <limits.h>
11 #include "debug.h"
12 #include "timeinfo.h"
13 #include "distributed/distributed.h"
14 #include "distributed/merge.h"
16 /* We merge together debug stats for all hash tables. */
17 static struct hash_counts h_counts;
19 /* We maintain counts per bucket to avoid sorting large arrays.
20 * All nodes with n updates since last send go to bucket n.
21 * We have at most max_merged_nodes = (max_slaves-1) * shared_nodes
22 * nodes to merge, 230K nodes for 24 slaves. If we put all nodes above
23 * 1K updates in the top bucket, we get at most 230 nodes in this
24 * bucket. So we can select exactly the best shared_nodes nodes if
25 * shared_nodes >= 230. In practice there is overlap between
26 * nodes sent by different slaves so shared_nodes can be lower. */
27 #define MAX_BUCKETS 1024
29 /* Update the hash table for the given increment stats,
30 * and increment the bucket count. Return the hash index.
31 * The slave lock is not held on either entry or exit of this function */
32 static inline int
33 stats_tally(struct incr_stats *s, struct slave_state *sstate, int *bucket_count)
35 int h;
36 bool found;
37 struct incr_stats *stats_htable = sstate->stats_htable;
38 find_hash(h, stats_htable, sstate->stats_hbits, s->coord_path, found, h_counts);
39 if (found) {
40 assert(stats_htable[h].incr.playouts > 0);
41 stats_add_result(&stats_htable[h].incr, s->incr.value, s->incr.playouts);
42 } else {
43 stats_htable[h] = *s;
44 if (DEBUG_MODE) h_counts.inserts++, h_counts.occupied++;
47 int incr = stats_htable[h].incr.playouts;
48 if (incr >= MAX_BUCKETS) incr = MAX_BUCKETS - 1;
49 bucket_count[incr]++;
50 return h;
53 static struct incr_stats terminator = { .coord_path = INT64_MAX };
55 /* Initialize the next pointers (see merge_new_stats()).
56 * Exclude invalid buffers and my own buffers by setting their next pointer
57 * to a terminator value. Update min if there are too many nodes to merge,
58 * so that merge time remains reasonable and the merge buffer doesn't overflow.
59 * (We skip the oldest buffers if the slave thread is too much behind. It is
60 * more important to get frequent incomplete updates than late complete updates.)
61 * Return the total number of nodes to be merged.
62 * The slave lock is not held on either entry or exit of this function. */
63 static int
64 filter_buffers(struct slave_state *sstate, struct incr_stats **next,
65 int *min, int max)
67 int size = 0;
68 int max_size = sstate->max_merged_nodes * sizeof(struct incr_stats);
70 for (int q = max; q >= *min; q--) {
71 if (!receive_queue[q].buf || receive_queue[q].thread_id == sstate->thread_id) {
72 next[q] = &terminator;
73 } else if (size + receive_queue[q].size > max_size) {
74 *min = q + 1;
75 assert(*min <= max);
76 break;
77 } else {
78 next[q] = (struct incr_stats*)receive_queue[q].buf;
79 size += receive_queue[q].size;
82 return size / sizeof(struct incr_stats);
85 /* Return the minimum coord path of next[min..max].
86 * This implementation is optimized for small values of max - min,
87 * which is the case if slaves are not too much behind.
88 * A heap (priority queue) could be used otherwise.
89 * The returned value might be come from a buffer that has
90 * been invalidated, the caller must check for this; in this
91 * case the returned value is < the correct value. */
92 static inline path_t
93 min_coord(struct incr_stats **next, int min, int max)
95 path_t min_c = next[min]->coord_path;
96 for (int q = min + 1; q <= max; q++) {
97 if (next[q]->coord_path < min_c)
98 min_c = next[q]->coord_path;
100 return min_c;
103 /* Merge all valid incremental stats in receive_queue[min..max],
104 * update the hash table, set the bucket counts, and save the
105 * list of updated hash table entries. The input buffers and
106 * the output buffer are all sorted by increasing coord path.
107 * The input buffers end with a terminator value INT64_MAX.
108 * Return the number of updated hash table entries. */
110 /* The slave lock is not held on either entry or exit of this function,
111 * so receive_queue entries may be invalidated while we scan them.
112 * The receive queue might grow while we scan it but we ignore
113 * entries above max, they will be processed at the next call.
114 * This function does not modify the receive queue. */
115 static int
116 merge_new_stats(struct slave_state *sstate, int min, int max,
117 int *bucket_count, int *nodes_read)
119 *nodes_read = 0;
120 if (max < min) return 0;
122 /* next[q] is the next value to be checked in receive_queue[q].buf */
123 struct incr_stats *next_[max - min + 1];
124 struct incr_stats **next = next_ - min;
125 *nodes_read = filter_buffers(sstate, next, &min, max);
127 /* prev_min_c is only used for debugging. */
128 path_t prev_min_c = 0;
130 /* Do N-way merge, processing one coord path per iteration.
131 * If the minimum coord is INT64_MAX, either all buffers are
132 * invalidated, or at least one is valid and we are at the
133 * end of all valid buffers. In both cases we're done. */
134 int merge_count = 0;
135 path_t min_c;
136 while ((min_c = min_coord(next, min, max)) != INT64_MAX) {
138 struct incr_stats sum = { .coord_path = min_c,
139 .incr = { .playouts = 0, .value = 0.0 }};
140 for (int q = min; q <= max; q++) {
141 struct incr_stats s = *(next[q]);
143 /* If s.coord_path != min_c, we must skip s.coord_path for now.
144 * If min_c is invalid, a future iteration will get a stable
145 * value since the call of min_coord(), so at some point we will
146 * get s.coord_path == min_c and we will not loop forever. */
147 if (s.coord_path != min_c) continue;
149 /* We check the buffer validity after s.coord has been checked
150 * to avoid a race condition, and also to avoid multiple useless
151 * checks for the same coord_path. */
152 if (unlikely(!receive_queue[q].buf)) {
153 next[q] = &terminator;
155 /* No point in continuing if we have a new move. */
156 if (min >= queue_length) return 0;
157 continue;
160 /* s.coord_path is valid here, so min_c is valid too.
161 * (An invalid min_c would be < s.coord_path.) */
162 assert(min_c > prev_min_c);
164 assert(s.coord_path && s.incr.playouts);
165 stats_add_result(&sum.incr, s.incr.value, s.incr.playouts);
166 next[q]++;
168 /* All the buffers containing min_c may have been invalidated
169 * so sum may still be zero. But in this case the next[q] which
170 * contained min_c have been reset to &terminator so we will
171 * not loop forever. */
172 if (!sum.incr.playouts) continue;
174 assert(min_c > prev_min_c);
175 if (DEBUG_MODE) prev_min_c = min_c;
177 /* At this point sum contains only valid increments,
178 * so we can add it to the hash table. */
179 assert(merge_count < sstate->max_merged_nodes);
180 sstate->merged[merge_count++] = stats_tally(&sum, sstate, bucket_count);
182 return merge_count;
185 /* Save in buf the best increments from other slaves merged previously.
186 * To avoid a costly scan of the entire hash table we only send nodes
187 * that were previously sent recently by other slaves. It is possible
188 * but very unlikely that the hash table contains some nodes with
189 * higher number of playouts.
190 * Return the number of nodes to be sent.
191 * The slave lock is not held on either entry or exit of this function. */
192 static int
193 output_stats(struct incr_stats *buf, struct slave_state *sstate,
194 int *bucket_count, int merge_count)
196 /* Find the minimum increment to send. The bucket with minimum
197 * increment may be sent only partially. */
198 int out_count = 0;
199 int min_incr = MAX_BUCKETS;
200 int shared_nodes = sstate->max_buf_size / sizeof(*buf);
201 do {
202 out_count += bucket_count[--min_incr];
203 } while (min_incr > 1 && out_count < shared_nodes);
205 /* Send all all increments > min_incr plus whatever we can at min_incr. */
206 int min_count = bucket_count[min_incr] - (out_count - shared_nodes);
207 out_count = 0;
208 int *merged = sstate->merged;
209 struct incr_stats *stats_htable = sstate->stats_htable;
210 while (merge_count--) {
211 int h = *merged++;
212 int delta = stats_htable[h].incr.playouts - min_incr;
213 if (delta < 0 || (delta == 0 && --min_count < 0)) continue;
215 assert (out_count < shared_nodes);
216 buf[out_count++] = stats_htable[h];
218 /* Clear the hash table entry. (We could instead
219 * just clear the playouts but clearing the entry
220 * leads to fewer collisions later.) */
221 stats_htable[h].coord_path = 0;
222 if (DEBUG_MODE) h_counts.occupied--;
224 /* The slave expects increments sorted by coord path
225 * but they are sorted already. */
226 return out_count;
229 /* Get all incremental stats received from other slaves since the
230 * last send. Store in buf the stats with largest playout increments.
231 * Return the byte size of the resulting buffer. The caller must
232 * check that the result is still valid.
233 * The slave lock is held on both entry and exit of this function. */
234 static int
235 get_new_stats(struct incr_stats *buf, struct slave_state *sstate, int cmd_id)
237 /* Process all valid buffers in receive_queue[min..max] */
238 int min = sstate->last_processed + 1;
239 int max = queue_length - 1;
240 if (max < min && cmd_id == sstate->stats_id) return 0;
242 sstate->last_processed = max;
244 /* It takes time to clear the hash table and merge the stats
245 * so do this unlocked. */
246 protocol_unlock();
248 double start = time_now();
249 double clear_time = 0;
251 /* Clear the hash table at a new move; the old paths in
252 * the hash table are now meaningless. */
253 if (cmd_id != sstate->stats_id) {
254 memset(sstate->stats_htable, 0,
255 (1 << sstate->stats_hbits) * sizeof(sstate->stats_htable[0]));
256 sstate->stats_id = cmd_id;
257 clear_time = time_now() - start;
260 /* Set the bucket counts and update the hash table stats. */
261 int bucket_count[MAX_BUCKETS];
262 memset(bucket_count, 0, sizeof(bucket_count));
263 int nodes_read;
264 int merge_count = merge_new_stats(sstate, min, max, bucket_count, &nodes_read);
266 /* Put the best increments in the output buffer. */
267 int output_nodes = output_stats(buf, sstate, bucket_count, merge_count);
269 if (DEBUGVV(2)) {
270 char b[1024];
271 snprintf(b, sizeof(b), "merged %d..%d %d/%d nodes,"
272 " output %d/%d nodes in %.3fms (clear %.3fms)\n",
273 min, max, merge_count, nodes_read, output_nodes,
274 sstate->max_buf_size / (int)sizeof(*buf),
275 (time_now() - start)*1000, clear_time*1000);
276 logline(&sstate->client, "= ", b);
279 protocol_lock();
281 return output_nodes * sizeof(*buf);
284 /* Allocate the buffers in the merge specific part of the slave sate,
285 * and reserve space for a terminator value (see merge_insert_hook). */
286 static void
287 merge_state_alloc(struct slave_state *sstate)
289 sstate->stats_htable = calloc2(1 << sstate->stats_hbits, sizeof(struct incr_stats));
290 sstate->merged = malloc2(sstate->max_merged_nodes * sizeof(int));
291 sstate->max_buf_size -= sizeof(struct incr_stats);
294 /* Append a terminator value to make merge_new_stats() more
295 * efficient. merge_state_alloc() has reserved enough space. */
296 static void
297 merge_insert_hook(struct incr_stats *buf, int size)
299 int nodes = size / sizeof(*buf);
300 buf[nodes].coord_path = INT64_MAX;
303 /* Initiliaze merge-related fields of the default slave state. */
304 void
305 merge_init(struct slave_state *sstate, int shared_nodes, int stats_hbits, int max_slaves)
307 /* See merge_state_alloc() for shared_nodes + 1 */
308 sstate->max_buf_size = (shared_nodes + 1) * sizeof(struct incr_stats);
309 sstate->stats_hbits = stats_hbits;
311 sstate->insert_hook = (buffer_hook)merge_insert_hook;
312 sstate->alloc_hook = merge_state_alloc;
313 sstate->args_hook = (getargs_hook)get_new_stats;
315 /* At worst one late slave thread may have to merge up to
316 * shared_nodes * BUFFERS_PER_SLAVE * (max_slaves - 1)
317 * nodes but on average it should not have to merge more than
318 * dist->shared_nodes * (max_slaves - 1)
319 * Restricting the maximum number of merged nodes to the latter avoids
320 * spending excessive time on the merge. */
321 sstate->max_merged_nodes = shared_nodes * (max_slaves - 1);