1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
16 // Construct a string of the specified length made out of the supplied
18 static std::string
BigString(const std::string
& partial_string
, size_t n
) {
20 while (result
.size() < n
) {
21 result
.append(partial_string
);
27 // Construct a string from a number
28 static std::string
NumberString(int n
) {
30 snprintf(buf
, sizeof(buf
), "%d.", n
);
31 return std::string(buf
);
34 // Return a skewed potentially long string
35 static std::string
RandomSkewedString(int i
, Random
* rnd
) {
36 return BigString(NumberString(i
), rnd
->Skewed(17));
41 class StringDest
: public WritableFile
{
43 std::string contents_
;
45 virtual Status
Close() { return Status::OK(); }
46 virtual Status
Flush() { return Status::OK(); }
47 virtual Status
Sync() { return Status::OK(); }
48 virtual Status
Append(const Slice
& slice
) {
49 contents_
.append(slice
.data(), slice
.size());
54 class StringSource
: public SequentialFile
{
58 bool returned_partial_
;
59 StringSource() : force_error_(false), returned_partial_(false) { }
61 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) {
62 ASSERT_TRUE(!returned_partial_
) << "must not Read() after eof/error";
66 returned_partial_
= true;
67 return Status::Corruption("read error");
70 if (contents_
.size() < n
) {
72 returned_partial_
= true;
74 *result
= Slice(contents_
.data(), n
);
75 contents_
.remove_prefix(n
);
79 virtual Status
Skip(uint64_t n
) {
80 if (n
> contents_
.size()) {
82 return Status::NotFound("in-memory file skipped past end");
85 contents_
.remove_prefix(n
);
91 class ReportCollector
: public Reader::Reporter
{
93 size_t dropped_bytes_
;
96 ReportCollector() : dropped_bytes_(0) { }
97 virtual void Corruption(size_t bytes
, const Status
& status
) {
98 dropped_bytes_
+= bytes
;
99 message_
.append(status
.ToString());
104 StringSource source_
;
105 ReportCollector report_
;
110 // Record metadata for testing initial offset functionality
111 static size_t initial_offset_record_sizes_
[];
112 static uint64_t initial_offset_last_record_offsets_
[];
113 static int num_initial_offset_records_
;
116 LogTest() : reading_(false),
117 writer_(new Writer(&dest_
)),
118 reader_(new Reader(&source_
, &report_
, true/*checksum*/,
119 0/*initial_offset*/)) {
127 void ReopenForAppend() {
129 writer_
= new Writer(&dest_
, dest_
.contents_
.size());
132 void Write(const std::string
& msg
) {
133 ASSERT_TRUE(!reading_
) << "Write() after starting to read";
134 writer_
->AddRecord(Slice(msg
));
137 size_t WrittenBytes() const {
138 return dest_
.contents_
.size();
144 source_
.contents_
= Slice(dest_
.contents_
);
148 if (reader_
->ReadRecord(&record
, &scratch
)) {
149 return record
.ToString();
155 void IncrementByte(int offset
, int delta
) {
156 dest_
.contents_
[offset
] += delta
;
159 void SetByte(int offset
, char new_byte
) {
160 dest_
.contents_
[offset
] = new_byte
;
163 void ShrinkSize(int bytes
) {
164 dest_
.contents_
.resize(dest_
.contents_
.size() - bytes
);
167 void FixChecksum(int header_offset
, int len
) {
168 // Compute crc of type/len/data
169 uint32_t crc
= crc32c::Value(&dest_
.contents_
[header_offset
+6], 1 + len
);
170 crc
= crc32c::Mask(crc
);
171 EncodeFixed32(&dest_
.contents_
[header_offset
], crc
);
175 source_
.force_error_
= true;
178 size_t DroppedBytes() const {
179 return report_
.dropped_bytes_
;
182 std::string
ReportMessage() const {
183 return report_
.message_
;
186 // Returns OK iff recorded error message contains "msg"
187 std::string
MatchError(const std::string
& msg
) const {
188 if (report_
.message_
.find(msg
) == std::string::npos
) {
189 return report_
.message_
;
195 void WriteInitialOffsetLog() {
196 for (int i
= 0; i
< num_initial_offset_records_
; i
++) {
197 std::string
record(initial_offset_record_sizes_
[i
],
198 static_cast<char>('a' + i
));
203 void StartReadingAt(uint64_t initial_offset
) {
205 reader_
= new Reader(&source_
, &report_
, true/*checksum*/, initial_offset
);
208 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end
) {
209 WriteInitialOffsetLog();
211 source_
.contents_
= Slice(dest_
.contents_
);
212 Reader
* offset_reader
= new Reader(&source_
, &report_
, true/*checksum*/,
213 WrittenBytes() + offset_past_end
);
216 ASSERT_TRUE(!offset_reader
->ReadRecord(&record
, &scratch
));
217 delete offset_reader
;
220 void CheckInitialOffsetRecord(uint64_t initial_offset
,
221 int expected_record_offset
) {
222 WriteInitialOffsetLog();
224 source_
.contents_
= Slice(dest_
.contents_
);
225 Reader
* offset_reader
= new Reader(&source_
, &report_
, true/*checksum*/,
228 // Read all records from expected_record_offset through the last one.
229 ASSERT_LT(expected_record_offset
, num_initial_offset_records_
);
230 for (; expected_record_offset
< num_initial_offset_records_
;
231 ++expected_record_offset
) {
234 ASSERT_TRUE(offset_reader
->ReadRecord(&record
, &scratch
));
235 ASSERT_EQ(initial_offset_record_sizes_
[expected_record_offset
],
237 ASSERT_EQ(initial_offset_last_record_offsets_
[expected_record_offset
],
238 offset_reader
->LastRecordOffset());
239 ASSERT_EQ((char)('a' + expected_record_offset
), record
.data()[0]);
241 delete offset_reader
;
245 size_t LogTest::initial_offset_record_sizes_
[] =
246 {10000, // Two sizable records in first block
248 2 * log::kBlockSize
- 1000, // Span three blocks
250 13716, // Consume all but two bytes of block 3.
251 log::kBlockSize
- kHeaderSize
, // Consume the entirety of block 4.
254 uint64_t LogTest::initial_offset_last_record_offsets_
[] =
257 2 * (kHeaderSize
+ 10000),
258 2 * (kHeaderSize
+ 10000) +
259 (2 * log::kBlockSize
- 1000) + 3 * kHeaderSize
,
260 2 * (kHeaderSize
+ 10000) +
261 (2 * log::kBlockSize
- 1000) + 3 * kHeaderSize
266 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
267 int LogTest::num_initial_offset_records_
=
268 sizeof(LogTest::initial_offset_last_record_offsets_
)/sizeof(uint64_t);
270 TEST(LogTest
, Empty
) {
271 ASSERT_EQ("EOF", Read());
274 TEST(LogTest
, ReadWrite
) {
279 ASSERT_EQ("foo", Read());
280 ASSERT_EQ("bar", Read());
281 ASSERT_EQ("", Read());
282 ASSERT_EQ("xxxx", Read());
283 ASSERT_EQ("EOF", Read());
284 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
287 TEST(LogTest
, ManyBlocks
) {
288 for (int i
= 0; i
< 100000; i
++) {
289 Write(NumberString(i
));
291 for (int i
= 0; i
< 100000; i
++) {
292 ASSERT_EQ(NumberString(i
), Read());
294 ASSERT_EQ("EOF", Read());
297 TEST(LogTest
, Fragmentation
) {
299 Write(BigString("medium", 50000));
300 Write(BigString("large", 100000));
301 ASSERT_EQ("small", Read());
302 ASSERT_EQ(BigString("medium", 50000), Read());
303 ASSERT_EQ(BigString("large", 100000), Read());
304 ASSERT_EQ("EOF", Read());
307 TEST(LogTest
, MarginalTrailer
) {
308 // Make a trailer that is exactly the same length as an empty record.
309 const int n
= kBlockSize
- 2*kHeaderSize
;
310 Write(BigString("foo", n
));
311 ASSERT_EQ(kBlockSize
- kHeaderSize
, WrittenBytes());
314 ASSERT_EQ(BigString("foo", n
), Read());
315 ASSERT_EQ("", Read());
316 ASSERT_EQ("bar", Read());
317 ASSERT_EQ("EOF", Read());
320 TEST(LogTest
, MarginalTrailer2
) {
321 // Make a trailer that is exactly the same length as an empty record.
322 const int n
= kBlockSize
- 2*kHeaderSize
;
323 Write(BigString("foo", n
));
324 ASSERT_EQ(kBlockSize
- kHeaderSize
, WrittenBytes());
326 ASSERT_EQ(BigString("foo", n
), Read());
327 ASSERT_EQ("bar", Read());
328 ASSERT_EQ("EOF", Read());
329 ASSERT_EQ(0, DroppedBytes());
330 ASSERT_EQ("", ReportMessage());
333 TEST(LogTest
, ShortTrailer
) {
334 const int n
= kBlockSize
- 2*kHeaderSize
+ 4;
335 Write(BigString("foo", n
));
336 ASSERT_EQ(kBlockSize
- kHeaderSize
+ 4, WrittenBytes());
339 ASSERT_EQ(BigString("foo", n
), Read());
340 ASSERT_EQ("", Read());
341 ASSERT_EQ("bar", Read());
342 ASSERT_EQ("EOF", Read());
345 TEST(LogTest
, AlignedEof
) {
346 const int n
= kBlockSize
- 2*kHeaderSize
+ 4;
347 Write(BigString("foo", n
));
348 ASSERT_EQ(kBlockSize
- kHeaderSize
+ 4, WrittenBytes());
349 ASSERT_EQ(BigString("foo", n
), Read());
350 ASSERT_EQ("EOF", Read());
353 TEST(LogTest
, OpenForAppend
) {
357 ASSERT_EQ("hello", Read());
358 ASSERT_EQ("world", Read());
359 ASSERT_EQ("EOF", Read());
362 TEST(LogTest
, RandomRead
) {
364 Random
write_rnd(301);
365 for (int i
= 0; i
< N
; i
++) {
366 Write(RandomSkewedString(i
, &write_rnd
));
368 Random
read_rnd(301);
369 for (int i
= 0; i
< N
; i
++) {
370 ASSERT_EQ(RandomSkewedString(i
, &read_rnd
), Read());
372 ASSERT_EQ("EOF", Read());
375 // Tests of all the error paths in log_reader.cc follow:
377 TEST(LogTest
, ReadError
) {
380 ASSERT_EQ("EOF", Read());
381 ASSERT_EQ(kBlockSize
, DroppedBytes());
382 ASSERT_EQ("OK", MatchError("read error"));
385 TEST(LogTest
, BadRecordType
) {
387 // Type is stored in header[6]
388 IncrementByte(6, 100);
390 ASSERT_EQ("EOF", Read());
391 ASSERT_EQ(3, DroppedBytes());
392 ASSERT_EQ("OK", MatchError("unknown record type"));
395 TEST(LogTest
, TruncatedTrailingRecordIsIgnored
) {
397 ShrinkSize(4); // Drop all payload as well as a header byte
398 ASSERT_EQ("EOF", Read());
399 // Truncated last record is ignored, not treated as an error.
400 ASSERT_EQ(0, DroppedBytes());
401 ASSERT_EQ("", ReportMessage());
404 TEST(LogTest
, BadLength
) {
405 const int kPayloadSize
= kBlockSize
- kHeaderSize
;
406 Write(BigString("bar", kPayloadSize
));
408 // Least significant size byte is stored in header[4].
410 ASSERT_EQ("foo", Read());
411 ASSERT_EQ(kBlockSize
, DroppedBytes());
412 ASSERT_EQ("OK", MatchError("bad record length"));
415 TEST(LogTest
, BadLengthAtEndIsIgnored
) {
418 ASSERT_EQ("EOF", Read());
419 ASSERT_EQ(0, DroppedBytes());
420 ASSERT_EQ("", ReportMessage());
423 TEST(LogTest
, ChecksumMismatch
) {
425 IncrementByte(0, 10);
426 ASSERT_EQ("EOF", Read());
427 ASSERT_EQ(10, DroppedBytes());
428 ASSERT_EQ("OK", MatchError("checksum mismatch"));
431 TEST(LogTest
, UnexpectedMiddleType
) {
433 SetByte(6, kMiddleType
);
435 ASSERT_EQ("EOF", Read());
436 ASSERT_EQ(3, DroppedBytes());
437 ASSERT_EQ("OK", MatchError("missing start"));
440 TEST(LogTest
, UnexpectedLastType
) {
442 SetByte(6, kLastType
);
444 ASSERT_EQ("EOF", Read());
445 ASSERT_EQ(3, DroppedBytes());
446 ASSERT_EQ("OK", MatchError("missing start"));
449 TEST(LogTest
, UnexpectedFullType
) {
452 SetByte(6, kFirstType
);
454 ASSERT_EQ("bar", Read());
455 ASSERT_EQ("EOF", Read());
456 ASSERT_EQ(3, DroppedBytes());
457 ASSERT_EQ("OK", MatchError("partial record without end"));
460 TEST(LogTest
, UnexpectedFirstType
) {
462 Write(BigString("bar", 100000));
463 SetByte(6, kFirstType
);
465 ASSERT_EQ(BigString("bar", 100000), Read());
466 ASSERT_EQ("EOF", Read());
467 ASSERT_EQ(3, DroppedBytes());
468 ASSERT_EQ("OK", MatchError("partial record without end"));
471 TEST(LogTest
, MissingLastIsIgnored
) {
472 Write(BigString("bar", kBlockSize
));
473 // Remove the LAST block, including header.
475 ASSERT_EQ("EOF", Read());
476 ASSERT_EQ("", ReportMessage());
477 ASSERT_EQ(0, DroppedBytes());
480 TEST(LogTest
, PartialLastIsIgnored
) {
481 Write(BigString("bar", kBlockSize
));
482 // Cause a bad record length in the LAST block.
484 ASSERT_EQ("EOF", Read());
485 ASSERT_EQ("", ReportMessage());
486 ASSERT_EQ(0, DroppedBytes());
489 TEST(LogTest
, SkipIntoMultiRecord
) {
490 // Consider a fragmented record:
491 // first(R1), middle(R1), last(R1), first(R2)
492 // If initial_offset points to a record after first(R1) but before first(R2)
493 // incomplete fragment errors are not actual errors, and must be suppressed
494 // until a new first or full record is encountered.
495 Write(BigString("foo", 3*kBlockSize
));
497 StartReadingAt(kBlockSize
);
499 ASSERT_EQ("correct", Read());
500 ASSERT_EQ("", ReportMessage());
501 ASSERT_EQ(0, DroppedBytes());
502 ASSERT_EQ("EOF", Read());
505 TEST(LogTest
, ErrorJoinsRecords
) {
506 // Consider two fragmented records:
507 // first(R1) last(R1) first(R2) last(R2)
508 // where the middle two fragments disappear. We do not want
509 // first(R1),last(R2) to get joined and returned as a valid record.
511 // Write records that span two blocks
512 Write(BigString("foo", kBlockSize
));
513 Write(BigString("bar", kBlockSize
));
516 // Wipe the middle block
517 for (int offset
= kBlockSize
; offset
< 2*kBlockSize
; offset
++) {
518 SetByte(offset
, 'x');
521 ASSERT_EQ("correct", Read());
522 ASSERT_EQ("EOF", Read());
523 const size_t dropped
= DroppedBytes();
524 ASSERT_LE(dropped
, 2*kBlockSize
+ 100);
525 ASSERT_GE(dropped
, 2*kBlockSize
);
528 TEST(LogTest
, ReadStart
) {
529 CheckInitialOffsetRecord(0, 0);
532 TEST(LogTest
, ReadSecondOneOff
) {
533 CheckInitialOffsetRecord(1, 1);
536 TEST(LogTest
, ReadSecondTenThousand
) {
537 CheckInitialOffsetRecord(10000, 1);
540 TEST(LogTest
, ReadSecondStart
) {
541 CheckInitialOffsetRecord(10007, 1);
544 TEST(LogTest
, ReadThirdOneOff
) {
545 CheckInitialOffsetRecord(10008, 2);
548 TEST(LogTest
, ReadThirdStart
) {
549 CheckInitialOffsetRecord(20014, 2);
552 TEST(LogTest
, ReadFourthOneOff
) {
553 CheckInitialOffsetRecord(20015, 3);
556 TEST(LogTest
, ReadFourthFirstBlockTrailer
) {
557 CheckInitialOffsetRecord(log::kBlockSize
- 4, 3);
560 TEST(LogTest
, ReadFourthMiddleBlock
) {
561 CheckInitialOffsetRecord(log::kBlockSize
+ 1, 3);
564 TEST(LogTest
, ReadFourthLastBlock
) {
565 CheckInitialOffsetRecord(2 * log::kBlockSize
+ 1, 3);
568 TEST(LogTest
, ReadFourthStart
) {
569 CheckInitialOffsetRecord(
570 2 * (kHeaderSize
+ 1000) + (2 * log::kBlockSize
- 1000) + 3 * kHeaderSize
,
574 TEST(LogTest
, ReadInitialOffsetIntoBlockPadding
) {
575 CheckInitialOffsetRecord(3 * log::kBlockSize
- 3, 5);
578 TEST(LogTest
, ReadEnd
) {
579 CheckOffsetPastEndReturnsNoRecords(0);
582 TEST(LogTest
, ReadPastEnd
) {
583 CheckOffsetPastEndReturnsNoRecords(5);
587 } // namespace leveldb
589 int main(int argc
, char** argv
) {
590 return leveldb::test::RunAllTests();