The eighth batch
[alt-git.git] / reftable / merged.c
blobf85a24c6786c6fb8e102ffc9ceb35f23706c8c9b
1 /*
2 Copyright 2020 Google LLC
4 Use of this source code is governed by a BSD-style
5 license that can be found in the LICENSE file or at
6 https://developers.google.com/open-source/licenses/bsd
7 */
9 #include "merged.h"
11 #include "constants.h"
12 #include "iter.h"
13 #include "pq.h"
14 #include "record.h"
15 #include "generic.h"
16 #include "reftable-merged.h"
17 #include "reftable-error.h"
18 #include "system.h"
20 struct merged_subiter {
21 struct reftable_iterator iter;
22 struct reftable_record rec;
25 struct merged_iter {
26 struct merged_subiter *subiters;
27 struct merged_iter_pqueue pq;
28 uint32_t hash_id;
29 size_t stack_len;
30 uint8_t typ;
31 int suppress_deletions;
32 ssize_t advance_index;
35 static int merged_iter_init(struct merged_iter *mi)
37 for (size_t i = 0; i < mi->stack_len; i++) {
38 struct pq_entry e = {
39 .index = i,
40 .rec = &mi->subiters[i].rec,
42 int err;
44 reftable_record_init(&mi->subiters[i].rec, mi->typ);
45 err = iterator_next(&mi->subiters[i].iter,
46 &mi->subiters[i].rec);
47 if (err < 0)
48 return err;
49 if (err > 0)
50 continue;
52 merged_iter_pqueue_add(&mi->pq, &e);
55 return 0;
58 static void merged_iter_close(void *p)
60 struct merged_iter *mi = p;
62 merged_iter_pqueue_release(&mi->pq);
63 for (size_t i = 0; i < mi->stack_len; i++) {
64 reftable_iterator_destroy(&mi->subiters[i].iter);
65 reftable_record_release(&mi->subiters[i].rec);
67 reftable_free(mi->subiters);
70 static int merged_iter_advance_subiter(struct merged_iter *mi, size_t idx)
72 struct pq_entry e = {
73 .index = idx,
74 .rec = &mi->subiters[idx].rec,
76 int err;
78 err = iterator_next(&mi->subiters[idx].iter, &mi->subiters[idx].rec);
79 if (err)
80 return err;
82 merged_iter_pqueue_add(&mi->pq, &e);
83 return 0;
86 static int merged_iter_next_entry(struct merged_iter *mi,
87 struct reftable_record *rec)
89 struct pq_entry entry = { 0 };
90 int err = 0, empty;
92 empty = merged_iter_pqueue_is_empty(mi->pq);
94 if (mi->advance_index >= 0) {
96 * When there are no pqueue entries then we only have a single
97 * subiter left. There is no need to use the pqueue in that
98 * case anymore as we know that the subiter will return entries
99 * in the correct order already.
101 * While this may sound like a very specific edge case, it may
102 * happen more frequently than you think. Most repositories
103 * will end up having a single large base table that contains
104 * most of the refs. It's thus likely that we exhaust all
105 * subiters but the one from that base ref.
107 if (empty)
108 return iterator_next(&mi->subiters[mi->advance_index].iter,
109 rec);
111 err = merged_iter_advance_subiter(mi, mi->advance_index);
112 if (err < 0)
113 return err;
114 if (!err)
115 empty = 0;
116 mi->advance_index = -1;
119 if (empty)
120 return 1;
122 entry = merged_iter_pqueue_remove(&mi->pq);
125 One can also use reftable as datacenter-local storage, where the ref
126 database is maintained in globally consistent database (eg.
127 CockroachDB or Spanner). In this scenario, replication delays together
128 with compaction may cause newer tables to contain older entries. In
129 such a deployment, the loop below must be changed to collect all
130 entries for the same key, and return new the newest one.
132 while (!merged_iter_pqueue_is_empty(mi->pq)) {
133 struct pq_entry top = merged_iter_pqueue_top(mi->pq);
134 int cmp;
136 cmp = reftable_record_cmp(top.rec, entry.rec);
137 if (cmp > 0)
138 break;
140 merged_iter_pqueue_remove(&mi->pq);
141 err = merged_iter_advance_subiter(mi, top.index);
142 if (err < 0)
143 return err;
146 mi->advance_index = entry.index;
147 SWAP(*rec, *entry.rec);
148 return 0;
151 static int merged_iter_next_void(void *p, struct reftable_record *rec)
153 struct merged_iter *mi = p;
154 while (1) {
155 int err = merged_iter_next_entry(mi, rec);
156 if (err)
157 return err;
158 if (mi->suppress_deletions && reftable_record_is_deletion(rec))
159 continue;
160 return 0;
164 static struct reftable_iterator_vtable merged_iter_vtable = {
165 .next = &merged_iter_next_void,
166 .close = &merged_iter_close,
169 static void iterator_from_merged_iter(struct reftable_iterator *it,
170 struct merged_iter *mi)
172 assert(!it->ops);
173 it->iter_arg = mi;
174 it->ops = &merged_iter_vtable;
177 int reftable_new_merged_table(struct reftable_merged_table **dest,
178 struct reftable_table *stack, size_t n,
179 uint32_t hash_id)
181 struct reftable_merged_table *m = NULL;
182 uint64_t last_max = 0;
183 uint64_t first_min = 0;
185 for (size_t i = 0; i < n; i++) {
186 uint64_t min = reftable_table_min_update_index(&stack[i]);
187 uint64_t max = reftable_table_max_update_index(&stack[i]);
189 if (reftable_table_hash_id(&stack[i]) != hash_id) {
190 return REFTABLE_FORMAT_ERROR;
192 if (i == 0 || min < first_min) {
193 first_min = min;
195 if (i == 0 || max > last_max) {
196 last_max = max;
200 REFTABLE_CALLOC_ARRAY(m, 1);
201 m->stack = stack;
202 m->stack_len = n;
203 m->min = first_min;
204 m->max = last_max;
205 m->hash_id = hash_id;
206 *dest = m;
207 return 0;
210 /* clears the list of subtable, without affecting the readers themselves. */
211 void merged_table_release(struct reftable_merged_table *mt)
213 FREE_AND_NULL(mt->stack);
214 mt->stack_len = 0;
217 void reftable_merged_table_free(struct reftable_merged_table *mt)
219 if (!mt) {
220 return;
222 merged_table_release(mt);
223 reftable_free(mt);
226 uint64_t
227 reftable_merged_table_max_update_index(struct reftable_merged_table *mt)
229 return mt->max;
232 uint64_t
233 reftable_merged_table_min_update_index(struct reftable_merged_table *mt)
235 return mt->min;
238 static int reftable_table_seek_record(struct reftable_table *tab,
239 struct reftable_iterator *it,
240 struct reftable_record *rec)
242 return tab->ops->seek_record(tab->table_arg, it, rec);
245 static int merged_table_seek_record(struct reftable_merged_table *mt,
246 struct reftable_iterator *it,
247 struct reftable_record *rec)
249 struct merged_iter merged = {
250 .typ = reftable_record_type(rec),
251 .hash_id = mt->hash_id,
252 .suppress_deletions = mt->suppress_deletions,
253 .advance_index = -1,
255 struct merged_iter *p;
256 int err;
258 REFTABLE_CALLOC_ARRAY(merged.subiters, mt->stack_len);
259 for (size_t i = 0; i < mt->stack_len; i++) {
260 err = reftable_table_seek_record(&mt->stack[i],
261 &merged.subiters[merged.stack_len].iter, rec);
262 if (err < 0)
263 goto out;
264 if (!err)
265 merged.stack_len++;
268 err = merged_iter_init(&merged);
269 if (err < 0)
270 goto out;
272 p = reftable_malloc(sizeof(struct merged_iter));
273 *p = merged;
274 iterator_from_merged_iter(it, p);
276 out:
277 if (err < 0)
278 merged_iter_close(&merged);
279 return err;
282 int reftable_merged_table_seek_ref(struct reftable_merged_table *mt,
283 struct reftable_iterator *it,
284 const char *name)
286 struct reftable_record rec = {
287 .type = BLOCK_TYPE_REF,
288 .u.ref = {
289 .refname = (char *)name,
292 return merged_table_seek_record(mt, it, &rec);
295 int reftable_merged_table_seek_log_at(struct reftable_merged_table *mt,
296 struct reftable_iterator *it,
297 const char *name, uint64_t update_index)
299 struct reftable_record rec = { .type = BLOCK_TYPE_LOG,
300 .u.log = {
301 .refname = (char *)name,
302 .update_index = update_index,
303 } };
304 return merged_table_seek_record(mt, it, &rec);
307 int reftable_merged_table_seek_log(struct reftable_merged_table *mt,
308 struct reftable_iterator *it,
309 const char *name)
311 uint64_t max = ~((uint64_t)0);
312 return reftable_merged_table_seek_log_at(mt, it, name, max);
315 uint32_t reftable_merged_table_hash_id(struct reftable_merged_table *mt)
317 return mt->hash_id;
320 static int reftable_merged_table_seek_void(void *tab,
321 struct reftable_iterator *it,
322 struct reftable_record *rec)
324 return merged_table_seek_record(tab, it, rec);
327 static uint32_t reftable_merged_table_hash_id_void(void *tab)
329 return reftable_merged_table_hash_id(tab);
332 static uint64_t reftable_merged_table_min_update_index_void(void *tab)
334 return reftable_merged_table_min_update_index(tab);
337 static uint64_t reftable_merged_table_max_update_index_void(void *tab)
339 return reftable_merged_table_max_update_index(tab);
342 static struct reftable_table_vtable merged_table_vtable = {
343 .seek_record = reftable_merged_table_seek_void,
344 .hash_id = reftable_merged_table_hash_id_void,
345 .min_update_index = reftable_merged_table_min_update_index_void,
346 .max_update_index = reftable_merged_table_max_update_index_void,
349 void reftable_table_from_merged_table(struct reftable_table *tab,
350 struct reftable_merged_table *merged)
352 assert(!tab->ops);
353 tab->ops = &merged_table_vtable;
354 tab->table_arg = merged;