4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
15 ** The maximum page size is 65536 bytes.
17 ** Since all records are equal to or larger than 2 bytes in size, and
18 ** some space within the page is consumed by the page footer, there must
19 ** be less than 2^15 records on each page.
21 ** Each page ends with a footer that describes the pages contents. This
22 ** footer serves as similar purpose to the page header in an SQLite database.
23 ** A footer is used instead of a header because it makes it easier to
24 ** populate a new page based on a sorted list of key/value pairs.
26 ** The footer consists of the following values (starting at the end of
27 ** the page and continuing backwards towards the start). All values are
28 ** stored as unsigned big-endian integers.
30 ** * Number of records on page (2 bytes).
31 ** * Flags field (2 bytes).
32 ** * Left-hand pointer value (8 bytes).
33 ** * The starting offset of each record (2 bytes per record).
35 ** Records may span pages. Unless it happens to be an exact fit, the part
36 ** of the final record that starts on page X that does not fit on page X
37 ** is stored at the start of page (X+1). This means there may be pages where
38 ** (N==0). And on most pages the first record that starts on the page will
39 ** not start at byte offset 0. For example:
41 ** aaaaa bbbbb ccc <footer> cc eeeee fffff g <footer> gggg....
45 ** The first byte of the record is a flags byte. It is a combination
46 ** of the following flags (defined in lsmInt.h):
55 ** Immediately following the type byte is a pointer to the smallest key
56 ** in the next file that is larger than the key in the current record. The
57 ** pointer is encoded as a varint. When added to the 32-bit page number
58 ** stored in the footer, it is the page number of the page that contains the
59 ** smallest key in the next sorted file that is larger than this key.
61 ** Next is the number of bytes in the key, encoded as a varint.
63 ** If the LSM_INSERT flag is set, the number of bytes in the value, as
66 ** Finally, the blob of data containing the key, and for LSM_INSERT
67 ** records, the value as well.
74 #define LSM_LOG_STRUCTURE 0
75 #define LSM_LOG_DATA 0
78 ** Macros to help decode record types.
80 #define rtTopic(eType) ((eType) & LSM_SYSTEMKEY)
81 #define rtIsDelete(eType) (((eType) & 0x0F)==LSM_POINT_DELETE)
83 #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
84 #define rtIsWrite(eType) (((eType) & LSM_INSERT)!=0)
85 #define rtIsSystem(eType) (((eType) & LSM_SYSTEMKEY)!=0)
88 ** The following macros are used to access a page footer.
90 #define SEGMENT_NRECORD_OFFSET(pgsz) ((pgsz) - 2)
91 #define SEGMENT_FLAGS_OFFSET(pgsz) ((pgsz) - 2 - 2)
92 #define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 8)
93 #define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)
95 #define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry-1)
97 #define SEGMENT_BTREE_FLAG 0x0001
98 #define PGFTR_SKIP_NEXT_FLAG 0x0002
99 #define PGFTR_SKIP_THIS_FLAG 0x0004
102 #ifndef LSM_SEGMENTPTR_FREE_THRESHOLD
103 # define LSM_SEGMENTPTR_FREE_THRESHOLD 1024
106 typedef struct SegmentPtr SegmentPtr
;
107 typedef struct LsmBlob LsmBlob
;
117 ** A SegmentPtr object may be used for one of two purposes:
119 ** * To iterate and/or seek within a single Segment (the combination of a
120 ** main run and an optional sorted run).
122 ** * To iterate through the separators array of a segment.
125 Level
*pLevel
; /* Level object segment is part of */
126 Segment
*pSeg
; /* Segment to access */
128 /* Current page. See segmentPtrLoadPage(). */
129 Page
*pPg
; /* Current page */
130 u16 flags
; /* Copy of page flags field */
131 int nCell
; /* Number of cells on pPg */
132 LsmPgno iPtr
; /* Base cascade pointer */
134 /* Current cell. See segmentPtrLoadCell() */
135 int iCell
; /* Current record within page pPg */
136 int eType
; /* Type of current record */
137 LsmPgno iPgPtr
; /* Cascade pointer offset */
138 void *pKey
; int nKey
; /* Key associated with current record */
139 void *pVal
; int nVal
; /* Current record value (eType==WRITE only) */
141 /* Blobs used to allocate buffers for pKey and pVal as required */
147 ** Used to iterate through the keys stored in a b-tree hierarchy from start
148 ** to finish. Only First() and Next() operations are required.
151 ** btreeCursorFirst()
154 ** btreeCursorPosition()
155 ** btreeCursorRestore()
157 typedef struct BtreePg BtreePg
;
158 typedef struct BtreeCursor BtreeCursor
;
164 Segment
*pSeg
; /* Iterate through this segments btree */
165 FileSystem
*pFS
; /* File system to read pages from */
166 int nDepth
; /* Allocated size of aPg[] */
167 int iPg
; /* Current entry in aPg[]. -1 -> EOF. */
168 BtreePg
*aPg
; /* Pages from root to current location */
170 /* Cache of current entry. pKey==0 for EOF. */
176 /* Storage for key, if not local */
182 ** A cursor used for merged searches or iterations through up to one
183 ** Tree structure and any number of sorted files.
196 ** This variable is only used by cursors providing input data for a
197 ** new top-level segment. Such cursors only ever iterate forwards, not
201 lsm_db
*pDb
; /* Connection that owns this cursor */
202 MultiCursor
*pNext
; /* Next cursor owned by connection pDb */
203 int flags
; /* Mask of CURSOR_XXX flags */
205 int eType
; /* Cache of current key type */
206 LsmBlob key
; /* Cache of current key (or NULL) */
207 LsmBlob val
; /* Cache of current value */
209 /* All the component cursors: */
210 TreeCursor
*apTreeCsr
[2]; /* Up to two tree cursors */
211 int iFree
; /* Next element of free-list (-ve for eof) */
212 SegmentPtr
*aPtr
; /* Array of segment pointers */
213 int nPtr
; /* Size of array aPtr[] */
214 BtreeCursor
*pBtCsr
; /* b-tree cursor (db writes only) */
216 /* Comparison results */
217 int nTree
; /* Size of aTree[] array */
218 int *aTree
; /* Array of comparison results */
220 /* Used by cursors flushing the in-memory tree only */
221 void *pSystemVal
; /* Pointer to buffer to free */
223 /* Used by worker cursors only */
224 LsmPgno
*pPrevMergePtr
;
228 ** The following constants are used to assign integers to each component
229 ** cursor of a multi-cursor.
231 #define CURSOR_DATA_TREE0 0 /* Current tree cursor (apTreeCsr[0]) */
232 #define CURSOR_DATA_TREE1 1 /* The "old" tree, if any (apTreeCsr[1]) */
233 #define CURSOR_DATA_SYSTEM 2 /* Free-list entries (new-toplevel only) */
234 #define CURSOR_DATA_SEGMENT 3 /* First segment pointer (aPtr[0]) */
237 ** CURSOR_IGNORE_DELETE
238 ** If set, this cursor will not visit SORTED_DELETE keys.
240 ** CURSOR_FLUSH_FREELIST
241 ** This cursor is being used to create a new toplevel. It should also
242 ** iterate through the contents of the in-memory free block list.
244 ** CURSOR_IGNORE_SYSTEM
245 ** If set, this cursor ignores system keys.
248 ** Set if it is Ok to call lsm_csr_next().
251 ** Set if it is Ok to call lsm_csr_prev().
253 ** CURSOR_READ_SEPARATORS
254 ** Set if this cursor should visit the separator keys in segment
258 ** Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
259 ** The key and value are stored in MultiCursor.key and MultiCursor.val
262 #define CURSOR_IGNORE_DELETE 0x00000001
263 #define CURSOR_FLUSH_FREELIST 0x00000002
264 #define CURSOR_IGNORE_SYSTEM 0x00000010
265 #define CURSOR_NEXT_OK 0x00000020
266 #define CURSOR_PREV_OK 0x00000040
267 #define CURSOR_READ_SEPARATORS 0x00000080
268 #define CURSOR_SEEK_EQ 0x00000100
270 typedef struct MergeWorker MergeWorker
;
271 typedef struct Hierarchy Hierarchy
;
280 ** When mergeWorkerNextPage() is called to advance to the next page in
281 ** the output segment, if the bStore flag for an element of aSave[] is
282 ** true, it is cleared and the corresponding iPgno value is set to the
283 ** page number of the page just completed.
285 ** aSave[0] is used to record the pointer value to be pushed into the
286 ** b-tree hierarchy. aSave[1] is used to save the page number of the
287 ** page containing the indirect key most recently written to the b-tree.
288 ** see mergeWorkerPushHierarchy() for details.
291 lsm_db
*pDb
; /* Database handle */
292 Level
*pLevel
; /* Worker snapshot Level being merged */
293 MultiCursor
*pCsr
; /* Cursor to read new segment contents from */
294 int bFlush
; /* True if this is an in-memory tree flush */
295 Hierarchy hier
; /* B-tree hierarchy under construction */
296 Page
*pPage
; /* Current output page */
297 int nWork
; /* Number of calls to mergeWorkerNextPage() */
298 LsmPgno
*aGobble
; /* Gobble point for each input segment */
307 #ifdef LSM_DEBUG_EXPENSIVE
308 static int assertPointersOk(lsm_db
*, Segment
*, Segment
*, int);
309 static int assertBtreeOk(lsm_db
*, Segment
*);
310 static void assertRunInOrder(lsm_db
*pDb
, Segment
*pSeg
);
312 #define assertRunInOrder(x,y)
313 #define assertBtreeOk(x,y)
317 struct FilePage
{ u8
*aData
; int nData
; };
318 static u8
*fsPageData(Page
*pPg
, int *pnData
){
319 *pnData
= ((struct FilePage
*)(pPg
))->nData
;
320 return ((struct FilePage
*)(pPg
))->aData
;
322 /*UNUSED static u8 *fsPageDataPtr(Page *pPg){
323 return ((struct FilePage *)(pPg))->aData;
327 ** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut.
329 void lsmPutU16(u8
*aOut
, u16 nVal
){
330 aOut
[0] = (u8
)((nVal
>>8) & 0xFF);
331 aOut
[1] = (u8
)(nVal
& 0xFF);
334 void lsmPutU32(u8
*aOut
, u32 nVal
){
335 aOut
[0] = (u8
)((nVal
>>24) & 0xFF);
336 aOut
[1] = (u8
)((nVal
>>16) & 0xFF);
337 aOut
[2] = (u8
)((nVal
>> 8) & 0xFF);
338 aOut
[3] = (u8
)((nVal
) & 0xFF);
341 int lsmGetU16(u8
*aOut
){
342 return (aOut
[0] << 8) + aOut
[1];
345 u32
lsmGetU32(u8
*aOut
){
346 return ((u32
)aOut
[0] << 24)
347 + ((u32
)aOut
[1] << 16)
348 + ((u32
)aOut
[2] << 8)
352 u64
lsmGetU64(u8
*aOut
){
353 return ((u64
)aOut
[0] << 56)
354 + ((u64
)aOut
[1] << 48)
355 + ((u64
)aOut
[2] << 40)
356 + ((u64
)aOut
[3] << 32)
357 + ((u64
)aOut
[4] << 24)
358 + ((u32
)aOut
[5] << 16)
359 + ((u32
)aOut
[6] << 8)
363 void lsmPutU64(u8
*aOut
, u64 nVal
){
364 aOut
[0] = (u8
)((nVal
>>56) & 0xFF);
365 aOut
[1] = (u8
)((nVal
>>48) & 0xFF);
366 aOut
[2] = (u8
)((nVal
>>40) & 0xFF);
367 aOut
[3] = (u8
)((nVal
>>32) & 0xFF);
368 aOut
[4] = (u8
)((nVal
>>24) & 0xFF);
369 aOut
[5] = (u8
)((nVal
>>16) & 0xFF);
370 aOut
[6] = (u8
)((nVal
>> 8) & 0xFF);
371 aOut
[7] = (u8
)((nVal
) & 0xFF);
374 static int sortedBlobGrow(lsm_env
*pEnv
, LsmBlob
*pBlob
, int nData
){
375 assert( pBlob
->pEnv
==pEnv
|| (pBlob
->pEnv
==0 && pBlob
->pData
==0) );
376 if( pBlob
->nAlloc
<nData
){
377 pBlob
->pData
= lsmReallocOrFree(pEnv
, pBlob
->pData
, nData
);
378 if( !pBlob
->pData
) return LSM_NOMEM_BKPT
;
379 pBlob
->nAlloc
= nData
;
385 static int sortedBlobSet(lsm_env
*pEnv
, LsmBlob
*pBlob
, void *pData
, int nData
){
386 if( sortedBlobGrow(pEnv
, pBlob
, nData
) ) return LSM_NOMEM
;
387 memcpy(pBlob
->pData
, pData
, nData
);
388 pBlob
->nData
= nData
;
393 static int sortedBlobCopy(LsmBlob
*pDest
, LsmBlob
*pSrc
){
394 return sortedBlobSet(pDest
, pSrc
->pData
, pSrc
->nData
);
398 static void sortedBlobFree(LsmBlob
*pBlob
){
399 assert( pBlob
->pEnv
|| pBlob
->pData
==0 );
400 if( pBlob
->pData
) lsmFree(pBlob
->pEnv
, pBlob
->pData
);
401 memset(pBlob
, 0, sizeof(LsmBlob
));
404 static int sortedReadData(
418 aData
= fsPageData(pPg
, &nData
);
419 nCell
= lsmGetU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)]);
420 iEnd
= SEGMENT_EOF(nData
, nCell
);
421 assert( iEnd
>0 && iEnd
<nData
);
423 if( iOff
+nByte
<=iEnd
){
424 *ppData
= (void *)&aData
[iOff
];
430 /* Make sure the blob is big enough to store the value being loaded. */
431 rc
= sortedBlobGrow(lsmPageEnv(pPg
), pBlob
, nByte
);
432 if( rc
!=LSM_OK
) return rc
;
433 pBlob
->nData
= nByte
;
434 aDest
= (u8
*)pBlob
->pData
;
435 *ppData
= pBlob
->pData
;
437 /* Increment the pointer pages ref-count. */
444 /* Copy data from pPg into the output buffer. */
445 int nCopy
= LSM_MIN(nRem
, iEnd
-i
);
447 memcpy(&aDest
[nByte
-nRem
], &aData
[i
], nCopy
);
450 assert( nRem
==0 || i
==iEnd
);
456 /* Grab the next page in the segment */
459 rc
= lsmFsDbPageNext(pSeg
, pPg
, 1, &pNext
);
460 if( rc
==LSM_OK
&& pNext
==0 ){
461 rc
= LSM_CORRUPT_BKPT
;
464 lsmFsPageRelease(pPg
);
466 aData
= fsPageData(pPg
, &nData
);
467 flags
= lsmGetU16(&aData
[SEGMENT_FLAGS_OFFSET(nData
)]);
468 }while( flags
&SEGMENT_BTREE_FLAG
);
470 iEnd
= SEGMENT_EOF(nData
, lsmGetU16(&aData
[nData
-2]));
471 assert( iEnd
>0 && iEnd
<nData
);
474 lsmFsPageRelease(pPg
);
480 static int pageGetNRec(u8
*aData
, int nData
){
481 return (int)lsmGetU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)]);
484 static LsmPgno
pageGetPtr(u8
*aData
, int nData
){
485 return (LsmPgno
)lsmGetU64(&aData
[SEGMENT_POINTER_OFFSET(nData
)]);
488 static int pageGetFlags(u8
*aData
, int nData
){
489 return (int)lsmGetU16(&aData
[SEGMENT_FLAGS_OFFSET(nData
)]);
492 static u8
*pageGetCell(u8
*aData
, int nData
, int iCell
){
493 return &aData
[lsmGetU16(&aData
[SEGMENT_CELLPTR_OFFSET(nData
, iCell
)])];
497 ** Return the number of cells on page pPg.
499 static int pageObjGetNRec(Page
*pPg
){
501 u8
*aData
= lsmFsPageData(pPg
, &nData
);
502 return pageGetNRec(aData
, nData
);
506 ** Return the decoded (possibly relative) pointer value stored in cell
507 ** iCell from page aData/nData.
509 static LsmPgno
pageGetRecordPtr(u8
*aData
, int nData
, int iCell
){
510 LsmPgno iRet
; /* Return value */
511 u8
*aCell
; /* Pointer to cell iCell */
513 assert( iCell
<pageGetNRec(aData
, nData
) && iCell
>=0 );
514 aCell
= pageGetCell(aData
, nData
, iCell
);
515 lsmVarintGet64(&aCell
[1], &iRet
);
519 static u8
*pageGetKey(
520 Segment
*pSeg
, /* Segment pPg belongs to */
521 Page
*pPg
, /* Page to read from */
522 int iCell
, /* Index of cell on page to read */
523 int *piTopic
, /* OUT: Topic associated with this key */
524 int *pnKey
, /* OUT: Size of key in bytes */
525 LsmBlob
*pBlob
/* If required, use this for dynamic memory */
533 aData
= fsPageData(pPg
, &nData
);
535 assert( !(pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
) );
536 assert( iCell
<pageGetNRec(aData
, nData
) );
538 pKey
= pageGetCell(aData
, nData
, iCell
);
540 pKey
+= lsmVarintGet32(pKey
, &nDummy
);
541 pKey
+= lsmVarintGet32(pKey
, pnKey
);
542 if( rtIsWrite(eType
) ){
543 pKey
+= lsmVarintGet32(pKey
, &nDummy
);
545 *piTopic
= rtTopic(eType
);
547 sortedReadData(pSeg
, pPg
, pKey
-aData
, *pnKey
, (void **)&pKey
, pBlob
);
551 static int pageGetKeyCopy(
552 lsm_env
*pEnv
, /* Environment handle */
553 Segment
*pSeg
, /* Segment pPg belongs to */
554 Page
*pPg
, /* Page to read from */
555 int iCell
, /* Index of cell on page to read */
556 int *piTopic
, /* OUT: Topic associated with this key */
557 LsmBlob
*pBlob
/* If required, use this for dynamic memory */
563 aKey
= pageGetKey(pSeg
, pPg
, iCell
, piTopic
, &nKey
, pBlob
);
564 assert( (void *)aKey
!=pBlob
->pData
|| nKey
==pBlob
->nData
);
565 if( (void *)aKey
!=pBlob
->pData
){
566 rc
= sortedBlobSet(pEnv
, pBlob
, aKey
, nKey
);
572 static LsmPgno
pageGetBtreeRef(Page
*pPg
, int iKey
){
578 aData
= fsPageData(pPg
, &nData
);
579 aCell
= pageGetCell(aData
, nData
, iKey
);
580 assert( aCell
[0]==0 );
582 aCell
+= lsmVarintGet64(aCell
, &iRef
);
583 lsmVarintGet64(aCell
, &iRef
);
588 #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
589 #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
591 static int pageGetBtreeKey(
592 Segment
*pSeg
, /* Segment page pPg belongs to */
606 aData
= fsPageData(pPg
, &nData
);
607 assert( SEGMENT_BTREE_FLAG
& pageGetFlags(aData
, nData
) );
608 assert( iKey
>=0 && iKey
<pageGetNRec(aData
, nData
) );
610 aCell
= pageGetCell(aData
, nData
, iKey
);
612 aCell
+= GETVARINT64(aCell
, *piPtr
);
616 LsmPgno iRef
; /* Page number of referenced page */
618 aCell
+= GETVARINT64(aCell
, iRef
);
619 rc
= lsmFsDbPageGet(lsmPageFS(pPg
), pSeg
, iRef
, &pRef
);
620 if( rc
!=LSM_OK
) return rc
;
621 pageGetKeyCopy(lsmPageEnv(pPg
), pSeg
, pRef
, 0, &eType
, pBlob
);
622 lsmFsPageRelease(pRef
);
623 *ppKey
= pBlob
->pData
;
624 *pnKey
= pBlob
->nData
;
626 aCell
+= GETVARINT32(aCell
, *pnKey
);
629 if( piTopic
) *piTopic
= rtTopic(eType
);
634 static int btreeCursorLoadKey(BtreeCursor
*pCsr
){
643 int iCell
= pCsr
->aPg
[iPg
].iCell
;
644 while( iCell
<0 && (--iPg
)>=0 ){
645 iCell
= pCsr
->aPg
[iPg
].iCell
-1;
647 if( iPg
<0 || iCell
<0 ) return LSM_CORRUPT_BKPT
;
649 rc
= pageGetBtreeKey(
651 pCsr
->aPg
[iPg
].pPage
, iCell
,
652 &dummy
, &pCsr
->eType
, &pCsr
->pKey
, &pCsr
->nKey
, &pCsr
->blob
654 pCsr
->eType
|= LSM_SEPARATOR
;
660 static int btreeCursorPtr(u8
*aData
, int nData
, int iCell
){
663 nCell
= pageGetNRec(aData
, nData
);
665 return (int)pageGetPtr(aData
, nData
);
667 return (int)pageGetRecordPtr(aData
, nData
, iCell
);
670 static int btreeCursorNext(BtreeCursor
*pCsr
){
673 BtreePg
*pPg
= &pCsr
->aPg
[pCsr
->iPg
];
678 assert( pCsr
->iPg
>=0 );
679 assert( pCsr
->iPg
==pCsr
->nDepth
-1 );
681 aData
= fsPageData(pPg
->pPage
, &nData
);
682 nCell
= pageGetNRec(aData
, nData
);
683 assert( pPg
->iCell
<=nCell
);
685 if( pPg
->iCell
==nCell
){
689 lsmFsPageRelease(pPg
->pPage
);
692 while( pCsr
->iPg
>=0 ){
693 pPg
= &pCsr
->aPg
[pCsr
->iPg
];
694 aData
= fsPageData(pPg
->pPage
, &nData
);
695 if( pPg
->iCell
<pageGetNRec(aData
, nData
) ) break;
696 lsmFsPageRelease(pPg
->pPage
);
701 rc
= btreeCursorLoadKey(pCsr
);
703 /* Unless the cursor is at EOF, descend to cell -1 (yes, negative one) of
704 ** the left-most most descendent. */
706 pCsr
->aPg
[pCsr
->iPg
].iCell
++;
708 iLoad
= btreeCursorPtr(aData
, nData
, pPg
->iCell
);
712 rc
= lsmFsDbPageGet(pCsr
->pFS
, pCsr
->pSeg
, iLoad
, &pLoad
);
713 pCsr
->aPg
[pCsr
->iPg
].pPage
= pLoad
;
714 pCsr
->aPg
[pCsr
->iPg
].iCell
= 0;
716 if( pCsr
->iPg
==(pCsr
->nDepth
-1) ) break;
717 aData
= fsPageData(pLoad
, &nData
);
718 iLoad
= btreeCursorPtr(aData
, nData
, 0);
720 }while( rc
==LSM_OK
&& pCsr
->iPg
<(pCsr
->nDepth
-1) );
721 pCsr
->aPg
[pCsr
->iPg
].iCell
= -1;
725 rc
= btreeCursorLoadKey(pCsr
);
728 if( rc
==LSM_OK
&& pCsr
->iPg
>=0 ){
729 aData
= fsPageData(pCsr
->aPg
[pCsr
->iPg
].pPage
, &nData
);
730 pCsr
->iPtr
= btreeCursorPtr(aData
, nData
, pCsr
->aPg
[pCsr
->iPg
].iCell
+1);
736 static void btreeCursorFree(BtreeCursor
*pCsr
){
739 lsm_env
*pEnv
= lsmFsEnv(pCsr
->pFS
);
740 for(i
=0; i
<=pCsr
->iPg
; i
++){
741 lsmFsPageRelease(pCsr
->aPg
[i
].pPage
);
743 sortedBlobFree(&pCsr
->blob
);
744 lsmFree(pEnv
, pCsr
->aPg
);
749 static int btreeCursorFirst(BtreeCursor
*pCsr
){
753 FileSystem
*pFS
= pCsr
->pFS
;
754 int iPg
= (int)pCsr
->pSeg
->iRoot
;
757 rc
= lsmFsDbPageGet(pFS
, pCsr
->pSeg
, iPg
, &pPg
);
758 assert( (rc
==LSM_OK
)==(pPg
!=0) );
764 aData
= fsPageData(pPg
, &nData
);
765 flags
= pageGetFlags(aData
, nData
);
766 if( (flags
& SEGMENT_BTREE_FLAG
)==0 ) break;
768 if( (pCsr
->nDepth
% 8)==0 ){
769 int nNew
= pCsr
->nDepth
+ 8;
770 pCsr
->aPg
= (BtreePg
*)lsmReallocOrFreeRc(
771 lsmFsEnv(pFS
), pCsr
->aPg
, sizeof(BtreePg
) * nNew
, &rc
774 memset(&pCsr
->aPg
[pCsr
->nDepth
], 0, sizeof(BtreePg
) * 8);
779 assert( pCsr
->aPg
[pCsr
->nDepth
].iCell
==0 );
780 pCsr
->aPg
[pCsr
->nDepth
].pPage
= pPg
;
782 iPg
= (int)pageGetRecordPtr(aData
, nData
, 0);
785 }while( rc
==LSM_OK
);
786 lsmFsPageRelease(pPg
);
787 pCsr
->iPg
= pCsr
->nDepth
-1;
789 if( rc
==LSM_OK
&& pCsr
->nDepth
){
790 pCsr
->aPg
[pCsr
->iPg
].iCell
= -1;
791 rc
= btreeCursorNext(pCsr
);
797 static void btreeCursorPosition(BtreeCursor
*pCsr
, MergeInput
*p
){
799 p
->iPg
= lsmFsPageNumber(pCsr
->aPg
[pCsr
->iPg
].pPage
);
800 p
->iCell
= ((pCsr
->aPg
[pCsr
->iPg
].iCell
+ 1) << 8) + pCsr
->nDepth
;
807 static void btreeCursorSplitkey(BtreeCursor
*pCsr
, MergeInput
*p
){
808 int iCell
= pCsr
->aPg
[pCsr
->iPg
].iCell
;
811 p
->iPg
= lsmFsPageNumber(pCsr
->aPg
[pCsr
->iPg
].pPage
);
814 for(i
=pCsr
->iPg
-1; i
>=0; i
--){
815 if( pCsr
->aPg
[i
].iCell
>0 ) break;
818 p
->iCell
= pCsr
->aPg
[i
].iCell
-1;
819 p
->iPg
= lsmFsPageNumber(pCsr
->aPg
[i
].pPage
);
823 static int sortedKeyCompare(
824 int (*xCmp
)(void *, int, void *, int),
825 int iLhsTopic
, void *pLhsKey
, int nLhsKey
,
826 int iRhsTopic
, void *pRhsKey
, int nRhsKey
828 int res
= iLhsTopic
- iRhsTopic
;
830 res
= xCmp(pLhsKey
, nLhsKey
, pRhsKey
, nRhsKey
);
835 static int btreeCursorRestore(
837 int (*xCmp
)(void *, int, void *, int),
843 lsm_env
*pEnv
= lsmFsEnv(pCsr
->pFS
);
844 int iCell
; /* Current cell number on leaf page */
845 LsmPgno iLeaf
; /* Page number of current leaf page */
846 int nDepth
; /* Depth of b-tree structure */
847 Segment
*pSeg
= pCsr
->pSeg
;
849 /* Decode the MergeInput structure */
851 nDepth
= (p
->iCell
& 0x00FF);
852 iCell
= (p
->iCell
>> 8) - 1;
854 /* Allocate the BtreeCursor.aPg[] array */
855 assert( pCsr
->aPg
==0 );
856 pCsr
->aPg
= (BtreePg
*)lsmMallocZeroRc(pEnv
, sizeof(BtreePg
) * nDepth
, &rc
);
858 /* Populate the last entry of the aPg[] array */
860 Page
**pp
= &pCsr
->aPg
[nDepth
-1].pPage
;
861 pCsr
->iPg
= nDepth
-1;
862 pCsr
->nDepth
= nDepth
;
863 pCsr
->aPg
[pCsr
->iPg
].iCell
= iCell
;
864 rc
= lsmFsDbPageGet(pCsr
->pFS
, pSeg
, iLeaf
, pp
);
867 /* Populate any other aPg[] array entries */
868 if( rc
==LSM_OK
&& nDepth
>1 ){
869 LsmBlob blob
= {0,0,0};
874 int iLoad
= (int)pSeg
->iRoot
;
875 Page
*pPg
= pCsr
->aPg
[nDepth
-1].pPage
;
877 if( pageObjGetNRec(pPg
)==0 ){
878 /* This can happen when pPg is the right-most leaf in the b-tree.
879 ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
880 ** greater than any real key. */
887 rc
= pageGetBtreeKey(pSeg
, pPg
,
888 0, &dummy
, &iTopicSeek
, &pSeek
, &nSeek
, &pCsr
->blob
894 rc
= lsmFsDbPageGet(pCsr
->pFS
, pSeg
, iLoad
, &pPg2
);
895 assert( rc
==LSM_OK
|| pPg2
==0 );
897 u8
*aData
; /* Buffer containing page data */
898 int nData
; /* Size of aData[] in bytes */
903 aData
= fsPageData(pPg2
, &nData
);
904 assert( (pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
) );
906 iLoad
= (int)pageGetPtr(aData
, nData
);
907 iCell2
= pageGetNRec(aData
, nData
);
912 int iTry
= (iMin
+iMax
)/2;
913 void *pKey
; int nKey
; /* Key for cell iTry */
914 int iTopic
; /* Topic for key pKeyT/nKeyT */
915 LsmPgno iPtr
; /* Pointer for cell iTry */
916 int res
; /* (pSeek - pKeyT) */
918 rc
= pageGetBtreeKey(
919 pSeg
, pPg2
, iTry
, &iPtr
, &iTopic
, &pKey
, &nKey
, &blob
921 if( rc
!=LSM_OK
) break;
923 res
= sortedKeyCompare(
924 xCmp
, iTopicSeek
, pSeek
, nSeek
, iTopic
, pKey
, nKey
937 pCsr
->aPg
[iPg
].pPage
= pPg2
;
938 pCsr
->aPg
[iPg
].iCell
= iCell2
;
940 assert( iPg
!=nDepth
-1
941 || lsmFsRedirectPage(pCsr
->pFS
, pSeg
->pRedirect
, iLoad
)==iLeaf
944 }while( rc
==LSM_OK
&& iPg
<(nDepth
-1) );
945 sortedBlobFree(&blob
);
948 /* Load the current key and pointer */
954 pBtreePg
= &pCsr
->aPg
[pCsr
->iPg
];
955 aData
= fsPageData(pBtreePg
->pPage
, &nData
);
956 pCsr
->iPtr
= btreeCursorPtr(aData
, nData
, pBtreePg
->iCell
+1);
957 if( pBtreePg
->iCell
<0 ){
960 for(i
=pCsr
->iPg
-1; i
>=0; i
--){
961 if( pCsr
->aPg
[i
].iCell
>0 ) break;
964 rc
= pageGetBtreeKey(pSeg
,
965 pCsr
->aPg
[i
].pPage
, pCsr
->aPg
[i
].iCell
-1,
966 &dummy
, &pCsr
->eType
, &pCsr
->pKey
, &pCsr
->nKey
, &pCsr
->blob
968 pCsr
->eType
|= LSM_SEPARATOR
;
971 rc
= btreeCursorLoadKey(pCsr
);
978 static int btreeCursorNew(
986 assert( pSeg
->iRoot
);
987 pCsr
= lsmMallocZeroRc(pDb
->pEnv
, sizeof(BtreeCursor
), &rc
);
989 pCsr
->pFS
= pDb
->pFS
;
998 static void segmentPtrSetPage(SegmentPtr
*pPtr
, Page
*pNext
){
999 lsmFsPageRelease(pPtr
->pPg
);
1002 u8
*aData
= fsPageData(pNext
, &nData
);
1003 pPtr
->nCell
= pageGetNRec(aData
, nData
);
1004 pPtr
->flags
= (u16
)pageGetFlags(aData
, nData
);
1005 pPtr
->iPtr
= pageGetPtr(aData
, nData
);
1011 ** Load a new page into the SegmentPtr object pPtr.
1013 static int segmentPtrLoadPage(
1015 SegmentPtr
*pPtr
, /* Load page into this SegmentPtr object */
1016 int iNew
/* Page number of new page */
1018 Page
*pPg
= 0; /* The new page */
1019 int rc
; /* Return Code */
1021 rc
= lsmFsDbPageGet(pFS
, pPtr
->pSeg
, iNew
, &pPg
);
1022 assert( rc
==LSM_OK
|| pPg
==0 );
1023 segmentPtrSetPage(pPtr
, pPg
);
1028 static int segmentPtrReadData(
1035 return sortedReadData(pPtr
->pSeg
, pPtr
->pPg
, iOff
, nByte
, ppData
, pBlob
);
1038 static int segmentPtrNextPage(
1039 SegmentPtr
*pPtr
, /* Load page into this SegmentPtr object */
1040 int eDir
/* +1 for next(), -1 for prev() */
1042 Page
*pNext
; /* New page to load */
1043 int rc
; /* Return code */
1045 assert( eDir
==1 || eDir
==-1 );
1046 assert( pPtr
->pPg
);
1047 assert( pPtr
->pSeg
|| eDir
>0 );
1049 rc
= lsmFsDbPageNext(pPtr
->pSeg
, pPtr
->pPg
, eDir
, &pNext
);
1050 assert( rc
==LSM_OK
|| pNext
==0 );
1051 segmentPtrSetPage(pPtr
, pNext
);
1055 static int segmentPtrLoadCell(
1056 SegmentPtr
*pPtr
, /* Load page into this SegmentPtr object */
1057 int iNew
/* Cell number of new cell */
1061 u8
*aData
; /* Pointer to page data buffer */
1062 int iOff
; /* Offset in aData[] to read from */
1063 int nPgsz
; /* Size of page (aData[]) in bytes */
1065 assert( iNew
<pPtr
->nCell
);
1067 aData
= fsPageData(pPtr
->pPg
, &nPgsz
);
1068 iOff
= lsmGetU16(&aData
[SEGMENT_CELLPTR_OFFSET(nPgsz
, pPtr
->iCell
)]);
1069 pPtr
->eType
= aData
[iOff
];
1071 iOff
+= GETVARINT64(&aData
[iOff
], pPtr
->iPgPtr
);
1072 iOff
+= GETVARINT32(&aData
[iOff
], pPtr
->nKey
);
1073 if( rtIsWrite(pPtr
->eType
) ){
1074 iOff
+= GETVARINT32(&aData
[iOff
], pPtr
->nVal
);
1076 assert( pPtr
->nKey
>=0 );
1078 rc
= segmentPtrReadData(
1079 pPtr
, iOff
, pPtr
->nKey
, &pPtr
->pKey
, &pPtr
->blob1
1081 if( rc
==LSM_OK
&& rtIsWrite(pPtr
->eType
) ){
1082 rc
= segmentPtrReadData(
1083 pPtr
, iOff
+pPtr
->nKey
, pPtr
->nVal
, &pPtr
->pVal
, &pPtr
->blob2
1095 static Segment
*sortedSplitkeySegment(Level
*pLevel
){
1096 Merge
*pMerge
= pLevel
->pMerge
;
1097 MergeInput
*p
= &pMerge
->splitkey
;
1101 for(i
=0; i
<pMerge
->nInput
; i
++){
1102 if( p
->iPg
==pMerge
->aInput
[i
].iPg
) break;
1104 if( pMerge
->nInput
==(pLevel
->nRight
+1) && i
>=(pMerge
->nInput
-1) ){
1105 pSeg
= &pLevel
->pNext
->lhs
;
1107 pSeg
= &pLevel
->aRhs
[i
];
1113 static void sortedSplitkey(lsm_db
*pDb
, Level
*pLevel
, int *pRc
){
1116 lsm_env
*pEnv
= pDb
->pEnv
; /* Environment handle */
1118 Merge
*pMerge
= pLevel
->pMerge
;
1120 pSeg
= sortedSplitkeySegment(pLevel
);
1122 rc
= lsmFsDbPageGet(pDb
->pFS
, pSeg
, pMerge
->splitkey
.iPg
, &pPg
);
1126 LsmBlob blob
= {0, 0, 0, 0};
1130 aData
= lsmFsPageData(pPg
, &nData
);
1131 if( pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
){
1135 rc
= pageGetBtreeKey(pSeg
,
1136 pPg
, pMerge
->splitkey
.iCell
, &dummy
, &iTopic
, &pKey
, &nKey
, &blob
1138 if( rc
==LSM_OK
&& blob
.pData
!=pKey
){
1139 rc
= sortedBlobSet(pEnv
, &blob
, pKey
, nKey
);
1142 rc
= pageGetKeyCopy(
1143 pEnv
, pSeg
, pPg
, pMerge
->splitkey
.iCell
, &iTopic
, &blob
1147 pLevel
->iSplitTopic
= iTopic
;
1148 pLevel
->pSplitKey
= blob
.pData
;
1149 pLevel
->nSplitKey
= blob
.nData
;
1150 lsmFsPageRelease(pPg
);
1157 ** Reset a segment cursor. Also free its buffers if they are nThreshold
1158 ** bytes or larger in size.
1160 static void segmentPtrReset(SegmentPtr
*pPtr
, int nThreshold
){
1161 lsmFsPageRelease(pPtr
->pPg
);
1170 if( pPtr
->blob1
.nAlloc
>=nThreshold
) sortedBlobFree(&pPtr
->blob1
);
1171 if( pPtr
->blob2
.nAlloc
>=nThreshold
) sortedBlobFree(&pPtr
->blob2
);
1174 static int segmentPtrIgnoreSeparators(MultiCursor
*pCsr
, SegmentPtr
*pPtr
){
1175 return (pCsr
->flags
& CURSOR_READ_SEPARATORS
)==0
1176 || (pPtr
!=&pCsr
->aPtr
[pCsr
->nPtr
-1]);
1179 static int segmentPtrAdvance(
1184 int eDir
= (bReverse
? -1 : 1);
1185 Level
*pLvl
= pPtr
->pLevel
;
1188 int iCell
; /* Number of new cell in page */
1189 int svFlags
= 0; /* SegmentPtr.eType before advance */
1191 iCell
= pPtr
->iCell
+ eDir
;
1192 assert( pPtr
->pPg
);
1193 assert( iCell
<=pPtr
->nCell
&& iCell
>=-1 );
1195 if( bReverse
&& pPtr
->pSeg
!=&pPtr
->pLevel
->lhs
){
1196 svFlags
= pPtr
->eType
;
1200 if( iCell
>=pPtr
->nCell
|| iCell
<0 ){
1202 rc
= segmentPtrNextPage(pPtr
, eDir
);
1205 && (pPtr
->nCell
==0 || (pPtr
->flags
& SEGMENT_BTREE_FLAG
) )
1207 if( rc
!=LSM_OK
) return rc
;
1208 iCell
= bReverse
? (pPtr
->nCell
-1) : 0;
1210 rc
= segmentPtrLoadCell(pPtr
, iCell
);
1211 if( rc
!=LSM_OK
) return rc
;
1213 if( svFlags
&& pPtr
->pPg
){
1214 int res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
1215 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
,
1216 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
1218 if( res
<0 ) segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
1221 if( pPtr
->pPg
==0 && (svFlags
& LSM_END_DELETE
) ){
1222 Segment
*pSeg
= pPtr
->pSeg
;
1223 rc
= lsmFsDbPageGet(pCsr
->pDb
->pFS
, pSeg
, pSeg
->iFirst
, &pPtr
->pPg
);
1224 if( rc
!=LSM_OK
) return rc
;
1225 pPtr
->eType
= LSM_START_DELETE
| LSM_POINT_DELETE
;
1226 pPtr
->eType
|= (pLvl
->iSplitTopic
? LSM_SYSTEMKEY
: 0);
1227 pPtr
->pKey
= pLvl
->pSplitKey
;
1228 pPtr
->nKey
= pLvl
->nSplitKey
;
1233 && segmentPtrIgnoreSeparators(pCsr
, pPtr
)
1234 && rtIsSeparator(pPtr
->eType
)
1240 static void segmentPtrEndPage(
1247 Segment
*pSeg
= pPtr
->pSeg
;
1250 *pRc
= lsmFsDbPageLast(pFS
, pSeg
, &pNew
);
1252 *pRc
= lsmFsDbPageGet(pFS
, pSeg
, pSeg
->iFirst
, &pNew
);
1254 segmentPtrSetPage(pPtr
, pNew
);
1260 ** Try to move the segment pointer passed as the second argument so that it
1261 ** points at either the first (bLast==0) or last (bLast==1) cell in the valid
1262 ** region of the segment defined by pPtr->iFirst and pPtr->iLast.
1264 ** Return LSM_OK if successful or an lsm error code if something goes
1265 ** wrong (IO error, OOM etc.).
1267 static int segmentPtrEnd(MultiCursor
*pCsr
, SegmentPtr
*pPtr
, int bLast
){
1268 Level
*pLvl
= pPtr
->pLevel
;
1270 FileSystem
*pFS
= pCsr
->pDb
->pFS
;
1273 segmentPtrEndPage(pFS
, pPtr
, bLast
, &rc
);
1274 while( rc
==LSM_OK
&& pPtr
->pPg
1275 && (pPtr
->nCell
==0 || (pPtr
->flags
& SEGMENT_BTREE_FLAG
))
1277 rc
= segmentPtrNextPage(pPtr
, (bLast
? -1 : 1));
1280 if( rc
==LSM_OK
&& pPtr
->pPg
){
1281 rc
= segmentPtrLoadCell(pPtr
, bLast
? (pPtr
->nCell
-1) : 0);
1282 if( rc
==LSM_OK
&& bLast
&& pPtr
->pSeg
!=&pLvl
->lhs
){
1283 int res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
1284 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
,
1285 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
1287 if( res
<0 ) segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
1291 bIgnore
= segmentPtrIgnoreSeparators(pCsr
, pPtr
);
1292 if( rc
==LSM_OK
&& pPtr
->pPg
&& bIgnore
&& rtIsSeparator(pPtr
->eType
) ){
1293 rc
= segmentPtrAdvance(pCsr
, pPtr
, bLast
);
1297 if( bLast
&& rc
==LSM_OK
&& pPtr
->pPg
1298 && pPtr
->pSeg
==&pLvl
->lhs
1299 && pLvl
->nRight
&& (pPtr
->eType
& LSM_START_DELETE
)
1302 pPtr
->eType
= LSM_END_DELETE
| (pLvl
->iSplitTopic
);
1303 pPtr
->pKey
= pLvl
->pSplitKey
;
1304 pPtr
->nKey
= pLvl
->nSplitKey
;
1313 static void segmentPtrKey(SegmentPtr
*pPtr
, void **ppKey
, int *pnKey
){
1314 assert( pPtr
->pPg
);
1315 *ppKey
= pPtr
->pKey
;
1316 *pnKey
= pPtr
->nKey
;
1319 #if 0 /* NOT USED */
1320 static char *keyToString(lsm_env
*pEnv
, void *pKey
, int nKey
){
1322 u8
*aKey
= (u8
*)pKey
;
1323 char *zRet
= (char *)lsmMalloc(pEnv
, nKey
+1);
1325 for(i
=0; i
<nKey
; i
++){
1326 zRet
[i
] = (char)(isalnum(aKey
[i
]) ? aKey
[i
] : '.');
1333 #if 0 /* NOT USED */
1335 ** Check that the page that pPtr currently has loaded is the correct page
1336 ** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
1337 ** fails and this function does not return.
1339 static int assertKeyLocation(
1342 void *pKey
, int nKey
1344 lsm_env
*pEnv
= lsmFsEnv(pCsr
->pDb
->pFS
);
1345 LsmBlob blob
= {0, 0, 0};
1347 int iTopic
= 0; /* TODO: Fix me */
1349 for(eDir
=-1; eDir
<=1; eDir
+=2){
1350 Page
*pTest
= pPtr
->pPg
;
1352 lsmFsPageRef(pTest
);
1354 Segment
*pSeg
= pPtr
->pSeg
;
1357 int rc
= lsmFsDbPageNext(pSeg
, pTest
, eDir
, &pNext
);
1358 lsmFsPageRelease(pTest
);
1364 u8
*aData
= fsPageData(pTest
, &nData
);
1365 int nCell
= pageGetNRec(aData
, nData
);
1366 int flags
= pageGetFlags(aData
, nData
);
1367 if( nCell
&& 0==(flags
&SEGMENT_BTREE_FLAG
) ){
1374 iCell
= ((eDir
< 0) ? (nCell
-1) : 0);
1375 pPgKey
= pageGetKey(pSeg
, pTest
, iCell
, &iPgTopic
, &nPgKey
, &blob
);
1376 res
= iTopic
- iPgTopic
;
1377 if( res
==0 ) res
= pCsr
->pDb
->xCmp(pKey
, nKey
, pPgKey
, nPgKey
);
1378 if( (eDir
==1 && res
>0) || (eDir
==-1 && res
<0) ){
1379 /* Taking this branch means something has gone wrong. */
1380 char *zMsg
= lsmMallocPrintf(pEnv
, "Key \"%s\" is not on page %d",
1381 keyToString(pEnv
, pKey
, nKey
), lsmFsPageNumber(pPtr
->pPg
)
1383 fprintf(stderr
, "%s\n", zMsg
);
1384 assert( !"assertKeyLocation() failed" );
1386 lsmFsPageRelease(pTest
);
1393 sortedBlobFree(&blob
);
1399 static int assertSeekResult(
1409 res
= sortedKeyCompare(pCsr
->pDb
->xCmp
, iTopic
, pKey
, nKey
,
1410 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
1413 if( eSeek
==LSM_SEEK_EQ
) return (res
==0);
1414 if( eSeek
==LSM_SEEK_LE
) return (res
>=0);
1415 if( eSeek
==LSM_SEEK_GE
) return (res
<=0);
1422 static int segmentPtrSearchOversized(
1423 MultiCursor
*pCsr
, /* Cursor context */
1424 SegmentPtr
*pPtr
, /* Pointer to seek */
1425 int iTopic
, /* Topic of key to search for */
1426 void *pKey
, int nKey
/* Key to seek to */
1428 int (*xCmp
)(void *, int, void *, int) = pCsr
->pDb
->xCmp
;
1431 /* If the OVERSIZED flag is set, then there is no pointer in the
1432 ** upper level to the next page in the segment that contains at least
1433 ** one key. So compare the largest key on the current page with the
1434 ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
1435 ** to the next page in the segment that contains at least one key.
1437 while( rc
==LSM_OK
&& (pPtr
->flags
& PGFTR_SKIP_NEXT_FLAG
) ){
1441 int res
; /* Result of comparison */
1444 /* Load the last key on the current page. */
1445 pLastKey
= pageGetKey(pPtr
->pSeg
,
1446 pPtr
->pPg
, pPtr
->nCell
-1, &iLastTopic
, &nLastKey
, &pPtr
->blob1
1449 /* If the loaded key is >= than (pKey/nKey), break out of the loop.
1450 ** If (pKey/nKey) is present in this array, it must be on the current
1452 res
= sortedKeyCompare(
1453 xCmp
, iLastTopic
, pLastKey
, nLastKey
, iTopic
, pKey
, nKey
1457 /* Advance to the next page that contains at least one key. */
1459 lsmFsPageRef(pNext
);
1462 u8
*aData
; int nData
;
1464 rc
= lsmFsDbPageNext(pPtr
->pSeg
, pNext
, 1, &pLoad
);
1465 lsmFsPageRelease(pNext
);
1467 if( pNext
==0 ) break;
1469 assert( rc
==LSM_OK
);
1470 aData
= lsmFsPageData(pNext
, &nData
);
1471 if( (pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
)==0
1472 && pageGetNRec(aData
, nData
)>0
1477 if( pNext
==0 ) break;
1478 segmentPtrSetPage(pPtr
, pNext
);
1480 /* This should probably be an LSM_CORRUPT error. */
1481 assert( rc
!=LSM_OK
|| (pPtr
->flags
& PGFTR_SKIP_THIS_FLAG
) );
1487 static int ptrFwdPointer(
1503 aData
= lsmFsPageData(pPg
, &nData
);
1504 if( (pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
)==0 ){
1506 int nCell
= pageGetNRec(aData
, nData
);
1507 for(i
=iFirst
; i
<nCell
; i
++){
1508 u8 eType
= *pageGetCell(aData
, nData
, i
);
1509 if( (eType
& LSM_START_DELETE
)==0 ){
1511 *piPtr
= pageGetRecordPtr(aData
, nData
, i
) + pageGetPtr(aData
, nData
);
1512 lsmFsPageRelease(pPg
);
1518 rc
= lsmFsDbPageNext(pSeg
, pPg
, 1, &pNext
);
1519 lsmFsPageRelease(pPg
);
1522 }while( pPg
&& rc
==LSM_OK
);
1523 lsmFsPageRelease(pPg
);
1529 static int sortedRhsFirst(MultiCursor
*pCsr
, Level
*pLvl
, SegmentPtr
*pPtr
){
1531 rc
= segmentPtrEnd(pCsr
, pPtr
, 0);
1532 while( pPtr
->pPg
&& rc
==LSM_OK
){
1533 int res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
1534 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
,
1535 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
1538 rc
= segmentPtrAdvance(pCsr
, pPtr
, 0);
1545 ** This function is called as part of a SEEK_GE op on a multi-cursor if the
1546 ** FC pointer read from segment *pPtr comes from an entry with the
1547 ** LSM_START_DELETE flag set. In this case the pointer value cannot be
1548 ** trusted. Instead, the pointer that should be followed is that associated
1549 ** with the next entry in *pPtr that does not have LSM_START_DELETE set.
1551 ** Why the pointers can't be trusted:
1555 ** TODO: This is a stop-gap solution:
1557 ** At the moment, this function is called from within segmentPtrSeek(),
1558 ** as part of the initial lsmMCursorSeek() call. However, consider a
1559 ** database where the following has occurred:
1561 ** 1. A range delete removes keys 1..9999 using a range delete.
1562 ** 2. Keys 1 through 9999 are reinserted.
1563 ** 3. The levels containing the ops in 1. and 2. above are merged. Call
1564 ** this level N. Level N contains FC pointers to level N+1.
1566 ** Then, if the user attempts to query for (key>=2 LIMIT 10), the
1567 ** lsmMCursorSeek() call will iterate through 9998 entries searching for a
1568 ** pointer down to the level N+1 that is never actually used. It would be
1569 ** much better if the multi-cursor could do this lazily - only seek to the
1570 ** level (N+1) page after the user has moved the cursor on level N passed
1571 ** the big range-delete.
1573 static int segmentPtrFwdPointer(
1574 MultiCursor
*pCsr
, /* Multi-cursor pPtr belongs to */
1575 SegmentPtr
*pPtr
, /* Segment-pointer to extract FC ptr from */
1576 LsmPgno
*piPtr
/* OUT: FC pointer value */
1578 Level
*pLvl
= pPtr
->pLevel
;
1579 Level
*pNext
= pLvl
->pNext
;
1580 Page
*pPg
= pPtr
->pPg
;
1585 if( pPtr
->pSeg
==&pLvl
->lhs
|| pPtr
->pSeg
==&pLvl
->aRhs
[pLvl
->nRight
-1] ){
1587 || (pNext
->nRight
==0 && pNext
->lhs
.iRoot
)
1588 || (pNext
->nRight
!=0 && pNext
->aRhs
[0].iRoot
)
1590 /* Do nothing. The pointer will not be used anyway. */
1594 if( pPtr
[1].pSeg
->iRoot
){
1599 /* Search for a pointer within the current segment. */
1601 rc
= ptrFwdPointer(pPg
, pPtr
->iCell
, pPtr
->pSeg
, &iOut
, &bFound
);
1603 if( rc
==LSM_OK
&& bFound
==0 ){
1604 /* This case happens when pPtr points to the left-hand-side of a segment
1605 ** currently undergoing an incremental merge. In this case, jump to the
1606 ** oldest segment in the right-hand-side of the same level and continue
1607 ** searching. But - do not consider any keys smaller than the levels
1611 if( pPtr
->pLevel
->nRight
==0 || pPtr
->pSeg
!=&pPtr
->pLevel
->lhs
){
1612 return LSM_CORRUPT_BKPT
;
1615 memset(&ptr
, 0, sizeof(SegmentPtr
));
1616 ptr
.pLevel
= pPtr
->pLevel
;
1617 ptr
.pSeg
= &ptr
.pLevel
->aRhs
[ptr
.pLevel
->nRight
-1];
1618 rc
= sortedRhsFirst(pCsr
, ptr
.pLevel
, &ptr
);
1620 rc
= ptrFwdPointer(ptr
.pPg
, ptr
.iCell
, ptr
.pSeg
, &iOut
, &bFound
);
1623 segmentPtrReset(&ptr
, 0);
1630 static int segmentPtrSeek(
1631 MultiCursor
*pCsr
, /* Cursor context */
1632 SegmentPtr
*pPtr
, /* Pointer to seek */
1633 int iTopic
, /* Key topic to seek to */
1634 void *pKey
, int nKey
, /* Key to seek to */
1635 int eSeek
, /* Search bias - see above */
1636 int *piPtr
, /* OUT: FC pointer */
1639 int (*xCmp
)(void *, int, void *, int) = pCsr
->pDb
->xCmp
;
1640 int res
= 0; /* Result of comparison operation */
1644 LsmPgno iPtrOut
= 0;
1646 /* If the current page contains an oversized entry, then there are no
1647 ** pointers to one or more of the subsequent pages in the sorted run.
1648 ** The following call ensures that the segment-ptr points to the correct
1649 ** page in this case. */
1650 rc
= segmentPtrSearchOversized(pCsr
, pPtr
, iTopic
, pKey
, nKey
);
1651 iPtrOut
= pPtr
->iPtr
;
1653 /* Assert that this page is the right page of this segment for the key
1654 ** that we are searching for. Do this by loading page (iPg-1) and testing
1655 ** that pKey/nKey is greater than all keys on that page, and then by
1656 ** loading (iPg+1) and testing that pKey/nKey is smaller than all
1657 ** the keys it houses.
1659 ** TODO: With range-deletes in the tree, the test described above may fail.
1662 assert( assertKeyLocation(pCsr
, pPtr
, pKey
, nKey
) );
1665 assert( pPtr
->nCell
>0
1666 || pPtr
->pSeg
->nSize
==1
1667 || lsmFsDbPageIsLast(pPtr
->pSeg
, pPtr
->pPg
)
1669 if( pPtr
->nCell
==0 ){
1670 segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
1673 iMax
= pPtr
->nCell
-1;
1676 int iTry
= (iMin
+iMax
)/2;
1677 void *pKeyT
; int nKeyT
; /* Key for cell iTry */
1680 assert( iTry
<iMax
|| iMin
==iMax
);
1682 rc
= segmentPtrLoadCell(pPtr
, iTry
);
1683 if( rc
!=LSM_OK
) break;
1685 segmentPtrKey(pPtr
, &pKeyT
, &nKeyT
);
1686 iTopicT
= rtTopic(pPtr
->eType
);
1688 res
= sortedKeyCompare(xCmp
, iTopicT
, pKeyT
, nKeyT
, iTopic
, pKey
, nKey
);
1690 iPtrOut
= pPtr
->iPtr
+ pPtr
->iPgPtr
;
1693 if( res
==0 || iMin
==iMax
){
1696 iMax
= LSM_MAX(iTry
-1, iMin
);
1703 assert( res
==0 || (iMin
==iMax
&& iMin
>=0 && iMin
<pPtr
->nCell
) );
1705 rc
= segmentPtrLoadCell(pPtr
, iMin
);
1707 assert( rc
!=LSM_OK
|| res
>0 || iPtrOut
==(pPtr
->iPtr
+ pPtr
->iPgPtr
) );
1712 int eType
= pPtr
->eType
;
1713 if( (res
<0 && (eType
& LSM_START_DELETE
))
1714 || (res
>0 && (eType
& LSM_END_DELETE
))
1715 || (res
==0 && (eType
& LSM_POINT_DELETE
))
1718 }else if( res
==0 && (eType
& LSM_INSERT
) ){
1719 lsm_env
*pEnv
= pCsr
->pDb
->pEnv
;
1721 pCsr
->eType
= pPtr
->eType
;
1722 rc
= sortedBlobSet(pEnv
, &pCsr
->key
, pPtr
->pKey
, pPtr
->nKey
);
1724 rc
= sortedBlobSet(pEnv
, &pCsr
->val
, pPtr
->pVal
, pPtr
->nVal
);
1726 pCsr
->flags
|= CURSOR_SEEK_EQ
;
1728 segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
1732 if( res
>0 ) rc
= segmentPtrAdvance(pCsr
, pPtr
, 1);
1735 /* Figure out if we need to 'skip' the pointer forward or not */
1736 if( (res
<=0 && (pPtr
->eType
& LSM_START_DELETE
))
1737 || (res
>0 && (pPtr
->eType
& LSM_END_DELETE
))
1739 rc
= segmentPtrFwdPointer(pCsr
, pPtr
, &iPtrOut
);
1741 if( res
<0 && rc
==LSM_OK
){
1742 rc
= segmentPtrAdvance(pCsr
, pPtr
, 0);
1750 /* If the cursor seek has found a separator key, and this cursor is
1751 ** supposed to ignore separators keys, advance to the next entry. */
1752 if( rc
==LSM_OK
&& pPtr
->pPg
1753 && segmentPtrIgnoreSeparators(pCsr
, pPtr
)
1754 && rtIsSeparator(pPtr
->eType
)
1756 assert( eSeek
!=LSM_SEEK_EQ
);
1757 rc
= segmentPtrAdvance(pCsr
, pPtr
, eSeek
==LSM_SEEK_LE
);
1761 assert( rc
!=LSM_OK
|| assertSeekResult(pCsr
,pPtr
,iTopic
,pKey
,nKey
,eSeek
) );
1762 *piPtr
= (int)iPtrOut
;
1766 static int seekInBtree(
1767 MultiCursor
*pCsr
, /* Multi-cursor object */
1768 Segment
*pSeg
, /* Seek within this segment */
1770 void *pKey
, int nKey
, /* Key to seek to */
1771 LsmPgno
*aPg
, /* OUT: Page numbers */
1772 Page
**ppPg
/* OUT: Leaf (sorted-run) page reference */
1778 LsmBlob blob
= {0, 0, 0};
1780 iPg
= (int)pSeg
->iRoot
;
1782 LsmPgno
*piFirst
= 0;
1788 rc
= lsmFsDbPageGet(pCsr
->pDb
->pFS
, pSeg
, iPg
, &pPg
);
1789 assert( rc
==LSM_OK
|| pPg
==0 );
1791 u8
*aData
; /* Buffer containing page data */
1792 int nData
; /* Size of aData[] in bytes */
1798 aData
= fsPageData(pPg
, &nData
);
1799 flags
= pageGetFlags(aData
, nData
);
1800 if( (flags
& SEGMENT_BTREE_FLAG
)==0 ) break;
1802 iPg
= (int)pageGetPtr(aData
, nData
);
1803 nRec
= pageGetNRec(aData
, nData
);
1807 while( iMax
>=iMin
){
1808 int iTry
= (iMin
+iMax
)/2;
1809 void *pKeyT
; int nKeyT
; /* Key for cell iTry */
1810 int iTopicT
; /* Topic for key pKeyT/nKeyT */
1811 LsmPgno iPtr
; /* Pointer associated with cell iTry */
1812 int res
; /* (pKey - pKeyT) */
1814 rc
= pageGetBtreeKey(
1815 pSeg
, pPg
, iTry
, &iPtr
, &iTopicT
, &pKeyT
, &nKeyT
, &blob
1817 if( rc
!=LSM_OK
) break;
1818 if( piFirst
&& pKeyT
==blob
.pData
){
1819 *piFirst
= pageGetBtreeRef(pPg
, iTry
);
1824 res
= sortedKeyCompare(
1825 pCsr
->pDb
->xCmp
, iTopic
, pKey
, nKey
, iTopicT
, pKeyT
, nKeyT
1834 lsmFsPageRelease(pPg
);
1837 }while( rc
==LSM_OK
);
1839 sortedBlobFree(&blob
);
1840 assert( (rc
==LSM_OK
)==(pPg
!=0) );
1844 lsmFsPageRelease(pPg
);
1849 static int seekInSegment(
1853 void *pKey
, int nKey
,
1854 int iPg
, /* Page to search */
1855 int eSeek
, /* Search bias - see above */
1856 int *piPtr
, /* OUT: FC pointer */
1857 int *pbStop
/* OUT: Stop search flag */
1862 if( pPtr
->pSeg
->iRoot
){
1864 assert( pPtr
->pSeg
->iRoot
!=0 );
1865 rc
= seekInBtree(pCsr
, pPtr
->pSeg
, iTopic
, pKey
, nKey
, 0, &pPg
);
1866 if( rc
==LSM_OK
) segmentPtrSetPage(pPtr
, pPg
);
1869 iPtr
= (int)pPtr
->pSeg
->iFirst
;
1872 rc
= segmentPtrLoadPage(pCsr
->pDb
->pFS
, pPtr
, iPtr
);
1877 rc
= segmentPtrSeek(pCsr
, pPtr
, iTopic
, pKey
, nKey
, eSeek
, piPtr
, pbStop
);
1883 ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
1886 ** This parameter is only significant if parameter eSeek is set to
1887 ** LSM_SEEK_EQ. In this case, it is set to true before returning if
1888 ** the seek operation is finished. This can happen in two ways:
1890 ** a) A key matching (pKey/nKey) is found, or
1891 ** b) A point-delete or range-delete deleting the key is found.
1893 ** In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
1894 ** and pCsr->val blobs populated before returning.
1896 static int seekInLevel(
1897 MultiCursor
*pCsr
, /* Sorted cursor object to seek */
1898 SegmentPtr
*aPtr
, /* Pointer to array of (nRhs+1) SPs */
1899 int eSeek
, /* Search bias - see above */
1900 int iTopic
, /* Key topic to search for */
1901 void *pKey
, int nKey
, /* Key to search for */
1902 LsmPgno
*piPgno
, /* IN/OUT: fraction cascade pointer (or 0) */
1903 int *pbStop
/* OUT: See above */
1905 Level
*pLvl
= aPtr
[0].pLevel
; /* Level to seek within */
1906 int rc
= LSM_OK
; /* Return code */
1907 int iOut
= 0; /* Pointer to return to caller */
1908 int res
= -1; /* Result of xCmp(pKey, split) */
1909 int nRhs
= pLvl
->nRight
; /* Number of right-hand-side segments */
1912 /* If this is a composite level (one currently undergoing an incremental
1913 ** merge), figure out if the search key is larger or smaller than the
1914 ** levels split-key. */
1916 res
= sortedKeyCompare(pCsr
->pDb
->xCmp
, iTopic
, pKey
, nKey
,
1917 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
1921 /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
1922 ** is not a composite level and there is no split-key). Search the
1923 ** left-hand-side of the level in this case. */
1927 if( nRhs
==0 ) iPtr
= (int)*piPgno
;
1930 pCsr
, &aPtr
[0], iTopic
, pKey
, nKey
, iPtr
, eSeek
, &iOut
, &bStop
1932 if( rc
==LSM_OK
&& nRhs
>0 && eSeek
==LSM_SEEK_GE
&& aPtr
[0].pPg
==0 ){
1935 for(i
=1; i
<=nRhs
; i
++){
1936 segmentPtrReset(&aPtr
[i
], LSM_SEGMENTPTR_FREE_THRESHOLD
);
1941 int bHit
= 0; /* True if at least one rhs is not EOF */
1942 int iPtr
= (int)*piPgno
;
1944 segmentPtrReset(&aPtr
[0], LSM_SEGMENTPTR_FREE_THRESHOLD
);
1945 for(i
=1; rc
==LSM_OK
&& i
<=nRhs
&& bStop
==0; i
++){
1946 SegmentPtr
*pPtr
= &aPtr
[i
];
1949 pCsr
, pPtr
, iTopic
, pKey
, nKey
, iPtr
, eSeek
, &iOut
, &bStop
1953 /* If the segment-pointer has settled on a key that is smaller than
1954 ** the splitkey, invalidate the segment-pointer. */
1956 res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
1957 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
,
1958 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
1961 if( pPtr
->eType
& LSM_START_DELETE
){
1962 pPtr
->eType
&= ~LSM_INSERT
;
1963 pPtr
->pKey
= pLvl
->pSplitKey
;
1964 pPtr
->nKey
= pLvl
->nSplitKey
;
1968 segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
1973 if( aPtr
[i
].pKey
) bHit
= 1;
1976 if( rc
==LSM_OK
&& eSeek
==LSM_SEEK_LE
&& bHit
==0 ){
1977 rc
= segmentPtrEnd(pCsr
, &aPtr
[0], 1);
1981 assert( eSeek
==LSM_SEEK_EQ
|| bStop
==0 );
1987 static void multiCursorGetKey(
1990 int *peType
, /* OUT: Key type (SORTED_WRITE etc.) */
1991 void **ppKey
, /* OUT: Pointer to buffer containing key */
1992 int *pnKey
/* OUT: Size of *ppKey in bytes */
1999 case CURSOR_DATA_TREE0
:
2000 case CURSOR_DATA_TREE1
: {
2001 TreeCursor
*pTreeCsr
= pCsr
->apTreeCsr
[iKey
-CURSOR_DATA_TREE0
];
2002 if( lsmTreeCursorValid(pTreeCsr
) ){
2003 lsmTreeCursorKey(pTreeCsr
, &eType
, &pKey
, &nKey
);
2008 case CURSOR_DATA_SYSTEM
: {
2009 Snapshot
*pWorker
= pCsr
->pDb
->pWorker
;
2010 if( pWorker
&& (pCsr
->flags
& CURSOR_FLUSH_FREELIST
) ){
2011 int nEntry
= pWorker
->freelist
.nEntry
;
2012 if( pCsr
->iFree
< (nEntry
*2) ){
2013 FreelistEntry
*aEntry
= pWorker
->freelist
.aEntry
;
2014 int i
= nEntry
- 1 - (pCsr
->iFree
/ 2);
2017 if( (pCsr
->iFree
% 2) ){
2018 eType
= LSM_END_DELETE
|LSM_SYSTEMKEY
;
2019 iKey2
= aEntry
[i
].iBlk
-1;
2020 }else if( aEntry
[i
].iId
>=0 ){
2021 eType
= LSM_INSERT
|LSM_SYSTEMKEY
;
2022 iKey2
= aEntry
[i
].iBlk
;
2024 /* If the in-memory entry immediately before this one was a
2025 ** DELETE, and the block number is one greater than the current
2026 ** block number, mark this entry as an "end-delete-range". */
2027 if( i
<(nEntry
-1) && aEntry
[i
+1].iBlk
==iKey2
+1 && aEntry
[i
+1].iId
<0 ){
2028 eType
|= LSM_END_DELETE
;
2032 eType
= LSM_START_DELETE
|LSM_SYSTEMKEY
;
2033 iKey2
= aEntry
[i
].iBlk
+ 1;
2036 /* If the in-memory entry immediately after this one is a
2037 ** DELETE, and the block number is one less than the current
2038 ** key, mark this entry as an "start-delete-range". */
2039 if( i
>0 && aEntry
[i
-1].iBlk
==iKey2
-1 && aEntry
[i
-1].iId
<0 ){
2040 eType
|= LSM_START_DELETE
;
2043 pKey
= pCsr
->pSystemVal
;
2045 lsmPutU32(pKey
, ~iKey2
);
2052 int iPtr
= iKey
- CURSOR_DATA_SEGMENT
;
2054 if( iPtr
==pCsr
->nPtr
){
2056 pKey
= pCsr
->pBtCsr
->pKey
;
2057 nKey
= pCsr
->pBtCsr
->nKey
;
2058 eType
= pCsr
->pBtCsr
->eType
;
2060 }else if( iPtr
<pCsr
->nPtr
){
2061 SegmentPtr
*pPtr
= &pCsr
->aPtr
[iPtr
];
2065 eType
= pPtr
->eType
;
2072 if( peType
) *peType
= eType
;
2073 if( pnKey
) *pnKey
= nKey
;
2074 if( ppKey
) *ppKey
= pKey
;
2077 static int sortedDbKeyCompare(
2079 int iLhsFlags
, void *pLhsKey
, int nLhsKey
,
2080 int iRhsFlags
, void *pRhsKey
, int nRhsKey
2082 int (*xCmp
)(void *, int, void *, int) = pCsr
->pDb
->xCmp
;
2085 /* Compare the keys, including the system flag. */
2086 res
= sortedKeyCompare(xCmp
,
2087 rtTopic(iLhsFlags
), pLhsKey
, nLhsKey
,
2088 rtTopic(iRhsFlags
), pRhsKey
, nRhsKey
2091 /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
2092 ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
2093 ** the beginning of an open-ended set from masking a database entry or
2094 ** delete at a lower level. */
2095 if( res
==0 && (pCsr
->flags
& CURSOR_IGNORE_DELETE
) ){
2096 const int m
= LSM_POINT_DELETE
|LSM_INSERT
|LSM_END_DELETE
|LSM_START_DELETE
;
2100 if( LSM_START_DELETE
==(iLhsFlags
& m
) ) iDel1
= +1;
2101 if( LSM_END_DELETE
==(iLhsFlags
& m
) ) iDel1
= -1;
2102 if( LSM_START_DELETE
==(iRhsFlags
& m
) ) iDel2
= +1;
2103 if( LSM_END_DELETE
==(iRhsFlags
& m
) ) iDel2
= -1;
2105 res
= (iDel1
- iDel2
);
2111 static void multiCursorDoCompare(MultiCursor
*pCsr
, int iOut
, int bReverse
){
2115 void *pKey1
; int nKey1
; int eType1
;
2116 void *pKey2
; int nKey2
; int eType2
;
2117 const int mul
= (bReverse
? -1 : 1);
2119 assert( pCsr
->aTree
&& iOut
<pCsr
->nTree
);
2120 if( iOut
>=(pCsr
->nTree
/2) ){
2121 i1
= (iOut
- pCsr
->nTree
/2) * 2;
2124 i1
= pCsr
->aTree
[iOut
*2];
2125 i2
= pCsr
->aTree
[iOut
*2+1];
2128 multiCursorGetKey(pCsr
, i1
, &eType1
, &pKey1
, &nKey1
);
2129 multiCursorGetKey(pCsr
, i2
, &eType2
, &pKey2
, &nKey2
);
2133 }else if( pKey2
==0 ){
2138 /* Compare the keys */
2139 res
= sortedDbKeyCompare(pCsr
,
2140 eType1
, pKey1
, nKey1
, eType2
, pKey2
, nKey2
2145 /* The two keys are identical. Normally, this means that the key from
2146 ** the newer run clobbers the old. However, if the newer key is a
2147 ** separator key, or a range-delete-boundary only, do not allow it
2148 ** to clobber an older entry. */
2149 int nc1
= (eType1
& (LSM_INSERT
|LSM_POINT_DELETE
))==0;
2150 int nc2
= (eType2
& (LSM_INSERT
|LSM_POINT_DELETE
))==0;
2151 iRes
= (nc1
> nc2
) ? i2
: i1
;
2159 pCsr
->aTree
[iOut
] = iRes
;
2163 ** This function advances segment pointer iPtr belonging to multi-cursor
2164 ** pCsr forward (bReverse==0) or backward (bReverse!=0).
2166 ** If the segment pointer points to a segment that is part of a composite
2167 ** level, then the following special case is handled.
2169 ** * If iPtr is the lhs of a composite level, and the cursor is being
2170 ** advanced forwards, and segment iPtr is at EOF, move all pointers
2171 ** that correspond to rhs segments of the same level to the first
2172 ** key in their respective data.
2174 static int segmentCursorAdvance(
2180 SegmentPtr
*pPtr
= &pCsr
->aPtr
[iPtr
];
2181 Level
*pLvl
= pPtr
->pLevel
;
2182 int bComposite
; /* True if pPtr is part of composite level */
2184 /* Advance the segment-pointer object. */
2185 rc
= segmentPtrAdvance(pCsr
, pPtr
, bReverse
);
2186 if( rc
!=LSM_OK
) return rc
;
2188 bComposite
= (pLvl
->nRight
>0 && pCsr
->nPtr
>pLvl
->nRight
);
2189 if( bComposite
&& pPtr
->pPg
==0 ){
2191 if( (bReverse
==0)==(pPtr
->pSeg
==&pLvl
->lhs
) ){
2194 SegmentPtr
*pLhs
= &pCsr
->aPtr
[iPtr
- 1 - (pPtr
->pSeg
- pLvl
->aRhs
)];
2195 for(i
=0; i
<pLvl
->nRight
; i
++){
2196 if( pLhs
[i
+1].pPg
) break;
2198 if( i
==pLvl
->nRight
){
2200 rc
= segmentPtrEnd(pCsr
, pLhs
, 1);
2204 for(i
=0; rc
==LSM_OK
&& i
<pLvl
->nRight
; i
++){
2205 rc
= sortedRhsFirst(pCsr
, pLvl
, &pCsr
->aPtr
[iPtr
+1+i
]);
2212 for(i
=pCsr
->nTree
-1; i
>0; i
--){
2213 multiCursorDoCompare(pCsr
, i
, bReverse
);
2219 if( bComposite
&& pPtr
->pSeg
==&pLvl
->lhs
/* lhs of composite level */
2220 && bReverse
==0 /* csr advanced forwards */
2221 && pPtr
->pPg
==0 /* segment at EOF */
2224 for(i
=0; rc
==LSM_OK
&& i
<pLvl
->nRight
; i
++){
2225 rc
= sortedRhsFirst(pCsr
, pLvl
, &pCsr
->aPtr
[iPtr
+1+i
]);
2227 for(i
=pCsr
->nTree
-1; i
>0; i
--){
2228 multiCursorDoCompare(pCsr
, i
, 0);
2236 static void mcursorFreeComponents(MultiCursor
*pCsr
){
2238 lsm_env
*pEnv
= pCsr
->pDb
->pEnv
;
2240 /* Close the tree cursor, if any. */
2241 lsmTreeCursorDestroy(pCsr
->apTreeCsr
[0]);
2242 lsmTreeCursorDestroy(pCsr
->apTreeCsr
[1]);
2244 /* Reset the segment pointers */
2245 for(i
=0; i
<pCsr
->nPtr
; i
++){
2246 segmentPtrReset(&pCsr
->aPtr
[i
], 0);
2249 /* And the b-tree cursor, if any */
2250 btreeCursorFree(pCsr
->pBtCsr
);
2252 /* Free allocations */
2253 lsmFree(pEnv
, pCsr
->aPtr
);
2254 lsmFree(pEnv
, pCsr
->aTree
);
2255 lsmFree(pEnv
, pCsr
->pSystemVal
);
2262 pCsr
->pSystemVal
= 0;
2263 pCsr
->apTreeCsr
[0] = 0;
2264 pCsr
->apTreeCsr
[1] = 0;
2268 void lsmMCursorFreeCache(lsm_db
*pDb
){
2271 for(p
=pDb
->pCsrCache
; p
; p
=pNext
){
2273 lsmMCursorClose(p
, 0);
2279 ** Close the cursor passed as the first argument.
2281 ** If the bCache parameter is true, then shift the cursor to the pCsrCache
2282 ** list for possible reuse instead of actually deleting it.
2284 void lsmMCursorClose(MultiCursor
*pCsr
, int bCache
){
2286 lsm_db
*pDb
= pCsr
->pDb
;
2287 MultiCursor
**pp
; /* Iterator variable */
2289 /* The cursor may or may not be currently part of the linked list
2290 ** starting at lsm_db.pCsr. If it is, extract it. */
2291 for(pp
=&pDb
->pCsr
; *pp
; pp
=&((*pp
)->pNext
)){
2299 int i
; /* Used to iterate through segment-pointers */
2301 /* Release any page references held by this cursor. */
2302 assert( !pCsr
->pBtCsr
);
2303 for(i
=0; i
<pCsr
->nPtr
; i
++){
2304 SegmentPtr
*pPtr
= &pCsr
->aPtr
[i
];
2305 lsmFsPageRelease(pPtr
->pPg
);
2309 /* Reset the tree cursors */
2310 lsmTreeCursorReset(pCsr
->apTreeCsr
[0]);
2311 lsmTreeCursorReset(pCsr
->apTreeCsr
[1]);
2313 /* Add the cursor to the pCsrCache list */
2314 pCsr
->pNext
= pDb
->pCsrCache
;
2315 pDb
->pCsrCache
= pCsr
;
2317 /* Free the allocation used to cache the current key, if any. */
2318 sortedBlobFree(&pCsr
->key
);
2319 sortedBlobFree(&pCsr
->val
);
2321 /* Free the component cursors */
2322 mcursorFreeComponents(pCsr
);
2324 /* Free the cursor structure itself */
2325 lsmFree(pDb
->pEnv
, pCsr
);
2335 ** Parameter eTree is one of TREE_OLD or TREE_BOTH.
2337 static int multiCursorAddTree(MultiCursor
*pCsr
, Snapshot
*pSnap
, int eTree
){
2339 lsm_db
*db
= pCsr
->pDb
;
2341 /* Add a tree cursor on the 'old' tree, if it exists. */
2342 if( eTree
!=TREE_NONE
2343 && lsmTreeHasOld(db
)
2344 && db
->treehdr
.iOldLog
!=pSnap
->iLogOff
2346 rc
= lsmTreeCursorNew(db
, 1, &pCsr
->apTreeCsr
[1]);
2349 /* Add a tree cursor on the 'current' tree, if required. */
2350 if( rc
==LSM_OK
&& eTree
==TREE_BOTH
){
2351 rc
= lsmTreeCursorNew(db
, 0, &pCsr
->apTreeCsr
[0]);
2357 static int multiCursorAddRhs(MultiCursor
*pCsr
, Level
*pLvl
){
2359 int nRhs
= pLvl
->nRight
;
2361 assert( pLvl
->nRight
>0 );
2362 assert( pCsr
->aPtr
==0 );
2363 pCsr
->aPtr
= lsmMallocZero(pCsr
->pDb
->pEnv
, sizeof(SegmentPtr
) * nRhs
);
2364 if( !pCsr
->aPtr
) return LSM_NOMEM_BKPT
;
2367 for(i
=0; i
<nRhs
; i
++){
2368 pCsr
->aPtr
[i
].pSeg
= &pLvl
->aRhs
[i
];
2369 pCsr
->aPtr
[i
].pLevel
= pLvl
;
2375 static void multiCursorAddOne(MultiCursor
*pCsr
, Level
*pLvl
, int *pRc
){
2377 int iPtr
= pCsr
->nPtr
;
2379 pCsr
->aPtr
[iPtr
].pLevel
= pLvl
;
2380 pCsr
->aPtr
[iPtr
].pSeg
= &pLvl
->lhs
;
2382 for(i
=0; i
<pLvl
->nRight
; i
++){
2383 pCsr
->aPtr
[iPtr
].pLevel
= pLvl
;
2384 pCsr
->aPtr
[iPtr
].pSeg
= &pLvl
->aRhs
[i
];
2388 if( pLvl
->nRight
&& pLvl
->pSplitKey
==0 ){
2389 sortedSplitkey(pCsr
->pDb
, pLvl
, pRc
);
2395 static int multiCursorAddAll(MultiCursor
*pCsr
, Snapshot
*pSnap
){
2400 for(pLvl
=pSnap
->pLevel
; pLvl
; pLvl
=pLvl
->pNext
){
2401 /* If the LEVEL_INCOMPLETE flag is set, then this function is being
2402 ** called (indirectly) from within a sortedNewToplevel() call to
2403 ** construct pLvl. In this case ignore pLvl - this cursor is going to
2404 ** be used to retrieve a freelist entry from the LSM, and the partially
2405 ** complete level may confuse it. */
2406 if( pLvl
->flags
& LEVEL_INCOMPLETE
) continue;
2407 nPtr
+= (1 + pLvl
->nRight
);
2410 assert( pCsr
->aPtr
==0 );
2411 pCsr
->aPtr
= lsmMallocZeroRc(pCsr
->pDb
->pEnv
, sizeof(SegmentPtr
) * nPtr
, &rc
);
2413 for(pLvl
=pSnap
->pLevel
; pLvl
; pLvl
=pLvl
->pNext
){
2414 if( (pLvl
->flags
& LEVEL_INCOMPLETE
)==0 ){
2415 multiCursorAddOne(pCsr
, pLvl
, &rc
);
2422 static int multiCursorInit(MultiCursor
*pCsr
, Snapshot
*pSnap
){
2424 rc
= multiCursorAddAll(pCsr
, pSnap
);
2426 rc
= multiCursorAddTree(pCsr
, pSnap
, TREE_BOTH
);
2428 pCsr
->flags
|= (CURSOR_IGNORE_SYSTEM
| CURSOR_IGNORE_DELETE
);
2432 static MultiCursor
*multiCursorNew(lsm_db
*db
, int *pRc
){
2434 pCsr
= (MultiCursor
*)lsmMallocZeroRc(db
->pEnv
, sizeof(MultiCursor
), pRc
);
2436 pCsr
->pNext
= db
->pCsr
;
2444 void lsmSortedRemap(lsm_db
*pDb
){
2446 for(pCsr
=pDb
->pCsr
; pCsr
; pCsr
=pCsr
->pNext
){
2449 btreeCursorLoadKey(pCsr
->pBtCsr
);
2451 for(iPtr
=0; iPtr
<pCsr
->nPtr
; iPtr
++){
2452 segmentPtrLoadCell(&pCsr
->aPtr
[iPtr
], pCsr
->aPtr
[iPtr
].iCell
);
2457 static void multiCursorReadSeparators(MultiCursor
*pCsr
){
2459 pCsr
->flags
|= CURSOR_READ_SEPARATORS
;
2464 ** Have this cursor skip over SORTED_DELETE entries.
2466 static void multiCursorIgnoreDelete(MultiCursor
*pCsr
){
2467 if( pCsr
) pCsr
->flags
|= CURSOR_IGNORE_DELETE
;
2471 ** If the free-block list is not empty, then have this cursor visit a key
2472 ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value
2473 ** blob containing the serialized free-block list.
2475 static int multiCursorVisitFreelist(MultiCursor
*pCsr
){
2477 pCsr
->flags
|= CURSOR_FLUSH_FREELIST
;
2478 pCsr
->pSystemVal
= lsmMallocRc(pCsr
->pDb
->pEnv
, 4 + 8, &rc
);
2483 ** Allocate and return a new database cursor.
2485 ** This method should only be called to allocate user cursors. As it may
2486 ** recycle a cursor from lsm_db.pCsrCache.
2489 lsm_db
*pDb
, /* Database handle */
2490 MultiCursor
**ppCsr
/* OUT: Allocated cursor */
2492 MultiCursor
*pCsr
= 0;
2495 if( pDb
->pCsrCache
){
2496 int bOld
; /* True if there is an old in-memory tree */
2498 /* Remove a cursor from the pCsrCache list and add it to the open list. */
2499 pCsr
= pDb
->pCsrCache
;
2500 pDb
->pCsrCache
= pCsr
->pNext
;
2501 pCsr
->pNext
= pDb
->pCsr
;
2504 /* The cursor can almost be used as is, except that the old in-memory
2505 ** tree cursor may be present and not required, or required and not
2506 ** present. Fix this if required. */
2507 bOld
= (lsmTreeHasOld(pDb
) && pDb
->treehdr
.iOldLog
!=pDb
->pClient
->iLogOff
);
2508 if( !bOld
&& pCsr
->apTreeCsr
[1] ){
2509 lsmTreeCursorDestroy(pCsr
->apTreeCsr
[1]);
2510 pCsr
->apTreeCsr
[1] = 0;
2511 }else if( bOld
&& !pCsr
->apTreeCsr
[1] ){
2512 rc
= lsmTreeCursorNew(pDb
, 1, &pCsr
->apTreeCsr
[1]);
2515 pCsr
->flags
= (CURSOR_IGNORE_SYSTEM
| CURSOR_IGNORE_DELETE
);
2518 pCsr
= multiCursorNew(pDb
, &rc
);
2519 if( rc
==LSM_OK
) rc
= multiCursorInit(pCsr
, pDb
->pClient
);
2523 lsmMCursorClose(pCsr
, 0);
2526 assert( (rc
==LSM_OK
)==(pCsr
!=0) );
2531 static int multiCursorGetVal(
2543 case CURSOR_DATA_TREE0
:
2544 case CURSOR_DATA_TREE1
: {
2545 TreeCursor
*pTreeCsr
= pCsr
->apTreeCsr
[iVal
-CURSOR_DATA_TREE0
];
2546 if( lsmTreeCursorValid(pTreeCsr
) ){
2547 lsmTreeCursorValue(pTreeCsr
, ppVal
, pnVal
);
2555 case CURSOR_DATA_SYSTEM
: {
2556 Snapshot
*pWorker
= pCsr
->pDb
->pWorker
;
2558 && (pCsr
->iFree
% 2)==0
2559 && pCsr
->iFree
< (pWorker
->freelist
.nEntry
*2)
2561 int iEntry
= pWorker
->freelist
.nEntry
- 1 - (pCsr
->iFree
/ 2);
2562 u8
*aVal
= &((u8
*)(pCsr
->pSystemVal
))[4];
2563 lsmPutU64(aVal
, pWorker
->freelist
.aEntry
[iEntry
].iId
);
2571 int iPtr
= iVal
-CURSOR_DATA_SEGMENT
;
2572 if( iPtr
<pCsr
->nPtr
){
2573 SegmentPtr
*pPtr
= &pCsr
->aPtr
[iPtr
];
2575 *ppVal
= pPtr
->pVal
;
2576 *pnVal
= pPtr
->nVal
;
2582 assert( rc
==LSM_OK
|| (*ppVal
==0 && *pnVal
==0) );
2586 static int multiCursorAdvance(MultiCursor
*pCsr
, int bReverse
);
2589 ** This function is called by worker connections to walk the part of the
2590 ** free-list stored within the LSM data structure.
2592 int lsmSortedWalkFreelist(
2593 lsm_db
*pDb
, /* Database handle */
2594 int bReverse
, /* True to iterate from largest to smallest */
2595 int (*x
)(void *, int, i64
), /* Callback function */
2596 void *pCtx
/* First argument to pass to callback */
2598 MultiCursor
*pCsr
; /* Cursor used to read db */
2599 int rc
= LSM_OK
; /* Return Code */
2600 Snapshot
*pSnap
= 0;
2602 assert( pDb
->pWorker
);
2603 if( pDb
->bIncrMerge
){
2604 rc
= lsmCheckpointDeserialize(pDb
, 0, pDb
->pShmhdr
->aSnap1
, &pSnap
);
2605 if( rc
!=LSM_OK
) return rc
;
2607 pSnap
= pDb
->pWorker
;
2610 pCsr
= multiCursorNew(pDb
, &rc
);
2612 rc
= multiCursorAddAll(pCsr
, pSnap
);
2613 pCsr
->flags
|= CURSOR_IGNORE_DELETE
;
2618 rc
= lsmMCursorLast(pCsr
);
2620 rc
= lsmMCursorSeek(pCsr
, 1, "", 0, LSM_SEEK_GE
);
2623 while( rc
==LSM_OK
&& lsmMCursorValid(pCsr
) && rtIsSystem(pCsr
->eType
) ){
2624 void *pKey
; int nKey
;
2625 void *pVal
= 0; int nVal
= 0;
2627 rc
= lsmMCursorKey(pCsr
, &pKey
, &nKey
);
2628 if( rc
==LSM_OK
) rc
= lsmMCursorValue(pCsr
, &pVal
, &nVal
);
2629 if( rc
==LSM_OK
&& (nKey
!=4 || nVal
!=8) ) rc
= LSM_CORRUPT_BKPT
;
2634 iBlk
= (int)(~(lsmGetU32((u8
*)pKey
)));
2635 iSnap
= (i64
)lsmGetU64((u8
*)pVal
);
2636 if( x(pCtx
, iBlk
, iSnap
) ) break;
2637 rc
= multiCursorAdvance(pCsr
, !bReverse
);
2642 lsmMCursorClose(pCsr
, 0);
2643 if( pSnap
!=pDb
->pWorker
){
2644 lsmFreeSnapshot(pDb
->pEnv
, pSnap
);
2650 int lsmSortedLoadFreelist(
2651 lsm_db
*pDb
, /* Database handle (must be worker) */
2652 void **ppVal
, /* OUT: Blob containing LSM free-list */
2653 int *pnVal
/* OUT: Size of *ppVal blob in bytes */
2655 MultiCursor
*pCsr
; /* Cursor used to retreive free-list */
2656 int rc
= LSM_OK
; /* Return Code */
2658 assert( pDb
->pWorker
);
2659 assert( *ppVal
==0 && *pnVal
==0 );
2661 pCsr
= multiCursorNew(pDb
, &rc
);
2663 rc
= multiCursorAddAll(pCsr
, pDb
->pWorker
);
2664 pCsr
->flags
|= CURSOR_IGNORE_DELETE
;
2668 rc
= lsmMCursorLast(pCsr
);
2670 && rtIsWrite(pCsr
->eType
) && rtIsSystem(pCsr
->eType
)
2671 && pCsr
->key
.nData
==8
2672 && 0==memcmp(pCsr
->key
.pData
, "FREELIST", 8)
2674 void *pVal
; int nVal
; /* Value read from database */
2675 rc
= lsmMCursorValue(pCsr
, &pVal
, &nVal
);
2677 *ppVal
= lsmMallocRc(pDb
->pEnv
, nVal
, &rc
);
2679 memcpy(*ppVal
, pVal
, nVal
);
2685 lsmMCursorClose(pCsr
, 0);
2691 static int multiCursorAllocTree(MultiCursor
*pCsr
){
2693 if( pCsr
->aTree
==0 ){
2694 int nByte
; /* Bytes of space to allocate */
2695 int nMin
; /* Total number of cursors being merged */
2697 nMin
= CURSOR_DATA_SEGMENT
+ pCsr
->nPtr
+ (pCsr
->pBtCsr
!=0);
2699 while( pCsr
->nTree
<nMin
){
2700 pCsr
->nTree
= pCsr
->nTree
*2;
2703 nByte
= sizeof(int)*pCsr
->nTree
*2;
2704 pCsr
->aTree
= (int *)lsmMallocZeroRc(pCsr
->pDb
->pEnv
, nByte
, &rc
);
2709 static void multiCursorCacheKey(MultiCursor
*pCsr
, int *pRc
){
2713 multiCursorGetKey(pCsr
, pCsr
->aTree
[1], &pCsr
->eType
, &pKey
, &nKey
);
2714 *pRc
= sortedBlobSet(pCsr
->pDb
->pEnv
, &pCsr
->key
, pKey
, nKey
);
2718 #ifdef LSM_DEBUG_EXPENSIVE
2719 static void assertCursorTree(MultiCursor
*pCsr
){
2720 int bRev
= !!(pCsr
->flags
& CURSOR_PREV_OK
);
2721 int *aSave
= pCsr
->aTree
;
2722 int nSave
= pCsr
->nTree
;
2727 rc
= multiCursorAllocTree(pCsr
);
2730 for(i
=pCsr
->nTree
-1; i
>0; i
--){
2731 multiCursorDoCompare(pCsr
, i
, bRev
);
2734 assert( nSave
==pCsr
->nTree
2735 && 0==memcmp(aSave
, pCsr
->aTree
, sizeof(int)*nSave
)
2738 lsmFree(pCsr
->pDb
->pEnv
, pCsr
->aTree
);
2741 pCsr
->aTree
= aSave
;
2742 pCsr
->nTree
= nSave
;
2745 # define assertCursorTree(x)
2748 static int mcursorLocationOk(MultiCursor
*pCsr
, int bDeleteOk
){
2749 int eType
= pCsr
->eType
;
2754 assert( pCsr
->flags
& (CURSOR_NEXT_OK
|CURSOR_PREV_OK
) );
2755 assertCursorTree(pCsr
);
2757 rdmask
= (pCsr
->flags
& CURSOR_NEXT_OK
) ? LSM_END_DELETE
: LSM_START_DELETE
;
2759 /* If the cursor does not currently point to an actual database key (i.e.
2760 ** it points to a delete key, or the start or end of a range-delete), and
2761 ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry. */
2762 if( (pCsr
->flags
& CURSOR_IGNORE_DELETE
) && bDeleteOk
==0 ){
2763 if( (eType
& LSM_INSERT
)==0 ) return 0;
2766 /* If the cursor points to a system key (free-list entry), and the
2767 ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry. */
2768 if( (pCsr
->flags
& CURSOR_IGNORE_SYSTEM
) && rtTopic(eType
)!=0 ){
2773 /* This block fires assert() statements to check one of the assumptions
2774 ** in the comment below - that if the lhs sub-cursor of a level undergoing
2775 ** a merge is valid, then all the rhs sub-cursors must be at EOF.
2777 ** Also assert that all rhs sub-cursors are either at EOF or point to
2778 ** a key that is not less than the level split-key. */
2779 for(i
=0; i
<pCsr
->nPtr
; i
++){
2780 SegmentPtr
*pPtr
= &pCsr
->aPtr
[i
];
2781 Level
*pLvl
= pPtr
->pLevel
;
2782 if( pLvl
->nRight
&& pPtr
->pPg
){
2783 if( pPtr
->pSeg
==&pLvl
->lhs
){
2785 for(j
=0; j
<pLvl
->nRight
; j
++) assert( pPtr
[j
+1].pPg
==0 );
2787 int res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
2788 rtTopic(pPtr
->eType
), pPtr
->pKey
, pPtr
->nKey
,
2789 pLvl
->iSplitTopic
, pLvl
->pSplitKey
, pLvl
->nSplitKey
2797 /* Now check if this key has already been deleted by a range-delete. If
2798 ** so, skip past it.
2800 ** Assume, for the moment, that the tree contains no levels currently
2801 ** undergoing incremental merge, and that this cursor is iterating forwards
2802 ** through the database keys. The cursor currently points to a key in
2803 ** level L. This key has already been deleted if any of the sub-cursors
2804 ** that point to levels newer than L (or to the in-memory tree) point to
2805 ** a key greater than the current key with the LSM_END_DELETE flag set.
2807 ** Or, if the cursor is iterating backwards through data keys, if any
2808 ** such sub-cursor points to a key smaller than the current key with the
2809 ** LSM_START_DELETE flag set.
2811 ** Why it works with levels undergoing a merge too:
2813 ** When a cursor iterates forwards, the sub-cursors for the rhs of a
2814 ** level are only activated once the lhs reaches EOF. So when iterating
2815 ** forwards, the keys visited are the same as if the level was completely
2818 ** If the cursor is iterating backwards, then the lhs sub-cursor is not
2819 ** initialized until the last of the rhs sub-cursors has reached EOF.
2820 ** Additionally, if the START_DELETE flag is set on the last entry (in
2821 ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
2822 ** then a pseudo-key equal to the levels split-key with the END_DELETE
2823 ** flag set is visited by the sub-cursor.
2825 iKey
= pCsr
->aTree
[1];
2826 for(i
=0; i
<iKey
; i
++){
2828 multiCursorGetKey(pCsr
, i
, &csrflags
, 0, 0);
2829 if( (rdmask
& csrflags
) ){
2830 const int SD_ED
= (LSM_START_DELETE
|LSM_END_DELETE
);
2831 if( (csrflags
& SD_ED
)==SD_ED
2832 || (pCsr
->flags
& CURSOR_IGNORE_DELETE
)==0
2834 void *pKey
; int nKey
;
2835 multiCursorGetKey(pCsr
, i
, 0, &pKey
, &nKey
);
2836 if( 0==sortedKeyCompare(pCsr
->pDb
->xCmp
,
2837 rtTopic(eType
), pCsr
->key
.pData
, pCsr
->key
.nData
,
2838 rtTopic(csrflags
), pKey
, nKey
2847 /* The current cursor position is one this cursor should visit. Return 1. */
2851 static int multiCursorSetupTree(MultiCursor
*pCsr
, int bRev
){
2854 rc
= multiCursorAllocTree(pCsr
);
2857 for(i
=pCsr
->nTree
-1; i
>0; i
--){
2858 multiCursorDoCompare(pCsr
, i
, bRev
);
2862 assertCursorTree(pCsr
);
2863 multiCursorCacheKey(pCsr
, &rc
);
2865 if( rc
==LSM_OK
&& mcursorLocationOk(pCsr
, 0)==0 ){
2866 rc
= multiCursorAdvance(pCsr
, bRev
);
2872 static int multiCursorEnd(MultiCursor
*pCsr
, int bLast
){
2876 pCsr
->flags
&= ~(CURSOR_NEXT_OK
| CURSOR_PREV_OK
| CURSOR_SEEK_EQ
);
2877 pCsr
->flags
|= (bLast
? CURSOR_PREV_OK
: CURSOR_NEXT_OK
);
2880 /* Position the two in-memory tree cursors */
2881 for(i
=0; rc
==LSM_OK
&& i
<2; i
++){
2882 if( pCsr
->apTreeCsr
[i
] ){
2883 rc
= lsmTreeCursorEnd(pCsr
->apTreeCsr
[i
], bLast
);
2887 for(i
=0; rc
==LSM_OK
&& i
<pCsr
->nPtr
; i
++){
2888 SegmentPtr
*pPtr
= &pCsr
->aPtr
[i
];
2889 Level
*pLvl
= pPtr
->pLevel
;
2894 for(iRhs
=0; iRhs
<pLvl
->nRight
&& rc
==LSM_OK
; iRhs
++){
2895 rc
= segmentPtrEnd(pCsr
, &pPtr
[iRhs
+1], 1);
2896 if( pPtr
[iRhs
+1].pPg
) bHit
= 1;
2898 if( bHit
==0 && rc
==LSM_OK
){
2899 rc
= segmentPtrEnd(pCsr
, pPtr
, 1);
2901 segmentPtrReset(pPtr
, LSM_SEGMENTPTR_FREE_THRESHOLD
);
2904 int bLhs
= (pPtr
->pSeg
==&pLvl
->lhs
);
2905 assert( pPtr
->pSeg
==&pLvl
->lhs
|| pPtr
->pSeg
==&pLvl
->aRhs
[0] );
2908 rc
= segmentPtrEnd(pCsr
, pPtr
, 0);
2909 if( pPtr
->pKey
) bHit
= 1;
2911 for(iRhs
=0; iRhs
<pLvl
->nRight
&& rc
==LSM_OK
; iRhs
++){
2913 segmentPtrReset(&pPtr
[iRhs
+1], LSM_SEGMENTPTR_FREE_THRESHOLD
);
2915 rc
= sortedRhsFirst(pCsr
, pLvl
, &pPtr
[iRhs
+bLhs
]);
2922 /* And the b-tree cursor, if applicable */
2923 if( rc
==LSM_OK
&& pCsr
->pBtCsr
){
2925 rc
= btreeCursorFirst(pCsr
->pBtCsr
);
2929 rc
= multiCursorSetupTree(pCsr
, bLast
);
2936 int mcursorSave(MultiCursor
*pCsr
){
2939 int iTree
= pCsr
->aTree
[1];
2940 if( iTree
==CURSOR_DATA_TREE0
|| iTree
==CURSOR_DATA_TREE1
){
2941 multiCursorCacheKey(pCsr
, &rc
);
2944 mcursorFreeComponents(pCsr
);
2948 int mcursorRestore(lsm_db
*pDb
, MultiCursor
*pCsr
){
2950 rc
= multiCursorInit(pCsr
, pDb
->pClient
);
2951 if( rc
==LSM_OK
&& pCsr
->key
.pData
){
2952 rc
= lsmMCursorSeek(pCsr
,
2953 rtTopic(pCsr
->eType
), pCsr
->key
.pData
, pCsr
->key
.nData
, +1
2959 int lsmSaveCursors(lsm_db
*pDb
){
2963 for(pCsr
=pDb
->pCsr
; rc
==LSM_OK
&& pCsr
; pCsr
=pCsr
->pNext
){
2964 rc
= mcursorSave(pCsr
);
2969 int lsmRestoreCursors(lsm_db
*pDb
){
2973 for(pCsr
=pDb
->pCsr
; rc
==LSM_OK
&& pCsr
; pCsr
=pCsr
->pNext
){
2974 rc
= mcursorRestore(pDb
, pCsr
);
2979 int lsmMCursorFirst(MultiCursor
*pCsr
){
2980 return multiCursorEnd(pCsr
, 0);
2983 int lsmMCursorLast(MultiCursor
*pCsr
){
2984 return multiCursorEnd(pCsr
, 1);
2987 lsm_db
*lsmMCursorDb(MultiCursor
*pCsr
){
2991 void lsmMCursorReset(MultiCursor
*pCsr
){
2993 lsmTreeCursorReset(pCsr
->apTreeCsr
[0]);
2994 lsmTreeCursorReset(pCsr
->apTreeCsr
[1]);
2995 for(i
=0; i
<pCsr
->nPtr
; i
++){
2996 segmentPtrReset(&pCsr
->aPtr
[i
], LSM_SEGMENTPTR_FREE_THRESHOLD
);
2998 pCsr
->key
.nData
= 0;
3001 static int treeCursorSeek(
3003 TreeCursor
*pTreeCsr
,
3004 void *pKey
, int nKey
,
3011 lsmTreeCursorSeek(pTreeCsr
, pKey
, nKey
, &res
);
3014 int eType
= lsmTreeCursorFlags(pTreeCsr
);
3015 if( (res
<0 && (eType
& LSM_START_DELETE
))
3016 || (res
>0 && (eType
& LSM_END_DELETE
))
3017 || (res
==0 && (eType
& LSM_POINT_DELETE
))
3020 }else if( res
==0 && (eType
& LSM_INSERT
) ){
3021 lsm_env
*pEnv
= pCsr
->pDb
->pEnv
;
3022 void *p
; int n
; /* Key/value from tree-cursor */
3024 pCsr
->flags
|= CURSOR_SEEK_EQ
;
3025 rc
= lsmTreeCursorKey(pTreeCsr
, &pCsr
->eType
, &p
, &n
);
3026 if( rc
==LSM_OK
) rc
= sortedBlobSet(pEnv
, &pCsr
->key
, p
, n
);
3027 if( rc
==LSM_OK
) rc
= lsmTreeCursorValue(pTreeCsr
, &p
, &n
);
3028 if( rc
==LSM_OK
) rc
= sortedBlobSet(pEnv
, &pCsr
->val
, p
, n
);
3030 lsmTreeCursorReset(pTreeCsr
);
3034 if( res
<0 && lsmTreeCursorValid(pTreeCsr
) ){
3035 lsmTreeCursorNext(pTreeCsr
);
3040 assert( lsmTreeCursorValid(pTreeCsr
) );
3041 lsmTreeCursorPrev(pTreeCsr
);
3056 void *pKey
, int nKey
,
3059 int eESeek
= eSeek
; /* Effective eSeek parameter */
3060 int bStop
= 0; /* Set to true to halt search operation */
3061 int rc
= LSM_OK
; /* Return code */
3062 int iPtr
= 0; /* Used to iterate through pCsr->aPtr[] */
3063 LsmPgno iPgno
= 0; /* FC pointer value */
3065 assert( pCsr
->apTreeCsr
[0]==0 || iTopic
==0 );
3066 assert( pCsr
->apTreeCsr
[1]==0 || iTopic
==0 );
3068 if( eESeek
==LSM_SEEK_LEFAST
) eESeek
= LSM_SEEK_LE
;
3070 assert( eESeek
==LSM_SEEK_EQ
|| eESeek
==LSM_SEEK_LE
|| eESeek
==LSM_SEEK_GE
);
3071 assert( (pCsr
->flags
& CURSOR_FLUSH_FREELIST
)==0 );
3072 assert( pCsr
->nPtr
==0 || pCsr
->aPtr
[0].pLevel
);
3074 pCsr
->flags
&= ~(CURSOR_NEXT_OK
| CURSOR_PREV_OK
| CURSOR_SEEK_EQ
);
3075 rc
= treeCursorSeek(pCsr
, pCsr
->apTreeCsr
[0], pKey
, nKey
, eESeek
, &bStop
);
3076 if( rc
==LSM_OK
&& bStop
==0 ){
3077 rc
= treeCursorSeek(pCsr
, pCsr
->apTreeCsr
[1], pKey
, nKey
, eESeek
, &bStop
);
3080 /* Seek all segment pointers. */
3081 for(iPtr
=0; iPtr
<pCsr
->nPtr
&& rc
==LSM_OK
&& bStop
==0; iPtr
++){
3082 SegmentPtr
*pPtr
= &pCsr
->aPtr
[iPtr
];
3083 assert( pPtr
->pSeg
==&pPtr
->pLevel
->lhs
);
3084 rc
= seekInLevel(pCsr
, pPtr
, eESeek
, iTopic
, pKey
, nKey
, &iPgno
, &bStop
);
3085 iPtr
+= pPtr
->pLevel
->nRight
;
3088 if( eSeek
!=LSM_SEEK_EQ
){
3090 rc
= multiCursorAllocTree(pCsr
);
3094 for(i
=pCsr
->nTree
-1; i
>0; i
--){
3095 multiCursorDoCompare(pCsr
, i
, eESeek
==LSM_SEEK_LE
);
3097 if( eSeek
==LSM_SEEK_GE
) pCsr
->flags
|= CURSOR_NEXT_OK
;
3098 if( eSeek
==LSM_SEEK_LE
) pCsr
->flags
|= CURSOR_PREV_OK
;
3101 multiCursorCacheKey(pCsr
, &rc
);
3102 if( rc
==LSM_OK
&& eSeek
!=LSM_SEEK_LEFAST
&& 0==mcursorLocationOk(pCsr
, 0) ){
3105 lsmMCursorReset(pCsr
);
3108 rc
= lsmMCursorNext(pCsr
);
3111 rc
= lsmMCursorPrev(pCsr
);
3120 int lsmMCursorValid(MultiCursor
*pCsr
){
3122 if( pCsr
->flags
& CURSOR_SEEK_EQ
){
3124 }else if( pCsr
->aTree
){
3125 int iKey
= pCsr
->aTree
[1];
3126 if( iKey
==CURSOR_DATA_TREE0
|| iKey
==CURSOR_DATA_TREE1
){
3127 res
= lsmTreeCursorValid(pCsr
->apTreeCsr
[iKey
-CURSOR_DATA_TREE0
]);
3130 multiCursorGetKey(pCsr
, iKey
, 0, &pKey
, 0);
3137 static int mcursorAdvanceOk(
3142 void *pNew
; /* Pointer to buffer containing new key */
3143 int nNew
; /* Size of buffer pNew in bytes */
3144 int eNewType
; /* Type of new record */
3146 if( *pRc
) return 1;
3148 /* Check the current key value. If it is not greater than (if bReverse==0)
3149 ** or less than (if bReverse!=0) the key currently cached in pCsr->key,
3150 ** then the cursor has not yet been successfully advanced.
3152 multiCursorGetKey(pCsr
, pCsr
->aTree
[1], &eNewType
, &pNew
, &nNew
);
3154 int typemask
= (pCsr
->flags
& CURSOR_IGNORE_DELETE
) ? ~(0) : LSM_SYSTEMKEY
;
3155 int res
= sortedDbKeyCompare(pCsr
,
3156 eNewType
& typemask
, pNew
, nNew
,
3157 pCsr
->eType
& typemask
, pCsr
->key
.pData
, pCsr
->key
.nData
3160 if( (bReverse
==0 && res
<=0) || (bReverse
!=0 && res
>=0) ){
3164 multiCursorCacheKey(pCsr
, pRc
);
3165 assert( pCsr
->eType
==eNewType
);
3167 /* If this cursor is configured to skip deleted keys, and the current
3168 ** cursor points to a SORTED_DELETE entry, then the cursor has not been
3169 ** successfully advanced.
3171 ** Similarly, if the cursor is configured to skip system keys and the
3172 ** current cursor points to a system key, it has not yet been advanced.
3174 if( *pRc
==LSM_OK
&& 0==mcursorLocationOk(pCsr
, 0) ) return 0;
3179 static void flCsrAdvance(MultiCursor
*pCsr
){
3180 assert( pCsr
->flags
& CURSOR_FLUSH_FREELIST
);
3181 if( pCsr
->iFree
% 2 ){
3184 int nEntry
= pCsr
->pDb
->pWorker
->freelist
.nEntry
;
3185 FreelistEntry
*aEntry
= pCsr
->pDb
->pWorker
->freelist
.aEntry
;
3187 int i
= nEntry
- 1 - (pCsr
->iFree
/ 2);
3189 /* If the current entry is a delete and the "end-delete" key will not
3190 ** be attached to the next entry, increment iFree by 1 only. */
3191 if( aEntry
[i
].iId
<0 ){
3193 if( i
==0 || aEntry
[i
-1].iBlk
!=aEntry
[i
].iBlk
-1 ){
3197 if( aEntry
[i
-1].iId
>=0 ) break;
3206 static int multiCursorAdvance(MultiCursor
*pCsr
, int bReverse
){
3207 int rc
= LSM_OK
; /* Return Code */
3208 if( lsmMCursorValid(pCsr
) ){
3210 int iKey
= pCsr
->aTree
[1];
3212 assertCursorTree(pCsr
);
3214 /* If this multi-cursor is advancing forwards, and the sub-cursor
3215 ** being advanced is the one that separator keys may be being read
3216 ** from, record the current absolute pointer value. */
3217 if( pCsr
->pPrevMergePtr
){
3218 if( iKey
==(CURSOR_DATA_SEGMENT
+pCsr
->nPtr
) ){
3219 assert( pCsr
->pBtCsr
);
3220 *pCsr
->pPrevMergePtr
= pCsr
->pBtCsr
->iPtr
;
3221 }else if( pCsr
->pBtCsr
==0 && pCsr
->nPtr
>0
3222 && iKey
==(CURSOR_DATA_SEGMENT
+pCsr
->nPtr
-1)
3224 SegmentPtr
*pPtr
= &pCsr
->aPtr
[iKey
-CURSOR_DATA_SEGMENT
];
3225 *pCsr
->pPrevMergePtr
= pPtr
->iPtr
+pPtr
->iPgPtr
;
3229 if( iKey
==CURSOR_DATA_TREE0
|| iKey
==CURSOR_DATA_TREE1
){
3230 TreeCursor
*pTreeCsr
= pCsr
->apTreeCsr
[iKey
-CURSOR_DATA_TREE0
];
3232 rc
= lsmTreeCursorPrev(pTreeCsr
);
3234 rc
= lsmTreeCursorNext(pTreeCsr
);
3236 }else if( iKey
==CURSOR_DATA_SYSTEM
){
3237 assert( pCsr
->flags
& CURSOR_FLUSH_FREELIST
);
3238 assert( bReverse
==0 );
3240 }else if( iKey
==(CURSOR_DATA_SEGMENT
+pCsr
->nPtr
) ){
3241 assert( bReverse
==0 && pCsr
->pBtCsr
);
3242 rc
= btreeCursorNext(pCsr
->pBtCsr
);
3244 rc
= segmentCursorAdvance(pCsr
, iKey
-CURSOR_DATA_SEGMENT
, bReverse
);
3248 for(i
=(iKey
+pCsr
->nTree
)/2; i
>0; i
=i
/2){
3249 multiCursorDoCompare(pCsr
, i
, bReverse
);
3251 assertCursorTree(pCsr
);
3253 }while( mcursorAdvanceOk(pCsr
, bReverse
, &rc
)==0 );
3258 int lsmMCursorNext(MultiCursor
*pCsr
){
3259 if( (pCsr
->flags
& CURSOR_NEXT_OK
)==0 ) return LSM_MISUSE_BKPT
;
3260 return multiCursorAdvance(pCsr
, 0);
3263 int lsmMCursorPrev(MultiCursor
*pCsr
){
3264 if( (pCsr
->flags
& CURSOR_PREV_OK
)==0 ) return LSM_MISUSE_BKPT
;
3265 return multiCursorAdvance(pCsr
, 1);
3268 int lsmMCursorKey(MultiCursor
*pCsr
, void **ppKey
, int *pnKey
){
3269 if( (pCsr
->flags
& CURSOR_SEEK_EQ
) || pCsr
->aTree
==0 ){
3270 *pnKey
= pCsr
->key
.nData
;
3271 *ppKey
= pCsr
->key
.pData
;
3273 int iKey
= pCsr
->aTree
[1];
3275 if( iKey
==CURSOR_DATA_TREE0
|| iKey
==CURSOR_DATA_TREE1
){
3276 TreeCursor
*pTreeCsr
= pCsr
->apTreeCsr
[iKey
-CURSOR_DATA_TREE0
];
3277 lsmTreeCursorKey(pTreeCsr
, 0, ppKey
, pnKey
);
3284 multiCursorGetKey(pCsr
, iKey
, &eType
, &pKey
, &nKey
);
3285 assert( eType
==pCsr
->eType
);
3286 assert( nKey
==pCsr
->key
.nData
);
3287 assert( memcmp(pKey
, pCsr
->key
.pData
, nKey
)==0 );
3290 nKey
= pCsr
->key
.nData
;
3294 *ppKey
= pCsr
->key
.pData
;
3303 ** Compare the current key that cursor csr points to with pKey/nKey. Set
3304 ** *piRes to the result and return LSM_OK.
3306 int lsm_csr_cmp(lsm_cursor
*csr
, const void *pKey
, int nKey
, int *piRes
){
3307 MultiCursor
*pCsr
= (MultiCursor
*)csr
;
3308 void *pCsrkey
; int nCsrkey
;
3310 rc
= lsmMCursorKey(pCsr
, &pCsrkey
, &nCsrkey
);
3312 int (*xCmp
)(void *, int, void *, int) = pCsr
->pDb
->xCmp
;
3313 *piRes
= sortedKeyCompare(xCmp
, 0, pCsrkey
, nCsrkey
, 0, (void *)pKey
, nKey
);
3318 int lsmMCursorValue(MultiCursor
*pCsr
, void **ppVal
, int *pnVal
){
3322 if( (pCsr
->flags
& CURSOR_SEEK_EQ
) || pCsr
->aTree
==0 ){
3324 nVal
= pCsr
->val
.nData
;
3325 pVal
= pCsr
->val
.pData
;
3328 assert( pCsr
->aTree
);
3329 assert( mcursorLocationOk(pCsr
, (pCsr
->flags
& CURSOR_IGNORE_DELETE
)) );
3331 rc
= multiCursorGetVal(pCsr
, pCsr
->aTree
[1], &pVal
, &nVal
);
3332 if( pVal
&& rc
==LSM_OK
){
3333 rc
= sortedBlobSet(pCsr
->pDb
->pEnv
, &pCsr
->val
, pVal
, nVal
);
3334 pVal
= pCsr
->val
.pData
;
3347 int lsmMCursorType(MultiCursor
*pCsr
, int *peType
){
3348 assert( pCsr
->aTree
);
3349 multiCursorGetKey(pCsr
, pCsr
->aTree
[1], peType
, 0, 0);
3354 ** Buffer aData[], size nData, is assumed to contain a valid b-tree
3355 ** hierarchy page image. Return the offset in aData[] of the next free
3356 ** byte in the data area (where a new cell may be written if there is
3359 static int mergeWorkerPageOffset(u8
*aData
, int nData
){
3365 nRec
= lsmGetU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)]);
3366 iOff
= lsmGetU16(&aData
[SEGMENT_CELLPTR_OFFSET(nData
, nRec
-1)]);
3367 eType
= aData
[iOff
++];
3369 || eType
==(LSM_SYSTEMKEY
|LSM_SEPARATOR
)
3370 || eType
==(LSM_SEPARATOR
)
3373 iOff
+= lsmVarintGet32(&aData
[iOff
], &nKey
);
3374 iOff
+= lsmVarintGet32(&aData
[iOff
], &nKey
);
3376 return iOff
+ (eType
? nKey
: 0);
3380 ** Following a checkpoint operation, database pages that are part of the
3381 ** checkpointed state of the LSM are deemed read-only. This includes the
3382 ** right-most page of the b-tree hierarchy of any separators array under
3383 ** construction, and all pages between it and the b-tree root, inclusive.
3384 ** This is a problem, as when further pages are appended to the separators
3385 ** array, entries must be added to the indicated b-tree hierarchy pages.
3387 ** This function copies all such b-tree pages to new locations, so that
3388 ** they can be modified as required.
3390 ** The complication is that not all database pages are the same size - due
3391 ** to the way the file.c module works some (the first and last in each block)
3392 ** are 4 bytes smaller than the others.
3394 static int mergeWorkerMoveHierarchy(
3395 MergeWorker
*pMW
, /* Merge worker */
3396 int bSep
/* True for separators run */
3398 lsm_db
*pDb
= pMW
->pDb
; /* Database handle */
3399 int rc
= LSM_OK
; /* Return code */
3401 Page
**apHier
= pMW
->hier
.apHier
;
3402 int nHier
= pMW
->hier
.nHier
;
3404 for(i
=0; rc
==LSM_OK
&& i
<nHier
; i
++){
3406 rc
= lsmFsSortedAppend(pDb
->pFS
, pDb
->pWorker
, pMW
->pLevel
, 1, &pNew
);
3407 assert( rc
==LSM_OK
);
3413 a1
= fsPageData(pNew
, &n1
);
3414 a2
= fsPageData(apHier
[i
], &n2
);
3416 assert( n1
==n2
|| n1
+4==n2
);
3421 int nEntry
= pageGetNRec(a2
, n2
);
3422 int iEof1
= SEGMENT_EOF(n1
, nEntry
);
3423 int iEof2
= SEGMENT_EOF(n2
, nEntry
);
3425 memcpy(a1
, a2
, iEof2
- 4);
3426 memcpy(&a1
[iEof1
], &a2
[iEof2
], n2
- iEof2
);
3429 lsmFsPageRelease(apHier
[i
]);
3433 assert( n1
==n2
|| n1
+4==n2
|| n2
+4==n1
);
3435 /* If n1 (size of the new page) is equal to or greater than n2 (the
3436 ** size of the old page), then copy the data into the new page. If
3437 ** n1==n2, this could be done with a single memcpy(). However,
3438 ** since sometimes n1>n2, the page content and footer must be copied
3440 int nEntry
= pageGetNRec(a2
, n2
);
3441 int iEof1
= SEGMENT_EOF(n1
, nEntry
);
3442 int iEof2
= SEGMENT_EOF(n2
, nEntry
);
3443 memcpy(a1
, a2
, iEof2
);
3444 memcpy(&a1
[iEof1
], &a2
[iEof2
], n2
- iEof2
);
3445 lsmFsPageRelease(apHier
[i
]);
3448 lsmPutU16(&a1
[SEGMENT_FLAGS_OFFSET(n1
)], SEGMENT_BTREE_FLAG
);
3449 lsmPutU16(&a1
[SEGMENT_NRECORD_OFFSET(n1
)], 0);
3450 lsmPutU64(&a1
[SEGMENT_POINTER_OFFSET(n1
)], 0);
3452 lsmFsPageRelease(pNew
);
3460 for(i
=0; i
<nHier
; i
++) assert( lsmFsPageWritable(apHier
[i
]) );
3468 ** Allocate and populate the MergeWorker.apHier[] array.
3470 static int mergeWorkerLoadHierarchy(MergeWorker
*pMW
){
3475 pSeg
= &pMW
->pLevel
->lhs
;
3478 if( p
->apHier
==0 && pSeg
->iRoot
!=0 ){
3479 FileSystem
*pFS
= pMW
->pDb
->pFS
;
3480 lsm_env
*pEnv
= pMW
->pDb
->pEnv
;
3483 int iPg
= (int)pSeg
->iRoot
;
3491 rc
= lsmFsDbPageGet(pFS
, pSeg
, iPg
, &pPg
);
3492 if( rc
!=LSM_OK
) break;
3494 aData
= fsPageData(pPg
, &nData
);
3495 flags
= pageGetFlags(aData
, nData
);
3496 if( flags
&SEGMENT_BTREE_FLAG
){
3497 Page
**apNew
= (Page
**)lsmRealloc(
3498 pEnv
, apHier
, sizeof(Page
*)*(nHier
+1)
3501 rc
= LSM_NOMEM_BKPT
;
3505 memmove(&apHier
[1], &apHier
[0], sizeof(Page
*) * nHier
);
3509 iPg
= (int)pageGetPtr(aData
, nData
);
3511 lsmFsPageRelease(pPg
);
3519 aData
= fsPageData(apHier
[0], &nData
);
3520 pMW
->aSave
[0].iPgno
= pageGetPtr(aData
, nData
);
3523 rc
= mergeWorkerMoveHierarchy(pMW
, 0);
3526 for(i
=0; i
<nHier
; i
++){
3527 lsmFsPageRelease(apHier
[i
]);
3529 lsmFree(pEnv
, apHier
);
3537 ** B-tree pages use almost the same format as regular pages. The
3540 ** 1. The record format is (usually, see below) as follows:
3542 ** + Type byte (always SORTED_SEPARATOR or SORTED_SYSTEM_SEPARATOR),
3543 ** + Absolute pointer value (varint),
3544 ** + Number of bytes in key (varint),
3545 ** + LsmBlob containing key data.
3547 ** 2. All pointer values are stored as absolute values (not offsets
3548 ** relative to the footer pointer value).
3550 ** 3. Each pointer that is part of a record points to a page that
3551 ** contains keys smaller than the records key (note: not "equal to or
3552 ** smaller than - smaller than").
3554 ** 4. The pointer in the page footer of a b-tree page points to a page
3555 ** that contains keys equal to or larger than the largest key on the
3558 ** The reason for having the page footer pointer point to the right-child
3559 ** (instead of the left) is that doing things this way makes the
3560 ** mergeWorkerMoveHierarchy() operation less complicated (since the pointers
3561 ** that need to be updated are all stored as fixed-size integers within the
3562 ** page footer, not varints in page records).
3564 ** Records may not span b-tree pages. If this function is called to add a
3565 ** record larger than (page-size / 4) bytes, then a pointer to the indexed
3566 ** array page that contains the main record is added to the b-tree instead.
3567 ** In this case the record format is:
3569 ** + 0x00 byte (1 byte)
3570 ** + Absolute pointer value (varint),
3571 ** + Absolute page number of page containing key (varint).
3573 ** See function seekInBtree() for the code that traverses b-tree pages.
3576 static int mergeWorkerBtreeWrite(
3584 Hierarchy
*p
= &pMW
->hier
;
3585 lsm_db
*pDb
= pMW
->pDb
; /* Database handle */
3586 int rc
= LSM_OK
; /* Return Code */
3587 int iLevel
; /* Level of b-tree hierachy to write to */
3588 int nData
; /* Size of aData[] in bytes */
3589 u8
*aData
; /* Page data for level iLevel */
3590 int iOff
; /* Offset on b-tree page to write record to */
3591 int nRec
; /* Initial number of records on b-tree page */
3593 /* iKeyPg should be zero for an ordinary b-tree key, or non-zero for an
3594 ** indirect key. The flags byte for an indirect key is 0x00. */
3595 assert( (eType
==0)==(iKeyPg
!=0) );
3597 /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
3598 ** hierarchy, the root node, and all nodes that lie on the path between.
3599 ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
3602 ** This loop searches for a node with enough space to store the key on,
3603 ** starting with the leaf and iterating up towards the root. When the loop
3604 ** exits, the key may be written to apHier[iLevel]. */
3605 for(iLevel
=0; iLevel
<=p
->nHier
; iLevel
++){
3606 int nByte
; /* Number of free bytes required */
3608 if( iLevel
==p
->nHier
){
3609 /* Extend the array and allocate a new root page. */
3611 aNew
= (Page
**)lsmRealloc(
3612 pMW
->pDb
->pEnv
, p
->apHier
, sizeof(Page
*)*(p
->nHier
+1)
3615 return LSM_NOMEM_BKPT
;
3622 /* If the key will fit on this page, break out of the loop here.
3623 ** The new entry will be written to page apHier[iLevel]. */
3624 pOld
= p
->apHier
[iLevel
];
3625 assert( lsmFsPageWritable(pOld
) );
3626 aData
= fsPageData(pOld
, &nData
);
3628 nByte
= 2 + 1 + lsmVarintLen32((int)iPtr
) + lsmVarintLen32((int)iKeyPg
);
3630 nByte
= 2 + 1 + lsmVarintLen32((int)iPtr
) + lsmVarintLen32(nKey
) + nKey
;
3632 nRec
= pageGetNRec(aData
, nData
);
3633 nFree
= SEGMENT_EOF(nData
, nRec
) - mergeWorkerPageOffset(aData
, nData
);
3634 if( nByte
<=nFree
) break;
3636 /* Otherwise, this page is full. Set the right-hand-child pointer
3637 ** to iPtr and release it. */
3638 lsmPutU64(&aData
[SEGMENT_POINTER_OFFSET(nData
)], iPtr
);
3639 assert( lsmFsPageNumber(pOld
)==0 );
3640 rc
= lsmFsPagePersist(pOld
);
3642 iPtr
= lsmFsPageNumber(pOld
);
3643 lsmFsPageRelease(pOld
);
3647 /* Allocate a new page for apHier[iLevel]. */
3648 p
->apHier
[iLevel
] = 0;
3650 rc
= lsmFsSortedAppend(
3651 pDb
->pFS
, pDb
->pWorker
, pMW
->pLevel
, 1, &p
->apHier
[iLevel
]
3654 if( rc
!=LSM_OK
) return rc
;
3656 aData
= fsPageData(p
->apHier
[iLevel
], &nData
);
3657 memset(aData
, 0, nData
);
3658 lsmPutU16(&aData
[SEGMENT_FLAGS_OFFSET(nData
)], SEGMENT_BTREE_FLAG
);
3659 lsmPutU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)], 0);
3661 if( iLevel
==p
->nHier
){
3667 /* Write the key into page apHier[iLevel]. */
3668 aData
= fsPageData(p
->apHier
[iLevel
], &nData
);
3669 iOff
= mergeWorkerPageOffset(aData
, nData
);
3670 nRec
= pageGetNRec(aData
, nData
);
3671 lsmPutU16(&aData
[SEGMENT_CELLPTR_OFFSET(nData
, nRec
)], (u16
)iOff
);
3672 lsmPutU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)], (u16
)(nRec
+1));
3674 aData
[iOff
++] = 0x00;
3675 iOff
+= lsmVarintPut32(&aData
[iOff
], (int)iPtr
);
3676 iOff
+= lsmVarintPut32(&aData
[iOff
], (int)iKeyPg
);
3678 aData
[iOff
++] = eType
;
3679 iOff
+= lsmVarintPut32(&aData
[iOff
], (int)iPtr
);
3680 iOff
+= lsmVarintPut32(&aData
[iOff
], nKey
);
3681 memcpy(&aData
[iOff
], pKey
, nKey
);
3687 static int mergeWorkerBtreeIndirect(MergeWorker
*pMW
){
3689 if( pMW
->iIndirect
){
3690 LsmPgno iKeyPg
= pMW
->aSave
[1].iPgno
;
3691 rc
= mergeWorkerBtreeWrite(pMW
, 0, pMW
->iIndirect
, iKeyPg
, 0, 0);
3698 ** Append the database key (iTopic/pKey/nKey) to the b-tree under
3699 ** construction. This key has not yet been written to a segment page.
3700 ** The pointer that will accompany the new key in the b-tree - that
3701 ** points to the completed segment page that contains keys smaller than
3702 ** (pKey/nKey) is currently stored in pMW->aSave[0].iPgno.
3704 static int mergeWorkerPushHierarchy(
3705 MergeWorker
*pMW
, /* Merge worker object */
3706 int iTopic
, /* Topic value for this key */
3707 void *pKey
, /* Pointer to key buffer */
3708 int nKey
/* Size of pKey buffer in bytes */
3710 int rc
= LSM_OK
; /* Return Code */
3711 LsmPgno iPtr
; /* Pointer value to accompany pKey/nKey */
3713 assert( pMW
->aSave
[0].bStore
==0 );
3714 assert( pMW
->aSave
[1].bStore
==0 );
3715 rc
= mergeWorkerBtreeIndirect(pMW
);
3717 /* Obtain the absolute pointer value to store along with the key in the
3718 ** page body. This pointer points to a page that contains keys that are
3719 ** smaller than pKey/nKey. */
3720 iPtr
= pMW
->aSave
[0].iPgno
;
3723 /* Determine if the indirect format should be used. */
3724 if( (nKey
*4 > lsmFsPageSize(pMW
->pDb
->pFS
)) ){
3725 pMW
->iIndirect
= iPtr
;
3726 pMW
->aSave
[1].bStore
= 1;
3728 rc
= mergeWorkerBtreeWrite(
3729 pMW
, (u8
)(iTopic
| LSM_SEPARATOR
), iPtr
, 0, pKey
, nKey
3733 /* Ensure that the SortedRun.iRoot field is correct. */
3737 static int mergeWorkerFinishHierarchy(
3738 MergeWorker
*pMW
/* Merge worker object */
3740 int i
; /* Used to loop through apHier[] */
3741 int rc
= LSM_OK
; /* Return code */
3742 LsmPgno iPtr
; /* New right-hand-child pointer value */
3744 iPtr
= pMW
->aSave
[0].iPgno
;
3745 for(i
=0; i
<pMW
->hier
.nHier
&& rc
==LSM_OK
; i
++){
3746 Page
*pPg
= pMW
->hier
.apHier
[i
];
3747 int nData
; /* Size of aData[] in bytes */
3748 u8
*aData
; /* Page data for pPg */
3750 aData
= fsPageData(pPg
, &nData
);
3751 lsmPutU64(&aData
[SEGMENT_POINTER_OFFSET(nData
)], iPtr
);
3753 rc
= lsmFsPagePersist(pPg
);
3754 iPtr
= lsmFsPageNumber(pPg
);
3755 lsmFsPageRelease(pPg
);
3758 if( pMW
->hier
.nHier
){
3759 pMW
->pLevel
->lhs
.iRoot
= iPtr
;
3760 lsmFree(pMW
->pDb
->pEnv
, pMW
->hier
.apHier
);
3761 pMW
->hier
.apHier
= 0;
3762 pMW
->hier
.nHier
= 0;
3768 static int mergeWorkerAddPadding(
3769 MergeWorker
*pMW
/* Merge worker object */
3771 FileSystem
*pFS
= pMW
->pDb
->pFS
;
3772 return lsmFsSortedPadding(pFS
, pMW
->pDb
->pWorker
, &pMW
->pLevel
->lhs
);
3776 ** Release all page references currently held by the merge-worker passed
3777 ** as the only argument. Unless an error has occurred, all pages have
3778 ** already been released.
3780 static void mergeWorkerReleaseAll(MergeWorker
*pMW
){
3782 lsmFsPageRelease(pMW
->pPage
);
3785 for(i
=0; i
<pMW
->hier
.nHier
; i
++){
3786 lsmFsPageRelease(pMW
->hier
.apHier
[i
]);
3787 pMW
->hier
.apHier
[i
] = 0;
3789 lsmFree(pMW
->pDb
->pEnv
, pMW
->hier
.apHier
);
3790 pMW
->hier
.apHier
= 0;
3791 pMW
->hier
.nHier
= 0;
3794 static int keyszToSkip(FileSystem
*pFS
, int nKey
){
3795 int nPgsz
; /* Nominal database page size */
3796 nPgsz
= lsmFsPageSize(pFS
);
3797 return LSM_MIN(((nKey
* 4) / nPgsz
), 3);
3801 ** Release the reference to the current output page of merge-worker *pMW
3802 ** (reference pMW->pPage). Set the page number values in aSave[] as
3803 ** required (see comments above struct MergeWorker for details).
3805 static int mergeWorkerPersistAndRelease(MergeWorker
*pMW
){
3809 assert( pMW
->pPage
|| (pMW
->aSave
[0].bStore
==0 && pMW
->aSave
[1].bStore
==0) );
3811 /* Persist the page */
3812 rc
= lsmFsPagePersist(pMW
->pPage
);
3814 /* If required, save the page number. */
3816 if( pMW
->aSave
[i
].bStore
){
3817 pMW
->aSave
[i
].iPgno
= lsmFsPageNumber(pMW
->pPage
);
3818 pMW
->aSave
[i
].bStore
= 0;
3822 /* Release the completed output page. */
3823 lsmFsPageRelease(pMW
->pPage
);
3829 ** Advance to the next page of an output run being populated by merge-worker
3830 ** pMW. The footer of the new page is initialized to indicate that it contains
3831 ** zero records. The flags field is cleared. The page footer pointer field
3834 ** If successful, LSM_OK is returned. Otherwise, an error code.
3836 static int mergeWorkerNextPage(
3837 MergeWorker
*pMW
, /* Merge worker object to append page to */
3838 LsmPgno iFPtr
/* Pointer value for footer of new page */
3840 int rc
= LSM_OK
; /* Return code */
3841 Page
*pNext
= 0; /* New page appended to run */
3842 lsm_db
*pDb
= pMW
->pDb
; /* Database handle */
3844 rc
= lsmFsSortedAppend(pDb
->pFS
, pDb
->pWorker
, pMW
->pLevel
, 0, &pNext
);
3845 assert( rc
|| pMW
->pLevel
->lhs
.iFirst
>0 || pMW
->pDb
->compress
.xCompress
);
3848 u8
*aData
; /* Data buffer belonging to page pNext */
3849 int nData
; /* Size of aData[] in bytes */
3851 rc
= mergeWorkerPersistAndRelease(pMW
);
3854 pMW
->pLevel
->pMerge
->iOutputOff
= 0;
3855 aData
= fsPageData(pNext
, &nData
);
3856 lsmPutU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)], 0);
3857 lsmPutU16(&aData
[SEGMENT_FLAGS_OFFSET(nData
)], 0);
3858 lsmPutU64(&aData
[SEGMENT_POINTER_OFFSET(nData
)], iFPtr
);
3866 ** Write a blob of data into an output segment being populated by a
3867 ** merge-worker object. If argument bSep is true, write into the separators
3868 ** array. Otherwise, the main array.
3870 ** This function is used to write the blobs of data for keys and values.
3872 static int mergeWorkerData(
3873 MergeWorker
*pMW
, /* Merge worker object */
3874 int bSep
, /* True to write to separators run */
3875 int iFPtr
, /* Footer ptr for new pages */
3876 u8
*aWrite
, /* Write data from this buffer */
3877 int nWrite
/* Size of aWrite[] in bytes */
3879 int rc
= LSM_OK
; /* Return code */
3880 int nRem
= nWrite
; /* Number of bytes still to write */
3882 while( rc
==LSM_OK
&& nRem
>0 ){
3883 Merge
*pMerge
= pMW
->pLevel
->pMerge
;
3884 int nCopy
; /* Number of bytes to copy */
3885 u8
*aData
; /* Pointer to buffer of current output page */
3886 int nData
; /* Size of aData[] in bytes */
3887 int nRec
; /* Number of records on current output page */
3888 int iOff
; /* Offset in aData[] to write to */
3890 assert( lsmFsPageWritable(pMW
->pPage
) );
3892 aData
= fsPageData(pMW
->pPage
, &nData
);
3893 nRec
= pageGetNRec(aData
, nData
);
3894 iOff
= pMerge
->iOutputOff
;
3895 nCopy
= LSM_MIN(nRem
, SEGMENT_EOF(nData
, nRec
) - iOff
);
3897 memcpy(&aData
[iOff
], &aWrite
[nWrite
-nRem
], nCopy
);
3901 rc
= mergeWorkerNextPage(pMW
, iFPtr
);
3903 pMerge
->iOutputOff
= iOff
+ nCopy
;
3912 ** The MergeWorker passed as the only argument is working to merge two or
3913 ** more existing segments together (not to flush an in-memory tree). It
3914 ** has not yet written the first key to the first page of the output.
3916 static int mergeWorkerFirstPage(MergeWorker
*pMW
){
3917 int rc
= LSM_OK
; /* Return code */
3918 Page
*pPg
= 0; /* First page of run pSeg */
3919 int iFPtr
= 0; /* Pointer value read from footer of pPg */
3920 MultiCursor
*pCsr
= pMW
->pCsr
;
3922 assert( pMW
->pPage
==0 );
3926 iFPtr
= (int)pMW
->pLevel
->pNext
->lhs
.iFirst
;
3927 }else if( pCsr
->nPtr
>0 ){
3929 pSeg
= pCsr
->aPtr
[pCsr
->nPtr
-1].pSeg
;
3930 rc
= lsmFsDbPageGet(pMW
->pDb
->pFS
, pSeg
, pSeg
->iFirst
, &pPg
);
3932 u8
*aData
; /* Buffer for page pPg */
3933 int nData
; /* Size of aData[] in bytes */
3934 aData
= fsPageData(pPg
, &nData
);
3935 iFPtr
= (int)pageGetPtr(aData
, nData
);
3936 lsmFsPageRelease(pPg
);
3941 rc
= mergeWorkerNextPage(pMW
, iFPtr
);
3942 if( pCsr
->pPrevMergePtr
) *pCsr
->pPrevMergePtr
= iFPtr
;
3943 pMW
->aSave
[0].bStore
= 1;
3949 static int mergeWorkerWrite(
3950 MergeWorker
*pMW
, /* Merge worker object to write into */
3951 int eType
, /* One of SORTED_SEPARATOR, WRITE or DELETE */
3952 void *pKey
, int nKey
, /* Key value */
3953 void *pVal
, int nVal
, /* Value value */
3954 int iPtr
/* Absolute value of page pointer, or 0 */
3956 int rc
= LSM_OK
; /* Return code */
3957 Merge
*pMerge
; /* Persistent part of level merge state */
3958 int nHdr
; /* Space required for this record header */
3959 Page
*pPg
; /* Page to write to */
3960 u8
*aData
; /* Data buffer for page pWriter->pPage */
3961 int nData
= 0; /* Size of buffer aData[] in bytes */
3962 int nRec
= 0; /* Number of records on page pPg */
3963 int iFPtr
= 0; /* Value of pointer in footer of pPg */
3964 int iRPtr
= 0; /* Value of pointer written into record */
3965 int iOff
= 0; /* Current write offset within page pPg */
3966 Segment
*pSeg
; /* Segment being written */
3967 int flags
= 0; /* If != 0, flags value for page footer */
3968 int bFirst
= 0; /* True for first key of output run */
3970 pMerge
= pMW
->pLevel
->pMerge
;
3971 pSeg
= &pMW
->pLevel
->lhs
;
3973 if( pSeg
->iFirst
==0 && pMW
->pPage
==0 ){
3974 rc
= mergeWorkerFirstPage(pMW
);
3979 aData
= fsPageData(pPg
, &nData
);
3980 nRec
= pageGetNRec(aData
, nData
);
3981 iFPtr
= (int)pageGetPtr(aData
, nData
);
3982 iRPtr
= iPtr
- iFPtr
;
3985 /* Figure out how much space is required by the new record. The space
3986 ** required is divided into two sections: the header and the body. The
3987 ** header consists of the intial varint fields. The body are the blobs
3988 ** of data that correspond to the key and value data. The entire header
3989 ** must be stored on the page. The body may overflow onto the next and
3990 ** subsequent pages.
3992 ** The header space is:
3994 ** 1) record type - 1 byte.
3995 ** 2) Page-pointer-offset - 1 varint
3996 ** 3) Key size - 1 varint
3997 ** 4) Value size - 1 varint (only if LSM_INSERT flag is set)
4000 nHdr
= 1 + lsmVarintLen32(iRPtr
) + lsmVarintLen32(nKey
);
4001 if( rtIsWrite(eType
) ) nHdr
+= lsmVarintLen32(nVal
);
4003 /* If the entire header will not fit on page pPg, or if page pPg is
4004 ** marked read-only, advance to the next page of the output run. */
4005 iOff
= pMerge
->iOutputOff
;
4006 if( iOff
<0 || pPg
==0 || iOff
+nHdr
> SEGMENT_EOF(nData
, nRec
+1) ){
4007 if( iOff
>=0 && pPg
){
4008 /* Zero any free space on the page */
4010 memset(&aData
[iOff
], 0, SEGMENT_EOF(nData
, nRec
)-iOff
);
4012 iFPtr
= (int)*pMW
->pCsr
->pPrevMergePtr
;
4013 iRPtr
= iPtr
- iFPtr
;
4016 rc
= mergeWorkerNextPage(pMW
, iFPtr
);
4021 /* If this record header will be the first on the page, and the page is
4022 ** not the very first in the entire run, add a copy of the key to the
4023 ** b-tree hierarchy.
4025 if( rc
==LSM_OK
&& nRec
==0 && bFirst
==0 ){
4026 assert( pMerge
->nSkip
>=0 );
4028 if( pMerge
->nSkip
==0 ){
4029 rc
= mergeWorkerPushHierarchy(pMW
, rtTopic(eType
), pKey
, nKey
);
4030 assert( pMW
->aSave
[0].bStore
==0 );
4031 pMW
->aSave
[0].bStore
= 1;
4032 pMerge
->nSkip
= keyszToSkip(pMW
->pDb
->pFS
, nKey
);
4035 flags
= PGFTR_SKIP_THIS_FLAG
;
4038 if( pMerge
->nSkip
) flags
|= PGFTR_SKIP_NEXT_FLAG
;
4041 /* Update the output segment */
4043 aData
= fsPageData(pPg
, &nData
);
4045 /* Update the page footer. */
4046 lsmPutU16(&aData
[SEGMENT_NRECORD_OFFSET(nData
)], (u16
)(nRec
+1));
4047 lsmPutU16(&aData
[SEGMENT_CELLPTR_OFFSET(nData
, nRec
)], (u16
)iOff
);
4048 if( flags
) lsmPutU16(&aData
[SEGMENT_FLAGS_OFFSET(nData
)], (u16
)flags
);
4050 /* Write the entry header into the current page. */
4051 aData
[iOff
++] = (u8
)eType
; /* 1 */
4052 iOff
+= lsmVarintPut32(&aData
[iOff
], iRPtr
); /* 2 */
4053 iOff
+= lsmVarintPut32(&aData
[iOff
], nKey
); /* 3 */
4054 if( rtIsWrite(eType
) ) iOff
+= lsmVarintPut32(&aData
[iOff
], nVal
); /* 4 */
4055 pMerge
->iOutputOff
= iOff
;
4057 /* Write the key and data into the segment. */
4058 assert( iFPtr
==pageGetPtr(aData
, nData
) );
4059 rc
= mergeWorkerData(pMW
, 0, iFPtr
+iRPtr
, pKey
, nKey
);
4060 if( rc
==LSM_OK
&& rtIsWrite(eType
) ){
4062 rc
= mergeWorkerData(pMW
, 0, iFPtr
+iRPtr
, pVal
, nVal
);
4072 ** Free all resources allocated by mergeWorkerInit().
4074 static void mergeWorkerShutdown(MergeWorker
*pMW
, int *pRc
){
4075 int i
; /* Iterator variable */
4077 MultiCursor
*pCsr
= pMW
->pCsr
;
4079 /* Unless the merge has finished, save the cursor position in the
4080 ** Merge.aInput[] array. See function mergeWorkerInit() for the
4081 ** code to restore a cursor position based on aInput[]. */
4082 if( rc
==LSM_OK
&& pCsr
){
4083 Merge
*pMerge
= pMW
->pLevel
->pMerge
;
4084 if( lsmMCursorValid(pCsr
) ){
4085 int bBtree
= (pCsr
->pBtCsr
!=0);
4088 /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */
4089 assert( pMerge
->nInput
==0 || pMW
->pLevel
->nRight
>0 );
4090 assert( pMerge
->nInput
==0 || pMerge
->nInput
==(pCsr
->nPtr
+bBtree
) );
4092 for(i
=0; i
<(pMerge
->nInput
-bBtree
); i
++){
4093 SegmentPtr
*pPtr
= &pCsr
->aPtr
[i
];
4095 pMerge
->aInput
[i
].iPg
= lsmFsPageNumber(pPtr
->pPg
);
4096 pMerge
->aInput
[i
].iCell
= pPtr
->iCell
;
4098 pMerge
->aInput
[i
].iPg
= 0;
4099 pMerge
->aInput
[i
].iCell
= 0;
4102 if( bBtree
&& pMerge
->nInput
){
4103 assert( i
==pCsr
->nPtr
);
4104 btreeCursorPosition(pCsr
->pBtCsr
, &pMerge
->aInput
[i
]);
4107 /* Store the location of the split-key */
4108 iPtr
= pCsr
->aTree
[1] - CURSOR_DATA_SEGMENT
;
4109 if( iPtr
<pCsr
->nPtr
){
4110 pMerge
->splitkey
= pMerge
->aInput
[iPtr
];
4112 btreeCursorSplitkey(pCsr
->pBtCsr
, &pMerge
->splitkey
);
4116 /* Zero any free space left on the final page. This helps with
4117 ** compression if using a compression hook. And prevents valgrind
4118 ** from complaining about uninitialized byte passed to write(). */
4121 u8
*aData
= fsPageData(pMW
->pPage
, &nData
);
4122 int iOff
= pMerge
->iOutputOff
;
4123 int iEof
= SEGMENT_EOF(nData
, pageGetNRec(aData
, nData
));
4124 memset(&aData
[iOff
], 0, iEof
- iOff
);
4127 pMerge
->iOutputOff
= -1;
4130 lsmMCursorClose(pCsr
, 0);
4132 /* Persist and release the output page. */
4133 if( rc
==LSM_OK
) rc
= mergeWorkerPersistAndRelease(pMW
);
4134 if( rc
==LSM_OK
) rc
= mergeWorkerBtreeIndirect(pMW
);
4135 if( rc
==LSM_OK
) rc
= mergeWorkerFinishHierarchy(pMW
);
4136 if( rc
==LSM_OK
) rc
= mergeWorkerAddPadding(pMW
);
4137 lsmFsFlushWaiting(pMW
->pDb
->pFS
, &rc
);
4138 mergeWorkerReleaseAll(pMW
);
4140 lsmFree(pMW
->pDb
->pEnv
, pMW
->aGobble
);
4148 ** The cursor passed as the first argument is being used as the input for
4149 ** a merge operation. When this function is called, *piFlags contains the
4150 ** database entry flags for the current entry. The entry about to be written
4153 ** Note that this function only has to work for cursors configured to
4154 ** iterate forwards (not backwards).
4156 static void mergeRangeDeletes(MultiCursor
*pCsr
, int *piVal
, int *piFlags
){
4158 int iKey
= pCsr
->aTree
[1];
4161 assert( pCsr
->flags
& CURSOR_NEXT_OK
);
4162 if( pCsr
->flags
& CURSOR_IGNORE_DELETE
){
4163 /* The ignore-delete flag is set when the output of the merge will form
4164 ** the oldest level in the database. In this case there is no point in
4165 ** retaining any range-delete flags. */
4166 assert( (f
& LSM_POINT_DELETE
)==0 );
4167 f
&= ~(LSM_START_DELETE
|LSM_END_DELETE
);
4169 for(i
=0; i
<(CURSOR_DATA_SEGMENT
+ pCsr
->nPtr
); i
++){
4175 multiCursorGetKey(pCsr
, i
, &eType
, &pKey
, &nKey
);
4178 res
= sortedKeyCompare(pCsr
->pDb
->xCmp
,
4179 rtTopic(pCsr
->eType
), pCsr
->key
.pData
, pCsr
->key
.nData
,
4180 rtTopic(eType
), pKey
, nKey
4184 if( (f
& (LSM_INSERT
|LSM_POINT_DELETE
))==0 ){
4185 if( eType
& LSM_INSERT
){
4189 else if( eType
& LSM_POINT_DELETE
){
4190 f
|= LSM_POINT_DELETE
;
4193 f
|= (eType
& (LSM_END_DELETE
|LSM_START_DELETE
));
4196 if( i
>iKey
&& (eType
& LSM_END_DELETE
) && res
<0 ){
4197 if( f
& (LSM_INSERT
|LSM_POINT_DELETE
) ){
4198 f
|= (LSM_END_DELETE
|LSM_START_DELETE
);
4208 assert( (f
& LSM_INSERT
)==0 || (f
& LSM_POINT_DELETE
)==0 );
4209 if( (f
& LSM_START_DELETE
)
4210 && (f
& LSM_END_DELETE
)
4211 && (f
& LSM_POINT_DELETE
)
4220 static int mergeWorkerStep(MergeWorker
*pMW
){
4221 lsm_db
*pDb
= pMW
->pDb
; /* Database handle */
4222 MultiCursor
*pCsr
; /* Cursor to read input data from */
4223 int rc
= LSM_OK
; /* Return code */
4224 int eType
; /* SORTED_SEPARATOR, WRITE or DELETE */
4225 void *pKey
; int nKey
; /* Key */
4231 /* Pull the next record out of the source cursor. */
4232 lsmMCursorKey(pCsr
, &pKey
, &nKey
);
4233 eType
= pCsr
->eType
;
4235 /* Figure out if the output record may have a different pointer value
4236 ** than the previous. This is the case if the current key is identical to
4237 ** a key that appears in the lowest level run being merged. If so, set
4238 ** iPtr to the absolute pointer value. If not, leave iPtr set to zero,
4239 ** indicating that the output pointer value should be a copy of the pointer
4240 ** value written with the previous key. */
4241 iPtr
= (pCsr
->pPrevMergePtr
? *pCsr
->pPrevMergePtr
: 0);
4243 BtreeCursor
*pBtCsr
= pCsr
->pBtCsr
;
4245 int res
= rtTopic(pBtCsr
->eType
) - rtTopic(eType
);
4246 if( res
==0 ) res
= pDb
->xCmp(pBtCsr
->pKey
, pBtCsr
->nKey
, pKey
, nKey
);
4247 if( 0==res
) iPtr
= pBtCsr
->iPtr
;
4250 }else if( pCsr
->nPtr
){
4251 SegmentPtr
*pPtr
= &pCsr
->aPtr
[pCsr
->nPtr
-1];
4253 && 0==pDb
->xCmp(pPtr
->pKey
, pPtr
->nKey
, pKey
, nKey
)
4255 iPtr
= pPtr
->iPtr
+pPtr
->iPgPtr
;
4259 iVal
= pCsr
->aTree
[1];
4260 mergeRangeDeletes(pCsr
, &iVal
, &eType
);
4264 int iGobble
= pCsr
->aTree
[1] - CURSOR_DATA_SEGMENT
;
4265 if( iGobble
<pCsr
->nPtr
&& iGobble
>=0 ){
4266 SegmentPtr
*pGobble
= &pCsr
->aPtr
[iGobble
];
4267 if( (pGobble
->flags
& PGFTR_SKIP_THIS_FLAG
)==0 ){
4268 pMW
->aGobble
[iGobble
] = lsmFsPageNumber(pGobble
->pPg
);
4273 /* If this is a separator key and we know that the output pointer has not
4274 ** changed, there is no point in writing an output record. Otherwise,
4276 if( rc
==LSM_OK
&& (rtIsSeparator(eType
)==0 || iPtr
!=0) ){
4277 /* Write the record into the main run. */
4278 void *pVal
; int nVal
;
4279 rc
= multiCursorGetVal(pCsr
, iVal
, &pVal
, &nVal
);
4280 if( pVal
&& rc
==LSM_OK
){
4282 rc
= sortedBlobSet(pDb
->pEnv
, &pCsr
->val
, pVal
, nVal
);
4283 pVal
= pCsr
->val
.pData
;
4286 rc
= mergeWorkerWrite(pMW
, eType
, pKey
, nKey
, pVal
, nVal
, (int)iPtr
);
4291 /* Advance the cursor to the next input record (assuming one exists). */
4292 assert( lsmMCursorValid(pMW
->pCsr
) );
4293 if( rc
==LSM_OK
) rc
= lsmMCursorNext(pMW
->pCsr
);
4298 static int mergeWorkerDone(MergeWorker
*pMW
){
4299 return pMW
->pCsr
==0 || !lsmMCursorValid(pMW
->pCsr
);
4302 static void sortedFreeLevel(lsm_env
*pEnv
, Level
*p
){
4304 lsmFree(pEnv
, p
->pSplitKey
);
4305 lsmFree(pEnv
, p
->pMerge
);
4306 lsmFree(pEnv
, p
->aRhs
);
4311 static void sortedInvokeWorkHook(lsm_db
*pDb
){
4313 pDb
->xWork(pDb
, pDb
->pWorkCtx
);
4317 static int sortedNewToplevel(
4318 lsm_db
*pDb
, /* Connection handle */
4319 int eTree
, /* One of the TREE_XXX constants */
4320 int *pnWrite
/* OUT: Number of database pages written */
4322 int rc
= LSM_OK
; /* Return Code */
4323 MultiCursor
*pCsr
= 0;
4324 Level
*pNext
= 0; /* The current top level */
4325 Level
*pNew
; /* The new level itself */
4326 Segment
*pLinked
= 0; /* Delete separators from this segment */
4327 Level
*pDel
= 0; /* Delete this entire level */
4328 int nWrite
= 0; /* Number of database pages written */
4331 if( eTree
!=TREE_NONE
){
4332 rc
= lsmShmCacheChunks(pDb
, pDb
->treehdr
.nChunk
);
4335 assert( pDb
->bUseFreelist
==0 );
4336 pDb
->pFreelist
= &freelist
;
4337 pDb
->bUseFreelist
= 1;
4338 memset(&freelist
, 0, sizeof(freelist
));
4340 /* Allocate the new level structure to write to. */
4341 pNext
= lsmDbSnapshotLevel(pDb
->pWorker
);
4342 pNew
= (Level
*)lsmMallocZeroRc(pDb
->pEnv
, sizeof(Level
), &rc
);
4344 pNew
->pNext
= pNext
;
4345 lsmDbSnapshotSetLevel(pDb
->pWorker
, pNew
);
4348 /* Create a cursor to gather the data required by the new segment. The new
4349 ** segment contains everything in the tree and pointers to the next segment
4350 ** in the database (if any). */
4351 pCsr
= multiCursorNew(pDb
, &rc
);
4354 rc
= multiCursorVisitFreelist(pCsr
);
4356 rc
= multiCursorAddTree(pCsr
, pDb
->pWorker
, eTree
);
4358 if( rc
==LSM_OK
&& pNext
&& pNext
->pMerge
==0 ){
4359 if( (pNext
->flags
& LEVEL_FREELIST_ONLY
) ){
4361 pCsr
->aPtr
= lsmMallocZeroRc(pDb
->pEnv
, sizeof(SegmentPtr
), &rc
);
4362 multiCursorAddOne(pCsr
, pNext
, &rc
);
4363 }else if( eTree
!=TREE_NONE
&& pNext
->lhs
.iRoot
){
4364 pLinked
= &pNext
->lhs
;
4365 rc
= btreeCursorNew(pDb
, pLinked
, &pCsr
->pBtCsr
);
4369 /* If this will be the only segment in the database, discard any delete
4370 ** markers present in the in-memory tree. */
4372 multiCursorIgnoreDelete(pCsr
);
4377 lsmMCursorClose(pCsr
, 0);
4379 LsmPgno iLeftPtr
= 0;
4380 Merge merge
; /* Merge object used to create new level */
4381 MergeWorker mergeworker
; /* MergeWorker object for the same purpose */
4383 memset(&merge
, 0, sizeof(Merge
));
4384 memset(&mergeworker
, 0, sizeof(MergeWorker
));
4386 pNew
->pMerge
= &merge
;
4387 pNew
->flags
|= LEVEL_INCOMPLETE
;
4388 mergeworker
.pDb
= pDb
;
4389 mergeworker
.pLevel
= pNew
;
4390 mergeworker
.pCsr
= pCsr
;
4391 pCsr
->pPrevMergePtr
= &iLeftPtr
;
4393 /* Mark the separators array for the new level as a "phantom". */
4394 mergeworker
.bFlush
= 1;
4396 /* Do the work to create the new merged segment on disk */
4397 if( rc
==LSM_OK
) rc
= lsmMCursorFirst(pCsr
);
4398 while( rc
==LSM_OK
&& mergeWorkerDone(&mergeworker
)==0 ){
4399 rc
= mergeWorkerStep(&mergeworker
);
4401 mergeWorkerShutdown(&mergeworker
, &rc
);
4402 assert( rc
!=LSM_OK
|| mergeworker
.nWork
==0 || pNew
->lhs
.iFirst
);
4403 if( rc
==LSM_OK
&& pNew
->lhs
.iFirst
){
4404 rc
= lsmFsSortedFinish(pDb
->pFS
, &pNew
->lhs
);
4406 nWrite
= mergeworker
.nWork
;
4407 pNew
->flags
&= ~LEVEL_INCOMPLETE
;
4408 if( eTree
==TREE_NONE
){
4409 pNew
->flags
|= LEVEL_FREELIST_ONLY
;
4414 if( rc
!=LSM_OK
|| pNew
->lhs
.iFirst
==0 ){
4415 assert( rc
!=LSM_OK
|| pDb
->pWorker
->freelist
.nEntry
==0 );
4416 lsmDbSnapshotSetLevel(pDb
->pWorker
, pNext
);
4417 sortedFreeLevel(pDb
->pEnv
, pNew
);
4422 assert( pNew
->pNext
==pDel
);
4423 pNew
->pNext
= pDel
->pNext
;
4424 lsmFsSortedDelete(pDb
->pFS
, pDb
->pWorker
, 1, &pDel
->lhs
);
4425 sortedFreeLevel(pDb
->pEnv
, pDel
);
4428 #if LSM_LOG_STRUCTURE
4429 lsmSortedDumpStructure(pDb
, pDb
->pWorker
, LSM_LOG_DATA
, 0, "new-toplevel");
4432 if( freelist
.nEntry
){
4433 Freelist
*p
= &pDb
->pWorker
->freelist
;
4434 lsmFree(pDb
->pEnv
, p
->aEntry
);
4435 memcpy(p
, &freelist
, sizeof(freelist
));
4436 freelist
.aEntry
= 0;
4438 pDb
->pWorker
->freelist
.nEntry
= 0;
4441 assertBtreeOk(pDb
, &pNew
->lhs
);
4442 sortedInvokeWorkHook(pDb
);
4445 if( pnWrite
) *pnWrite
= nWrite
;
4446 pDb
->pWorker
->nWrite
+= nWrite
;
4448 pDb
->bUseFreelist
= 0;
4449 lsmFree(pDb
->pEnv
, freelist
.aEntry
);
4454 ** The nMerge levels in the LSM beginning with pLevel consist of a
4455 ** left-hand-side segment only. Replace these levels with a single new
4456 ** level consisting of a new empty segment on the left-hand-side and the
4457 ** nMerge segments from the replaced levels on the right-hand-side.
4459 ** Also, allocate and populate a Merge object and set Level.pMerge to
4462 static int sortedMergeSetup(
4463 lsm_db
*pDb
, /* Database handle */
4464 Level
*pLevel
, /* First level to merge */
4465 int nMerge
, /* Merge this many levels together */
4466 Level
**ppNew
/* New, merged, level */
4468 int rc
= LSM_OK
; /* Return Code */
4469 Level
*pNew
; /* New Level object */
4470 int bUseNext
= 0; /* True to link in next separators */
4471 Merge
*pMerge
; /* New Merge object */
4472 int nByte
; /* Bytes of space allocated at pMerge */
4477 for(iLevel
=0; iLevel
<nMerge
; iLevel
++){
4478 assert( pX
->nRight
==0 );
4483 /* Allocate the new Level object */
4484 pNew
= (Level
*)lsmMallocZeroRc(pDb
->pEnv
, sizeof(Level
), &rc
);
4486 pNew
->aRhs
= (Segment
*)lsmMallocZeroRc(pDb
->pEnv
,
4487 nMerge
* sizeof(Segment
), &rc
);
4490 /* Populate the new Level object */
4492 Level
*pNext
= 0; /* Level following pNew */
4498 pNew
->nRight
= nMerge
;
4499 pNew
->iAge
= pLevel
->iAge
+1;
4500 for(i
=0; i
<nMerge
; i
++){
4501 assert( p
->nRight
==0 );
4503 pNew
->aRhs
[i
] = p
->lhs
;
4504 if( (p
->flags
& LEVEL_FREELIST_ONLY
)==0 ) bFreeOnly
= 0;
4505 sortedFreeLevel(pDb
->pEnv
, p
);
4509 if( bFreeOnly
) pNew
->flags
|= LEVEL_FREELIST_ONLY
;
4511 /* Replace the old levels with the new. */
4512 pTopLevel
= lsmDbSnapshotLevel(pDb
->pWorker
);
4514 for(pp
=&pTopLevel
; *pp
!=pLevel
; pp
=&((*pp
)->pNext
));
4516 lsmDbSnapshotSetLevel(pDb
->pWorker
, pTopLevel
);
4518 /* Determine whether or not the next separators will be linked in */
4519 if( pNext
&& pNext
->pMerge
==0 && pNext
->lhs
.iRoot
&& pNext
4520 && (bFreeOnly
==0 || (pNext
->flags
& LEVEL_FREELIST_ONLY
))
4526 /* Allocate the merge object */
4527 nByte
= sizeof(Merge
) + sizeof(MergeInput
) * (nMerge
+ bUseNext
);
4528 pMerge
= (Merge
*)lsmMallocZeroRc(pDb
->pEnv
, nByte
, &rc
);
4530 pMerge
->aInput
= (MergeInput
*)&pMerge
[1];
4531 pMerge
->nInput
= nMerge
+ bUseNext
;
4532 pNew
->pMerge
= pMerge
;
4539 static int mergeWorkerInit(
4540 lsm_db
*pDb
, /* Db connection to do merge work */
4541 Level
*pLevel
, /* Level to work on merging */
4542 MergeWorker
*pMW
/* Object to initialize */
4544 int rc
= LSM_OK
; /* Return code */
4545 Merge
*pMerge
= pLevel
->pMerge
; /* Persistent part of merge state */
4546 MultiCursor
*pCsr
= 0; /* Cursor opened for pMW */
4547 Level
*pNext
= pLevel
->pNext
; /* Next level in LSM */
4549 assert( pDb
->pWorker
);
4550 assert( pLevel
->pMerge
);
4551 assert( pLevel
->nRight
>0 );
4553 memset(pMW
, 0, sizeof(MergeWorker
));
4555 pMW
->pLevel
= pLevel
;
4556 pMW
->aGobble
= lsmMallocZeroRc(pDb
->pEnv
, sizeof(LsmPgno
)*pLevel
->nRight
,&rc
);
4558 /* Create a multi-cursor to read the data to write to the new
4559 ** segment. The new segment contains:
4561 ** 1. Records from LHS of each of the nMerge levels being merged.
4562 ** 2. Separators from either the last level being merged, or the
4563 ** separators attached to the LHS of the following level, or neither.
4565 ** If the new level is the lowest (oldest) in the db, discard any
4566 ** delete keys. Key annihilation.
4568 pCsr
= multiCursorNew(pDb
, &rc
);
4570 pCsr
->flags
|= CURSOR_NEXT_OK
;
4571 rc
= multiCursorAddRhs(pCsr
, pLevel
);
4573 if( rc
==LSM_OK
&& pMerge
->nInput
> pLevel
->nRight
){
4574 rc
= btreeCursorNew(pDb
, &pNext
->lhs
, &pCsr
->pBtCsr
);
4576 multiCursorReadSeparators(pCsr
);
4578 multiCursorIgnoreDelete(pCsr
);
4581 assert( rc
!=LSM_OK
|| pMerge
->nInput
==(pCsr
->nPtr
+(pCsr
->pBtCsr
!=0)) );
4584 /* Load the b-tree hierarchy into memory. */
4585 if( rc
==LSM_OK
) rc
= mergeWorkerLoadHierarchy(pMW
);
4586 if( rc
==LSM_OK
&& pMW
->hier
.nHier
==0 ){
4587 pMW
->aSave
[0].iPgno
= pLevel
->lhs
.iFirst
;
4590 /* Position the cursor. */
4592 pCsr
->pPrevMergePtr
= &pMerge
->iCurrentPtr
;
4593 if( pLevel
->lhs
.iFirst
==0 ){
4594 /* The output array is still empty. So position the cursor at the very
4595 ** start of the input. */
4596 rc
= multiCursorEnd(pCsr
, 0);
4598 /* The output array is non-empty. Position the cursor based on the
4599 ** page/cell data saved in the Merge.aInput[] array. */
4601 for(i
=0; rc
==LSM_OK
&& i
<pCsr
->nPtr
; i
++){
4602 MergeInput
*pInput
= &pMerge
->aInput
[i
];
4605 assert( pCsr
->aPtr
[i
].pPg
==0 );
4606 pPtr
= &pCsr
->aPtr
[i
];
4607 rc
= segmentPtrLoadPage(pDb
->pFS
, pPtr
, (int)pInput
->iPg
);
4608 if( rc
==LSM_OK
&& pPtr
->nCell
>0 ){
4609 rc
= segmentPtrLoadCell(pPtr
, pInput
->iCell
);
4614 if( rc
==LSM_OK
&& pCsr
->pBtCsr
){
4615 int (*xCmp
)(void *, int, void *, int) = pCsr
->pDb
->xCmp
;
4616 assert( i
==pCsr
->nPtr
);
4617 rc
= btreeCursorRestore(pCsr
->pBtCsr
, xCmp
, &pMerge
->aInput
[i
]);
4621 rc
= multiCursorSetupTree(pCsr
, 0);
4624 pCsr
->flags
|= CURSOR_NEXT_OK
;
4630 static int sortedBtreeGobble(
4631 lsm_db
*pDb
, /* Worker connection */
4632 MultiCursor
*pCsr
, /* Multi-cursor being used for a merge */
4633 int iGobble
/* pCsr->aPtr[] entry to operate on */
4636 if( rtTopic(pCsr
->eType
)==0 ){
4637 Segment
*pSeg
= pCsr
->aPtr
[iGobble
].pSeg
;
4641 /* Seek from the root of the b-tree to the segment leaf that may contain
4642 ** a key equal to the one multi-cursor currently points to. Record the
4643 ** page number of each b-tree page and the leaf. The segment may be
4644 ** gobbled up to (but not including) the first of these page numbers.
4646 assert( pSeg
->iRoot
>0 );
4647 aPg
= lsmMallocZeroRc(pDb
->pEnv
, sizeof(LsmPgno
)*32, &rc
);
4649 rc
= seekInBtree(pCsr
, pSeg
,
4650 rtTopic(pCsr
->eType
), pCsr
->key
.pData
, pCsr
->key
.nData
, aPg
, 0
4655 for(nPg
=0; aPg
[nPg
]; nPg
++);
4656 lsmFsGobble(pDb
, pSeg
, aPg
, nPg
);
4659 lsmFree(pDb
->pEnv
, aPg
);
4665 ** Argument p points to a level of age N. Return the number of levels in
4666 ** the linked list starting at p that have age=N (always at least 1).
4668 static int sortedCountLevels(Level
*p
){
4674 }while( p
&& p
->iAge
==iAge
);
4678 static int sortedSelectLevel(lsm_db
*pDb
, int nMerge
, Level
**ppOut
){
4679 Level
*pTopLevel
= lsmDbSnapshotLevel(pDb
->pWorker
);
4681 Level
*pLevel
= 0; /* Output value */
4682 Level
*pBest
= 0; /* Best level to work on found so far */
4683 int nBest
; /* Number of segments merged at pBest */
4684 Level
*pThis
= 0; /* First in run of levels with age=iAge */
4685 int nThis
= 0; /* Number of levels starting at pThis */
4687 assert( nMerge
>=1 );
4688 nBest
= LSM_MAX(1, nMerge
-1);
4690 /* Find the longest contiguous run of levels not currently undergoing a
4691 ** merge with the same age in the structure. Or the level being merged
4692 ** with the largest number of right-hand segments. Work on it. */
4693 for(pLevel
=pTopLevel
; pLevel
; pLevel
=pLevel
->pNext
){
4694 if( pLevel
->nRight
==0 && pThis
&& pLevel
->iAge
==pThis
->iAge
){
4698 if( (pLevel
->iAge
!=pThis
->iAge
+1)
4699 || (pLevel
->nRight
==0 && sortedCountLevels(pLevel
)<=pDb
->nMerge
)
4705 if( pLevel
->nRight
){
4706 if( pLevel
->nRight
>nBest
){
4707 nBest
= pLevel
->nRight
;
4724 if( pBest
==0 && nMerge
==1 ){
4727 for(pLevel
=pTopLevel
; pLevel
; pLevel
=pLevel
->pNext
){
4728 assert( !pLevel
->nRight
);
4729 if( pLevel
->flags
& LEVEL_FREELIST_ONLY
){
4737 nBest
= nFree
+ nUsr
;
4742 if( pBest
->nRight
==0 ){
4743 rc
= sortedMergeSetup(pDb
, pBest
, nBest
, ppOut
);
4752 static int sortedDbIsFull(lsm_db
*pDb
){
4753 Level
*pTop
= lsmDbSnapshotLevel(pDb
->pWorker
);
4755 if( lsmDatabaseFull(pDb
) ) return 1;
4756 if( pTop
&& pTop
->iAge
==0
4757 && (pTop
->nRight
|| sortedCountLevels(pTop
)>=pDb
->nMerge
)
4764 typedef struct MoveBlockCtx MoveBlockCtx
;
4765 struct MoveBlockCtx
{
4766 int iSeen
; /* Previous free block on list */
4767 int iFrom
; /* Total number of blocks in file */
4770 static int moveBlockCb(void *pCtx
, int iBlk
, i64 iSnapshot
){
4771 MoveBlockCtx
*p
= (MoveBlockCtx
*)pCtx
;
4772 assert( p
->iFrom
==0 );
4773 if( iBlk
==(p
->iSeen
-1) ){
4777 p
->iFrom
= p
->iSeen
-1;
4782 ** This function is called to further compact a database for which all
4783 ** of the content has already been merged into a single segment. If
4784 ** possible, it moves the contents of a single block from the end of the
4785 ** file to a free-block that lies closer to the start of the file (allowing
4786 ** the file to be eventually truncated).
4788 static int sortedMoveBlock(lsm_db
*pDb
, int *pnWrite
){
4789 Snapshot
*p
= pDb
->pWorker
;
4790 Level
*pLvl
= lsmDbSnapshotLevel(p
);
4791 int iFrom
; /* Block to move */
4792 int iTo
; /* Destination to move block to */
4793 int rc
; /* Return code */
4797 assert( pLvl
->pNext
==0 && pLvl
->nRight
==0 );
4798 assert( p
->redirect
.n
<=LSM_MAX_BLOCK_REDIRECTS
);
4802 /* Check that the redirect array is not already full. If it is, return
4803 ** without moving any database content. */
4804 if( p
->redirect
.n
>=LSM_MAX_BLOCK_REDIRECTS
) return LSM_OK
;
4806 /* Find the last block of content in the database file. Do this by
4807 ** traversing the free-list in reverse (descending block number) order.
4808 ** The first block not on the free list is the one that will be moved.
4809 ** Since the db consists of a single segment, there is no ambiguity as
4810 ** to which segment the block belongs to. */
4811 sCtx
.iSeen
= p
->nBlock
+1;
4813 rc
= lsmWalkFreelist(pDb
, 1, moveBlockCb
, &sCtx
);
4814 if( rc
!=LSM_OK
|| sCtx
.iFrom
==0 ) return rc
;
4817 /* Find the first free block in the database, ignoring block 1. Block
4818 ** 1 is tricky as it is smaller than the other blocks. */
4819 rc
= lsmBlockAllocate(pDb
, iFrom
, &iTo
);
4820 if( rc
!=LSM_OK
|| iTo
==0 ) return rc
;
4821 assert( iTo
!=1 && iTo
<iFrom
);
4823 rc
= lsmFsMoveBlock(pDb
->pFS
, &pLvl
->lhs
, iTo
, iFrom
);
4825 if( p
->redirect
.a
==0 ){
4826 int nByte
= sizeof(struct RedirectEntry
) * LSM_MAX_BLOCK_REDIRECTS
;
4827 p
->redirect
.a
= lsmMallocZeroRc(pDb
->pEnv
, nByte
, &rc
);
4831 /* Check if the block just moved was already redirected. */
4833 for(i
=0; i
<p
->redirect
.n
; i
++){
4834 if( p
->redirect
.a
[i
].iTo
==iFrom
) break;
4837 if( i
==p
->redirect
.n
){
4838 /* Block iFrom was not already redirected. Add a new array entry. */
4839 memmove(&p
->redirect
.a
[1], &p
->redirect
.a
[0],
4840 sizeof(struct RedirectEntry
) * p
->redirect
.n
4842 p
->redirect
.a
[0].iFrom
= iFrom
;
4843 p
->redirect
.a
[0].iTo
= iTo
;
4846 /* Block iFrom was already redirected. Overwrite existing entry. */
4847 p
->redirect
.a
[i
].iTo
= iTo
;
4850 rc
= lsmBlockFree(pDb
, iFrom
);
4852 *pnWrite
= lsmFsBlockSize(pDb
->pFS
) / lsmFsPageSize(pDb
->pFS
);
4853 pLvl
->lhs
.pRedirect
= &p
->redirect
;
4857 #if LSM_LOG_STRUCTURE
4860 sprintf(aBuf
, "move-block %d/%d", p
->redirect
.n
-1, LSM_MAX_BLOCK_REDIRECTS
);
4861 lsmSortedDumpStructure(pDb
, pDb
->pWorker
, LSM_LOG_DATA
, 0, aBuf
);
4869 static int mergeInsertFreelistSegments(
4876 MultiCursor
*pCsr
= pMW
->pCsr
;
4877 Level
*pLvl
= pMW
->pLevel
;
4885 aNew1
= (SegmentPtr
*)lsmMallocZeroRc(
4886 pDb
->pEnv
, sizeof(SegmentPtr
) * (pCsr
->nPtr
+nFree
), &rc
4889 memcpy(&aNew1
[nFree
], pCsr
->aPtr
, sizeof(SegmentPtr
)*pCsr
->nPtr
);
4890 pCsr
->nPtr
+= nFree
;
4891 lsmFree(pDb
->pEnv
, pCsr
->aTree
);
4892 lsmFree(pDb
->pEnv
, pCsr
->aPtr
);
4896 aNew2
= (Segment
*)lsmMallocZeroRc(
4897 pDb
->pEnv
, sizeof(Segment
) * (pLvl
->nRight
+nFree
), &rc
4900 memcpy(&aNew2
[nFree
], pLvl
->aRhs
, sizeof(Segment
)*pLvl
->nRight
);
4901 pLvl
->nRight
+= nFree
;
4902 lsmFree(pDb
->pEnv
, pLvl
->aRhs
);
4905 for(pIter
=pDb
->pWorker
->pLevel
; rc
==LSM_OK
&& pIter
!=pLvl
; pIter
=pNext
){
4906 Segment
*pSeg
= &pLvl
->aRhs
[i
];
4907 memcpy(pSeg
, &pIter
->lhs
, sizeof(Segment
));
4909 pCsr
->aPtr
[i
].pSeg
= pSeg
;
4910 pCsr
->aPtr
[i
].pLevel
= pLvl
;
4911 rc
= segmentPtrEnd(pCsr
, &pCsr
->aPtr
[i
], 0);
4913 pDb
->pWorker
->pLevel
= pNext
= pIter
->pNext
;
4914 sortedFreeLevel(pDb
->pEnv
, pIter
);
4918 assert( rc
!=LSM_OK
|| pDb
->pWorker
->pLevel
==pLvl
);
4920 for(i
=nFree
; i
<pCsr
->nPtr
; i
++){
4921 pCsr
->aPtr
[i
].pSeg
= &pLvl
->aRhs
[i
];
4924 lsmFree(pDb
->pEnv
, pMW
->aGobble
);
4930 static int sortedWork(
4931 lsm_db
*pDb
, /* Database handle. Must be worker. */
4932 int nWork
, /* Number of pages of work to do */
4933 int nMerge
, /* Try to merge this many levels at once */
4934 int bFlush
, /* Set if call is to make room for a flush */
4935 int *pnWrite
/* OUT: Actual number of pages written */
4937 int rc
= LSM_OK
; /* Return Code */
4938 int nRemaining
= nWork
; /* Units of work to do before returning */
4939 Snapshot
*pWorker
= pDb
->pWorker
;
4942 if( lsmDbSnapshotLevel(pWorker
)==0 ) return LSM_OK
;
4944 while( nRemaining
>0 ){
4947 /* Find a level to work on. */
4948 rc
= sortedSelectLevel(pDb
, nMerge
, &pLevel
);
4949 assert( rc
==LSM_OK
|| pLevel
==0 );
4953 Level
*pTopLevel
= lsmDbSnapshotLevel(pDb
->pWorker
);
4954 if( bFlush
==0 && nMerge
==1 && pTopLevel
&& pTopLevel
->pNext
==0 ){
4955 rc
= sortedMoveBlock(pDb
, &nDone
);
4957 nRemaining
-= nDone
;
4959 /* Could not find any work to do. Finished. */
4960 if( nDone
==0 ) break;
4963 Freelist freelist
= {0, 0, 0};
4964 MergeWorker mergeworker
; /* State used to work on the level merge */
4966 assert( pDb
->bIncrMerge
==0 );
4967 assert( pDb
->pFreelist
==0 && pDb
->bUseFreelist
==0 );
4969 pDb
->bIncrMerge
= 1;
4970 rc
= mergeWorkerInit(pDb
, pLevel
, &mergeworker
);
4971 assert( mergeworker
.nWork
==0 );
4974 && 0==mergeWorkerDone(&mergeworker
)
4975 && (mergeworker
.nWork
<nRemaining
|| pDb
->bUseFreelist
)
4977 int eType
= rtTopic(mergeworker
.pCsr
->eType
);
4978 rc
= mergeWorkerStep(&mergeworker
);
4980 /* If the cursor now points at the first entry past the end of the
4981 ** user data (i.e. either to EOF or to the first free-list entry
4982 ** that will be added to the run), then check if it is possible to
4983 ** merge in any free-list entries that are either in-memory or in
4984 ** free-list-only blocks. */
4985 if( rc
==LSM_OK
&& nMerge
==1 && eType
==0
4986 && (rtTopic(mergeworker
.pCsr
->eType
) || mergeWorkerDone(&mergeworker
))
4988 int nFree
= 0; /* Number of free-list-only levels to merge */
4990 assert( pDb
->pFreelist
==0 && pDb
->bUseFreelist
==0 );
4992 /* Now check if all levels containing data newer than this one
4993 ** are single-segment free-list only levels. If so, they will be
4994 ** merged in now. */
4995 for(pLvl
=pDb
->pWorker
->pLevel
;
4996 pLvl
!=mergeworker
.pLevel
&& (pLvl
->flags
& LEVEL_FREELIST_ONLY
);
4999 assert( pLvl
->nRight
==0 );
5002 if( pLvl
==mergeworker
.pLevel
){
5004 rc
= mergeInsertFreelistSegments(pDb
, nFree
, &mergeworker
);
5006 rc
= multiCursorVisitFreelist(mergeworker
.pCsr
);
5009 rc
= multiCursorSetupTree(mergeworker
.pCsr
, 0);
5010 pDb
->pFreelist
= &freelist
;
5011 pDb
->bUseFreelist
= 1;
5016 nRemaining
-= LSM_MAX(mergeworker
.nWork
, 1);
5019 /* Check if the merge operation is completely finished. If not,
5020 ** gobble up (declare eligible for recycling) any pages from rhs
5021 ** segments for which the content has been completely merged into
5022 ** the lhs of the level. */
5023 if( mergeWorkerDone(&mergeworker
)==0 ){
5025 for(i
=0; i
<pLevel
->nRight
; i
++){
5026 SegmentPtr
*pGobble
= &mergeworker
.pCsr
->aPtr
[i
];
5027 if( pGobble
->pSeg
->iRoot
){
5028 rc
= sortedBtreeGobble(pDb
, mergeworker
.pCsr
, i
);
5029 }else if( mergeworker
.aGobble
[i
] ){
5030 lsmFsGobble(pDb
, pGobble
->pSeg
, &mergeworker
.aGobble
[i
], 1);
5036 mergeWorkerShutdown(&mergeworker
, &rc
);
5037 bEmpty
= (pLevel
->lhs
.iFirst
==0);
5039 if( bEmpty
==0 && rc
==LSM_OK
){
5040 rc
= lsmFsSortedFinish(pDb
->pFS
, &pLevel
->lhs
);
5043 if( pDb
->bUseFreelist
){
5044 Freelist
*p
= &pDb
->pWorker
->freelist
;
5045 lsmFree(pDb
->pEnv
, p
->aEntry
);
5046 memcpy(p
, &freelist
, sizeof(freelist
));
5047 pDb
->bUseFreelist
= 0;
5052 for(i
=0; i
<pLevel
->nRight
; i
++){
5053 lsmFsSortedDelete(pDb
->pFS
, pWorker
, 1, &pLevel
->aRhs
[i
]);
5057 /* If the new level is completely empty, remove it from the
5058 ** database snapshot. This can only happen if all input keys were
5059 ** annihilated. Since keys are only annihilated if the new level
5060 ** is the last in the linked list (contains the most ancient of
5061 ** database content), this guarantees that pLevel->pNext==0. */
5062 Level
*pTop
; /* Top level of worker snapshot */
5063 Level
**pp
; /* Read/write iterator for Level.pNext list */
5065 assert( pLevel
->pNext
==0 );
5067 /* Remove the level from the worker snapshot. */
5068 pTop
= lsmDbSnapshotLevel(pWorker
);
5069 for(pp
=&pTop
; *pp
!=pLevel
; pp
=&((*pp
)->pNext
));
5070 *pp
= pLevel
->pNext
;
5071 lsmDbSnapshotSetLevel(pWorker
, pTop
);
5073 /* Free the Level structure. */
5074 sortedFreeLevel(pDb
->pEnv
, pLevel
);
5077 /* Free the separators of the next level, if required. */
5078 if( pLevel
->pMerge
->nInput
> pLevel
->nRight
){
5079 assert( pLevel
->pNext
->lhs
.iRoot
);
5080 pLevel
->pNext
->lhs
.iRoot
= 0;
5083 /* Zero the right-hand-side of pLevel */
5084 lsmFree(pDb
->pEnv
, pLevel
->aRhs
);
5088 /* Free the Merge object */
5089 lsmFree(pDb
->pEnv
, pLevel
->pMerge
);
5093 if( bSave
&& rc
==LSM_OK
){
5094 pDb
->bIncrMerge
= 0;
5095 rc
= lsmSaveWorker(pDb
, 0);
5100 /* Clean up the MergeWorker object initialized above. If no error
5101 ** has occurred, invoke the work-hook to inform the application that
5102 ** the database structure has changed. */
5103 mergeWorkerShutdown(&mergeworker
, &rc
);
5104 pDb
->bIncrMerge
= 0;
5105 if( rc
==LSM_OK
) sortedInvokeWorkHook(pDb
);
5107 #if LSM_LOG_STRUCTURE
5108 lsmSortedDumpStructure(pDb
, pDb
->pWorker
, LSM_LOG_DATA
, 0, "work");
5110 assertBtreeOk(pDb
, &pLevel
->lhs
);
5111 assertRunInOrder(pDb
, &pLevel
->lhs
);
5113 /* If bFlush is true and the database is no longer considered "full",
5114 ** break out of the loop even if nRemaining is still greater than
5115 ** zero. The caller has an in-memory tree to flush to disk. */
5116 if( bFlush
&& sortedDbIsFull(pDb
)==0 ) break;
5120 if( pnWrite
) *pnWrite
= (nWork
- nRemaining
);
5121 pWorker
->nWrite
+= (nWork
- nRemaining
);
5124 lsmLogMessage(pDb
, rc
, "sortedWork(): %d pages", (nWork
-nRemaining
));
5130 ** The database connection passed as the first argument must be a worker
5131 ** connection. This function checks if there exists an "old" in-memory tree
5132 ** ready to be flushed to disk. If so, true is returned. Otherwise false.
5134 ** If an error occurs, *pRc is set to an LSM error code before returning.
5135 ** It is assumed that *pRc is set to LSM_OK when this function is called.
5137 static int sortedTreeHasOld(lsm_db
*pDb
, int *pRc
){
5141 assert( pDb
->pWorker
);
5144 && pDb
->treehdr
.iOldShmid
5145 && pDb
->treehdr
.iOldLog
!=pDb
->pWorker
->iLogOff
5153 assert( *pRc
==LSM_OK
|| bRet
==0 );
5158 ** Create a new free-list only top-level segment. Return LSM_OK if successful
5159 ** or an LSM error code if some error occurs.
5161 static int sortedNewFreelistOnly(lsm_db
*pDb
){
5162 return sortedNewToplevel(pDb
, TREE_NONE
, 0);
5165 int lsmSaveWorker(lsm_db
*pDb
, int bFlush
){
5166 Snapshot
*p
= pDb
->pWorker
;
5167 if( p
->freelist
.nEntry
>pDb
->nMaxFreelist
){
5168 int rc
= sortedNewFreelistOnly(pDb
);
5169 if( rc
!=LSM_OK
) return rc
;
5171 return lsmCheckpointSaveWorker(pDb
, bFlush
);
5174 static int doLsmSingleWork(
5177 int nMerge
, /* Minimum segments to merge together */
5178 int nPage
, /* Number of pages to write to disk */
5179 int *pnWrite
, /* OUT: Pages actually written to disk */
5180 int *pbCkpt
/* OUT: True if an auto-checkpoint is req. */
5182 Snapshot
*pWorker
; /* Worker snapshot */
5183 int rc
= LSM_OK
; /* Return code */
5185 int nMax
= nPage
; /* Maximum pages to write to disk */
5191 /* Open the worker 'transaction'. It will be closed before this function
5193 assert( pDb
->pWorker
==0 );
5194 rc
= lsmBeginWork(pDb
);
5195 if( rc
!=LSM_OK
) return rc
;
5196 pWorker
= pDb
->pWorker
;
5198 /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
5199 ** that this call stops writing when the auto-checkpoint is due. The
5200 ** caller will do the checkpoint, then possibly call this function again. */
5201 if( bShutdown
==0 && pDb
->nAutockpt
){
5206 lsmCheckpointSynced(pDb
, 0, 0, &nSync
);
5207 nUnsync
= lsmCheckpointNWrite(pDb
->pShmhdr
->aSnap1
, 0);
5208 nPgsz
= lsmCheckpointPgsz(pDb
->pShmhdr
->aSnap1
);
5210 nMax
= (int)LSM_MIN(nMax
, (pDb
->nAutockpt
/nPgsz
) - (int)(nUnsync
-nSync
));
5213 nRem
= LSM_MAX(nMax
, 0);
5217 /* If there exists in-memory data ready to be flushed to disk, attempt
5218 ** to flush it now. */
5219 if( pDb
->nTransOpen
==0 ){
5220 rc
= lsmTreeLoadHeader(pDb
, 0);
5222 if( sortedTreeHasOld(pDb
, &rc
) ){
5223 /* sortedDbIsFull() returns non-zero if either (a) there are too many
5224 ** levels in total in the db, or (b) there are too many levels with the
5225 ** the same age in the db. Either way, call sortedWork() to merge
5226 ** existing segments together until this condition is cleared. */
5227 if( sortedDbIsFull(pDb
) ){
5229 rc
= sortedWork(pDb
, nRem
, nMerge
, 1, &nPg
);
5231 assert( rc
!=LSM_OK
|| nRem
<=0 || !sortedDbIsFull(pDb
) );
5235 if( rc
==LSM_OK
&& nRem
>0 ){
5237 rc
= sortedNewToplevel(pDb
, TREE_OLD
, &nPg
);
5240 if( pDb
->nTransOpen
>0 ){
5241 lsmTreeDiscardOld(pDb
);
5243 rc
= lsmSaveWorker(pDb
, 1);
5249 /* If nPage is still greater than zero, do some merging. */
5250 if( rc
==LSM_OK
&& nRem
>0 && bShutdown
==0 ){
5252 rc
= sortedWork(pDb
, nRem
, nMerge
, 0, &nPg
);
5254 if( nPg
) bDirty
= 1;
5257 /* If the in-memory part of the free-list is too large, write a new
5258 ** top-level containing just the in-memory free-list entries to disk. */
5259 if( rc
==LSM_OK
&& pDb
->pWorker
->freelist
.nEntry
> pDb
->nMaxFreelist
){
5260 while( rc
==LSM_OK
&& lsmDatabaseFull(pDb
) ){
5262 rc
= sortedWork(pDb
, 16, nMerge
, 1, &nPg
);
5266 rc
= sortedNewFreelistOnly(pDb
);
5272 *pnWrite
= (nMax
- nRem
);
5273 *pbCkpt
= (bCkpt
&& nRem
<=0);
5274 if( nMerge
==1 && pDb
->nAutockpt
>0 && *pnWrite
>0
5276 && pWorker
->pLevel
->nRight
==0
5277 && pWorker
->pLevel
->pNext
==0
5283 if( rc
==LSM_OK
&& bDirty
){
5284 lsmFinishWork(pDb
, 0, &rc
);
5286 int rcdummy
= LSM_BUSY
;
5287 lsmFinishWork(pDb
, 0, &rcdummy
);
5290 assert( pDb
->pWorker
==0 );
5294 static int doLsmWork(lsm_db
*pDb
, int nMerge
, int nPage
, int *pnWrite
){
5295 int rc
= LSM_OK
; /* Return code */
5296 int nWrite
= 0; /* Number of pages written */
5298 assert( nMerge
>=1 );
5304 int nReq
= (nPage
>=0) ? (nPage
-nWrite
) : ((int)0x7FFFFFFF);
5307 rc
= doLsmSingleWork(pDb
, 0, nMerge
, nReq
, &nThis
, &bCkpt
);
5309 if( rc
==LSM_OK
&& bCkpt
){
5310 rc
= lsm_checkpoint(pDb
, 0);
5312 }while( rc
==LSM_OK
&& bCkpt
&& (nWrite
<nPage
|| nPage
<0) );
5326 ** Perform work to merge database segments together.
5328 int lsm_work(lsm_db
*pDb
, int nMerge
, int nKB
, int *pnWrite
){
5329 int rc
; /* Return code */
5330 int nPgsz
; /* Nominal page size in bytes */
5331 int nPage
; /* Equivalent of nKB in pages */
5332 int nWrite
= 0; /* Number of pages written */
5334 /* This function may not be called if pDb has an open read or write
5335 ** transaction. Return LSM_MISUSE if an application attempts this. */
5336 if( pDb
->nTransOpen
|| pDb
->pCsr
) return LSM_MISUSE_BKPT
;
5337 if( nMerge
<=0 ) nMerge
= pDb
->nMerge
;
5339 lsmFsPurgeCache(pDb
->pFS
);
5341 /* Convert from KB to pages */
5342 nPgsz
= lsmFsPageSize(pDb
->pFS
);
5344 nPage
= ((i64
)nKB
* 1024 + nPgsz
- 1) / nPgsz
;
5349 rc
= doLsmWork(pDb
, nMerge
, nPage
, &nWrite
);
5352 /* Convert back from pages to KB */
5353 *pnWrite
= (int)(((i64
)nWrite
* 1024 + nPgsz
- 1) / nPgsz
);
5358 int lsm_flush(lsm_db
*db
){
5361 if( db
->nTransOpen
>0 || db
->pCsr
){
5362 rc
= LSM_MISUSE_BKPT
;
5364 rc
= lsmBeginWriteTrans(db
);
5366 lsmFlushTreeToDisk(db
);
5367 lsmTreeDiscardOld(db
);
5369 lsmTreeDiscardOld(db
);
5373 rc
= lsmFinishWriteTrans(db
, 1);
5375 lsmFinishWriteTrans(db
, 0);
5377 lsmFinishReadTrans(db
);
5384 ** This function is called in auto-work mode to perform merging work on
5385 ** the data structure. It performs enough merging work to prevent the
5386 ** height of the tree from growing indefinitely assuming that roughly
5387 ** nUnit database pages worth of data have been written to the database
5388 ** (i.e. the in-memory tree) since the last call.
5390 int lsmSortedAutoWork(
5391 lsm_db
*pDb
, /* Database handle */
5392 int nUnit
/* Pages of data written to in-memory tree */
5394 int rc
= LSM_OK
; /* Return code */
5395 int nDepth
= 0; /* Current height of tree (longest path) */
5396 Level
*pLevel
; /* Used to iterate through levels */
5399 assert( pDb
->pWorker
==0 );
5400 assert( pDb
->nTransOpen
>0 );
5402 /* Determine how many units of work to do before returning. One unit of
5403 ** work is achieved by writing one page (~4KB) of merged data. */
5404 for(pLevel
=lsmDbSnapshotLevel(pDb
->pClient
); pLevel
; pLevel
=pLevel
->pNext
){
5405 /* nDepth += LSM_MAX(1, pLevel->nRight); */
5408 if( lsmTreeHasOld(pDb
) ){
5411 rc
= lsmSaveCursors(pDb
);
5412 if( rc
!=LSM_OK
) return rc
;
5416 int nRemaining
; /* Units of work to do before returning */
5418 nRemaining
= nUnit
* nDepth
;
5420 lsmLogMessage(pDb
, rc
, "lsmSortedAutoWork(): %d*%d = %d pages",
5421 nUnit
, nDepth
, nRemaining
);
5423 assert( nRemaining
>=0 );
5424 rc
= doLsmWork(pDb
, pDb
->nMerge
, nRemaining
, 0);
5425 if( rc
==LSM_BUSY
) rc
= LSM_OK
;
5427 if( bRestore
&& pDb
->pCsr
){
5428 lsmMCursorFreeCache(pDb
);
5429 lsmFreeSnapshot(pDb
->pEnv
, pDb
->pClient
);
5432 rc
= lsmCheckpointLoad(pDb
, 0);
5435 rc
= lsmCheckpointDeserialize(pDb
, 0, pDb
->aSnapshot
, &pDb
->pClient
);
5438 rc
= lsmRestoreCursors(pDb
);
5447 ** This function is only called during system shutdown. The contents of
5448 ** any in-memory trees present (old or current) are written out to disk.
5450 int lsmFlushTreeToDisk(lsm_db
*pDb
){
5453 rc
= lsmBeginWork(pDb
);
5454 while( rc
==LSM_OK
&& sortedDbIsFull(pDb
) ){
5455 rc
= sortedWork(pDb
, 256, pDb
->nMerge
, 1, 0);
5459 rc
= sortedNewToplevel(pDb
, TREE_BOTH
, 0);
5462 lsmFinishWork(pDb
, 1, &rc
);
5467 ** Return a string representation of the segment passed as the only argument.
5468 ** Space for the returned string is allocated using lsmMalloc(), and should
5469 ** be freed by the caller using lsmFree().
5471 static char *segToString(lsm_env
*pEnv
, Segment
*pSeg
, int nMin
){
5472 int nSize
= pSeg
->nSize
;
5473 LsmPgno iRoot
= pSeg
->iRoot
;
5474 LsmPgno iFirst
= pSeg
->iFirst
;
5475 LsmPgno iLast
= pSeg
->iLastPg
;
5482 z1
= lsmMallocPrintf(pEnv
, "%d.%d", iFirst
, iLast
);
5484 z2
= lsmMallocPrintf(pEnv
, "root=%d", iRoot
);
5486 z2
= lsmMallocPrintf(pEnv
, "size=%d", nSize
);
5489 nPad
= nMin
- 2 - strlen(z1
) - 1 - strlen(z2
);
5490 nPad
= LSM_MAX(0, nPad
);
5493 z
= lsmMallocPrintf(pEnv
, "/%s %*s%s\\", z1
, nPad
, "", z2
);
5495 z
= lsmMallocPrintf(pEnv
, "|%s %*s%s|", z1
, nPad
, "", z2
);
5503 static int fileToString(
5504 lsm_db
*pDb
, /* For xMalloc() */
5514 zSeg
= segToString(pDb
->pEnv
, pSeg
, nMin
);
5515 snprintf(&aBuf
[i
], nBuf
-i
, "%s", zSeg
);
5516 i
+= strlen(&aBuf
[i
]);
5517 lsmFree(pDb
->pEnv
, zSeg
);
5519 #ifdef LSM_LOG_FREELIST
5520 lsmInfoArrayStructure(pDb
, 1, pSeg
->iFirst
, &zSeg
);
5521 snprintf(&aBuf
[i
], nBuf
-1, " (%s)", zSeg
);
5522 i
+= strlen(&aBuf
[i
]);
5523 lsmFree(pDb
->pEnv
, zSeg
);
5533 void sortedDumpPage(lsm_db
*pDb
, Segment
*pRun
, Page
*pPg
, int bVals
){
5534 LsmBlob blob
= {0, 0, 0}; /* LsmBlob used for keys */
5544 aData
= fsPageData(pPg
, &nData
);
5546 nRec
= pageGetNRec(aData
, nData
);
5547 iPtr
= (int)pageGetPtr(aData
, nData
);
5548 flags
= pageGetFlags(aData
, nData
);
5550 lsmStringInit(&s
, pDb
->pEnv
);
5551 lsmStringAppendf(&s
,"nCell=%d iPtr=%d flags=%d {", nRec
, iPtr
, flags
);
5552 if( flags
&SEGMENT_BTREE_FLAG
) iPtr
= 0;
5554 for(i
=0; i
<nRec
; i
++){
5555 Page
*pRef
= 0; /* Pointer to page iRef */
5557 u8
*aKey
; int nKey
= 0; /* Key */
5558 u8
*aVal
= 0; int nVal
= 0; /* Value */
5564 aCell
= pageGetCell(aData
, nData
, i
);
5566 assert( (flags
& SEGMENT_BTREE_FLAG
) || eType
!=0 );
5567 aCell
+= lsmVarintGet32(aCell
, &iPgPtr
);
5570 LsmPgno iRef
; /* Page number of referenced page */
5571 aCell
+= lsmVarintGet64(aCell
, &iRef
);
5572 lsmFsDbPageGet(pDb
->pFS
, pRun
, iRef
, &pRef
);
5573 aKey
= pageGetKey(pRun
, pRef
, 0, &iTopic
, &nKey
, &blob
);
5575 aCell
+= lsmVarintGet32(aCell
, &nKey
);
5576 if( rtIsWrite(eType
) ) aCell
+= lsmVarintGet32(aCell
, &nVal
);
5577 sortedReadData(0, pPg
, (aCell
-aData
), nKey
+nVal
, (void **)&aKey
, &blob
);
5582 lsmStringAppendf(&s
, "%s%2X:", (i
==0?"":" "), iTopic
);
5583 for(iChar
=0; iChar
<nKey
; iChar
++){
5584 lsmStringAppendf(&s
, "%c", isalnum(aKey
[iChar
]) ? aKey
[iChar
] : '.');
5586 if( nVal
>0 && bVals
){
5587 lsmStringAppendf(&s
, "##");
5588 for(iChar
=0; iChar
<nVal
; iChar
++){
5589 lsmStringAppendf(&s
, "%c", isalnum(aVal
[iChar
]) ? aVal
[iChar
] : '.');
5593 lsmStringAppendf(&s
, " %d", iPgPtr
+iPtr
);
5594 lsmFsPageRelease(pRef
);
5596 lsmStringAppend(&s
, "}", 1);
5598 lsmLogMessage(pDb
, LSM_OK
, " Page %d: %s", lsmFsPageNumber(pPg
), s
.z
);
5601 sortedBlobFree(&blob
);
5604 static void infoCellDump(
5605 lsm_db
*pDb
, /* Database handle */
5606 Segment
*pSeg
, /* Segment page belongs to */
5607 int bIndirect
, /* True to follow indirect refs */
5612 u8
**paKey
, int *pnKey
,
5613 u8
**paVal
, int *pnVal
,
5616 u8
*aData
; int nData
; /* Page data */
5617 u8
*aKey
; int nKey
= 0; /* Key */
5618 u8
*aVal
= 0; int nVal
= 0; /* Value */
5621 Page
*pRef
= 0; /* Pointer to page iRef */
5624 aData
= fsPageData(pPg
, &nData
);
5626 aCell
= pageGetCell(aData
, nData
, iCell
);
5628 aCell
+= lsmVarintGet32(aCell
, &iPgPtr
);
5632 LsmPgno iRef
; /* Page number of referenced page */
5633 aCell
+= lsmVarintGet64(aCell
, &iRef
);
5635 lsmFsDbPageGet(pDb
->pFS
, pSeg
, iRef
, &pRef
);
5636 pageGetKeyCopy(pDb
->pEnv
, pSeg
, pRef
, 0, &dummy
, pBlob
);
5637 aKey
= (u8
*)pBlob
->pData
;
5638 nKey
= pBlob
->nData
;
5639 lsmFsPageRelease(pRef
);
5641 aKey
= (u8
*)"<indirect>";
5645 aCell
+= lsmVarintGet32(aCell
, &nKey
);
5646 if( rtIsWrite(eType
) ) aCell
+= lsmVarintGet32(aCell
, &nVal
);
5647 sortedReadData(pSeg
, pPg
, (aCell
-aData
), nKey
+nVal
, (void **)&aKey
, pBlob
);
5651 if( peType
) *peType
= eType
;
5652 if( piPgPtr
) *piPgPtr
= iPgPtr
;
5653 if( paKey
) *paKey
= aKey
;
5654 if( paVal
) *paVal
= aVal
;
5655 if( pnKey
) *pnKey
= nKey
;
5656 if( pnVal
) *pnVal
= nVal
;
5659 static int infoAppendBlob(LsmString
*pStr
, int bHex
, u8
*z
, int n
){
5661 for(iChar
=0; iChar
<n
; iChar
++){
5663 lsmStringAppendf(pStr
, "%02X", z
[iChar
]);
5665 lsmStringAppendf(pStr
, "%c", isalnum(z
[iChar
]) ?z
[iChar
] : '.');
5671 #define INFO_PAGE_DUMP_DATA 0x01
5672 #define INFO_PAGE_DUMP_VALUES 0x02
5673 #define INFO_PAGE_DUMP_HEX 0x04
5674 #define INFO_PAGE_DUMP_INDIRECT 0x08
5676 static int infoPageDump(
5677 lsm_db
*pDb
, /* Database handle */
5678 LsmPgno iPg
, /* Page number of page to dump */
5680 char **pzOut
/* OUT: lsmMalloc'd string */
5682 int rc
= LSM_OK
; /* Return code */
5683 Page
*pPg
= 0; /* Handle for page iPg */
5684 int i
, j
; /* Loop counters */
5685 const int perLine
= 16; /* Bytes per line in the raw hex dump */
5689 int bValues
= (flags
& INFO_PAGE_DUMP_VALUES
);
5690 int bHex
= (flags
& INFO_PAGE_DUMP_HEX
);
5691 int bData
= (flags
& INFO_PAGE_DUMP_DATA
);
5692 int bIndirect
= (flags
& INFO_PAGE_DUMP_INDIRECT
);
5695 if( iPg
==0 ) return LSM_ERROR
;
5697 assert( pDb
->pClient
|| pDb
->pWorker
);
5698 pSnap
= pDb
->pClient
;
5699 if( pSnap
==0 ) pSnap
= pDb
->pWorker
;
5700 if( pSnap
->redirect
.n
>0 ){
5703 for(pLvl
=pSnap
->pLevel
; pLvl
->pNext
; pLvl
=pLvl
->pNext
);
5704 pSeg
= (pLvl
->nRight
==0 ? &pLvl
->lhs
: &pLvl
->aRhs
[pLvl
->nRight
-1]);
5705 rc
= lsmFsSegmentContainsPg(pDb
->pFS
, pSeg
, iPg
, &bUse
);
5711 /* iPg is a real page number (not subject to redirection). So it is safe
5712 ** to pass a NULL in place of the segment pointer as the second argument
5713 ** to lsmFsDbPageGet() here. */
5715 rc
= lsmFsDbPageGet(pDb
->pFS
, 0, iPg
, &pPg
);
5719 LsmBlob blob
= {0, 0, 0, 0};
5726 u8
*aData
; int nData
; /* Page data and size thereof */
5728 aData
= fsPageData(pPg
, &nData
);
5729 nRec
= pageGetNRec(aData
, nData
);
5730 iPtr
= (int)pageGetPtr(aData
, nData
);
5731 flags2
= pageGetFlags(aData
, nData
);
5733 lsmStringInit(&str
, pDb
->pEnv
);
5734 lsmStringAppendf(&str
, "Page : %lld (%d bytes)\n", iPg
, nData
);
5735 lsmStringAppendf(&str
, "nRec : %d\n", nRec
);
5736 lsmStringAppendf(&str
, "iPtr : %d\n", iPtr
);
5737 lsmStringAppendf(&str
, "flags: %04x\n", flags2
);
5738 lsmStringAppendf(&str
, "\n");
5740 for(iCell
=0; iCell
<nRec
; iCell
++){
5743 pDb
, pSeg
, bIndirect
, pPg
, iCell
, 0, 0, 0, &nKey
, 0, 0, &blob
5745 if( nKey
>nKeyWidth
) nKeyWidth
= nKey
;
5747 if( bHex
) nKeyWidth
= nKeyWidth
* 2;
5749 for(iCell
=0; iCell
<nRec
; iCell
++){
5750 u8
*aKey
; int nKey
= 0; /* Key */
5751 u8
*aVal
; int nVal
= 0; /* Value */
5757 infoCellDump(pDb
, pSeg
, bIndirect
, pPg
, iCell
, &eType
, &iPgPtr
,
5758 &aKey
, &nKey
, &aVal
, &nVal
, &blob
5760 iAbsPtr
= iPgPtr
+ ((flags2
& SEGMENT_BTREE_FLAG
) ? 0 : iPtr
);
5762 lsmFlagsToString(eType
, zFlags
);
5763 lsmStringAppendf(&str
, "%s %d (%s) ",
5764 zFlags
, iAbsPtr
, (rtTopic(eType
) ? "sys" : "usr")
5766 infoAppendBlob(&str
, bHex
, aKey
, nKey
);
5767 if( nVal
>0 && bValues
){
5768 lsmStringAppendf(&str
, "%*s", nKeyWidth
- (nKey
*(1+bHex
)), "");
5769 lsmStringAppendf(&str
, " ");
5770 infoAppendBlob(&str
, bHex
, aVal
, nVal
);
5772 if( rtTopic(eType
) ){
5773 int iBlk
= (int)~lsmGetU32(aKey
);
5774 lsmStringAppendf(&str
, " (block=%d", iBlk
);
5776 i64 iSnap
= lsmGetU64(aVal
);
5777 lsmStringAppendf(&str
, " snapshot=%lld", iSnap
);
5779 lsmStringAppendf(&str
, ")");
5781 lsmStringAppendf(&str
, "\n");
5785 lsmStringAppendf(&str
, "\n-------------------"
5786 "-------------------------------------------------------------\n");
5787 lsmStringAppendf(&str
, "Page %d\n",
5788 iPg
, (iPg
-1)*nData
, iPg
*nData
- 1);
5789 for(i
=0; i
<nData
; i
+= perLine
){
5790 lsmStringAppendf(&str
, "%04x: ", i
);
5791 for(j
=0; j
<perLine
; j
++){
5793 lsmStringAppendf(&str
, " ");
5795 lsmStringAppendf(&str
, "%02x ", aData
[i
+j
]);
5798 lsmStringAppendf(&str
, " ");
5799 for(j
=0; j
<perLine
; j
++){
5801 lsmStringAppendf(&str
, " ");
5803 lsmStringAppendf(&str
,"%c", isprint(aData
[i
+j
]) ? aData
[i
+j
] : '.');
5806 lsmStringAppendf(&str
,"\n");
5811 sortedBlobFree(&blob
);
5812 lsmFsPageRelease(pPg
);
5818 int lsmInfoPageDump(
5819 lsm_db
*pDb
, /* Database handle */
5820 LsmPgno iPg
, /* Page number of page to dump */
5821 int bHex
, /* True to output key/value in hex form */
5822 char **pzOut
/* OUT: lsmMalloc'd string */
5824 int flags
= INFO_PAGE_DUMP_DATA
| INFO_PAGE_DUMP_VALUES
;
5825 if( bHex
) flags
|= INFO_PAGE_DUMP_HEX
;
5826 return infoPageDump(pDb
, iPg
, flags
, pzOut
);
5829 void sortedDumpSegment(lsm_db
*pDb
, Segment
*pRun
, int bVals
){
5830 assert( pDb
->xLog
);
5831 if( pRun
&& pRun
->iFirst
){
5832 int flags
= (bVals
? INFO_PAGE_DUMP_VALUES
: 0);
5836 zSeg
= segToString(pDb
->pEnv
, pRun
, 0);
5837 lsmLogMessage(pDb
, LSM_OK
, "Segment: %s", zSeg
);
5838 lsmFree(pDb
->pEnv
, zSeg
);
5840 lsmFsDbPageGet(pDb
->pFS
, pRun
, pRun
->iFirst
, &pPg
);
5844 infoPageDump(pDb
, lsmFsPageNumber(pPg
), flags
, &z
);
5845 lsmLogMessage(pDb
, LSM_OK
, "%s", z
);
5846 lsmFree(pDb
->pEnv
, z
);
5848 sortedDumpPage(pDb
, pRun
, pPg
, bVals
);
5850 lsmFsDbPageNext(pRun
, pPg
, 1, &pNext
);
5851 lsmFsPageRelease(pPg
);
5858 ** Invoke the log callback zero or more times with messages that describe
5859 ** the current database structure.
5861 void lsmSortedDumpStructure(
5862 lsm_db
*pDb
, /* Database handle (used for xLog callback) */
5863 Snapshot
*pSnap
, /* Snapshot to dump */
5864 int bKeys
, /* Output the keys from each segment */
5865 int bVals
, /* Output the values from each segment */
5866 const char *zWhy
/* Caption to print near top of dump */
5868 Snapshot
*pDump
= pSnap
;
5873 pTopLevel
= lsmDbSnapshotLevel(pDump
);
5874 if( pDb
->xLog
&& pTopLevel
){
5875 static int nCall
= 0;
5880 lsmLogMessage(pDb
, LSM_OK
, "Database structure %d (%s)", nCall
, zWhy
);
5883 if( nCall
==1031 || nCall
==1032 ) bKeys
=1;
5886 for(pLevel
=pTopLevel
; pLevel
; pLevel
=pLevel
->pNext
){
5892 Segment
*aRight
[24];
5897 Segment
*pSeg
= &pLevel
->lhs
;
5898 aLeft
[nLeft
++] = pSeg
;
5900 for(i
=0; i
<pLevel
->nRight
; i
++){
5901 aRight
[nRight
++] = &pLevel
->aRhs
[i
];
5904 #ifdef LSM_LOG_FREELIST
5906 memmove(&aRight
[1], aRight
, sizeof(aRight
[0])*nRight
);
5912 for(i
=0; i
<nLeft
|| i
<nRight
; i
++){
5919 fileToString(pDb
, zLeft
, sizeof(zLeft
), 24, aLeft
[i
]);
5922 fileToString(pDb
, zRight
, sizeof(zRight
), 24, aRight
[i
]);
5926 snprintf(zLevel
, sizeof(zLevel
), "L%d: (age=%d) (flags=%.4x)",
5927 iLevel
, (int)pLevel
->iAge
, (int)pLevel
->flags
5937 lsmLogMessage(pDb
, LSM_OK
, "% 25s % *s% -35s %s",
5938 zLevel
, iPad
, "", zLeft
, zRight
5946 for(pLevel
=pTopLevel
; pLevel
; pLevel
=pLevel
->pNext
){
5948 sortedDumpSegment(pDb
, &pLevel
->lhs
, bVals
);
5949 for(i
=0; i
<pLevel
->nRight
; i
++){
5950 sortedDumpSegment(pDb
, &pLevel
->aRhs
[i
], bVals
);
5956 lsmInfoFreelist(pDb
, &zFree
);
5957 lsmLogMessage(pDb
, LSM_OK
, "Freelist: %s", zFree
);
5958 lsmFree(pDb
->pEnv
, zFree
);
5960 assert( lsmFsIntegrityCheck(pDb
) );
5963 void lsmSortedFreeLevel(lsm_env
*pEnv
, Level
*pLevel
){
5967 for(p
=pLevel
; p
; p
=pNext
){
5969 sortedFreeLevel(pEnv
, p
);
5973 void lsmSortedSaveTreeCursors(lsm_db
*pDb
){
5975 for(pCsr
=pDb
->pCsr
; pCsr
; pCsr
=pCsr
->pNext
){
5976 lsmTreeCursorSave(pCsr
->apTreeCsr
[0]);
5977 lsmTreeCursorSave(pCsr
->apTreeCsr
[1]);
5981 void lsmSortedExpandBtreePage(Page
*pPg
, int nOrig
){
5987 aData
= lsmFsPageData(pPg
, &nData
);
5988 nEntry
= pageGetNRec(aData
, nOrig
);
5989 iHdr
= SEGMENT_EOF(nOrig
, nEntry
);
5990 memmove(&aData
[iHdr
+ (nData
-nOrig
)], &aData
[iHdr
], nOrig
-iHdr
);
5993 #ifdef LSM_DEBUG_EXPENSIVE
5994 static void assertRunInOrder(lsm_db
*pDb
, Segment
*pSeg
){
5996 LsmBlob blob1
= {0, 0, 0, 0};
5997 LsmBlob blob2
= {0, 0, 0, 0};
5999 lsmFsDbPageGet(pDb
->pFS
, pSeg
, pSeg
->iFirst
, &pPg
);
6001 u8
*aData
; int nData
;
6004 aData
= lsmFsPageData(pPg
, &nData
);
6005 if( 0==(pageGetFlags(aData
, nData
) & SEGMENT_BTREE_FLAG
) ){
6007 int nRec
= pageGetNRec(aData
, nData
);
6008 for(i
=0; i
<nRec
; i
++){
6009 int iTopic1
, iTopic2
;
6010 pageGetKeyCopy(pDb
->pEnv
, pSeg
, pPg
, i
, &iTopic1
, &blob1
);
6012 if( i
==0 && blob2
.nData
){
6013 assert( sortedKeyCompare(
6014 pDb
->xCmp
, iTopic2
, blob2
.pData
, blob2
.nData
,
6015 iTopic1
, blob1
.pData
, blob1
.nData
6020 pageGetKeyCopy(pDb
->pEnv
, pSeg
, pPg
, i
+1, &iTopic2
, &blob2
);
6021 assert( sortedKeyCompare(
6022 pDb
->xCmp
, iTopic1
, blob1
.pData
, blob1
.nData
,
6023 iTopic2
, blob2
.pData
, blob2
.nData
6029 lsmFsDbPageNext(pSeg
, pPg
, 1, &pNext
);
6030 lsmFsPageRelease(pPg
);
6034 sortedBlobFree(&blob1
);
6035 sortedBlobFree(&blob2
);
6039 #ifdef LSM_DEBUG_EXPENSIVE
6041 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6042 ** defined. Its only purpose is to evaluate various assert() statements to
6043 ** verify that the database is well formed in certain respects.
6045 ** More specifically, it checks that the array pOne contains the required
6046 ** pointers to pTwo. Array pTwo must be a main array. pOne may be either a
6047 ** separators array or another main array. If pOne does not contain the
6048 ** correct set of pointers, an assert() statement fails.
6050 static int assertPointersOk(
6051 lsm_db
*pDb
, /* Database handle */
6052 Segment
*pOne
, /* Segment containing pointers */
6053 Segment
*pTwo
, /* Segment containing pointer targets */
6054 int bRhs
/* True if pTwo may have been Gobble()d */
6056 int rc
= LSM_OK
; /* Error code */
6057 SegmentPtr ptr1
; /* Iterates through pOne */
6058 SegmentPtr ptr2
; /* Iterates through pTwo */
6061 assert( pOne
&& pTwo
);
6063 memset(&ptr1
, 0, sizeof(ptr1
));
6064 memset(&ptr2
, 0, sizeof(ptr1
));
6067 segmentPtrEndPage(pDb
->pFS
, &ptr1
, 0, &rc
);
6068 segmentPtrEndPage(pDb
->pFS
, &ptr2
, 0, &rc
);
6070 /* Check that the footer pointer of the first page of pOne points to
6071 ** the first page of pTwo. */
6072 iPrev
= pTwo
->iFirst
;
6073 if( ptr1
.iPtr
!=iPrev
&& !bRhs
){
6077 if( rc
==LSM_OK
&& ptr1
.nCell
>0 ){
6078 rc
= segmentPtrLoadCell(&ptr1
, 0);
6081 while( rc
==LSM_OK
&& ptr2
.pPg
){
6084 /* Advance to the next page of segment pTwo that contains at least
6085 ** one cell. Break out of the loop if the iterator reaches EOF. */
6087 rc
= segmentPtrNextPage(&ptr2
, 1);
6088 assert( rc
==LSM_OK
);
6089 }while( rc
==LSM_OK
&& ptr2
.pPg
&& ptr2
.nCell
==0 );
6090 if( rc
!=LSM_OK
|| ptr2
.pPg
==0 ) break;
6091 iThis
= lsmFsPageNumber(ptr2
.pPg
);
6093 if( (ptr2
.flags
& (PGFTR_SKIP_THIS_FLAG
|SEGMENT_BTREE_FLAG
))==0 ){
6095 /* Load the first cell in the array pTwo page. */
6096 rc
= segmentPtrLoadCell(&ptr2
, 0);
6098 /* Iterate forwards through pOne, searching for a key that matches the
6099 ** key ptr2.pKey/nKey. This key should have a pointer to the page that
6100 ** ptr2 currently points to. */
6101 while( rc
==LSM_OK
){
6102 int res
= rtTopic(ptr1
.eType
) - rtTopic(ptr2
.eType
);
6104 res
= pDb
->xCmp(ptr1
.pKey
, ptr1
.nKey
, ptr2
.pKey
, ptr2
.nKey
);
6108 assert( bRhs
|| ptr1
.iPtr
+ptr1
.iPgPtr
==iPrev
);
6112 assert( ptr1
.iPtr
+ptr1
.iPgPtr
==iThis
);
6117 rc
= segmentPtrAdvance(0, &ptr1
, 0);
6125 segmentPtrReset(&ptr1
, 0);
6126 segmentPtrReset(&ptr2
, 0);
6131 ** This function is only included in the build if LSM_DEBUG_EXPENSIVE is
6132 ** defined. Its only purpose is to evaluate various assert() statements to
6133 ** verify that the database is well formed in certain respects.
6135 ** More specifically, it checks that the b-tree embedded in array pRun
6136 ** contains the correct keys. If not, an assert() fails.
6138 static int assertBtreeOk(
6142 int rc
= LSM_OK
; /* Return code */
6144 LsmBlob blob
= {0, 0, 0}; /* Buffer used to cache overflow keys */
6145 FileSystem
*pFS
= pDb
->pFS
; /* File system to read from */
6146 Page
*pPg
= 0; /* Main run page */
6147 BtreeCursor
*pCsr
= 0; /* Btree cursor */
6149 rc
= btreeCursorNew(pDb
, pSeg
, &pCsr
);
6151 rc
= btreeCursorFirst(pCsr
);
6154 rc
= lsmFsDbPageGet(pFS
, pSeg
, pSeg
->iFirst
, &pPg
);
6157 while( rc
==LSM_OK
){
6163 rc
= lsmFsDbPageNext(pSeg
, pPg
, 1, &pNext
);
6164 lsmFsPageRelease(pPg
);
6167 aData
= fsPageData(pPg
, &nData
);
6168 flags
= pageGetFlags(aData
, nData
);
6170 && 0==((SEGMENT_BTREE_FLAG
|PGFTR_SKIP_THIS_FLAG
) & flags
)
6171 && 0!=pageGetNRec(aData
, nData
)
6176 pKey
= pageGetKey(pSeg
, pPg
, 0, &iTopic
, &nKey
, &blob
);
6177 assert( nKey
==pCsr
->nKey
&& 0==memcmp(pKey
, pCsr
->pKey
, nKey
) );
6178 assert( lsmFsPageNumber(pPg
)==pCsr
->iPtr
);
6179 rc
= btreeCursorNext(pCsr
);
6182 assert( rc
!=LSM_OK
|| pCsr
->pKey
==0 );
6184 if( pPg
) lsmFsPageRelease(pPg
);
6186 btreeCursorFree(pCsr
);
6187 sortedBlobFree(&blob
);
6192 #endif /* ifdef LSM_DEBUG_EXPENSIVE */