Fix the ".lint fkey-indexes" shell command so that it works with WITHOUT ROWID
[sqlite.git] / ext / lsm1 / lsm_log.c
bloba66e40bccd58bb2a61ef6f4be6ab1fee7d76d770
1 /*
2 ** 2011-08-13
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** This file contains the implementation of LSM database logging. Logging
14 ** has one purpose in LSM - to make transactions durable.
16 ** When data is written to an LSM database, it is initially stored in an
17 ** in-memory tree structure. Since this structure is in volatile memory,
18 ** if a power failure or application crash occurs it may be lost. To
19 ** prevent loss of data in this case, each time a record is written to the
20 ** in-memory tree an equivalent record is appended to the log on disk.
21 ** If a power failure or application crash does occur, data can be recovered
22 ** by reading the log.
24 ** A log file consists of the following types of records representing data
25 ** written into the database:
27 ** LOG_WRITE: A key-value pair written to the database.
28 ** LOG_DELETE: A delete key issued to the database.
29 ** LOG_COMMIT: A transaction commit.
31 ** And the following types of records for ancillary purposes..
33 ** LOG_EOF: A record indicating the end of a log file.
34 ** LOG_PAD1: A single byte padding record.
35 ** LOG_PAD2: An N byte padding record (N>1).
36 ** LOG_JUMP: A pointer to another offset within the log file.
38 ** Each transaction written to the log contains one or more LOG_WRITE and/or
39 ** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
40 ** contains an 8-byte checksum based on all previous data written to the
41 ** log file.
43 ** LOG CHECKSUMS & RECOVERY
45 ** Checksums are found in two types of log records: LOG_COMMIT and
46 ** LOG_CKSUM records. In order to recover content from a log, a client
47 ** reads each record from the start of the log, calculating a checksum as
48 ** it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the
49 ** recovery process verifies that the checksum stored in the log
50 ** matches the calculated checksum. If it does not, the recovery process
51 ** can stop reading the log.
53 ** If a recovery process reads records (other than COMMIT or CKSUM)
54 ** consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
55 ** the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
56 ** not, the recovery process also stops reading the log.
58 ** To recover the log file, it must be read twice. The first time to
59 ** determine the location of the last valid commit record. And the second
60 ** time to load data into the in-memory tree.
62 ** Todo: Surely there is a better way...
64 ** LOG WRAPPING
66 ** If the log file were never deleted or wrapped, it would be possible to
67 ** read it from start to end each time is required recovery (i.e each time
68 ** the number of database clients changes from 0 to 1). Effectively reading
69 ** the entire history of the database each time. This would quickly become
70 ** inefficient. Additionally, since the log file would grow without bound,
71 ** it wastes storage space.
73 ** Instead, part of each checkpoint written into the database file contains
74 ** a log offset (and other information required to read the log starting at
75 ** at this offset) at which to begin recovery. Offset $O.
77 ** Once a checkpoint has been written and synced into the database file, it
78 ** is guaranteed that no recovery process will need to read any data before
79 ** offset $O of the log file. It is therefore safe to begin overwriting
80 ** any data that occurs before offset $O.
82 ** This implementation separates the log into three regions mapped into
83 ** the log file - regions 0, 1 and 2. During recovery, regions are read
84 ** in ascending order (i.e. 0, then 1, then 2). Each region is zero or
85 ** more bytes in size.
87 ** |---1---|..|--0--|.|--2--|....
89 ** New records are always appended to the end of region 2.
91 ** Initially (when it is empty), all three regions are zero bytes in size.
92 ** Each of them are located at the beginning of the file. As records are
93 ** added to the log, region 2 grows, so that the log consists of a zero
94 ** byte region 1, followed by a zero byte region 0, followed by an N byte
95 ** region 2. After one or more checkpoints have been written to disk,
96 ** the start point of region 2 is moved to $O. For example:
98 ** A) ||.........|--2--|....
99 **
100 ** (both regions 0 and 1 are 0 bytes in size at offset 0).
102 ** Eventually, the log wraps around to write new records into the start.
103 ** At this point, region 2 is renamed to region 0. Region 0 is renamed
104 ** to region 2. After appending a few records to the new region 2, the
105 ** log file looks like this:
107 ** B) ||--2--|...|--0--|....
109 ** (region 1 is still 0 bytes in size, located at offset 0).
111 ** Any checkpoints made at this point may reduce the size of region 0.
112 ** However, if they do not, and region 2 expands so that it is about to
113 ** overwrite the start of region 0, then region 2 is renamed to region 1,
114 ** and a new region 2 created at the end of the file following the existing
115 ** region 0.
117 ** C) |---1---|..|--0--|.|-2-|
119 ** In this state records are appended to region 2 until checkpoints have
120 ** contracted regions 0 AND 1 UNTil they are both zero bytes in size. They
121 ** are then shifted to the start of the log file, leaving the system in
122 ** the equivalent of state A above.
124 ** Alternatively, state B may transition directly to state A if the size
125 ** of region 0 is reduced to zero bytes before region 2 threatens to
126 ** encroach upon it.
128 ** LOG_PAD1 & LOG_PAD2 RECORDS
130 ** PAD1 and PAD2 records may appear in a log file at any point. They allow
131 ** a process writing the log file align the beginning of transactions with
132 ** the beginning of disk sectors, which increases robustness.
134 ** RECORD FORMATS:
136 ** LOG_EOF: * A single 0x00 byte.
138 ** LOG_PAD1: * A single 0x01 byte.
140 ** LOG_PAD2: * A single 0x02 byte, followed by
141 ** * The number of unused bytes (N) as a varint,
142 ** * An N byte block of unused space.
144 ** LOG_COMMIT: * A single 0x03 byte.
145 ** * An 8-byte checksum.
147 ** LOG_JUMP: * A single 0x04 byte.
148 ** * Absolute file offset to jump to, encoded as a varint.
150 ** LOG_WRITE: * A single 0x06 or 0x07 byte,
151 ** * The number of bytes in the key, encoded as a varint,
152 ** * The number of bytes in the value, encoded as a varint,
153 ** * If the first byte was 0x07, an 8 byte checksum.
154 ** * The key data,
155 ** * The value data.
157 ** LOG_DELETE: * A single 0x08 or 0x09 byte,
158 ** * The number of bytes in the key, encoded as a varint,
159 ** * If the first byte was 0x09, an 8 byte checksum.
160 ** * The key data.
162 ** Varints are as described in lsm_varint.c (SQLite 4 format).
164 ** CHECKSUMS:
166 ** The checksum is calculated using two 32-bit unsigned integers, s0 and
167 ** s1. The initial value for both is 42. It is updated each time a record
168 ** is written into the log file by treating the encoded (binary) record as
169 ** an array of 32-bit little-endian integers. Then, if x[] is the integer
170 ** array, updating the checksum accumulators as follows:
172 ** for i from 0 to n-1 step 2:
173 ** s0 += x[i] + s1;
174 ** s1 += x[i+1] + s0;
175 ** endfor
177 ** If the record is not an even multiple of 8-bytes in size it is padded
178 ** with zeroes to make it so before the checksum is updated.
180 ** The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
181 ** up to the start of the 8-byte checksum itself, including the COMMIT,
182 ** WRITE or DELETE fields that appear before the checksum in the record.
184 ** VARINT FORMAT
186 ** See lsm_varint.c.
189 #ifndef _LSM_INT_H
190 # include "lsmInt.h"
191 #endif
193 /* Log record types */
194 #define LSM_LOG_EOF 0x00
195 #define LSM_LOG_PAD1 0x01
196 #define LSM_LOG_PAD2 0x02
197 #define LSM_LOG_COMMIT 0x03
198 #define LSM_LOG_JUMP 0x04
200 #define LSM_LOG_WRITE 0x06
201 #define LSM_LOG_WRITE_CKSUM 0x07
203 #define LSM_LOG_DELETE 0x08
204 #define LSM_LOG_DELETE_CKSUM 0x09
206 #define LSM_LOG_DRANGE 0x0A
207 #define LSM_LOG_DRANGE_CKSUM 0x0B
209 /* Require a checksum every 32KB. */
210 #define LSM_CKSUM_MAXDATA (32*1024)
212 /* Do not wrap a log file smaller than this in bytes. */
213 #define LSM_MIN_LOGWRAP (128*1024)
216 ** szSector:
217 ** Commit records must be aligned to end on szSector boundaries. If
218 ** the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
219 ** if the safety-mode is set to FULL, it is the size of the file-system
220 ** sectors as reported by lsmFsSectorSize().
222 struct LogWriter {
223 u32 cksum0; /* Checksum 0 at offset iOff */
224 u32 cksum1; /* Checksum 1 at offset iOff */
225 int iCksumBuf; /* Bytes of buf that have been checksummed */
226 i64 iOff; /* Offset at start of buffer buf */
227 int szSector; /* Sector size for this transaction */
228 LogRegion jump; /* Avoid writing to this region */
229 i64 iRegion1End; /* End of first region written by trans */
230 i64 iRegion2Start; /* Start of second regions written by trans */
231 LsmString buf; /* Buffer containing data not yet written */
235 ** Return the result of interpreting the first 4 bytes in buffer aIn as
236 ** a 32-bit unsigned little-endian integer.
238 static u32 getU32le(u8 *aIn){
239 return ((u32)aIn[3] << 24)
240 + ((u32)aIn[2] << 16)
241 + ((u32)aIn[1] << 8)
242 + ((u32)aIn[0]);
247 ** This function is the same as logCksum(), except that pointer "a" need
248 ** not be aligned to an 8-byte boundary or padded with zero bytes. This
249 ** version is slower, but sometimes more convenient to use.
251 static void logCksumUnaligned(
252 char *z, /* Input buffer */
253 int n, /* Size of input buffer in bytes */
254 u32 *pCksum0, /* IN/OUT: Checksum value 1 */
255 u32 *pCksum1 /* IN/OUT: Checksum value 2 */
257 u8 *a = (u8 *)z;
258 u32 cksum0 = *pCksum0;
259 u32 cksum1 = *pCksum1;
260 int nIn = (n/8) * 8;
261 int i;
263 assert( n>0 );
264 for(i=0; i<nIn; i+=8){
265 cksum0 += getU32le(&a[i]) + cksum1;
266 cksum1 += getU32le(&a[i+4]) + cksum0;
269 if( nIn!=n ){
270 u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
271 assert( (n-nIn)<8 && n>nIn );
272 memcpy(aBuf, &a[nIn], n-nIn);
273 cksum0 += getU32le(aBuf) + cksum1;
274 cksum1 += getU32le(&aBuf[4]) + cksum0;
277 *pCksum0 = cksum0;
278 *pCksum1 = cksum1;
282 ** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the
283 ** write buffer (pLog->buf) are included in the checksum.
285 static void logUpdateCksum(LogWriter *pLog, int nBuf){
286 assert( (pLog->iCksumBuf % 8)==0 );
287 assert( pLog->iCksumBuf<=nBuf );
288 assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
289 if( nBuf>pLog->iCksumBuf ){
290 logCksumUnaligned(
291 &pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf,
292 &pLog->cksum0, &pLog->cksum1
295 pLog->iCksumBuf = nBuf;
298 static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
299 return (iOff / pLog->szSector) * pLog->szSector;
301 static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
302 return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
306 ** If possible, reclaim log file space. Log file space is reclaimed after
307 ** a snapshot that points to the same data in the database file is synced
308 ** into the db header.
310 static int logReclaimSpace(lsm_db *pDb){
311 int rc;
312 int iMeta;
313 int bRotrans; /* True if there exists some ro-trans */
315 /* Test if there exists some other connection with a read-only transaction
316 ** open. If there does, then log file space may not be reclaimed. */
317 rc = lsmDetectRoTrans(pDb, &bRotrans);
318 if( rc!=LSM_OK || bRotrans ) return rc;
320 iMeta = (int)pDb->pShmhdr->iMetaPage;
321 if( iMeta==1 || iMeta==2 ){
322 DbLog *pLog = &pDb->treehdr.log;
323 i64 iSyncedId;
325 /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
326 ** that in theory, the value read is untrustworthy (due to a race
327 ** condition - see comments above lsmFsReadSyncedId()). So it is only
328 ** ever used to conclude that no log space can be reclaimed. If it seems
329 ** to indicate that it may be possible to reclaim log space, a
330 ** second call to lsmCheckpointSynced() (which does return trustworthy
331 ** values) is made below to confirm. */
332 rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);
334 if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
335 i64 iSnapshotId = 0;
336 i64 iOff = 0;
337 rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
338 if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
339 int iRegion;
340 for(iRegion=0; iRegion<3; iRegion++){
341 LogRegion *p = &pLog->aRegion[iRegion];
342 if( iOff>=p->iStart && iOff<=p->iEnd ) break;
343 p->iStart = 0;
344 p->iEnd = 0;
346 assert( iRegion<3 );
347 pLog->aRegion[iRegion].iStart = iOff;
348 pLog->iSnapshotId = iSnapshotId;
352 return rc;
356 ** This function is called when a write-transaction is first opened. It
357 ** is assumed that the caller is holding the client-mutex when it is
358 ** called.
360 ** Before returning, this function allocates the LogWriter object that
361 ** will be used to write to the log file during the write transaction.
362 ** LSM_OK is returned if no error occurs, otherwise an LSM error code.
364 int lsmLogBegin(lsm_db *pDb){
365 int rc = LSM_OK;
366 LogWriter *pNew;
367 LogRegion *aReg;
369 if( pDb->bUseLog==0 ) return LSM_OK;
371 /* If the log file has not yet been opened, open it now. Also allocate
372 ** the LogWriter structure, if it has not already been allocated. */
373 rc = lsmFsOpenLog(pDb, 0);
374 if( pDb->pLogWriter==0 ){
375 pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
376 if( pNew ){
377 lsmStringInit(&pNew->buf, pDb->pEnv);
378 rc = lsmStringExtend(&pNew->buf, 2);
380 pDb->pLogWriter = pNew;
381 }else{
382 pNew = pDb->pLogWriter;
383 assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
384 memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
385 pNew->buf.n = 0;
388 if( rc==LSM_OK ){
389 /* The following call detects whether or not a new snapshot has been
390 ** synced into the database file. If so, it updates the contents of
391 ** the pDb->treehdr.log structure to reclaim any space in the log
392 ** file that is no longer required.
394 ** TODO: Calling this every transaction is overkill. And since the
395 ** call has to read and checksum a snapshot from the database file,
396 ** it is expensive. It would be better to figure out a way so that
397 ** this is only called occasionally - say for every 32KB written to
398 ** the log file.
400 rc = logReclaimSpace(pDb);
402 if( rc!=LSM_OK ){
403 lsmLogClose(pDb);
404 return rc;
407 /* Set the effective sector-size for this transaction. Sectors are assumed
408 ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
409 ** reported by lsmFsSectorSize if it is FULL. */
410 if( pDb->eSafety==LSM_SAFETY_FULL ){
411 pNew->szSector = lsmFsSectorSize(pDb->pFS);
412 assert( pNew->szSector>0 );
413 }else{
414 pNew->szSector = 1;
417 /* There are now three scenarios:
419 ** 1) Regions 0 and 1 are both zero bytes in size and region 2 begins
420 ** at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
421 ** around to the start and write data into the start of the log file.
423 ** 2) Region 1 is zero bytes in size and region 2 occurs earlier in the
424 ** file than region 0. In this case, append data to region 2, but
425 ** remember to jump over region 1 if required.
427 ** 3) Region 2 is the last in the file. Append to it.
429 aReg = &pDb->treehdr.log.aRegion[0];
431 assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
432 assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );
434 pNew->cksum0 = pDb->treehdr.log.cksum0;
435 pNew->cksum1 = pDb->treehdr.log.cksum1;
437 if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
438 /* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP
439 ** into the log file in this case. Pad it out to 8 bytes using a PAD2
440 ** record so that the checksums can be updated immediately. */
441 u8 aJump[] = {
442 LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00
445 lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
446 logUpdateCksum(pNew, pNew->buf.n);
447 rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
448 pNew->iCksumBuf = pNew->buf.n = 0;
450 aReg[2].iEnd += 8;
451 pNew->jump = aReg[0] = aReg[2];
452 aReg[2].iStart = aReg[2].iEnd = 0;
453 }else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
454 /* Case 2. */
455 pNew->iOff = aReg[2].iEnd;
456 pNew->jump = aReg[0];
457 }else{
458 /* Case 3. */
459 assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
460 pNew->iOff = aReg[2].iEnd;
463 if( pNew->jump.iStart ){
464 i64 iRound;
465 assert( pNew->jump.iStart>pNew->iOff );
467 iRound = firstByteOnSector(pNew, pNew->jump.iStart);
468 if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
469 pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
472 assert( pDb->pLogWriter==pNew );
473 return rc;
477 ** This function is called when a write-transaction is being closed.
478 ** Parameter bCommit is true if the transaction is being committed,
479 ** or false otherwise. The caller must hold the client-mutex to call
480 ** this function.
482 ** A call to this function deletes the LogWriter object allocated by
483 ** lsmLogBegin(). If the transaction is being committed, the shared state
484 ** in *pLog is updated before returning.
486 void lsmLogEnd(lsm_db *pDb, int bCommit){
487 DbLog *pLog;
488 LogWriter *p;
489 p = pDb->pLogWriter;
491 if( p==0 ) return;
492 pLog = &pDb->treehdr.log;
494 if( bCommit ){
495 pLog->aRegion[2].iEnd = p->iOff;
496 pLog->cksum0 = p->cksum0;
497 pLog->cksum1 = p->cksum1;
498 if( p->iRegion1End ){
499 /* This happens when the transaction had to jump over some other
500 ** part of the log. */
501 assert( pLog->aRegion[1].iEnd==0 );
502 assert( pLog->aRegion[2].iStart<p->iRegion1End );
503 pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
504 pLog->aRegion[1].iEnd = p->iRegion1End;
505 pLog->aRegion[2].iStart = p->iRegion2Start;
510 static int jumpIfRequired(
511 lsm_db *pDb,
512 LogWriter *pLog,
513 int nReq,
514 int *pbJump
516 /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
517 ** jump region before writing the LSM_LOG_WRITE or DELETE record. This
518 ** is necessary if there is insufficient room between the current offset
519 ** and the jump region to fit the new WRITE/DELETE record and the largest
520 ** possible JUMP record with up to 7 bytes of padding (a total of 17
521 ** bytes). */
522 if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
523 && (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17)))
525 int rc; /* Return code */
526 i64 iJump; /* Offset to jump to */
527 u8 aJump[10]; /* Encoded jump record */
528 int nJump; /* Valid bytes in aJump[] */
529 int nPad; /* Bytes of padding required */
531 /* Serialize the JUMP record */
532 iJump = pLog->jump.iEnd+1;
533 aJump[0] = LSM_LOG_JUMP;
534 nJump = 1 + lsmVarintPut64(&aJump[1], iJump);
536 /* Adding padding to the contents of the buffer so that it will be a
537 ** multiple of 8 bytes in size after the JUMP record is appended. This
538 ** is not strictly required, it just makes the keeping the running
539 ** checksum up to date in this file a little simpler. */
540 nPad = (pLog->buf.n + nJump) % 8;
541 if( nPad ){
542 u8 aPad[7] = {0,0,0,0,0,0,0};
543 nPad = 8-nPad;
544 if( nPad==1 ){
545 aPad[0] = LSM_LOG_PAD1;
546 }else{
547 aPad[0] = LSM_LOG_PAD2;
548 aPad[1] = (u8)(nPad-2);
550 rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
551 if( rc!=LSM_OK ) return rc;
554 /* Append the JUMP record to the buffer. Then flush the buffer to disk
555 ** and update the checksums. The next write to the log file (assuming
556 ** there is no transaction rollback) will be to offset iJump (just past
557 ** the jump region). */
558 rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
559 if( rc!=LSM_OK ) return rc;
560 assert( (pLog->buf.n % 8)==0 );
561 rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
562 if( rc!=LSM_OK ) return rc;
563 logUpdateCksum(pLog, pLog->buf.n);
564 pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
565 pLog->iRegion2Start = iJump;
566 pLog->iOff = iJump;
567 pLog->iCksumBuf = pLog->buf.n = 0;
568 if( pbJump ) *pbJump = 1;
571 return LSM_OK;
574 static int logCksumAndFlush(lsm_db *pDb){
575 int rc; /* Return code */
576 LogWriter *pLog = pDb->pLogWriter;
578 /* Calculate the checksum value. Append it to the buffer. */
579 logUpdateCksum(pLog, pLog->buf.n);
580 lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
581 pLog->buf.n += 4;
582 lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
583 pLog->buf.n += 4;
585 /* Write the contents of the buffer to disk. */
586 rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
587 pLog->iOff += pLog->buf.n;
588 pLog->iCksumBuf = pLog->buf.n = 0;
590 return rc;
594 ** Write the contents of the log-buffer to disk. Then write either a CKSUM
595 ** or COMMIT record, depending on the value of parameter eType.
597 static int logFlush(lsm_db *pDb, int eType){
598 int rc;
599 int nReq;
600 LogWriter *pLog = pDb->pLogWriter;
602 assert( eType==LSM_LOG_COMMIT );
603 assert( pLog );
605 /* Commit record is always 9 bytes in size. */
606 nReq = 9;
607 if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
608 rc = jumpIfRequired(pDb, pLog, nReq, 0);
610 /* If this is a COMMIT, add padding to the log so that the COMMIT record
611 ** is aligned against the end of a disk sector. In other words, add padding
612 ** so that the first byte following the COMMIT record lies on a different
613 ** sector. */
614 if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
615 int nPad; /* Bytes of padding to add */
617 /* Determine the value of nPad. */
618 nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
619 if( nPad ) nPad = pLog->szSector - nPad;
620 rc = lsmStringExtend(&pLog->buf, nPad);
621 if( rc!=LSM_OK ) return rc;
623 while( nPad ){
624 if( nPad==1 ){
625 pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
626 nPad = 0;
627 }else{
628 int n = LSM_MIN(200, nPad-2);
629 pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
630 pLog->buf.z[pLog->buf.n++] = (char)n;
631 nPad -= 2;
632 memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
633 pLog->buf.n += n;
634 nPad -= n;
639 /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
640 ** record. Then add the first byte of it. */
641 rc = lsmStringExtend(&pLog->buf, 9);
642 if( rc!=LSM_OK ) return rc;
643 pLog->buf.z[pLog->buf.n++] = (char)eType;
644 memset(&pLog->buf.z[pLog->buf.n], 0, 8);
646 rc = logCksumAndFlush(pDb);
648 /* If this is a commit and synchronous=full, sync the log to disk. */
649 if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
650 rc = lsmFsSyncLog(pDb->pFS);
652 return rc;
656 ** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0)
657 ** record to the database log.
659 int lsmLogWrite(
660 lsm_db *pDb, /* Database handle */
661 int eType,
662 void *pKey, int nKey, /* Database key to write to log */
663 void *pVal, int nVal /* Database value (or nVal<0) to write */
665 int rc = LSM_OK;
666 LogWriter *pLog; /* Log object to write to */
667 int nReq; /* Bytes of space required in log */
668 int bCksum = 0; /* True to embed a checksum in this record */
670 assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
671 assert( LSM_LOG_WRITE==LSM_WRITE );
672 assert( LSM_LOG_DELETE==LSM_DELETE );
673 assert( LSM_LOG_DRANGE==LSM_DRANGE );
674 assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
676 if( pDb->bUseLog==0 ) return LSM_OK;
677 pLog = pDb->pLogWriter;
679 /* Determine how many bytes of space are required, assuming that a checksum
680 ** will be embedded in this record (even though it may not be). */
681 nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
682 if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
684 /* Jump over the jump region if required. Set bCksum to true to tell the
685 ** code below to include a checksum in the record if either (a) writing
686 ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
687 ** have been written to the log since the last checksum, or (b) the jump
688 ** is taken. */
689 rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
690 if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;
692 if( rc==LSM_OK ){
693 rc = lsmStringExtend(&pLog->buf, nReq);
695 if( rc==LSM_OK ){
696 u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
698 /* Write the record header - the type byte followed by either 1 (for
699 ** DELETE) or 2 (for WRITE) varints. */
700 assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
701 assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
702 assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
703 *(a++) = (u8)eType | (u8)bCksum;
704 a += lsmVarintPut32(a, nKey);
705 if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
707 if( bCksum ){
708 pLog->buf.n = (a - (u8 *)pLog->buf.z);
709 rc = logCksumAndFlush(pDb);
710 a = (u8 *)&pLog->buf.z[pLog->buf.n];
713 memcpy(a, pKey, nKey);
714 a += nKey;
715 if( eType!=LSM_LOG_DELETE ){
716 memcpy(a, pVal, nVal);
717 a += nVal;
719 pLog->buf.n = a - (u8 *)pLog->buf.z;
720 assert( pLog->buf.n<=pLog->buf.nAlloc );
723 return rc;
727 ** Append an LSM_LOG_COMMIT record to the database log.
729 int lsmLogCommit(lsm_db *pDb){
730 if( pDb->bUseLog==0 ) return LSM_OK;
731 return logFlush(pDb, LSM_LOG_COMMIT);
735 ** Store the current offset and other checksum related information in the
736 ** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
737 ** the LogWriter object to the current log file offset. This is used when
738 ** rolling back savepoint transactions.
740 void lsmLogTell(
741 lsm_db *pDb, /* Database handle */
742 LogMark *pMark /* Populate this object with current offset */
744 LogWriter *pLog;
745 int nCksum;
747 if( pDb->bUseLog==0 ) return;
748 pLog = pDb->pLogWriter;
749 nCksum = pLog->buf.n & 0xFFFFFFF8;
750 logUpdateCksum(pLog, nCksum);
751 assert( pLog->iCksumBuf==nCksum );
752 pMark->nBuf = pLog->buf.n - nCksum;
753 memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);
755 pMark->iOff = pLog->iOff + pLog->buf.n;
756 pMark->cksum0 = pLog->cksum0;
757 pMark->cksum1 = pLog->cksum1;
761 ** Seek (rewind) back to the log file offset stored by an ealier call to
762 ** lsmLogTell() in *pMark.
764 void lsmLogSeek(
765 lsm_db *pDb, /* Database handle */
766 LogMark *pMark /* Object containing log offset to seek to */
768 LogWriter *pLog;
770 if( pDb->bUseLog==0 ) return;
771 pLog = pDb->pLogWriter;
773 assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
774 if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
775 pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
776 pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
777 }else{
778 pLog->buf.n = pMark->nBuf;
779 memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
780 pLog->iCksumBuf = 0;
781 pLog->iOff = pMark->iOff - pMark->nBuf;
783 pLog->cksum0 = pMark->cksum0;
784 pLog->cksum1 = pMark->cksum1;
786 if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
787 if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
791 ** This function does the work for an lsm_info(LOG_STRUCTURE) request.
793 int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
794 int rc = LSM_OK;
795 char *zVal = 0;
797 /* If there is no read or write transaction open, read the latest
798 ** tree-header from shared-memory to report on. If necessary, update
799 ** it based on the contents of the database header.
801 ** No locks are taken here - these are passive read operations only.
803 if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
804 rc = lsmTreeLoadHeader(pDb, 0);
805 if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
808 if( rc==LSM_OK ){
809 DbLog *pLog = &pDb->treehdr.log;
810 zVal = lsmMallocPrintf(pDb->pEnv,
811 "%d %d %d %d %d %d",
812 (int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
813 (int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
814 (int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
816 if( !zVal ) rc = LSM_NOMEM_BKPT;
819 *pzVal = zVal;
820 return rc;
823 /*************************************************************************
824 ** Begin code for log recovery.
827 typedef struct LogReader LogReader;
828 struct LogReader {
829 FileSystem *pFS; /* File system to read from */
830 i64 iOff; /* File offset at end of buf content */
831 int iBuf; /* Current read offset in buf */
832 LsmString buf; /* Buffer containing file content */
834 int iCksumBuf; /* Offset in buf corresponding to cksum[01] */
835 u32 cksum0; /* Checksum 0 at offset iCksumBuf */
836 u32 cksum1; /* Checksum 1 at offset iCksumBuf */
839 static void logReaderBlob(
840 LogReader *p, /* Log reader object */
841 LsmString *pBuf, /* Dynamic storage, if required */
842 int nBlob, /* Number of bytes to read */
843 u8 **ppBlob, /* OUT: Pointer to blob read */
844 int *pRc /* IN/OUT: Error code */
846 static const int LOG_READ_SIZE = 512;
847 int rc = *pRc; /* Return code */
848 int nReq = nBlob; /* Bytes required */
850 while( rc==LSM_OK && nReq>0 ){
851 int nAvail; /* Bytes of data available in p->buf */
852 if( p->buf.n==p->iBuf ){
853 int nCksum; /* Total bytes requiring checksum */
854 int nCarry = 0; /* Total bytes requiring checksum */
856 nCksum = p->iBuf - p->iCksumBuf;
857 if( nCksum>0 ){
858 nCarry = nCksum % 8;
859 nCksum = ((nCksum / 8) * 8);
860 if( nCksum>0 ){
861 logCksumUnaligned(
862 &p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
866 if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
867 p->buf.n = nCarry;
868 p->iBuf = nCarry;
870 rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
871 if( rc!=LSM_OK ) break;
872 p->iCksumBuf = 0;
873 p->iOff += LOG_READ_SIZE;
876 nAvail = p->buf.n - p->iBuf;
877 if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
878 *ppBlob = (u8 *)&p->buf.z[p->iBuf];
879 p->iBuf += nBlob;
880 nReq = 0;
881 }else{
882 int nCopy = LSM_MIN(nAvail, nReq);
883 if( nBlob==nReq ){
884 pBuf->n = 0;
886 rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
887 nReq -= nCopy;
888 p->iBuf += nCopy;
889 if( nReq==0 && ppBlob ){
890 *ppBlob = (u8*)pBuf->z;
895 *pRc = rc;
898 static void logReaderVarint(
899 LogReader *p,
900 LsmString *pBuf,
901 int *piVal, /* OUT: Value read from log */
902 int *pRc /* IN/OUT: Error code */
904 if( *pRc==LSM_OK ){
905 u8 *aVarint;
906 if( p->buf.n==p->iBuf ){
907 logReaderBlob(p, 0, 10, &aVarint, pRc);
908 if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
909 }else{
910 logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
911 if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
916 static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
917 u8 *pPtr = 0;
918 logReaderBlob(p, 0, 1, &pPtr, pRc);
919 if( pPtr ) *pByte = *pPtr;
922 static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
923 if( *pRc==LSM_OK ){
924 u8 *pPtr = 0;
925 u32 cksum0, cksum1;
926 int nCksum = p->iBuf - p->iCksumBuf;
928 /* Update in-memory (expected) checksums */
929 assert( nCksum>=0 );
930 logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
931 p->iCksumBuf = p->iBuf + 8;
932 logReaderBlob(p, pBuf, 8, &pPtr, pRc);
933 assert( pPtr || *pRc );
935 /* Read the checksums from the log file. Set *pbEof if they do not match. */
936 if( pPtr ){
937 cksum0 = lsmGetU32(pPtr);
938 cksum1 = lsmGetU32(&pPtr[4]);
939 *pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
940 p->iCksumBuf = p->iBuf;
945 static void logReaderInit(
946 lsm_db *pDb, /* Database handle */
947 DbLog *pLog, /* Log object associated with pDb */
948 int bInitBuf, /* True if p->buf is uninitialized */
949 LogReader *p /* Initialize this LogReader object */
951 p->pFS = pDb->pFS;
952 p->iOff = pLog->aRegion[2].iStart;
953 p->cksum0 = pLog->cksum0;
954 p->cksum1 = pLog->cksum1;
955 if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
956 p->buf.n = 0;
957 p->iCksumBuf = 0;
958 p->iBuf = 0;
962 ** This function is called after reading the header of a LOG_DELETE or
963 ** LOG_WRITE record. Parameter nByte is the total size of the key and
964 ** value that follow the header just read. Return true if the size and
965 ** position of the record indicate that it should contain a checksum.
967 static int logRequireCksum(LogReader *p, int nByte){
968 return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
972 ** Recover the contents of the log file.
974 int lsmLogRecover(lsm_db *pDb){
975 LsmString buf1; /* Key buffer */
976 LsmString buf2; /* Value buffer */
977 LogReader reader; /* Log reader object */
978 int rc = LSM_OK; /* Return code */
979 int nCommit = 0; /* Number of transactions to recover */
980 int iPass;
981 int nJump = 0; /* Number of LSM_LOG_JUMP records in pass 0 */
982 DbLog *pLog;
983 int bOpen;
985 rc = lsmFsOpenLog(pDb, &bOpen);
986 if( rc!=LSM_OK ) return rc;
988 rc = lsmTreeInit(pDb);
989 if( rc!=LSM_OK ) return rc;
991 pLog = &pDb->treehdr.log;
992 lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);
994 logReaderInit(pDb, pLog, 1, &reader);
995 lsmStringInit(&buf1, pDb->pEnv);
996 lsmStringInit(&buf2, pDb->pEnv);
998 /* The outer for() loop runs at most twice. The first iteration is to
999 ** count the number of committed transactions in the log. The second
1000 ** iterates through those transactions and updates the in-memory tree
1001 ** structure with their contents. */
1002 if( bOpen ){
1003 for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
1004 int bEof = 0;
1006 while( rc==LSM_OK && !bEof ){
1007 u8 eType = 0;
1008 logReaderByte(&reader, &eType, &rc);
1010 switch( eType ){
1011 case LSM_LOG_PAD1:
1012 break;
1014 case LSM_LOG_PAD2: {
1015 int nPad;
1016 logReaderVarint(&reader, &buf1, &nPad, &rc);
1017 logReaderBlob(&reader, &buf1, nPad, 0, &rc);
1018 break;
1021 case LSM_LOG_DRANGE:
1022 case LSM_LOG_DRANGE_CKSUM:
1023 case LSM_LOG_WRITE:
1024 case LSM_LOG_WRITE_CKSUM: {
1025 int nKey;
1026 int nVal;
1027 u8 *aVal;
1028 logReaderVarint(&reader, &buf1, &nKey, &rc);
1029 logReaderVarint(&reader, &buf2, &nVal, &rc);
1031 if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
1032 logReaderCksum(&reader, &buf1, &bEof, &rc);
1033 }else{
1034 bEof = logRequireCksum(&reader, nKey+nVal);
1036 if( bEof ) break;
1038 logReaderBlob(&reader, &buf1, nKey, 0, &rc);
1039 logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
1040 if( iPass==1 && rc==LSM_OK ){
1041 if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
1042 rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
1043 }else{
1044 rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
1047 break;
1050 case LSM_LOG_DELETE:
1051 case LSM_LOG_DELETE_CKSUM: {
1052 int nKey; u8 *aKey;
1053 logReaderVarint(&reader, &buf1, &nKey, &rc);
1055 if( eType==LSM_LOG_DELETE_CKSUM ){
1056 logReaderCksum(&reader, &buf1, &bEof, &rc);
1057 }else{
1058 bEof = logRequireCksum(&reader, nKey);
1060 if( bEof ) break;
1062 logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
1063 if( iPass==1 && rc==LSM_OK ){
1064 rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
1066 break;
1069 case LSM_LOG_COMMIT:
1070 logReaderCksum(&reader, &buf1, &bEof, &rc);
1071 if( bEof==0 ){
1072 nCommit++;
1073 assert( nCommit>0 || iPass==1 );
1074 if( nCommit==0 ) bEof = 1;
1076 break;
1078 case LSM_LOG_JUMP: {
1079 int iOff = 0;
1080 logReaderVarint(&reader, &buf1, &iOff, &rc);
1081 if( rc==LSM_OK ){
1082 if( iPass==1 ){
1083 if( pLog->aRegion[2].iStart==0 ){
1084 assert( pLog->aRegion[1].iStart==0 );
1085 pLog->aRegion[1].iEnd = reader.iOff;
1086 }else{
1087 assert( pLog->aRegion[0].iStart==0 );
1088 pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
1089 pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
1091 pLog->aRegion[2].iStart = iOff;
1092 }else{
1093 if( (nJump++)==2 ){
1094 bEof = 1;
1098 reader.iOff = iOff;
1099 reader.buf.n = reader.iBuf;
1101 break;
1104 default:
1105 /* Including LSM_LOG_EOF */
1106 bEof = 1;
1107 break;
1111 if( rc==LSM_OK && iPass==0 ){
1112 if( nCommit==0 ){
1113 if( pLog->aRegion[2].iStart==0 ){
1114 iPass = 1;
1115 }else{
1116 pLog->aRegion[2].iStart = 0;
1117 iPass = -1;
1118 lsmCheckpointZeroLogoffset(pDb);
1121 logReaderInit(pDb, pLog, 0, &reader);
1122 nCommit = nCommit * -1;
1127 /* Initialize DbLog object */
1128 if( rc==LSM_OK ){
1129 pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
1130 pLog->cksum0 = reader.cksum0;
1131 pLog->cksum1 = reader.cksum1;
1134 if( rc==LSM_OK ){
1135 rc = lsmFinishRecovery(pDb);
1136 }else{
1137 lsmFinishRecovery(pDb);
1140 if( pDb->bRoTrans ){
1141 lsmFsCloseLog(pDb);
1144 lsmStringClear(&buf1);
1145 lsmStringClear(&buf2);
1146 lsmStringClear(&reader.buf);
1147 return rc;
1150 void lsmLogClose(lsm_db *db){
1151 if( db->pLogWriter ){
1152 lsmFree(db->pEnv, db->pLogWriter->buf.z);
1153 lsmFree(db->pEnv, db->pLogWriter);
1154 db->pLogWriter = 0;