1 /* The master keeps stats received from slaves in a queue of received
2 * buffers that are merged together with the functions implemented
3 * here. It also has one hash table per slave to maintain cumulative
4 * stats that have not yet been sent to the slave machine. The queue
5 * and the hash tables are cleared at each new move. */
15 #include "distributed/distributed.h"
16 #include "distributed/merge.h"
18 /* We merge together debug stats for all hash tables. */
19 static struct hash_counts h_counts
;
21 /* Display and reset hash statistics. For debugging only. */
23 merge_print_stats(int total_hnodes
)
27 snprintf(buf
, sizeof(buf
),
28 "stats occupied %ld %.1f%% inserts %ld collisions %ld/%ld %.1f%%\n",
29 h_counts
.occupied
, h_counts
.occupied
* 100.0 / total_hnodes
,
30 h_counts
.inserts
, h_counts
.collisions
, h_counts
.lookups
,
31 h_counts
.collisions
* 100.0 / (h_counts
.lookups
+ 1));
32 logline(NULL
, "* ", buf
);
34 if (DEBUG_MODE
) h_counts
.occupied
= 0;
37 /* We maintain counts per bucket to avoid sorting large arrays.
38 * All nodes with n updates since last send go to bucket n.
39 * We have at most max_merged_nodes = (max_slaves-1) * shared_nodes
40 * nodes to merge, 230K nodes for 24 slaves. If we put all nodes above
41 * 1K updates in the top bucket, we get at most 230 nodes in this
42 * bucket. So we can select exactly the best shared_nodes nodes if
43 * shared_nodes >= 230. In practice there is overlap between
44 * nodes sent by different slaves so shared_nodes can be lower. */
45 #define MAX_BUCKETS 1024
47 /* Update the hash table for the given increment stats,
48 * and increment the bucket count. Return the hash index.
49 * The slave lock is not held on either entry or exit of this function */
51 stats_tally(struct incr_stats
*s
, struct slave_state
*sstate
, int *bucket_count
)
55 struct incr_stats
*stats_htable
= sstate
->stats_htable
;
56 find_hash(h
, stats_htable
, sstate
->stats_hbits
, s
->coord_path
, found
, h_counts
);
58 assert(stats_htable
[h
].incr
.playouts
> 0);
59 stats_add_result(&stats_htable
[h
].incr
, s
->incr
.value
, s
->incr
.playouts
);
62 if (DEBUG_MODE
) h_counts
.inserts
++, h_counts
.occupied
++;
65 int incr
= stats_htable
[h
].incr
.playouts
;
66 if (incr
>= MAX_BUCKETS
) incr
= MAX_BUCKETS
- 1;
71 static struct incr_stats terminator
= { .coord_path
= INT64_MAX
};
73 /* Initialize the next pointers (see merge_new_stats()).
74 * Exclude invalid buffers and my own buffers by setting their next pointer
75 * to a terminator value. Update min if there are too many nodes to merge,
76 * so that merge time remains reasonable and the merge buffer doesn't overflow.
77 * (We skip the oldest buffers if the slave thread is too much behind. It is
78 * more important to get frequent incomplete updates than late complete updates.)
79 * Return the total number of nodes to be merged.
80 * The slave lock is not held on either entry or exit of this function. */
82 filter_buffers(struct slave_state
*sstate
, struct incr_stats
**next
,
86 int max_size
= sstate
->max_merged_nodes
* sizeof(struct incr_stats
);
88 for (int q
= max
; q
>= *min
; q
--) {
89 if (!receive_queue
[q
] || receive_queue
[q
]->owner
== sstate
->thread_id
) {
90 next
[q
] = &terminator
;
91 } else if (size
+ receive_queue
[q
]->size
> max_size
) {
96 next
[q
] = (struct incr_stats
*)receive_queue
[q
]->buf
;
97 size
+= receive_queue
[q
]->size
;
100 return size
/ sizeof(struct incr_stats
);
103 /* Return the minimum coord path of next[min..max].
104 * This implementation is optimized for small values of max - min,
105 * which is the case if slaves are not too much behind.
106 * A heap (priority queue) could be used otherwise.
107 * The returned value might be come from a buffer that has
108 * been invalidated, the caller must check for this; in this
109 * case the returned value is < the correct value. */
111 min_coord(struct incr_stats
**next
, int min
, int max
)
113 path_t min_c
= next
[min
]->coord_path
;
114 for (int q
= min
+ 1; q
<= max
; q
++) {
115 if (next
[q
]->coord_path
< min_c
)
116 min_c
= next
[q
]->coord_path
;
121 /* Merge all valid incremental stats in receive_queue[min..max],
122 * update the hash table, set the bucket counts, and save the
123 * list of updated hash table entries. The input buffers and
124 * the output buffer are all sorted by increasing coord path.
125 * The input buffers end with a terminator value INT64_MAX.
126 * Return the number of updated hash table entries. */
128 /* The slave lock is not held on either entry or exit of this function,
129 * so receive_queue entries may be invalidated while we scan them.
130 * The receive queue might grow while we scan it but we ignore
131 * entries above max, they will be processed at the next call.
132 * This function does not modify the receive queue. */
134 merge_new_stats(struct slave_state
*sstate
, int min
, int max
,
135 int *bucket_count
, int *nodes_read
, int last_queue_age
)
138 if (max
< min
) return 0;
140 /* next[q] is the next value to be checked in receive_queue[q]->buf */
141 struct incr_stats
*next_
[max
- min
+ 1];
142 struct incr_stats
**next
= next_
- min
;
143 *nodes_read
= filter_buffers(sstate
, next
, &min
, max
);
145 /* prev_min_c is only used for debugging. */
146 path_t prev_min_c
= 0;
148 /* Do N-way merge, processing one coord path per iteration.
149 * If the minimum coord is INT64_MAX, either all buffers are
150 * invalidated, or at least one is valid and we are at the
151 * end of all valid buffers. In both cases we're done. */
154 while ((min_c
= min_coord(next
, min
, max
)) != INT64_MAX
) {
156 struct incr_stats sum
= { .coord_path
= min_c
,
157 .incr
= { .playouts
= 0, .value
= 0.0 }};
158 for (int q
= min
; q
<= max
; q
++) {
159 struct incr_stats s
= *(next
[q
]);
161 /* If s.coord_path != min_c, we must skip s.coord_path for now.
162 * If min_c is invalid, a future iteration will get a stable
163 * value since the call of min_coord(), so at some point we will
164 * get s.coord_path == min_c and we will not loop forever. */
165 if (s
.coord_path
!= min_c
) continue;
167 /* We check the buffer validity after s.coord has been checked
168 * to avoid a race condition, and also to avoid multiple useless
169 * checks for the same coord_path. */
170 if (unlikely(!receive_queue
[q
])) {
171 next
[q
] = &terminator
;
175 /* Stop if we have a new move. If queue_age is incremented
176 * after this check, the merged output will be discarded. */
177 if (unlikely(queue_age
> last_queue_age
)) return 0;
179 /* s.coord_path is valid here, so min_c is valid too.
180 * (An invalid min_c would be < s.coord_path.) */
181 assert(min_c
> prev_min_c
);
183 assert(s
.coord_path
&& s
.incr
.playouts
);
184 stats_add_result(&sum
.incr
, s
.incr
.value
, s
.incr
.playouts
);
187 /* All the buffers containing min_c may have been invalidated
188 * so sum may still be zero. But in this case the next[q] which
189 * contained min_c have been reset to &terminator so we will
190 * not loop forever. */
191 if (!sum
.incr
.playouts
) continue;
193 assert(min_c
> prev_min_c
);
194 if (DEBUG_MODE
) prev_min_c
= min_c
;
196 /* At this point sum contains only valid increments,
197 * so we can add it to the hash table. */
198 assert(merge_count
< sstate
->max_merged_nodes
);
199 sstate
->merged
[merge_count
++] = stats_tally(&sum
, sstate
, bucket_count
);
204 /* Save in buf the best increments from other slaves merged previously.
205 * To avoid a costly scan of the entire hash table we only send nodes
206 * that were previously sent recently by other slaves. It is possible
207 * but very unlikely that the hash table contains some nodes with
208 * higher number of playouts.
209 * Return the number of nodes to be sent.
210 * The slave lock is not held on either entry or exit of this function. */
212 output_stats(struct incr_stats
*buf
, struct slave_state
*sstate
,
213 int *bucket_count
, int merge_count
)
215 /* Find the minimum increment to send. The bucket with minimum
216 * increment may be sent only partially. */
218 int min_incr
= MAX_BUCKETS
;
219 int shared_nodes
= sstate
->max_buf_size
/ sizeof(*buf
);
221 out_count
+= bucket_count
[--min_incr
];
222 } while (min_incr
> 1 && out_count
< shared_nodes
);
224 /* Send all all increments > min_incr plus whatever we can at min_incr. */
225 int min_count
= bucket_count
[min_incr
] - (out_count
- shared_nodes
);
227 int *merged
= sstate
->merged
;
228 struct incr_stats
*stats_htable
= sstate
->stats_htable
;
229 while (merge_count
--) {
231 int delta
= stats_htable
[h
].incr
.playouts
- min_incr
;
232 if (delta
< 0 || (delta
== 0 && --min_count
< 0)) continue;
234 assert (out_count
< shared_nodes
);
235 buf
[out_count
++] = stats_htable
[h
];
237 /* Clear the hash table entry. (We could instead
238 * just clear the playouts but clearing the entry
239 * leads to fewer collisions later.) */
240 stats_htable
[h
].coord_path
= 0;
241 if (DEBUG_MODE
) h_counts
.occupied
--;
243 /* The slave expects increments sorted by coord path
244 * but they are sorted already. */
248 /* Get all incremental stats received from other slaves since the
249 * last send. Store in buf the stats with largest playout increments.
250 * Return the byte size of the resulting buffer. The caller must
251 * check that the result is still valid.
252 * The slave lock is held on both entry and exit of this function. */
254 get_new_stats(struct incr_stats
*buf
, struct slave_state
*sstate
, int cmd_id
)
256 /* Process all valid buffers in receive_queue[min..max] */
257 int min
= sstate
->last_processed
+ 1;
258 int max
= queue_length
- 1;
259 if (max
< min
&& cmd_id
== sstate
->stats_id
) return 0;
261 sstate
->last_processed
= max
;
262 int last_queue_age
= queue_age
;
264 /* It takes time to clear the hash table and merge the stats
265 * so do this unlocked. */
268 double start
= time_now();
269 double clear_time
= 0;
271 /* Clear the hash table at a new move; the old paths in
272 * the hash table are now meaningless. */
273 if (cmd_id
!= sstate
->stats_id
) {
274 memset(sstate
->stats_htable
, 0,
275 (1 << sstate
->stats_hbits
) * sizeof(sstate
->stats_htable
[0]));
276 sstate
->stats_id
= cmd_id
;
277 clear_time
= time_now() - start
;
280 /* Set the bucket counts and update the hash table stats. */
281 int bucket_count
[MAX_BUCKETS
];
282 memset(bucket_count
, 0, sizeof(bucket_count
));
284 int merge_count
= merge_new_stats(sstate
, min
, max
, bucket_count
,
285 &nodes_read
, last_queue_age
);
289 for (int q
= min
; q
<= max
; q
++) missed
+= !receive_queue
[q
];
291 /* Put the best increments in the output buffer. */
292 int output_nodes
= output_stats(buf
, sstate
, bucket_count
, merge_count
);
296 snprintf(b
, sizeof(b
), "merged %d..%d missed %d %d/%d nodes,"
297 " output %d/%d nodes in %.3fms (clear %.3fms)\n",
298 min
, max
, missed
, merge_count
, nodes_read
, output_nodes
,
299 sstate
->max_buf_size
/ (int)sizeof(*buf
),
300 (time_now() - start
)*1000, clear_time
*1000);
301 logline(&sstate
->client
, "= ", b
);
306 return output_nodes
* sizeof(*buf
);
309 /* Allocate the buffers in the merge specific part of the slave sate,
310 * and reserve space for a terminator value (see merge_insert_hook). */
312 merge_state_alloc(struct slave_state
*sstate
)
314 sstate
->stats_htable
= calloc2(1 << sstate
->stats_hbits
, sizeof(struct incr_stats
));
315 sstate
->merged
= malloc2(sstate
->max_merged_nodes
* sizeof(int));
316 sstate
->max_buf_size
-= sizeof(struct incr_stats
);
319 /* Append a terminator value to make merge_new_stats() more
320 * efficient. merge_state_alloc() has reserved enough space. */
322 merge_insert_hook(struct incr_stats
*buf
, int size
)
324 int nodes
= size
/ sizeof(*buf
);
325 buf
[nodes
].coord_path
= INT64_MAX
;
328 /* Initiliaze merge-related fields of the default slave state. */
330 merge_init(struct slave_state
*sstate
, int shared_nodes
, int stats_hbits
, int max_slaves
)
332 /* See merge_state_alloc() for shared_nodes + 1 */
333 sstate
->max_buf_size
= (shared_nodes
+ 1) * sizeof(struct incr_stats
);
334 sstate
->stats_hbits
= stats_hbits
;
336 sstate
->insert_hook
= (buffer_hook
)merge_insert_hook
;
337 sstate
->alloc_hook
= merge_state_alloc
;
338 sstate
->args_hook
= (getargs_hook
)get_new_stats
;
340 /* At worst one late slave thread may have to merge up to
341 * shared_nodes * BUFFERS_PER_SLAVE * (max_slaves - 1)
342 * nodes but on average it should not have to merge more than
343 * dist->shared_nodes * (max_slaves - 1)
344 * Restricting the maximum number of merged nodes to the latter avoids
345 * spending excessive time on the merge. */
346 sstate
->max_merged_nodes
= shared_nodes
* (max_slaves
- 1);