1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #include "db/version_set.h"
9 #include "db/filename.h"
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "db/memtable.h"
13 #include "db/table_cache.h"
14 #include "leveldb/env.h"
15 #include "leveldb/table_builder.h"
16 #include "table/merger.h"
17 #include "table/two_level_iterator.h"
18 #include "util/coding.h"
19 #include "util/logging.h"
23 static int TargetFileSize(const Options
* options
) {
24 return options
->max_file_size
;
27 // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
28 // stop building a single file in a level->level+1 compaction.
29 static int64_t MaxGrandParentOverlapBytes(const Options
* options
) {
30 return 10 * TargetFileSize(options
);
33 // Maximum number of bytes in all compacted files. We avoid expanding
34 // the lower level file set of a compaction if it would make the
35 // total compaction cover more than this many bytes.
36 static int64_t ExpandedCompactionByteSizeLimit(const Options
* options
) {
37 return 25 * TargetFileSize(options
);
40 static double MaxBytesForLevel(const Options
* options
, int level
) {
41 // Note: the result for level zero is not really used since we set
42 // the level-0 compaction threshold based on number of files.
44 // Result for both level-0 and level-1
45 double result
= 10. * 1048576.0;
53 static uint64_t MaxFileSizeForLevel(const Options
* options
, int level
) {
54 // We could vary per level to reduce number of files?
55 return TargetFileSize(options
);
58 static int64_t TotalFileSize(const std::vector
<FileMetaData
*>& files
) {
60 for (size_t i
= 0; i
< files
.size(); i
++) {
61 sum
+= files
[i
]->file_size
;
69 // Remove from linked list
73 // Drop references to files
74 for (int level
= 0; level
< config::kNumLevels
; level
++) {
75 for (size_t i
= 0; i
< files_
[level
].size(); i
++) {
76 FileMetaData
* f
= files_
[level
][i
];
86 int FindFile(const InternalKeyComparator
& icmp
,
87 const std::vector
<FileMetaData
*>& files
,
90 uint32_t right
= files
.size();
91 while (left
< right
) {
92 uint32_t mid
= (left
+ right
) / 2;
93 const FileMetaData
* f
= files
[mid
];
94 if (icmp
.InternalKeyComparator::Compare(f
->largest
.Encode(), key
) < 0) {
95 // Key at "mid.largest" is < "target". Therefore all
96 // files at or before "mid" are uninteresting.
99 // Key at "mid.largest" is >= "target". Therefore all files
100 // after "mid" are uninteresting.
107 static bool AfterFile(const Comparator
* ucmp
,
108 const Slice
* user_key
, const FileMetaData
* f
) {
109 // NULL user_key occurs before all keys and is therefore never after *f
110 return (user_key
!= NULL
&&
111 ucmp
->Compare(*user_key
, f
->largest
.user_key()) > 0);
114 static bool BeforeFile(const Comparator
* ucmp
,
115 const Slice
* user_key
, const FileMetaData
* f
) {
116 // NULL user_key occurs after all keys and is therefore never before *f
117 return (user_key
!= NULL
&&
118 ucmp
->Compare(*user_key
, f
->smallest
.user_key()) < 0);
121 bool SomeFileOverlapsRange(
122 const InternalKeyComparator
& icmp
,
123 bool disjoint_sorted_files
,
124 const std::vector
<FileMetaData
*>& files
,
125 const Slice
* smallest_user_key
,
126 const Slice
* largest_user_key
) {
127 const Comparator
* ucmp
= icmp
.user_comparator();
128 if (!disjoint_sorted_files
) {
129 // Need to check against all files
130 for (size_t i
= 0; i
< files
.size(); i
++) {
131 const FileMetaData
* f
= files
[i
];
132 if (AfterFile(ucmp
, smallest_user_key
, f
) ||
133 BeforeFile(ucmp
, largest_user_key
, f
)) {
136 return true; // Overlap
142 // Binary search over file list
144 if (smallest_user_key
!= NULL
) {
145 // Find the earliest possible internal key for smallest_user_key
146 InternalKey
small(*smallest_user_key
, kMaxSequenceNumber
,kValueTypeForSeek
);
147 index
= FindFile(icmp
, files
, small
.Encode());
150 if (index
>= files
.size()) {
151 // beginning of range is after all files, so no overlap.
155 return !BeforeFile(ucmp
, largest_user_key
, files
[index
]);
158 // An internal iterator. For a given version/level pair, yields
159 // information about the files in the level. For a given entry, key()
160 // is the largest key that occurs in the file, and value() is an
161 // 16-byte value containing the file number and file size, both
162 // encoded using EncodeFixed64.
163 class Version::LevelFileNumIterator
: public Iterator
{
165 LevelFileNumIterator(const InternalKeyComparator
& icmp
,
166 const std::vector
<FileMetaData
*>* flist
)
169 index_(flist
->size()) { // Marks as invalid
171 virtual bool Valid() const {
172 return index_
< flist_
->size();
174 virtual void Seek(const Slice
& target
) {
175 index_
= FindFile(icmp_
, *flist_
, target
);
177 virtual void SeekToFirst() { index_
= 0; }
178 virtual void SeekToLast() {
179 index_
= flist_
->empty() ? 0 : flist_
->size() - 1;
181 virtual void Next() {
185 virtual void Prev() {
188 index_
= flist_
->size(); // Marks as invalid
195 return (*flist_
)[index_
]->largest
.Encode();
197 Slice
value() const {
199 EncodeFixed64(value_buf_
, (*flist_
)[index_
]->number
);
200 EncodeFixed64(value_buf_
+8, (*flist_
)[index_
]->file_size
);
201 return Slice(value_buf_
, sizeof(value_buf_
));
203 virtual Status
status() const { return Status::OK(); }
205 const InternalKeyComparator icmp_
;
206 const std::vector
<FileMetaData
*>* const flist_
;
209 // Backing store for value(). Holds the file number and size.
210 mutable char value_buf_
[16];
213 static Iterator
* GetFileIterator(void* arg
,
214 const ReadOptions
& options
,
215 const Slice
& file_value
) {
216 TableCache
* cache
= reinterpret_cast<TableCache
*>(arg
);
217 if (file_value
.size() != 16) {
218 return NewErrorIterator(
219 Status::Corruption("FileReader invoked with unexpected value"));
221 return cache
->NewIterator(options
,
222 DecodeFixed64(file_value
.data()),
223 DecodeFixed64(file_value
.data() + 8));
227 Iterator
* Version::NewConcatenatingIterator(const ReadOptions
& options
,
229 return NewTwoLevelIterator(
230 new LevelFileNumIterator(vset_
->icmp_
, &files_
[level
]),
231 &GetFileIterator
, vset_
->table_cache_
, options
);
234 void Version::AddIterators(const ReadOptions
& options
,
235 std::vector
<Iterator
*>* iters
) {
236 // Merge all level zero files together since they may overlap
237 for (size_t i
= 0; i
< files_
[0].size(); i
++) {
239 vset_
->table_cache_
->NewIterator(
240 options
, files_
[0][i
]->number
, files_
[0][i
]->file_size
));
243 // For levels > 0, we can use a concatenating iterator that sequentially
244 // walks through the non-overlapping files in the level, opening them
246 for (int level
= 1; level
< config::kNumLevels
; level
++) {
247 if (!files_
[level
].empty()) {
248 iters
->push_back(NewConcatenatingIterator(options
, level
));
253 // Callback from TableCache::Get()
263 const Comparator
* ucmp
;
268 static void SaveValue(void* arg
, const Slice
& ikey
, const Slice
& v
) {
269 Saver
* s
= reinterpret_cast<Saver
*>(arg
);
270 ParsedInternalKey parsed_key
;
271 if (!ParseInternalKey(ikey
, &parsed_key
)) {
274 if (s
->ucmp
->Compare(parsed_key
.user_key
, s
->user_key
) == 0) {
275 s
->state
= (parsed_key
.type
== kTypeValue
) ? kFound
: kDeleted
;
276 if (s
->state
== kFound
) {
277 s
->value
->assign(v
.data(), v
.size());
283 static bool NewestFirst(FileMetaData
* a
, FileMetaData
* b
) {
284 return a
->number
> b
->number
;
287 void Version::ForEachOverlapping(Slice user_key
, Slice internal_key
,
289 bool (*func
)(void*, int, FileMetaData
*)) {
290 // TODO(sanjay): Change Version::Get() to use this function.
291 const Comparator
* ucmp
= vset_
->icmp_
.user_comparator();
293 // Search level-0 in order from newest to oldest.
294 std::vector
<FileMetaData
*> tmp
;
295 tmp
.reserve(files_
[0].size());
296 for (uint32_t i
= 0; i
< files_
[0].size(); i
++) {
297 FileMetaData
* f
= files_
[0][i
];
298 if (ucmp
->Compare(user_key
, f
->smallest
.user_key()) >= 0 &&
299 ucmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
304 std::sort(tmp
.begin(), tmp
.end(), NewestFirst
);
305 for (uint32_t i
= 0; i
< tmp
.size(); i
++) {
306 if (!(*func
)(arg
, 0, tmp
[i
])) {
312 // Search other levels.
313 for (int level
= 1; level
< config::kNumLevels
; level
++) {
314 size_t num_files
= files_
[level
].size();
315 if (num_files
== 0) continue;
317 // Binary search to find earliest index whose largest key >= internal_key.
318 uint32_t index
= FindFile(vset_
->icmp_
, files_
[level
], internal_key
);
319 if (index
< num_files
) {
320 FileMetaData
* f
= files_
[level
][index
];
321 if (ucmp
->Compare(user_key
, f
->smallest
.user_key()) < 0) {
322 // All of "f" is past any data for user_key
324 if (!(*func
)(arg
, level
, f
)) {
332 Status
Version::Get(const ReadOptions
& options
,
336 Slice ikey
= k
.internal_key();
337 Slice user_key
= k
.user_key();
338 const Comparator
* ucmp
= vset_
->icmp_
.user_comparator();
341 stats
->seek_file
= NULL
;
342 stats
->seek_file_level
= -1;
343 FileMetaData
* last_file_read
= NULL
;
344 int last_file_read_level
= -1;
346 // We can search level-by-level since entries never hop across
347 // levels. Therefore we are guaranteed that if we find data
348 // in an smaller level, later levels are irrelevant.
349 std::vector
<FileMetaData
*> tmp
;
351 for (int level
= 0; level
< config::kNumLevels
; level
++) {
352 size_t num_files
= files_
[level
].size();
353 if (num_files
== 0) continue;
355 // Get the list of files to search in this level
356 FileMetaData
* const* files
= &files_
[level
][0];
358 // Level-0 files may overlap each other. Find all files that
359 // overlap user_key and process them in order from newest to oldest.
360 tmp
.reserve(num_files
);
361 for (uint32_t i
= 0; i
< num_files
; i
++) {
362 FileMetaData
* f
= files
[i
];
363 if (ucmp
->Compare(user_key
, f
->smallest
.user_key()) >= 0 &&
364 ucmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
368 if (tmp
.empty()) continue;
370 std::sort(tmp
.begin(), tmp
.end(), NewestFirst
);
372 num_files
= tmp
.size();
374 // Binary search to find earliest index whose largest key >= ikey.
375 uint32_t index
= FindFile(vset_
->icmp_
, files_
[level
], ikey
);
376 if (index
>= num_files
) {
381 if (ucmp
->Compare(user_key
, tmp2
->smallest
.user_key()) < 0) {
382 // All of "tmp2" is past any data for user_key
392 for (uint32_t i
= 0; i
< num_files
; ++i
) {
393 if (last_file_read
!= NULL
&& stats
->seek_file
== NULL
) {
394 // We have had more than one seek for this read. Charge the 1st file.
395 stats
->seek_file
= last_file_read
;
396 stats
->seek_file_level
= last_file_read_level
;
399 FileMetaData
* f
= files
[i
];
401 last_file_read_level
= level
;
404 saver
.state
= kNotFound
;
406 saver
.user_key
= user_key
;
408 s
= vset_
->table_cache_
->Get(options
, f
->number
, f
->file_size
,
409 ikey
, &saver
, SaveValue
);
413 switch (saver
.state
) {
415 break; // Keep searching in other files
419 s
= Status::NotFound(Slice()); // Use empty error message for speed
422 s
= Status::Corruption("corrupted key for ", user_key
);
428 return Status::NotFound(Slice()); // Use an empty error message for speed
431 bool Version::UpdateStats(const GetStats
& stats
) {
432 FileMetaData
* f
= stats
.seek_file
;
435 if (f
->allowed_seeks
<= 0 && file_to_compact_
== NULL
) {
436 file_to_compact_
= f
;
437 file_to_compact_level_
= stats
.seek_file_level
;
444 bool Version::RecordReadSample(Slice internal_key
) {
445 ParsedInternalKey ikey
;
446 if (!ParseInternalKey(internal_key
, &ikey
)) {
451 GetStats stats
; // Holds first matching file
454 static bool Match(void* arg
, int level
, FileMetaData
* f
) {
455 State
* state
= reinterpret_cast<State
*>(arg
);
457 if (state
->matches
== 1) {
458 // Remember first match.
459 state
->stats
.seek_file
= f
;
460 state
->stats
.seek_file_level
= level
;
462 // We can stop iterating once we have a second match.
463 return state
->matches
< 2;
469 ForEachOverlapping(ikey
.user_key
, internal_key
, &state
, &State::Match
);
471 // Must have at least two matches since we want to merge across
472 // files. But what if we have a single file that contains many
473 // overwrites and deletions? Should we have another mechanism for
474 // finding such files?
475 if (state
.matches
>= 2) {
476 // 1MB cost is about 1 seek (see comment in Builder::Apply).
477 return UpdateStats(state
.stats
);
482 void Version::Ref() {
486 void Version::Unref() {
487 assert(this != &vset_
->dummy_versions_
);
495 bool Version::OverlapInLevel(int level
,
496 const Slice
* smallest_user_key
,
497 const Slice
* largest_user_key
) {
498 return SomeFileOverlapsRange(vset_
->icmp_
, (level
> 0), files_
[level
],
499 smallest_user_key
, largest_user_key
);
502 int Version::PickLevelForMemTableOutput(
503 const Slice
& smallest_user_key
,
504 const Slice
& largest_user_key
) {
506 if (!OverlapInLevel(0, &smallest_user_key
, &largest_user_key
)) {
507 // Push to next level if there is no overlap in next level,
508 // and the #bytes overlapping in the level after that are limited.
509 InternalKey
start(smallest_user_key
, kMaxSequenceNumber
, kValueTypeForSeek
);
510 InternalKey
limit(largest_user_key
, 0, static_cast<ValueType
>(0));
511 std::vector
<FileMetaData
*> overlaps
;
512 while (level
< config::kMaxMemCompactLevel
) {
513 if (OverlapInLevel(level
+ 1, &smallest_user_key
, &largest_user_key
)) {
516 if (level
+ 2 < config::kNumLevels
) {
517 // Check that file does not overlap too many grandparent bytes.
518 GetOverlappingInputs(level
+ 2, &start
, &limit
, &overlaps
);
519 const int64_t sum
= TotalFileSize(overlaps
);
520 if (sum
> MaxGrandParentOverlapBytes(vset_
->options_
)) {
530 // Store in "*inputs" all files in "level" that overlap [begin,end]
531 void Version::GetOverlappingInputs(
533 const InternalKey
* begin
,
534 const InternalKey
* end
,
535 std::vector
<FileMetaData
*>* inputs
) {
537 assert(level
< config::kNumLevels
);
539 Slice user_begin
, user_end
;
541 user_begin
= begin
->user_key();
544 user_end
= end
->user_key();
546 const Comparator
* user_cmp
= vset_
->icmp_
.user_comparator();
547 for (size_t i
= 0; i
< files_
[level
].size(); ) {
548 FileMetaData
* f
= files_
[level
][i
++];
549 const Slice file_start
= f
->smallest
.user_key();
550 const Slice file_limit
= f
->largest
.user_key();
551 if (begin
!= NULL
&& user_cmp
->Compare(file_limit
, user_begin
) < 0) {
552 // "f" is completely before specified range; skip it
553 } else if (end
!= NULL
&& user_cmp
->Compare(file_start
, user_end
) > 0) {
554 // "f" is completely after specified range; skip it
556 inputs
->push_back(f
);
558 // Level-0 files may overlap each other. So check if the newly
559 // added file has expanded the range. If so, restart search.
560 if (begin
!= NULL
&& user_cmp
->Compare(file_start
, user_begin
) < 0) {
561 user_begin
= file_start
;
564 } else if (end
!= NULL
&& user_cmp
->Compare(file_limit
, user_end
) > 0) {
565 user_end
= file_limit
;
574 std::string
Version::DebugString() const {
576 for (int level
= 0; level
< config::kNumLevels
; level
++) {
579 // 17:123['a' .. 'd']
581 r
.append("--- level ");
582 AppendNumberTo(&r
, level
);
584 const std::vector
<FileMetaData
*>& files
= files_
[level
];
585 for (size_t i
= 0; i
< files
.size(); i
++) {
587 AppendNumberTo(&r
, files
[i
]->number
);
589 AppendNumberTo(&r
, files
[i
]->file_size
);
591 r
.append(files
[i
]->smallest
.DebugString());
593 r
.append(files
[i
]->largest
.DebugString());
600 // A helper class so we can efficiently apply a whole sequence
601 // of edits to a particular state without creating intermediate
602 // Versions that contain full copies of the intermediate state.
603 class VersionSet::Builder
{
605 // Helper to sort by v->files_[file_number].smallest
606 struct BySmallestKey
{
607 const InternalKeyComparator
* internal_comparator
;
609 bool operator()(FileMetaData
* f1
, FileMetaData
* f2
) const {
610 int r
= internal_comparator
->Compare(f1
->smallest
, f2
->smallest
);
614 // Break ties by file number
615 return (f1
->number
< f2
->number
);
620 typedef std::set
<FileMetaData
*, BySmallestKey
> FileSet
;
622 std::set
<uint64_t> deleted_files
;
623 FileSet
* added_files
;
628 LevelState levels_
[config::kNumLevels
];
631 // Initialize a builder with the files from *base and other info from *vset
632 Builder(VersionSet
* vset
, Version
* base
)
637 cmp
.internal_comparator
= &vset_
->icmp_
;
638 for (int level
= 0; level
< config::kNumLevels
; level
++) {
639 levels_
[level
].added_files
= new FileSet(cmp
);
644 for (int level
= 0; level
< config::kNumLevels
; level
++) {
645 const FileSet
* added
= levels_
[level
].added_files
;
646 std::vector
<FileMetaData
*> to_unref
;
647 to_unref
.reserve(added
->size());
648 for (FileSet::const_iterator it
= added
->begin();
649 it
!= added
->end(); ++it
) {
650 to_unref
.push_back(*it
);
653 for (uint32_t i
= 0; i
< to_unref
.size(); i
++) {
654 FileMetaData
* f
= to_unref
[i
];
664 // Apply all of the edits in *edit to the current state.
665 void Apply(VersionEdit
* edit
) {
666 // Update compaction pointers
667 for (size_t i
= 0; i
< edit
->compact_pointers_
.size(); i
++) {
668 const int level
= edit
->compact_pointers_
[i
].first
;
669 vset_
->compact_pointer_
[level
] =
670 edit
->compact_pointers_
[i
].second
.Encode().ToString();
674 const VersionEdit::DeletedFileSet
& del
= edit
->deleted_files_
;
675 for (VersionEdit::DeletedFileSet::const_iterator iter
= del
.begin();
678 const int level
= iter
->first
;
679 const uint64_t number
= iter
->second
;
680 levels_
[level
].deleted_files
.insert(number
);
684 for (size_t i
= 0; i
< edit
->new_files_
.size(); i
++) {
685 const int level
= edit
->new_files_
[i
].first
;
686 FileMetaData
* f
= new FileMetaData(edit
->new_files_
[i
].second
);
689 // We arrange to automatically compact this file after
690 // a certain number of seeks. Let's assume:
691 // (1) One seek costs 10ms
692 // (2) Writing or reading 1MB costs 10ms (100MB/s)
693 // (3) A compaction of 1MB does 25MB of IO:
694 // 1MB read from this level
695 // 10-12MB read from next level (boundaries may be misaligned)
696 // 10-12MB written to next level
697 // This implies that 25 seeks cost the same as the compaction
698 // of 1MB of data. I.e., one seek costs approximately the
699 // same as the compaction of 40KB of data. We are a little
700 // conservative and allow approximately one seek for every 16KB
701 // of data before triggering a compaction.
702 f
->allowed_seeks
= (f
->file_size
/ 16384);
703 if (f
->allowed_seeks
< 100) f
->allowed_seeks
= 100;
705 levels_
[level
].deleted_files
.erase(f
->number
);
706 levels_
[level
].added_files
->insert(f
);
710 // Save the current state in *v.
711 void SaveTo(Version
* v
) {
713 cmp
.internal_comparator
= &vset_
->icmp_
;
714 for (int level
= 0; level
< config::kNumLevels
; level
++) {
715 // Merge the set of added files with the set of pre-existing files.
716 // Drop any deleted files. Store the result in *v.
717 const std::vector
<FileMetaData
*>& base_files
= base_
->files_
[level
];
718 std::vector
<FileMetaData
*>::const_iterator base_iter
= base_files
.begin();
719 std::vector
<FileMetaData
*>::const_iterator base_end
= base_files
.end();
720 const FileSet
* added
= levels_
[level
].added_files
;
721 v
->files_
[level
].reserve(base_files
.size() + added
->size());
722 for (FileSet::const_iterator added_iter
= added
->begin();
723 added_iter
!= added
->end();
725 // Add all smaller files listed in base_
726 for (std::vector
<FileMetaData
*>::const_iterator bpos
727 = std::upper_bound(base_iter
, base_end
, *added_iter
, cmp
);
730 MaybeAddFile(v
, level
, *base_iter
);
733 MaybeAddFile(v
, level
, *added_iter
);
736 // Add remaining base files
737 for (; base_iter
!= base_end
; ++base_iter
) {
738 MaybeAddFile(v
, level
, *base_iter
);
742 // Make sure there is no overlap in levels > 0
744 for (uint32_t i
= 1; i
< v
->files_
[level
].size(); i
++) {
745 const InternalKey
& prev_end
= v
->files_
[level
][i
-1]->largest
;
746 const InternalKey
& this_begin
= v
->files_
[level
][i
]->smallest
;
747 if (vset_
->icmp_
.Compare(prev_end
, this_begin
) >= 0) {
748 fprintf(stderr
, "overlapping ranges in same level %s vs. %s\n",
749 prev_end
.DebugString().c_str(),
750 this_begin
.DebugString().c_str());
759 void MaybeAddFile(Version
* v
, int level
, FileMetaData
* f
) {
760 if (levels_
[level
].deleted_files
.count(f
->number
) > 0) {
761 // File is deleted: do nothing
763 std::vector
<FileMetaData
*>* files
= &v
->files_
[level
];
764 if (level
> 0 && !files
->empty()) {
766 assert(vset_
->icmp_
.Compare((*files
)[files
->size()-1]->largest
,
775 VersionSet::VersionSet(const std::string
& dbname
,
776 const Options
* options
,
777 TableCache
* table_cache
,
778 const InternalKeyComparator
* cmp
)
779 : env_(options
->env
),
782 table_cache_(table_cache
),
784 next_file_number_(2),
785 manifest_file_number_(0), // Filled by Recover()
789 descriptor_file_(NULL
),
790 descriptor_log_(NULL
),
791 dummy_versions_(this),
793 AppendVersion(new Version(this));
796 VersionSet::~VersionSet() {
798 assert(dummy_versions_
.next_
== &dummy_versions_
); // List must be empty
799 delete descriptor_log_
;
800 delete descriptor_file_
;
803 void VersionSet::AppendVersion(Version
* v
) {
805 assert(v
->refs_
== 0);
806 assert(v
!= current_
);
807 if (current_
!= NULL
) {
813 // Append to linked list
814 v
->prev_
= dummy_versions_
.prev_
;
815 v
->next_
= &dummy_versions_
;
820 Status
VersionSet::LogAndApply(VersionEdit
* edit
, port::Mutex
* mu
) {
821 if (edit
->has_log_number_
) {
822 assert(edit
->log_number_
>= log_number_
);
823 assert(edit
->log_number_
< next_file_number_
);
825 edit
->SetLogNumber(log_number_
);
828 if (!edit
->has_prev_log_number_
) {
829 edit
->SetPrevLogNumber(prev_log_number_
);
832 edit
->SetNextFile(next_file_number_
);
833 edit
->SetLastSequence(last_sequence_
);
835 Version
* v
= new Version(this);
837 Builder
builder(this, current_
);
843 // Initialize new descriptor log file if necessary by creating
844 // a temporary file that contains a snapshot of the current version.
845 std::string new_manifest_file
;
847 if (descriptor_log_
== NULL
) {
848 // No reason to unlock *mu here since we only hit this path in the
849 // first call to LogAndApply (when opening the database).
850 assert(descriptor_file_
== NULL
);
851 new_manifest_file
= DescriptorFileName(dbname_
, manifest_file_number_
);
852 edit
->SetNextFile(next_file_number_
);
853 s
= env_
->NewWritableFile(new_manifest_file
, &descriptor_file_
);
855 descriptor_log_
= new log::Writer(descriptor_file_
);
856 s
= WriteSnapshot(descriptor_log_
);
860 // Unlock during expensive MANIFEST log write
864 // Write new record to MANIFEST log
867 edit
->EncodeTo(&record
);
868 s
= descriptor_log_
->AddRecord(record
);
870 s
= descriptor_file_
->Sync();
873 Log(options_
->info_log
, "MANIFEST write: %s\n", s
.ToString().c_str());
877 // If we just created a new descriptor file, install it by writing a
878 // new CURRENT file that points to it.
879 if (s
.ok() && !new_manifest_file
.empty()) {
880 s
= SetCurrentFile(env_
, dbname_
, manifest_file_number_
);
886 // Install the new version
889 log_number_
= edit
->log_number_
;
890 prev_log_number_
= edit
->prev_log_number_
;
893 if (!new_manifest_file
.empty()) {
894 delete descriptor_log_
;
895 delete descriptor_file_
;
896 descriptor_log_
= NULL
;
897 descriptor_file_
= NULL
;
898 env_
->DeleteFile(new_manifest_file
);
905 Status
VersionSet::Recover(bool *save_manifest
) {
906 struct LogReporter
: public log::Reader::Reporter
{
908 virtual void Corruption(size_t bytes
, const Status
& s
) {
909 if (this->status
->ok()) *this->status
= s
;
913 // Read "CURRENT" file, which contains a pointer to the current manifest file
915 Status s
= ReadFileToString(env_
, CurrentFileName(dbname_
), ¤t
);
919 if (current
.empty() || current
[current
.size()-1] != '\n') {
920 return Status::Corruption("CURRENT file does not end with newline");
922 current
.resize(current
.size() - 1);
924 std::string dscname
= dbname_
+ "/" + current
;
925 SequentialFile
* file
;
926 s
= env_
->NewSequentialFile(dscname
, &file
);
931 bool have_log_number
= false;
932 bool have_prev_log_number
= false;
933 bool have_next_file
= false;
934 bool have_last_sequence
= false;
935 uint64_t next_file
= 0;
936 uint64_t last_sequence
= 0;
937 uint64_t log_number
= 0;
938 uint64_t prev_log_number
= 0;
939 Builder
builder(this, current_
);
942 LogReporter reporter
;
943 reporter
.status
= &s
;
944 log::Reader
reader(file
, &reporter
, true/*checksum*/, 0/*initial_offset*/);
947 while (reader
.ReadRecord(&record
, &scratch
) && s
.ok()) {
949 s
= edit
.DecodeFrom(record
);
951 if (edit
.has_comparator_
&&
952 edit
.comparator_
!= icmp_
.user_comparator()->Name()) {
953 s
= Status::InvalidArgument(
954 edit
.comparator_
+ " does not match existing comparator ",
955 icmp_
.user_comparator()->Name());
960 builder
.Apply(&edit
);
963 if (edit
.has_log_number_
) {
964 log_number
= edit
.log_number_
;
965 have_log_number
= true;
968 if (edit
.has_prev_log_number_
) {
969 prev_log_number
= edit
.prev_log_number_
;
970 have_prev_log_number
= true;
973 if (edit
.has_next_file_number_
) {
974 next_file
= edit
.next_file_number_
;
975 have_next_file
= true;
978 if (edit
.has_last_sequence_
) {
979 last_sequence
= edit
.last_sequence_
;
980 have_last_sequence
= true;
988 if (!have_next_file
) {
989 s
= Status::Corruption("no meta-nextfile entry in descriptor");
990 } else if (!have_log_number
) {
991 s
= Status::Corruption("no meta-lognumber entry in descriptor");
992 } else if (!have_last_sequence
) {
993 s
= Status::Corruption("no last-sequence-number entry in descriptor");
996 if (!have_prev_log_number
) {
1000 MarkFileNumberUsed(prev_log_number
);
1001 MarkFileNumberUsed(log_number
);
1005 Version
* v
= new Version(this);
1007 // Install recovered version
1010 manifest_file_number_
= next_file
;
1011 next_file_number_
= next_file
+ 1;
1012 last_sequence_
= last_sequence
;
1013 log_number_
= log_number
;
1014 prev_log_number_
= prev_log_number
;
1016 // See if we can reuse the existing MANIFEST file.
1017 if (ReuseManifest(dscname
, current
)) {
1018 // No need to save new manifest
1020 *save_manifest
= true;
1027 bool VersionSet::ReuseManifest(const std::string
& dscname
,
1028 const std::string
& dscbase
) {
1029 if (!options_
->reuse_logs
) {
1032 FileType manifest_type
;
1033 uint64_t manifest_number
;
1034 uint64_t manifest_size
;
1035 if (!ParseFileName(dscbase
, &manifest_number
, &manifest_type
) ||
1036 manifest_type
!= kDescriptorFile
||
1037 !env_
->GetFileSize(dscname
, &manifest_size
).ok() ||
1038 // Make new compacted MANIFEST if old one is too big
1039 manifest_size
>= TargetFileSize(options_
)) {
1043 assert(descriptor_file_
== NULL
);
1044 assert(descriptor_log_
== NULL
);
1045 Status r
= env_
->NewAppendableFile(dscname
, &descriptor_file_
);
1047 Log(options_
->info_log
, "Reuse MANIFEST: %s\n", r
.ToString().c_str());
1048 assert(descriptor_file_
== NULL
);
1052 Log(options_
->info_log
, "Reusing MANIFEST %s\n", dscname
.c_str());
1053 descriptor_log_
= new log::Writer(descriptor_file_
, manifest_size
);
1054 manifest_file_number_
= manifest_number
;
1058 void VersionSet::MarkFileNumberUsed(uint64_t number
) {
1059 if (next_file_number_
<= number
) {
1060 next_file_number_
= number
+ 1;
1064 void VersionSet::Finalize(Version
* v
) {
1065 // Precomputed best level for next compaction
1066 int best_level
= -1;
1067 double best_score
= -1;
1069 for (int level
= 0; level
< config::kNumLevels
-1; level
++) {
1072 // We treat level-0 specially by bounding the number of files
1073 // instead of number of bytes for two reasons:
1075 // (1) With larger write-buffer sizes, it is nice not to do too
1076 // many level-0 compactions.
1078 // (2) The files in level-0 are merged on every read and
1079 // therefore we wish to avoid too many files when the individual
1080 // file size is small (perhaps because of a small write-buffer
1081 // setting, or very high compression ratios, or lots of
1082 // overwrites/deletions).
1083 score
= v
->files_
[level
].size() /
1084 static_cast<double>(config::kL0_CompactionTrigger
);
1086 // Compute the ratio of current size to size limit.
1087 const uint64_t level_bytes
= TotalFileSize(v
->files_
[level
]);
1089 static_cast<double>(level_bytes
) / MaxBytesForLevel(options_
, level
);
1092 if (score
> best_score
) {
1098 v
->compaction_level_
= best_level
;
1099 v
->compaction_score_
= best_score
;
1102 Status
VersionSet::WriteSnapshot(log::Writer
* log
) {
1103 // TODO: Break up into multiple records to reduce memory usage on recovery?
1107 edit
.SetComparatorName(icmp_
.user_comparator()->Name());
1109 // Save compaction pointers
1110 for (int level
= 0; level
< config::kNumLevels
; level
++) {
1111 if (!compact_pointer_
[level
].empty()) {
1113 key
.DecodeFrom(compact_pointer_
[level
]);
1114 edit
.SetCompactPointer(level
, key
);
1119 for (int level
= 0; level
< config::kNumLevels
; level
++) {
1120 const std::vector
<FileMetaData
*>& files
= current_
->files_
[level
];
1121 for (size_t i
= 0; i
< files
.size(); i
++) {
1122 const FileMetaData
* f
= files
[i
];
1123 edit
.AddFile(level
, f
->number
, f
->file_size
, f
->smallest
, f
->largest
);
1128 edit
.EncodeTo(&record
);
1129 return log
->AddRecord(record
);
1132 int VersionSet::NumLevelFiles(int level
) const {
1134 assert(level
< config::kNumLevels
);
1135 return current_
->files_
[level
].size();
1138 const char* VersionSet::LevelSummary(LevelSummaryStorage
* scratch
) const {
1139 // Update code if kNumLevels changes
1140 assert(config::kNumLevels
== 7);
1141 snprintf(scratch
->buffer
, sizeof(scratch
->buffer
),
1142 "files[ %d %d %d %d %d %d %d ]",
1143 int(current_
->files_
[0].size()),
1144 int(current_
->files_
[1].size()),
1145 int(current_
->files_
[2].size()),
1146 int(current_
->files_
[3].size()),
1147 int(current_
->files_
[4].size()),
1148 int(current_
->files_
[5].size()),
1149 int(current_
->files_
[6].size()));
1150 return scratch
->buffer
;
1153 uint64_t VersionSet::ApproximateOffsetOf(Version
* v
, const InternalKey
& ikey
) {
1154 uint64_t result
= 0;
1155 for (int level
= 0; level
< config::kNumLevels
; level
++) {
1156 const std::vector
<FileMetaData
*>& files
= v
->files_
[level
];
1157 for (size_t i
= 0; i
< files
.size(); i
++) {
1158 if (icmp_
.Compare(files
[i
]->largest
, ikey
) <= 0) {
1159 // Entire file is before "ikey", so just add the file size
1160 result
+= files
[i
]->file_size
;
1161 } else if (icmp_
.Compare(files
[i
]->smallest
, ikey
) > 0) {
1162 // Entire file is after "ikey", so ignore
1164 // Files other than level 0 are sorted by meta->smallest, so
1165 // no further files in this level will contain data for
1170 // "ikey" falls in the range for this table. Add the
1171 // approximate offset of "ikey" within the table.
1173 Iterator
* iter
= table_cache_
->NewIterator(
1174 ReadOptions(), files
[i
]->number
, files
[i
]->file_size
, &tableptr
);
1175 if (tableptr
!= NULL
) {
1176 result
+= tableptr
->ApproximateOffsetOf(ikey
.Encode());
1185 void VersionSet::AddLiveFiles(std::set
<uint64_t>* live
) {
1186 for (Version
* v
= dummy_versions_
.next_
;
1187 v
!= &dummy_versions_
;
1189 for (int level
= 0; level
< config::kNumLevels
; level
++) {
1190 const std::vector
<FileMetaData
*>& files
= v
->files_
[level
];
1191 for (size_t i
= 0; i
< files
.size(); i
++) {
1192 live
->insert(files
[i
]->number
);
1198 int64_t VersionSet::NumLevelBytes(int level
) const {
1200 assert(level
< config::kNumLevels
);
1201 return TotalFileSize(current_
->files_
[level
]);
1204 int64_t VersionSet::MaxNextLevelOverlappingBytes() {
1206 std::vector
<FileMetaData
*> overlaps
;
1207 for (int level
= 1; level
< config::kNumLevels
- 1; level
++) {
1208 for (size_t i
= 0; i
< current_
->files_
[level
].size(); i
++) {
1209 const FileMetaData
* f
= current_
->files_
[level
][i
];
1210 current_
->GetOverlappingInputs(level
+1, &f
->smallest
, &f
->largest
,
1212 const int64_t sum
= TotalFileSize(overlaps
);
1221 // Stores the minimal range that covers all entries in inputs in
1222 // *smallest, *largest.
1223 // REQUIRES: inputs is not empty
1224 void VersionSet::GetRange(const std::vector
<FileMetaData
*>& inputs
,
1225 InternalKey
* smallest
,
1226 InternalKey
* largest
) {
1227 assert(!inputs
.empty());
1230 for (size_t i
= 0; i
< inputs
.size(); i
++) {
1231 FileMetaData
* f
= inputs
[i
];
1233 *smallest
= f
->smallest
;
1234 *largest
= f
->largest
;
1236 if (icmp_
.Compare(f
->smallest
, *smallest
) < 0) {
1237 *smallest
= f
->smallest
;
1239 if (icmp_
.Compare(f
->largest
, *largest
) > 0) {
1240 *largest
= f
->largest
;
1246 // Stores the minimal range that covers all entries in inputs1 and inputs2
1247 // in *smallest, *largest.
1248 // REQUIRES: inputs is not empty
1249 void VersionSet::GetRange2(const std::vector
<FileMetaData
*>& inputs1
,
1250 const std::vector
<FileMetaData
*>& inputs2
,
1251 InternalKey
* smallest
,
1252 InternalKey
* largest
) {
1253 std::vector
<FileMetaData
*> all
= inputs1
;
1254 all
.insert(all
.end(), inputs2
.begin(), inputs2
.end());
1255 GetRange(all
, smallest
, largest
);
1258 Iterator
* VersionSet::MakeInputIterator(Compaction
* c
) {
1259 ReadOptions options
;
1260 options
.verify_checksums
= options_
->paranoid_checks
;
1261 options
.fill_cache
= false;
1263 // Level-0 files have to be merged together. For other levels,
1264 // we will make a concatenating iterator per level.
1265 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
1266 const int space
= (c
->level() == 0 ? c
->inputs_
[0].size() + 1 : 2);
1267 Iterator
** list
= new Iterator
*[space
];
1269 for (int which
= 0; which
< 2; which
++) {
1270 if (!c
->inputs_
[which
].empty()) {
1271 if (c
->level() + which
== 0) {
1272 const std::vector
<FileMetaData
*>& files
= c
->inputs_
[which
];
1273 for (size_t i
= 0; i
< files
.size(); i
++) {
1274 list
[num
++] = table_cache_
->NewIterator(
1275 options
, files
[i
]->number
, files
[i
]->file_size
);
1278 // Create concatenating iterator for the files from this level
1279 list
[num
++] = NewTwoLevelIterator(
1280 new Version::LevelFileNumIterator(icmp_
, &c
->inputs_
[which
]),
1281 &GetFileIterator
, table_cache_
, options
);
1285 assert(num
<= space
);
1286 Iterator
* result
= NewMergingIterator(&icmp_
, list
, num
);
1291 Compaction
* VersionSet::PickCompaction() {
1295 // We prefer compactions triggered by too much data in a level over
1296 // the compactions triggered by seeks.
1297 const bool size_compaction
= (current_
->compaction_score_
>= 1);
1298 const bool seek_compaction
= (current_
->file_to_compact_
!= NULL
);
1299 if (size_compaction
) {
1300 level
= current_
->compaction_level_
;
1302 assert(level
+1 < config::kNumLevels
);
1303 c
= new Compaction(options_
, level
);
1305 // Pick the first file that comes after compact_pointer_[level]
1306 for (size_t i
= 0; i
< current_
->files_
[level
].size(); i
++) {
1307 FileMetaData
* f
= current_
->files_
[level
][i
];
1308 if (compact_pointer_
[level
].empty() ||
1309 icmp_
.Compare(f
->largest
.Encode(), compact_pointer_
[level
]) > 0) {
1310 c
->inputs_
[0].push_back(f
);
1314 if (c
->inputs_
[0].empty()) {
1315 // Wrap-around to the beginning of the key space
1316 c
->inputs_
[0].push_back(current_
->files_
[level
][0]);
1318 } else if (seek_compaction
) {
1319 level
= current_
->file_to_compact_level_
;
1320 c
= new Compaction(options_
, level
);
1321 c
->inputs_
[0].push_back(current_
->file_to_compact_
);
1326 c
->input_version_
= current_
;
1327 c
->input_version_
->Ref();
1329 // Files in level 0 may overlap each other, so pick up all overlapping ones
1331 InternalKey smallest
, largest
;
1332 GetRange(c
->inputs_
[0], &smallest
, &largest
);
1333 // Note that the next call will discard the file we placed in
1334 // c->inputs_[0] earlier and replace it with an overlapping set
1335 // which will include the picked file.
1336 current_
->GetOverlappingInputs(0, &smallest
, &largest
, &c
->inputs_
[0]);
1337 assert(!c
->inputs_
[0].empty());
1340 SetupOtherInputs(c
);
1345 void VersionSet::SetupOtherInputs(Compaction
* c
) {
1346 const int level
= c
->level();
1347 InternalKey smallest
, largest
;
1348 GetRange(c
->inputs_
[0], &smallest
, &largest
);
1350 current_
->GetOverlappingInputs(level
+1, &smallest
, &largest
, &c
->inputs_
[1]);
1352 // Get entire range covered by compaction
1353 InternalKey all_start
, all_limit
;
1354 GetRange2(c
->inputs_
[0], c
->inputs_
[1], &all_start
, &all_limit
);
1356 // See if we can grow the number of inputs in "level" without
1357 // changing the number of "level+1" files we pick up.
1358 if (!c
->inputs_
[1].empty()) {
1359 std::vector
<FileMetaData
*> expanded0
;
1360 current_
->GetOverlappingInputs(level
, &all_start
, &all_limit
, &expanded0
);
1361 const int64_t inputs0_size
= TotalFileSize(c
->inputs_
[0]);
1362 const int64_t inputs1_size
= TotalFileSize(c
->inputs_
[1]);
1363 const int64_t expanded0_size
= TotalFileSize(expanded0
);
1364 if (expanded0
.size() > c
->inputs_
[0].size() &&
1365 inputs1_size
+ expanded0_size
<
1366 ExpandedCompactionByteSizeLimit(options_
)) {
1367 InternalKey new_start
, new_limit
;
1368 GetRange(expanded0
, &new_start
, &new_limit
);
1369 std::vector
<FileMetaData
*> expanded1
;
1370 current_
->GetOverlappingInputs(level
+1, &new_start
, &new_limit
,
1372 if (expanded1
.size() == c
->inputs_
[1].size()) {
1373 Log(options_
->info_log
,
1374 "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
1376 int(c
->inputs_
[0].size()),
1377 int(c
->inputs_
[1].size()),
1378 long(inputs0_size
), long(inputs1_size
),
1379 int(expanded0
.size()),
1380 int(expanded1
.size()),
1381 long(expanded0_size
), long(inputs1_size
));
1382 smallest
= new_start
;
1383 largest
= new_limit
;
1384 c
->inputs_
[0] = expanded0
;
1385 c
->inputs_
[1] = expanded1
;
1386 GetRange2(c
->inputs_
[0], c
->inputs_
[1], &all_start
, &all_limit
);
1391 // Compute the set of grandparent files that overlap this compaction
1392 // (parent == level+1; grandparent == level+2)
1393 if (level
+ 2 < config::kNumLevels
) {
1394 current_
->GetOverlappingInputs(level
+ 2, &all_start
, &all_limit
,
1399 Log(options_
->info_log
, "Compacting %d '%s' .. '%s'",
1401 smallest
.DebugString().c_str(),
1402 largest
.DebugString().c_str());
1405 // Update the place where we will do the next compaction for this level.
1406 // We update this immediately instead of waiting for the VersionEdit
1407 // to be applied so that if the compaction fails, we will try a different
1408 // key range next time.
1409 compact_pointer_
[level
] = largest
.Encode().ToString();
1410 c
->edit_
.SetCompactPointer(level
, largest
);
1413 Compaction
* VersionSet::CompactRange(
1415 const InternalKey
* begin
,
1416 const InternalKey
* end
) {
1417 std::vector
<FileMetaData
*> inputs
;
1418 current_
->GetOverlappingInputs(level
, begin
, end
, &inputs
);
1419 if (inputs
.empty()) {
1423 // Avoid compacting too much in one shot in case the range is large.
1424 // But we cannot do this for level-0 since level-0 files can overlap
1425 // and we must not pick one file and drop another older file if the
1426 // two files overlap.
1428 const uint64_t limit
= MaxFileSizeForLevel(options_
, level
);
1430 for (size_t i
= 0; i
< inputs
.size(); i
++) {
1431 uint64_t s
= inputs
[i
]->file_size
;
1433 if (total
>= limit
) {
1434 inputs
.resize(i
+ 1);
1440 Compaction
* c
= new Compaction(options_
, level
);
1441 c
->input_version_
= current_
;
1442 c
->input_version_
->Ref();
1443 c
->inputs_
[0] = inputs
;
1444 SetupOtherInputs(c
);
1448 Compaction::Compaction(const Options
* options
, int level
)
1450 max_output_file_size_(MaxFileSizeForLevel(options
, level
)),
1451 input_version_(NULL
),
1452 grandparent_index_(0),
1454 overlapped_bytes_(0) {
1455 for (int i
= 0; i
< config::kNumLevels
; i
++) {
1460 Compaction::~Compaction() {
1461 if (input_version_
!= NULL
) {
1462 input_version_
->Unref();
1466 bool Compaction::IsTrivialMove() const {
1467 const VersionSet
* vset
= input_version_
->vset_
;
1468 // Avoid a move if there is lots of overlapping grandparent data.
1469 // Otherwise, the move could create a parent file that will require
1470 // a very expensive merge later on.
1471 return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
1472 TotalFileSize(grandparents_
) <=
1473 MaxGrandParentOverlapBytes(vset
->options_
));
1476 void Compaction::AddInputDeletions(VersionEdit
* edit
) {
1477 for (int which
= 0; which
< 2; which
++) {
1478 for (size_t i
= 0; i
< inputs_
[which
].size(); i
++) {
1479 edit
->DeleteFile(level_
+ which
, inputs_
[which
][i
]->number
);
1484 bool Compaction::IsBaseLevelForKey(const Slice
& user_key
) {
1485 // Maybe use binary search to find right entry instead of linear search?
1486 const Comparator
* user_cmp
= input_version_
->vset_
->icmp_
.user_comparator();
1487 for (int lvl
= level_
+ 2; lvl
< config::kNumLevels
; lvl
++) {
1488 const std::vector
<FileMetaData
*>& files
= input_version_
->files_
[lvl
];
1489 for (; level_ptrs_
[lvl
] < files
.size(); ) {
1490 FileMetaData
* f
= files
[level_ptrs_
[lvl
]];
1491 if (user_cmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
1492 // We've advanced far enough
1493 if (user_cmp
->Compare(user_key
, f
->smallest
.user_key()) >= 0) {
1494 // Key falls in this file's range, so definitely not base level
1505 bool Compaction::ShouldStopBefore(const Slice
& internal_key
) {
1506 const VersionSet
* vset
= input_version_
->vset_
;
1507 // Scan to find earliest grandparent file that contains key.
1508 const InternalKeyComparator
* icmp
= &vset
->icmp_
;
1509 while (grandparent_index_
< grandparents_
.size() &&
1510 icmp
->Compare(internal_key
,
1511 grandparents_
[grandparent_index_
]->largest
.Encode()) > 0) {
1513 overlapped_bytes_
+= grandparents_
[grandparent_index_
]->file_size
;
1515 grandparent_index_
++;
1519 if (overlapped_bytes_
> MaxGrandParentOverlapBytes(vset
->options_
)) {
1520 // Too much overlap for current output; start new output
1521 overlapped_bytes_
= 0;
1528 void Compaction::ReleaseInputs() {
1529 if (input_version_
!= NULL
) {
1530 input_version_
->Unref();
1531 input_version_
= NULL
;
1535 } // namespace leveldb