2 Copyright 2020 Google LLC
4 Use of this source code is governed by a BSD-style
5 license that can be found in the LICENSE file or at
6 https://developers.google.com/open-source/licenses/bsd
11 #include "constants.h"
16 #include "reftable-merged.h"
17 #include "reftable-error.h"
20 struct merged_subiter
{
21 struct reftable_iterator iter
;
22 struct reftable_record rec
;
26 struct merged_subiter
*subiters
;
27 struct merged_iter_pqueue pq
;
31 int suppress_deletions
;
32 ssize_t advance_index
;
35 static int merged_iter_init(struct merged_iter
*mi
)
37 for (size_t i
= 0; i
< mi
->stack_len
; i
++) {
40 .rec
= &mi
->subiters
[i
].rec
,
44 reftable_record_init(&mi
->subiters
[i
].rec
, mi
->typ
);
45 err
= iterator_next(&mi
->subiters
[i
].iter
,
46 &mi
->subiters
[i
].rec
);
52 merged_iter_pqueue_add(&mi
->pq
, &e
);
58 static void merged_iter_close(void *p
)
60 struct merged_iter
*mi
= p
;
62 merged_iter_pqueue_release(&mi
->pq
);
63 for (size_t i
= 0; i
< mi
->stack_len
; i
++) {
64 reftable_iterator_destroy(&mi
->subiters
[i
].iter
);
65 reftable_record_release(&mi
->subiters
[i
].rec
);
67 reftable_free(mi
->subiters
);
70 static int merged_iter_advance_subiter(struct merged_iter
*mi
, size_t idx
)
74 .rec
= &mi
->subiters
[idx
].rec
,
78 err
= iterator_next(&mi
->subiters
[idx
].iter
, &mi
->subiters
[idx
].rec
);
82 merged_iter_pqueue_add(&mi
->pq
, &e
);
86 static int merged_iter_next_entry(struct merged_iter
*mi
,
87 struct reftable_record
*rec
)
89 struct pq_entry entry
= { 0 };
92 empty
= merged_iter_pqueue_is_empty(mi
->pq
);
94 if (mi
->advance_index
>= 0) {
96 * When there are no pqueue entries then we only have a single
97 * subiter left. There is no need to use the pqueue in that
98 * case anymore as we know that the subiter will return entries
99 * in the correct order already.
101 * While this may sound like a very specific edge case, it may
102 * happen more frequently than you think. Most repositories
103 * will end up having a single large base table that contains
104 * most of the refs. It's thus likely that we exhaust all
105 * subiters but the one from that base ref.
108 return iterator_next(&mi
->subiters
[mi
->advance_index
].iter
,
111 err
= merged_iter_advance_subiter(mi
, mi
->advance_index
);
116 mi
->advance_index
= -1;
122 entry
= merged_iter_pqueue_remove(&mi
->pq
);
125 One can also use reftable as datacenter-local storage, where the ref
126 database is maintained in globally consistent database (eg.
127 CockroachDB or Spanner). In this scenario, replication delays together
128 with compaction may cause newer tables to contain older entries. In
129 such a deployment, the loop below must be changed to collect all
130 entries for the same key, and return new the newest one.
132 while (!merged_iter_pqueue_is_empty(mi
->pq
)) {
133 struct pq_entry top
= merged_iter_pqueue_top(mi
->pq
);
136 cmp
= reftable_record_cmp(top
.rec
, entry
.rec
);
140 merged_iter_pqueue_remove(&mi
->pq
);
141 err
= merged_iter_advance_subiter(mi
, top
.index
);
146 mi
->advance_index
= entry
.index
;
147 SWAP(*rec
, *entry
.rec
);
151 static int merged_iter_next_void(void *p
, struct reftable_record
*rec
)
153 struct merged_iter
*mi
= p
;
155 int err
= merged_iter_next_entry(mi
, rec
);
158 if (mi
->suppress_deletions
&& reftable_record_is_deletion(rec
))
164 static struct reftable_iterator_vtable merged_iter_vtable
= {
165 .next
= &merged_iter_next_void
,
166 .close
= &merged_iter_close
,
169 static void iterator_from_merged_iter(struct reftable_iterator
*it
,
170 struct merged_iter
*mi
)
174 it
->ops
= &merged_iter_vtable
;
177 int reftable_new_merged_table(struct reftable_merged_table
**dest
,
178 struct reftable_table
*stack
, size_t n
,
181 struct reftable_merged_table
*m
= NULL
;
182 uint64_t last_max
= 0;
183 uint64_t first_min
= 0;
185 for (size_t i
= 0; i
< n
; i
++) {
186 uint64_t min
= reftable_table_min_update_index(&stack
[i
]);
187 uint64_t max
= reftable_table_max_update_index(&stack
[i
]);
189 if (reftable_table_hash_id(&stack
[i
]) != hash_id
) {
190 return REFTABLE_FORMAT_ERROR
;
192 if (i
== 0 || min
< first_min
) {
195 if (i
== 0 || max
> last_max
) {
200 REFTABLE_CALLOC_ARRAY(m
, 1);
205 m
->hash_id
= hash_id
;
210 /* clears the list of subtable, without affecting the readers themselves. */
211 void merged_table_release(struct reftable_merged_table
*mt
)
213 FREE_AND_NULL(mt
->stack
);
217 void reftable_merged_table_free(struct reftable_merged_table
*mt
)
222 merged_table_release(mt
);
227 reftable_merged_table_max_update_index(struct reftable_merged_table
*mt
)
233 reftable_merged_table_min_update_index(struct reftable_merged_table
*mt
)
238 static int reftable_table_seek_record(struct reftable_table
*tab
,
239 struct reftable_iterator
*it
,
240 struct reftable_record
*rec
)
242 return tab
->ops
->seek_record(tab
->table_arg
, it
, rec
);
245 static int merged_table_seek_record(struct reftable_merged_table
*mt
,
246 struct reftable_iterator
*it
,
247 struct reftable_record
*rec
)
249 struct merged_iter merged
= {
250 .typ
= reftable_record_type(rec
),
251 .hash_id
= mt
->hash_id
,
252 .suppress_deletions
= mt
->suppress_deletions
,
255 struct merged_iter
*p
;
258 REFTABLE_CALLOC_ARRAY(merged
.subiters
, mt
->stack_len
);
259 for (size_t i
= 0; i
< mt
->stack_len
; i
++) {
260 err
= reftable_table_seek_record(&mt
->stack
[i
],
261 &merged
.subiters
[merged
.stack_len
].iter
, rec
);
268 err
= merged_iter_init(&merged
);
272 p
= reftable_malloc(sizeof(struct merged_iter
));
274 iterator_from_merged_iter(it
, p
);
278 merged_iter_close(&merged
);
282 int reftable_merged_table_seek_ref(struct reftable_merged_table
*mt
,
283 struct reftable_iterator
*it
,
286 struct reftable_record rec
= {
287 .type
= BLOCK_TYPE_REF
,
289 .refname
= (char *)name
,
292 return merged_table_seek_record(mt
, it
, &rec
);
295 int reftable_merged_table_seek_log_at(struct reftable_merged_table
*mt
,
296 struct reftable_iterator
*it
,
297 const char *name
, uint64_t update_index
)
299 struct reftable_record rec
= { .type
= BLOCK_TYPE_LOG
,
301 .refname
= (char *)name
,
302 .update_index
= update_index
,
304 return merged_table_seek_record(mt
, it
, &rec
);
307 int reftable_merged_table_seek_log(struct reftable_merged_table
*mt
,
308 struct reftable_iterator
*it
,
311 uint64_t max
= ~((uint64_t)0);
312 return reftable_merged_table_seek_log_at(mt
, it
, name
, max
);
315 uint32_t reftable_merged_table_hash_id(struct reftable_merged_table
*mt
)
320 static int reftable_merged_table_seek_void(void *tab
,
321 struct reftable_iterator
*it
,
322 struct reftable_record
*rec
)
324 return merged_table_seek_record(tab
, it
, rec
);
327 static uint32_t reftable_merged_table_hash_id_void(void *tab
)
329 return reftable_merged_table_hash_id(tab
);
332 static uint64_t reftable_merged_table_min_update_index_void(void *tab
)
334 return reftable_merged_table_min_update_index(tab
);
337 static uint64_t reftable_merged_table_max_update_index_void(void *tab
)
339 return reftable_merged_table_max_update_index(tab
);
342 static struct reftable_table_vtable merged_table_vtable
= {
343 .seek_record
= reftable_merged_table_seek_void
,
344 .hash_id
= reftable_merged_table_hash_id_void
,
345 .min_update_index
= reftable_merged_table_min_update_index_void
,
346 .max_update_index
= reftable_merged_table_max_update_index_void
,
349 void reftable_table_from_merged_table(struct reftable_table
*tab
,
350 struct reftable_merged_table
*merged
)
353 tab
->ops
= &merged_table_vtable
;
354 tab
->table_arg
= merged
;