2 Copyright 2020 Google LLC
4 Use of this source code is governed by a BSD-style
5 license that can be found in the LICENSE file or at
6 https://developers.google.com/open-source/licenses/bsd
11 #include "constants.h"
16 #include "reftable-merged.h"
17 #include "reftable-error.h"
20 struct merged_subiter
{
21 struct reftable_iterator iter
;
22 struct reftable_record rec
;
26 struct merged_subiter
*subiters
;
27 struct merged_iter_pqueue pq
;
29 int suppress_deletions
;
30 ssize_t advance_index
;
33 static void merged_iter_close(void *p
)
35 struct merged_iter
*mi
= p
;
37 merged_iter_pqueue_release(&mi
->pq
);
38 for (size_t i
= 0; i
< mi
->subiters_len
; i
++) {
39 reftable_iterator_destroy(&mi
->subiters
[i
].iter
);
40 reftable_record_release(&mi
->subiters
[i
].rec
);
42 reftable_free(mi
->subiters
);
45 static int merged_iter_advance_subiter(struct merged_iter
*mi
, size_t idx
)
49 .rec
= &mi
->subiters
[idx
].rec
,
53 err
= iterator_next(&mi
->subiters
[idx
].iter
, &mi
->subiters
[idx
].rec
);
57 err
= merged_iter_pqueue_add(&mi
->pq
, &e
);
64 static int merged_iter_seek(struct merged_iter
*mi
, struct reftable_record
*want
)
68 mi
->advance_index
= -1;
70 for (size_t i
= 0; i
< mi
->subiters_len
; i
++) {
71 err
= iterator_seek(&mi
->subiters
[i
].iter
, want
);
77 err
= merged_iter_advance_subiter(mi
, i
);
85 static int merged_iter_next_entry(struct merged_iter
*mi
,
86 struct reftable_record
*rec
)
88 struct pq_entry entry
= { 0 };
91 empty
= merged_iter_pqueue_is_empty(mi
->pq
);
93 if (mi
->advance_index
>= 0) {
95 * When there are no pqueue entries then we only have a single
96 * subiter left. There is no need to use the pqueue in that
97 * case anymore as we know that the subiter will return entries
98 * in the correct order already.
100 * While this may sound like a very specific edge case, it may
101 * happen more frequently than you think. Most repositories
102 * will end up having a single large base table that contains
103 * most of the refs. It's thus likely that we exhaust all
104 * subiters but the one from that base ref.
107 return iterator_next(&mi
->subiters
[mi
->advance_index
].iter
,
110 err
= merged_iter_advance_subiter(mi
, mi
->advance_index
);
115 mi
->advance_index
= -1;
121 entry
= merged_iter_pqueue_remove(&mi
->pq
);
124 One can also use reftable as datacenter-local storage, where the ref
125 database is maintained in globally consistent database (eg.
126 CockroachDB or Spanner). In this scenario, replication delays together
127 with compaction may cause newer tables to contain older entries. In
128 such a deployment, the loop below must be changed to collect all
129 entries for the same key, and return new the newest one.
131 while (!merged_iter_pqueue_is_empty(mi
->pq
)) {
132 struct pq_entry top
= merged_iter_pqueue_top(mi
->pq
);
135 cmp
= reftable_record_cmp(top
.rec
, entry
.rec
);
139 merged_iter_pqueue_remove(&mi
->pq
);
140 err
= merged_iter_advance_subiter(mi
, top
.index
);
145 mi
->advance_index
= entry
.index
;
146 SWAP(*rec
, *entry
.rec
);
150 static int merged_iter_seek_void(void *it
, struct reftable_record
*want
)
152 return merged_iter_seek(it
, want
);
155 static int merged_iter_next_void(void *p
, struct reftable_record
*rec
)
157 struct merged_iter
*mi
= p
;
159 int err
= merged_iter_next_entry(mi
, rec
);
162 if (mi
->suppress_deletions
&& reftable_record_is_deletion(rec
))
168 static struct reftable_iterator_vtable merged_iter_vtable
= {
169 .seek
= merged_iter_seek_void
,
170 .next
= &merged_iter_next_void
,
171 .close
= &merged_iter_close
,
174 static void iterator_from_merged_iter(struct reftable_iterator
*it
,
175 struct merged_iter
*mi
)
179 it
->ops
= &merged_iter_vtable
;
182 int reftable_merged_table_new(struct reftable_merged_table
**dest
,
183 struct reftable_reader
**readers
, size_t n
,
186 struct reftable_merged_table
*m
= NULL
;
187 uint64_t last_max
= 0;
188 uint64_t first_min
= 0;
190 for (size_t i
= 0; i
< n
; i
++) {
191 uint64_t min
= reftable_reader_min_update_index(readers
[i
]);
192 uint64_t max
= reftable_reader_max_update_index(readers
[i
]);
194 if (reftable_reader_hash_id(readers
[i
]) != hash_id
) {
195 return REFTABLE_FORMAT_ERROR
;
197 if (i
== 0 || min
< first_min
) {
200 if (i
== 0 || max
> last_max
) {
205 REFTABLE_CALLOC_ARRAY(m
, 1);
207 return REFTABLE_OUT_OF_MEMORY_ERROR
;
209 m
->readers
= readers
;
213 m
->hash_id
= hash_id
;
218 void reftable_merged_table_free(struct reftable_merged_table
*mt
)
226 reftable_merged_table_max_update_index(struct reftable_merged_table
*mt
)
232 reftable_merged_table_min_update_index(struct reftable_merged_table
*mt
)
237 int merged_table_init_iter(struct reftable_merged_table
*mt
,
238 struct reftable_iterator
*it
,
241 struct merged_subiter
*subiters
;
242 struct merged_iter
*mi
= NULL
;
245 REFTABLE_CALLOC_ARRAY(subiters
, mt
->readers_len
);
247 ret
= REFTABLE_OUT_OF_MEMORY_ERROR
;
251 for (size_t i
= 0; i
< mt
->readers_len
; i
++) {
252 reftable_record_init(&subiters
[i
].rec
, typ
);
253 ret
= reader_init_iter(mt
->readers
[i
], &subiters
[i
].iter
, typ
);
258 REFTABLE_CALLOC_ARRAY(mi
, 1);
260 ret
= REFTABLE_OUT_OF_MEMORY_ERROR
;
263 mi
->advance_index
= -1;
264 mi
->suppress_deletions
= mt
->suppress_deletions
;
265 mi
->subiters
= subiters
;
266 mi
->subiters_len
= mt
->readers_len
;
268 iterator_from_merged_iter(it
, mi
);
273 for (size_t i
= 0; subiters
&& i
< mt
->readers_len
; i
++) {
274 reftable_iterator_destroy(&subiters
[i
].iter
);
275 reftable_record_release(&subiters
[i
].rec
);
277 reftable_free(subiters
);
284 int reftable_merged_table_init_ref_iterator(struct reftable_merged_table
*mt
,
285 struct reftable_iterator
*it
)
287 return merged_table_init_iter(mt
, it
, BLOCK_TYPE_REF
);
290 int reftable_merged_table_init_log_iterator(struct reftable_merged_table
*mt
,
291 struct reftable_iterator
*it
)
293 return merged_table_init_iter(mt
, it
, BLOCK_TYPE_LOG
);
296 uint32_t reftable_merged_table_hash_id(struct reftable_merged_table
*mt
)