Update test file walprotocol.test to account for the changes in the wal
[sqlite.git] / ext / lsm1 / lsm_tree.c
blob1a199fc1ce52782c64d77217c5d36bee64622a64
1 /*
2 ** 2011-08-18
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** This file contains the implementation of an in-memory tree structure.
15 ** Technically the tree is a B-tree of order 4 (in the Knuth sense - each
16 ** node may have up to 4 children). Keys are stored within B-tree nodes by
17 ** reference. This may be slightly slower than a conventional red-black
18 ** tree, but it is simpler. It is also an easier structure to modify to
19 ** create a version that supports nested transaction rollback.
21 ** This tree does not currently support a delete operation. One is not
22 ** required. When LSM deletes a key from a database, it inserts a DELETE
23 ** marker into the data structure. As a result, although the value associated
24 ** with a key stored in the in-memory tree structure may be modified, no
25 ** keys are ever removed.
29 ** MVCC NOTES
31 ** The in-memory tree structure supports SQLite-style MVCC. This means
32 ** that while one client is writing to the tree structure, other clients
33 ** may still be querying an older snapshot of the tree.
35 ** One way to implement this is to use an append-only b-tree. In this
36 ** case instead of modifying nodes in-place, a copy of the node is made
37 ** and the required modifications made to the copy. The parent of the
38 ** node is then modified (to update the pointer so that it points to
39 ** the new copy), which causes a copy of the parent to be made, and so on.
40 ** This means that each time the tree is written to a new root node is
41 ** created. A snapshot is identified by the root node that it uses.
43 ** The problem with the above is that each time the tree is written to,
44 ** a copy of the node structure modified and all of its ancestor nodes
45 ** is made. This may prove excessive with large tree structures.
47 ** To reduce this overhead, the data structure used for a tree node is
48 ** designed so that it may be edited in place exactly once without
49 ** affecting existing users. In other words, the node structure is capable
50 ** of storing two separate versions of the node at the same time.
51 ** When a node is to be edited, if the node structure already contains
52 ** two versions, a copy is made as in the append-only approach. Or, if
53 ** it only contains a single version, it is edited in place.
55 ** This reduces the overhead so that, roughly, one new node structure
56 ** must be allocated for each write (on top of those allocations that
57 ** would have been required by a non-MVCC tree). Logic: Assume that at
58 ** any time, 50% of nodes in the tree already contain 2 versions. When
59 ** a new entry is written to a node, there is a 50% chance that a copy
60 ** of the node will be required. And a 25% chance that a copy of its
61 ** parent is required. And so on.
63 ** ROLLBACK
65 ** The in-memory tree also supports transaction and sub-transaction
66 ** rollback. In order to rollback to point in time X, the following is
67 ** necessary:
69 ** 1. All memory allocated since X must be freed, and
70 ** 2. All "v2" data adding to nodes that existed at X should be zeroed.
71 ** 3. The root node must be restored to its X value.
73 ** The Mempool object used to allocate memory for the tree supports
74 ** operation (1) - see the lsmPoolMark() and lsmPoolRevert() functions.
76 ** To support (2), all nodes that have v2 data are part of a singly linked
77 ** list, sorted by the age of the v2 data (nodes that have had data added
78 ** most recently are at the end of the list). So to zero all v2 data added
79 ** since X, the linked list is traversed from the first node added following
80 ** X onwards.
84 #ifndef _LSM_INT_H
85 # include "lsmInt.h"
86 #endif
88 #include <string.h>
90 #define MAX_DEPTH 32
92 typedef struct TreeKey TreeKey;
93 typedef struct TreeNode TreeNode;
94 typedef struct TreeLeaf TreeLeaf;
95 typedef struct NodeVersion NodeVersion;
97 struct TreeOld {
98 u32 iShmid; /* Last shared-memory chunk in use by old */
99 u32 iRoot; /* Offset of root node in shm file */
100 u32 nHeight; /* Height of tree structure */
103 #if 0
105 ** assert() that a TreeKey.flags value is sane. Usage:
107 ** assert( lsmAssertFlagsOk(pTreeKey->flags) );
109 static int lsmAssertFlagsOk(u8 keyflags){
110 /* At least one flag must be set. Otherwise, what is this key doing? */
111 assert( keyflags!=0 );
113 /* The POINT_DELETE and INSERT flags cannot both be set. */
114 assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );
116 /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
117 ** flag must also be set. In other words - the three DELETE flags cannot
118 ** all be set */
119 assert( (keyflags & LSM_END_DELETE)==0
120 || (keyflags & LSM_START_DELETE)==0
121 || (keyflags & LSM_POINT_DELETE)==0
124 return 1;
126 #endif
127 static int assert_delete_ranges_match(lsm_db *);
128 static int treeCountEntries(lsm_db *db);
131 ** Container for a key-value pair. Within the *-shm file, each key/value
132 ** pair is stored in a single allocation (which may not actually be
133 ** contiguous in memory). Layout is the TreeKey structure, followed by
134 ** the nKey bytes of key blob, followed by the nValue bytes of value blob
135 ** (if nValue is non-negative).
137 struct TreeKey {
138 int nKey; /* Size of pKey in bytes */
139 int nValue; /* Size of pValue. Or negative. */
140 u8 flags; /* Various LSM_XXX flags */
143 #define TKV_KEY(p) ((void *)&(p)[1])
144 #define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
147 ** A single tree node. A node structure may contain up to 3 key/value
148 ** pairs. Internal (non-leaf) nodes have up to 4 children.
150 ** TODO: Update the format of this to be more compact. Get it working
151 ** first though...
153 struct TreeNode {
154 u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
156 /* The following fields are present for interior nodes only, not leaves. */
157 u32 aiChildPtr[4]; /* Array of pointers to child nodes */
159 /* The extra child pointer slot. */
160 u32 iV2; /* Transaction number of v2 */
161 u8 iV2Child; /* apChild[] entry replaced by pV2Ptr */
162 u32 iV2Ptr; /* Substitute pointer */
165 struct TreeLeaf {
166 u32 aiKeyPtr[3]; /* Array of pointers to TreeKey objects */
169 typedef struct TreeBlob TreeBlob;
170 struct TreeBlob {
171 int n;
172 u8 *a;
176 ** Cursor for searching a tree structure.
178 ** If a cursor does not point to any element (a.k.a. EOF), then the
179 ** TreeCursor.iNode variable is set to a negative value. Otherwise, the
180 ** cursor currently points to key aiCell[iNode] on node apTreeNode[iNode].
182 ** Entries in the apTreeNode[] and aiCell[] arrays contain the node and
183 ** index of the TreeNode.apChild[] pointer followed to descend to the
184 ** current element. Hence apTreeNode[0] always contains the root node of
185 ** the tree.
187 struct TreeCursor {
188 lsm_db *pDb; /* Database handle for this cursor */
189 TreeRoot *pRoot; /* Root node and height of tree to access */
190 int iNode; /* Cursor points at apTreeNode[iNode] */
191 TreeNode *apTreeNode[MAX_DEPTH];/* Current position in tree */
192 u8 aiCell[MAX_DEPTH]; /* Current position in tree */
193 TreeKey *pSave; /* Saved key */
194 TreeBlob blob; /* Dynamic storage for a key */
198 ** A value guaranteed to be larger than the largest possible transaction
199 ** id (TreeHeader.iTransId).
201 #define WORKING_VERSION (1<<30)
203 static int tblobGrow(lsm_db *pDb, TreeBlob *p, int n, int *pRc){
204 if( n>p->n ){
205 lsmFree(pDb->pEnv, p->a);
206 p->a = lsmMallocRc(pDb->pEnv, n, pRc);
207 p->n = n;
209 return (p->a==0);
211 static void tblobFree(lsm_db *pDb, TreeBlob *p){
212 lsmFree(pDb->pEnv, p->a);
216 /***********************************************************************
217 ** Start of IntArray methods. */
219 ** Append value iVal to the contents of IntArray *p. Return LSM_OK if
220 ** successful, or LSM_NOMEM if an OOM condition is encountered.
222 static int intArrayAppend(lsm_env *pEnv, IntArray *p, u32 iVal){
223 assert( p->nArray<=p->nAlloc );
224 if( p->nArray>=p->nAlloc ){
225 u32 *aNew;
226 int nNew = p->nArray ? p->nArray*2 : 128;
227 aNew = lsmRealloc(pEnv, p->aArray, nNew*sizeof(u32));
228 if( !aNew ) return LSM_NOMEM_BKPT;
229 p->aArray = aNew;
230 p->nAlloc = nNew;
233 p->aArray[p->nArray++] = iVal;
234 return LSM_OK;
238 ** Zero the IntArray object.
240 static void intArrayFree(lsm_env *pEnv, IntArray *p){
241 p->nArray = 0;
245 ** Return the number of entries currently in the int-array object.
247 static int intArraySize(IntArray *p){
248 return p->nArray;
252 ** Return a copy of the iIdx'th entry in the int-array.
254 static u32 intArrayEntry(IntArray *p, int iIdx){
255 return p->aArray[iIdx];
259 ** Truncate the int-array so that all but the first nVal values are
260 ** discarded.
262 static void intArrayTruncate(IntArray *p, int nVal){
263 p->nArray = nVal;
265 /* End of IntArray methods.
266 ***********************************************************************/
268 static int treeKeycmp(void *p1, int n1, void *p2, int n2){
269 int res;
270 res = memcmp(p1, p2, LSM_MIN(n1, n2));
271 if( res==0 ) res = (n1-n2);
272 return res;
276 ** The pointer passed as the first argument points to an interior node,
277 ** not a leaf. This function returns the offset of the iCell'th child
278 ** sub-tree of the node.
280 static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
281 assert( iVersion>=0 );
282 assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
283 if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
284 return p->aiChildPtr[iCell];
288 ** Given an offset within the *-shm file, return the associated chunk number.
290 static int treeOffsetToChunk(u32 iOff){
291 assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
292 return (int)(iOff>>15);
295 #define treeShmptrUnsafe(pDb, iPtr) \
296 (&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
299 ** Return a pointer to the mapped memory location associated with *-shm
300 ** file offset iPtr.
302 static void *treeShmptr(lsm_db *pDb, u32 iPtr){
304 assert( (iPtr>>15)<(u32)pDb->nShm );
305 assert( pDb->apShm[iPtr>>15] );
307 return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
310 static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
311 return (ShmChunk *)(pDb->apShm[iChunk]);
314 static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
315 assert( *pRc==LSM_OK );
316 if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
317 return (ShmChunk *)(pDb->apShm[iChunk]);
319 return 0;
323 #ifndef NDEBUG
324 static void assertIsWorkingChild(
325 lsm_db *db,
326 TreeNode *pNode,
327 TreeNode *pParent,
328 int iCell
330 TreeNode *p;
331 u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
332 p = treeShmptr(db, iPtr);
333 assert( p==pNode );
335 #else
336 # define assertIsWorkingChild(w,x,y,z)
337 #endif
339 /* Values for the third argument to treeShmkey(). */
340 #define TKV_LOADKEY 1
341 #define TKV_LOADVAL 2
343 static TreeKey *treeShmkey(
344 lsm_db *pDb, /* Database handle */
345 u32 iPtr, /* Shmptr to TreeKey struct */
346 int eLoad, /* Either zero or a TREEKEY_LOADXXX value */
347 TreeBlob *pBlob, /* Used if dynamic memory is required */
348 int *pRc /* IN/OUT: Error code */
350 TreeKey *pRet;
352 assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
353 pRet = (TreeKey *)treeShmptr(pDb, iPtr);
354 if( pRet ){
355 int nReq; /* Bytes of space required at pRet */
356 int nAvail; /* Bytes of space available at pRet */
358 nReq = sizeof(TreeKey) + pRet->nKey;
359 if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
360 nReq += pRet->nValue;
362 assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
363 nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));
365 if( nAvail<nReq ){
366 if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
367 int nLoad = 0;
368 while( *pRc==LSM_OK ){
369 ShmChunk *pChunk;
370 void *p = treeShmptr(pDb, iPtr);
371 int n = LSM_MIN(nAvail, nReq-nLoad);
373 memcpy(&pBlob->a[nLoad], p, n);
374 nLoad += n;
375 if( nLoad==nReq ) break;
377 pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
378 assert( pChunk );
379 iPtr = (pChunk->iNext * LSM_SHM_CHUNK_SIZE) + LSM_SHM_CHUNK_HDR;
380 nAvail = LSM_SHM_CHUNK_SIZE - LSM_SHM_CHUNK_HDR;
383 pRet = (TreeKey *)(pBlob->a);
387 return pRet;
390 #if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
391 void assert_leaf_looks_ok(TreeNode *pNode){
392 assert( pNode->apKey[1] );
395 void assert_node_looks_ok(TreeNode *pNode, int nHeight){
396 if( pNode ){
397 assert( pNode->apKey[1] );
398 if( nHeight>1 ){
399 int i;
400 assert( getChildPtr(pNode, WORKING_VERSION, 1) );
401 assert( getChildPtr(pNode, WORKING_VERSION, 2) );
402 for(i=0; i<4; i++){
403 assert_node_looks_ok(getChildPtr(pNode, WORKING_VERSION, i), nHeight-1);
410 ** Run various assert() statements to check that the working-version of the
411 ** tree is correct in the following respects:
413 ** * todo...
415 void assert_tree_looks_ok(int rc, Tree *pTree){
417 #else
418 # define assert_tree_looks_ok(x,y)
419 #endif
421 void lsmFlagsToString(int flags, char *zFlags){
423 zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.';
425 /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
426 ** be set. If this is not true, write a '?' to the output. */
427 switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
428 case 0: zFlags[1] = '.'; break;
429 case LSM_POINT_DELETE: zFlags[1] = '-'; break;
430 case LSM_INSERT: zFlags[1] = '+'; break;
431 case LSM_SEPARATOR: zFlags[1] = '^'; break;
432 default: zFlags[1] = '?'; break;
435 zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.';
436 zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
437 zFlags[4] = '\0';
440 #ifdef LSM_DEBUG
443 ** Pointer pBlob points to a buffer containing a blob of binary data
444 ** nBlob bytes long. Append the contents of this blob to *pStr, with
445 ** each octet represented by a 2-digit hexadecimal number. For example,
446 ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF},
447 ** then "0144ff" is appended to *pStr.
449 static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){
450 int i;
451 lsmStringExtend(pStr, nBlob*2);
452 if( pStr->nAlloc==0 ) return;
453 for(i=0; i<nBlob; i++){
454 u8 c = ((u8*)pBlob)[i];
455 if( c>='a' && c<='z' ){
456 pStr->z[pStr->n++] = c;
457 }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
458 pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
459 pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
462 pStr->z[pStr->n] = 0;
465 #if 0 /* NOT USED */
467 ** Append nIndent space (0x20) characters to string *pStr.
469 static void lsmAppendIndent(LsmString *pStr, int nIndent){
470 int i;
471 lsmStringExtend(pStr, nIndent);
472 for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
474 #endif
476 static void strAppendFlags(LsmString *pStr, u8 flags){
477 char zFlags[8];
479 lsmFlagsToString(flags, zFlags);
480 zFlags[4] = ':';
482 lsmStringAppend(pStr, zFlags, 5);
485 void dump_node_contents(
486 lsm_db *pDb,
487 u32 iNode, /* Print out the contents of this node */
488 char *zPath, /* Path from root to this node */
489 int nPath, /* Number of bytes in zPath */
490 int nHeight /* Height: (0==leaf) (1==parent-of-leaf) */
492 const char *zSpace = " ";
493 int i;
494 int rc = LSM_OK;
495 LsmString s;
496 TreeNode *pNode;
497 TreeBlob b = {0, 0};
499 pNode = (TreeNode *)treeShmptr(pDb, iNode);
501 if( nHeight==0 ){
502 /* Append the nIndent bytes of space to string s. */
503 lsmStringInit(&s, pDb->pEnv);
505 /* Append each key to string s. */
506 for(i=0; i<3; i++){
507 u32 iPtr = pNode->aiKeyPtr[i];
508 if( iPtr ){
509 TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i],TKV_LOADKEY, &b,&rc);
510 strAppendFlags(&s, pKey->flags);
511 lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
512 lsmStringAppend(&s, " ", -1);
516 printf("% 6d %.*sleaf%.*s: %s\n",
517 iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
519 lsmStringClear(&s);
520 }else{
521 for(i=0; i<4 && nHeight>0; i++){
522 u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
523 zPath[nPath] = (char)(i+'0');
524 zPath[nPath+1] = '/';
526 if( iPtr ){
527 dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
529 if( i!=3 && pNode->aiKeyPtr[i] ){
530 TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
531 lsmStringInit(&s, pDb->pEnv);
532 strAppendFlags(&s, pKey->flags);
533 lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
534 printf("% 6d %.*s%.*s: %s\n",
535 iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
536 lsmStringClear(&s);
541 tblobFree(pDb, &b);
544 void dump_tree_contents(lsm_db *pDb, const char *zCaption){
545 char zPath[64];
546 TreeRoot *p = &pDb->treehdr.root;
547 printf("\n%s\n", zCaption);
548 zPath[0] = '/';
549 if( p->iRoot ){
550 dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
552 fflush(stdout);
555 #endif
558 ** Initialize a cursor object, the space for which has already been
559 ** allocated.
561 static void treeCursorInit(lsm_db *pDb, int bOld, TreeCursor *pCsr){
562 memset(pCsr, 0, sizeof(TreeCursor));
563 pCsr->pDb = pDb;
564 if( bOld ){
565 pCsr->pRoot = &pDb->treehdr.oldroot;
566 }else{
567 pCsr->pRoot = &pDb->treehdr.root;
569 pCsr->iNode = -1;
573 ** Return a pointer to the mapping of the TreeKey object that the cursor
574 ** is pointing to.
576 static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
577 TreeKey *pRet;
578 lsm_db *pDb = pCsr->pDb;
579 u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]];
581 assert( iPtr );
582 pRet = (TreeKey*)treeShmptrUnsafe(pDb, iPtr);
583 if( !(pRet->flags & LSM_CONTIGUOUS) ){
584 pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc);
587 return pRet;
591 ** Save the current position of tree cursor pCsr.
593 int lsmTreeCursorSave(TreeCursor *pCsr){
594 int rc = LSM_OK;
595 if( pCsr && pCsr->pSave==0 ){
596 int iNode = pCsr->iNode;
597 if( iNode>=0 ){
598 pCsr->pSave = csrGetKey(pCsr, &pCsr->blob, &rc);
600 pCsr->iNode = -1;
602 return rc;
606 ** Restore the position of a saved tree cursor.
608 static int treeCursorRestore(TreeCursor *pCsr, int *pRes){
609 int rc = LSM_OK;
610 if( pCsr->pSave ){
611 TreeKey *pKey = pCsr->pSave;
612 pCsr->pSave = 0;
613 if( pRes ){
614 rc = lsmTreeCursorSeek(pCsr, TKV_KEY(pKey), pKey->nKey, pRes);
617 return rc;
621 ** Allocate nByte bytes of space within the *-shm file. If successful,
622 ** return LSM_OK and set *piPtr to the offset within the file at which
623 ** the allocated space is located.
625 static u32 treeShmalloc(lsm_db *pDb, int bAlign, int nByte, int *pRc){
626 u32 iRet = 0;
627 if( *pRc==LSM_OK ){
628 const static int CHUNK_SIZE = LSM_SHM_CHUNK_SIZE;
629 const static int CHUNK_HDR = LSM_SHM_CHUNK_HDR;
630 u32 iWrite; /* Current write offset */
631 u32 iEof; /* End of current chunk */
632 int iChunk; /* Current chunk */
634 assert( nByte <= (CHUNK_SIZE-CHUNK_HDR) );
636 /* Check if there is enough space on the current chunk to fit the
637 ** new allocation. If not, link in a new chunk and put the new
638 ** allocation at the start of it. */
639 iWrite = pDb->treehdr.iWrite;
640 if( bAlign ){
641 iWrite = (iWrite + 3) & ~0x0003;
642 assert( (iWrite % 4)==0 );
645 assert( iWrite );
646 iChunk = treeOffsetToChunk(iWrite-1);
647 iEof = (iChunk+1) * CHUNK_SIZE;
648 assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
649 if( (iWrite+nByte)>iEof ){
650 ShmChunk *pHdr; /* Header of chunk just finished (iChunk) */
651 ShmChunk *pFirst; /* Header of chunk treehdr.iFirst */
652 ShmChunk *pNext; /* Header of new chunk */
653 int iNext = 0; /* Next chunk */
654 int rc = LSM_OK;
656 pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
658 assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
659 assert( (pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk)==pFirst->iShmid );
661 /* Check if the chunk at the start of the linked list is still in
662 ** use. If not, reuse it. If so, allocate a new chunk by appending
663 ** to the *-shm file. */
664 if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
665 int bInUse;
666 rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
667 if( rc!=LSM_OK ){
668 *pRc = rc;
669 return 0;
671 if( bInUse==0 ){
672 iNext = pDb->treehdr.iFirst;
673 pDb->treehdr.iFirst = pFirst->iNext;
674 assert( pDb->treehdr.iFirst );
677 if( iNext==0 ) iNext = pDb->treehdr.nChunk++;
679 /* Set the header values for the new chunk */
680 pNext = treeShmChunkRc(pDb, iNext, &rc);
681 if( pNext ){
682 pNext->iNext = 0;
683 pNext->iShmid = (pDb->treehdr.iNextShmid++);
684 }else{
685 *pRc = rc;
686 return 0;
689 /* Set the header values for the chunk just finished */
690 pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
691 pHdr->iNext = iNext;
693 /* Advance to the next chunk */
694 iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
697 /* Allocate space at iWrite. */
698 iRet = iWrite;
699 pDb->treehdr.iWrite = iWrite + nByte;
700 pDb->treehdr.root.nByte += nByte;
702 return iRet;
706 ** Allocate and zero nByte bytes of space within the *-shm file.
708 static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
709 u32 iPtr;
710 void *p;
711 iPtr = treeShmalloc(pDb, 1, nByte, pRc);
712 p = treeShmptr(pDb, iPtr);
713 if( p ){
714 assert( *pRc==LSM_OK );
715 memset(p, 0, nByte);
716 *piPtr = iPtr;
718 return p;
721 static TreeNode *newTreeNode(lsm_db *pDb, u32 *piPtr, int *pRc){
722 return treeShmallocZero(pDb, sizeof(TreeNode), piPtr, pRc);
725 static TreeLeaf *newTreeLeaf(lsm_db *pDb, u32 *piPtr, int *pRc){
726 return treeShmallocZero(pDb, sizeof(TreeLeaf), piPtr, pRc);
729 static TreeKey *newTreeKey(
730 lsm_db *pDb,
731 u32 *piPtr,
732 void *pKey, int nKey, /* Key data */
733 void *pVal, int nVal, /* Value data (or nVal<0 for delete) */
734 int *pRc
736 TreeKey *p;
737 u32 iPtr;
738 u32 iEnd;
739 int nRem;
740 u8 *a;
741 int n;
743 /* Allocate space for the TreeKey structure itself */
744 *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
745 p = treeShmptr(pDb, iPtr);
746 if( *pRc ) return 0;
747 p->nKey = nKey;
748 p->nValue = nVal;
750 /* Allocate and populate the space required for the key and value. */
751 n = nRem = nKey;
752 a = (u8 *)pKey;
753 while( a ){
754 while( nRem>0 ){
755 u8 *aAlloc;
756 int nAlloc;
757 u32 iWrite;
759 iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
760 iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
761 nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);
763 aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
764 if( aAlloc==0 ) break;
765 memcpy(aAlloc, &a[n-nRem], nAlloc);
766 nRem -= nAlloc;
768 a = pVal;
769 n = nRem = nVal;
770 pVal = 0;
773 iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal);
774 if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){
775 p->flags = 0;
776 }else{
777 p->flags = LSM_CONTIGUOUS;
780 if( *pRc ) return 0;
781 #if 0
782 printf("store: %d %s\n", (int)iPtr, (char *)pKey);
783 #endif
784 return p;
787 static TreeNode *copyTreeNode(
788 lsm_db *pDb,
789 TreeNode *pOld,
790 u32 *piNew,
791 int *pRc
793 TreeNode *pNew;
795 pNew = newTreeNode(pDb, piNew, pRc);
796 if( pNew ){
797 memcpy(pNew->aiKeyPtr, pOld->aiKeyPtr, sizeof(pNew->aiKeyPtr));
798 memcpy(pNew->aiChildPtr, pOld->aiChildPtr, sizeof(pNew->aiChildPtr));
799 if( pOld->iV2 ) pNew->aiChildPtr[pOld->iV2Child] = pOld->iV2Ptr;
801 return pNew;
804 static TreeNode *copyTreeLeaf(
805 lsm_db *pDb,
806 TreeLeaf *pOld,
807 u32 *piNew,
808 int *pRc
810 TreeLeaf *pNew;
811 pNew = newTreeLeaf(pDb, piNew, pRc);
812 if( pNew ){
813 memcpy(pNew, pOld, sizeof(TreeLeaf));
815 return (TreeNode *)pNew;
819 ** The tree cursor passed as the second argument currently points to an
820 ** internal node (not a leaf). Specifically, to a sub-tree pointer. This
821 ** function replaces the sub-tree that the cursor currently points to
822 ** with sub-tree pNew.
824 ** The sub-tree may be replaced either by writing the "v2 data" on the
825 ** internal node, or by allocating a new TreeNode structure and then
826 ** calling this function on the parent of the internal node.
828 static int treeUpdatePtr(lsm_db *pDb, TreeCursor *pCsr, u32 iNew){
829 int rc = LSM_OK;
830 if( pCsr->iNode<0 ){
831 /* iNew is the new root node */
832 pDb->treehdr.root.iRoot = iNew;
833 }else{
834 /* If this node already has version 2 content, allocate a copy and
835 ** update the copy with the new pointer value. Otherwise, store the
836 ** new pointer as v2 data within the current node structure. */
838 TreeNode *p; /* The node to be modified */
839 int iChildPtr; /* apChild[] entry to modify */
841 p = pCsr->apTreeNode[pCsr->iNode];
842 iChildPtr = pCsr->aiCell[pCsr->iNode];
844 if( p->iV2 ){
845 /* The "allocate new TreeNode" option */
846 u32 iCopy;
847 TreeNode *pCopy;
848 pCopy = copyTreeNode(pDb, p, &iCopy, &rc);
849 if( pCopy ){
850 assert( rc==LSM_OK );
851 pCopy->aiChildPtr[iChildPtr] = iNew;
852 pCsr->iNode--;
853 rc = treeUpdatePtr(pDb, pCsr, iCopy);
855 }else{
856 /* The "v2 data" option */
857 u32 iPtr;
858 assert( pDb->treehdr.root.iTransId>0 );
860 if( pCsr->iNode ){
861 iPtr = getChildPtr(
862 pCsr->apTreeNode[pCsr->iNode-1],
863 pDb->treehdr.root.iTransId, pCsr->aiCell[pCsr->iNode-1]
865 }else{
866 iPtr = pDb->treehdr.root.iRoot;
868 rc = intArrayAppend(pDb->pEnv, &pDb->rollback, iPtr);
870 if( rc==LSM_OK ){
871 p->iV2 = pDb->treehdr.root.iTransId;
872 p->iV2Child = (u8)iChildPtr;
873 p->iV2Ptr = iNew;
878 return rc;
882 ** Cursor pCsr points at a node that is part of pTree. This function
883 ** inserts a new key and optionally child node pointer into that node.
885 ** The position into which the new key and pointer are inserted is
886 ** determined by the iSlot parameter. The new key will be inserted to
887 ** the left of the key currently stored in apKey[iSlot]. Or, if iSlot is
888 ** greater than the index of the rightmost key in the node.
890 ** Pointer pLeftPtr points to a child tree that contains keys that are
891 ** smaller than pTreeKey.
893 static int treeInsert(
894 lsm_db *pDb, /* Database handle */
895 TreeCursor *pCsr, /* Cursor indicating path to insert at */
896 u32 iLeftPtr, /* Left child pointer */
897 u32 iTreeKey, /* Location of key to insert */
898 u32 iRightPtr, /* Right child pointer */
899 int iSlot /* Position to insert key into */
901 int rc = LSM_OK;
902 TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
904 /* Check if the node is currently full. If so, split pNode in two and
905 ** call this function recursively to add a key to the parent. Otherwise,
906 ** insert the new key directly into pNode. */
907 assert( pNode->aiKeyPtr[1] );
908 if( pNode->aiKeyPtr[0] && pNode->aiKeyPtr[2] ){
909 u32 iLeft; TreeNode *pLeft; /* New left-hand sibling node */
910 u32 iRight; TreeNode *pRight; /* New right-hand sibling node */
912 pLeft = newTreeNode(pDb, &iLeft, &rc);
913 pRight = newTreeNode(pDb, &iRight, &rc);
914 if( rc ) return rc;
916 pLeft->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 0);
917 pLeft->aiKeyPtr[1] = pNode->aiKeyPtr[0];
918 pLeft->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 1);
920 pRight->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 2);
921 pRight->aiKeyPtr[1] = pNode->aiKeyPtr[2];
922 pRight->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 3);
924 if( pCsr->iNode==0 ){
925 /* pNode is the root of the tree. Grow the tree by one level. */
926 u32 iRoot; TreeNode *pRoot; /* New root node */
928 pRoot = newTreeNode(pDb, &iRoot, &rc);
929 pRoot->aiKeyPtr[1] = pNode->aiKeyPtr[1];
930 pRoot->aiChildPtr[1] = iLeft;
931 pRoot->aiChildPtr[2] = iRight;
933 pDb->treehdr.root.iRoot = iRoot;
934 pDb->treehdr.root.nHeight++;
935 }else{
937 pCsr->iNode--;
938 rc = treeInsert(pDb, pCsr,
939 iLeft, pNode->aiKeyPtr[1], iRight, pCsr->aiCell[pCsr->iNode]
943 assert( pLeft->iV2==0 );
944 assert( pRight->iV2==0 );
945 switch( iSlot ){
946 case 0:
947 pLeft->aiKeyPtr[0] = iTreeKey;
948 pLeft->aiChildPtr[0] = iLeftPtr;
949 if( iRightPtr ) pLeft->aiChildPtr[1] = iRightPtr;
950 break;
951 case 1:
952 pLeft->aiChildPtr[3] = (iRightPtr ? iRightPtr : pLeft->aiChildPtr[2]);
953 pLeft->aiKeyPtr[2] = iTreeKey;
954 pLeft->aiChildPtr[2] = iLeftPtr;
955 break;
956 case 2:
957 pRight->aiKeyPtr[0] = iTreeKey;
958 pRight->aiChildPtr[0] = iLeftPtr;
959 if( iRightPtr ) pRight->aiChildPtr[1] = iRightPtr;
960 break;
961 case 3:
962 pRight->aiChildPtr[3] = (iRightPtr ? iRightPtr : pRight->aiChildPtr[2]);
963 pRight->aiKeyPtr[2] = iTreeKey;
964 pRight->aiChildPtr[2] = iLeftPtr;
965 break;
968 }else{
969 TreeNode *pNew;
970 u32 *piKey;
971 u32 *piChild;
972 u32 iStore = 0;
973 u32 iNew = 0;
974 int i;
976 /* Allocate a new version of node pNode. */
977 pNew = newTreeNode(pDb, &iNew, &rc);
978 if( rc ) return rc;
980 piKey = pNew->aiKeyPtr;
981 piChild = pNew->aiChildPtr;
983 for(i=0; i<iSlot; i++){
984 if( pNode->aiKeyPtr[i] ){
985 *(piKey++) = pNode->aiKeyPtr[i];
986 *(piChild++) = getChildPtr(pNode, WORKING_VERSION, i);
990 *piKey++ = iTreeKey;
991 *piChild++ = iLeftPtr;
993 iStore = iRightPtr;
994 for(i=iSlot; i<3; i++){
995 if( pNode->aiKeyPtr[i] ){
996 *(piKey++) = pNode->aiKeyPtr[i];
997 *(piChild++) = iStore ? iStore : getChildPtr(pNode, WORKING_VERSION, i);
998 iStore = 0;
1002 if( iStore ){
1003 *piChild = iStore;
1004 }else{
1005 *piChild = getChildPtr(pNode, WORKING_VERSION,
1006 (pNode->aiKeyPtr[2] ? 3 : 2)
1009 pCsr->iNode--;
1010 rc = treeUpdatePtr(pDb, pCsr, iNew);
1013 return rc;
1016 static int treeInsertLeaf(
1017 lsm_db *pDb, /* Database handle */
1018 TreeCursor *pCsr, /* Cursor structure */
1019 u32 iTreeKey, /* Key pointer to insert */
1020 int iSlot /* Insert key to the left of this */
1022 int rc = LSM_OK; /* Return code */
1023 TreeNode *pLeaf = pCsr->apTreeNode[pCsr->iNode];
1024 TreeLeaf *pNew;
1025 u32 iNew;
1027 assert( iSlot>=0 && iSlot<=4 );
1028 assert( pCsr->iNode>0 );
1029 assert( pLeaf->aiKeyPtr[1] );
1031 pCsr->iNode--;
1033 pNew = newTreeLeaf(pDb, &iNew, &rc);
1034 if( pNew ){
1035 if( pLeaf->aiKeyPtr[0] && pLeaf->aiKeyPtr[2] ){
1036 /* The leaf is full. Split it in two. */
1037 TreeLeaf *pRight;
1038 u32 iRight;
1039 pRight = newTreeLeaf(pDb, &iRight, &rc);
1040 if( pRight ){
1041 assert( rc==LSM_OK );
1042 pNew->aiKeyPtr[1] = pLeaf->aiKeyPtr[0];
1043 pRight->aiKeyPtr[1] = pLeaf->aiKeyPtr[2];
1044 switch( iSlot ){
1045 case 0: pNew->aiKeyPtr[0] = iTreeKey; break;
1046 case 1: pNew->aiKeyPtr[2] = iTreeKey; break;
1047 case 2: pRight->aiKeyPtr[0] = iTreeKey; break;
1048 case 3: pRight->aiKeyPtr[2] = iTreeKey; break;
1051 rc = treeInsert(pDb, pCsr, iNew, pLeaf->aiKeyPtr[1], iRight,
1052 pCsr->aiCell[pCsr->iNode]
1055 }else{
1056 int iOut = 0;
1057 int i;
1058 for(i=0; i<4; i++){
1059 if( i==iSlot ) pNew->aiKeyPtr[iOut++] = iTreeKey;
1060 if( i<3 && pLeaf->aiKeyPtr[i] ){
1061 pNew->aiKeyPtr[iOut++] = pLeaf->aiKeyPtr[i];
1064 rc = treeUpdatePtr(pDb, pCsr, iNew);
1068 return rc;
1071 void lsmTreeMakeOld(lsm_db *pDb){
1073 /* A write transaction must be open. Otherwise the code below that
1074 ** assumes (pDb->pClient->iLogOff) is current may malfunction.
1076 ** Update: currently this assert fails due to lsm_flush(), which does
1077 ** not set nTransOpen.
1079 assert( /* pDb->nTransOpen>0 && */ pDb->iReader>=0 );
1081 if( pDb->treehdr.iOldShmid==0 ){
1082 pDb->treehdr.iOldLog = (pDb->treehdr.log.aRegion[2].iEnd << 1);
1083 pDb->treehdr.iOldLog |= (~(pDb->pClient->iLogOff) & (i64)0x0001);
1085 pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
1086 pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
1087 pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
1088 memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));
1090 pDb->treehdr.root.iTransId = 1;
1091 pDb->treehdr.root.iRoot = 0;
1092 pDb->treehdr.root.nHeight = 0;
1093 pDb->treehdr.root.nByte = 0;
1097 void lsmTreeDiscardOld(lsm_db *pDb){
1098 assert( lsmShmAssertLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL)
1099 || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL)
1101 pDb->treehdr.iUsedShmid = pDb->treehdr.iOldShmid;
1102 pDb->treehdr.iOldShmid = 0;
1105 int lsmTreeHasOld(lsm_db *pDb){
1106 return pDb->treehdr.iOldShmid!=0;
1110 ** This function is called during recovery to initialize the
1111 ** tree header. Only the database connections private copy of the tree-header
1112 ** is initialized here - it will be copied into shared memory if log file
1113 ** recovery is successful.
1115 int lsmTreeInit(lsm_db *pDb){
1116 ShmChunk *pOne;
1117 int rc = LSM_OK;
1119 memset(&pDb->treehdr, 0, sizeof(TreeHeader));
1120 pDb->treehdr.root.iTransId = 1;
1121 pDb->treehdr.iFirst = 1;
1122 pDb->treehdr.nChunk = 2;
1123 pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
1124 pDb->treehdr.iNextShmid = 2;
1125 pDb->treehdr.iUsedShmid = 1;
1127 pOne = treeShmChunkRc(pDb, 1, &rc);
1128 if( pOne ){
1129 pOne->iNext = 0;
1130 pOne->iShmid = 1;
1132 return rc;
1135 static void treeHeaderChecksum(
1136 TreeHeader *pHdr,
1137 u32 *aCksum
1139 u32 cksum1 = 0x12345678;
1140 u32 cksum2 = 0x9ABCDEF0;
1141 u32 *a = (u32 *)pHdr;
1142 int i;
1144 assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
1145 assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
1147 for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
1148 cksum1 += a[i];
1149 cksum2 += (cksum1 + a[i+1]);
1151 aCksum[0] = cksum1;
1152 aCksum[1] = cksum2;
1156 ** Return true if the checksum stored in TreeHeader object *pHdr is
1157 ** consistent with the contents of its other fields.
1159 static int treeHeaderChecksumOk(TreeHeader *pHdr){
1160 u32 aCksum[2];
1161 treeHeaderChecksum(pHdr, aCksum);
1162 return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
1166 ** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
1167 ** make relinking the linked list of shared-memory chunks easier.
1169 typedef struct ShmChunkLoc ShmChunkLoc;
1170 struct ShmChunkLoc {
1171 ShmChunk *pShm;
1172 u32 iLoc;
1176 ** This function checks that the linked list of shared memory chunks
1177 ** that starts at chunk db->treehdr.iFirst:
1179 ** 1) Includes all chunks in the shared-memory region, and
1180 ** 2) Links them together in order of ascending shm-id.
1182 ** If no error occurs and the conditions above are met, LSM_OK is returned.
1184 ** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
1185 ** an error is encountered before the checks are completed, another LSM error
1186 ** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
1188 static int treeCheckLinkedList(lsm_db *db){
1189 int rc = LSM_OK;
1190 int nVisit = 0;
1191 ShmChunk *p;
1193 p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
1194 while( rc==LSM_OK && p ){
1195 if( p->iNext ){
1196 if( p->iNext>=db->treehdr.nChunk ){
1197 rc = LSM_CORRUPT_BKPT;
1198 }else{
1199 ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
1200 if( rc==LSM_OK ){
1201 if( pNext->iShmid!=p->iShmid+1 ){
1202 rc = LSM_CORRUPT_BKPT;
1204 p = pNext;
1207 }else{
1208 p = 0;
1210 nVisit++;
1213 if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
1214 rc = LSM_CORRUPT_BKPT;
1216 return rc;
1220 ** Iterate through the current in-memory tree. If there are any v2-pointers
1221 ** with transaction ids larger than db->treehdr.iTransId, zero them.
1223 static int treeRepairPtrs(lsm_db *db){
1224 int rc = LSM_OK;
1226 if( db->treehdr.root.nHeight>1 ){
1227 TreeCursor csr; /* Cursor used to iterate through tree */
1228 u32 iTransId = db->treehdr.root.iTransId;
1230 /* Initialize the cursor structure. Also decrement the nHeight variable
1231 ** in the tree-header. This will prevent the cursor from visiting any
1232 ** leaf nodes. */
1233 db->treehdr.root.nHeight--;
1234 treeCursorInit(db, 0, &csr);
1236 rc = lsmTreeCursorEnd(&csr, 0);
1237 while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
1238 TreeNode *pNode = csr.apTreeNode[csr.iNode];
1239 if( pNode->iV2>iTransId ){
1240 pNode->iV2Child = 0;
1241 pNode->iV2Ptr = 0;
1242 pNode->iV2 = 0;
1244 rc = lsmTreeCursorNext(&csr);
1246 tblobFree(csr.pDb, &csr.blob);
1248 db->treehdr.root.nHeight++;
1251 return rc;
1254 static int treeRepairList(lsm_db *db){
1255 int rc = LSM_OK;
1256 int i;
1257 ShmChunk *p;
1258 ShmChunk *pMin = 0;
1259 u32 iMin = 0;
1261 /* Iterate through all shm chunks. Find the smallest shm-id present in
1262 ** the shared-memory region. */
1263 for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
1264 p = treeShmChunkRc(db, i, &rc);
1265 if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
1266 pMin = p;
1267 iMin = i;
1271 /* Fix the shm-id values on any chunks with a shm-id greater than or
1272 ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to
1273 ** fix the ShmChunk.iNext pointers.
1275 if( rc==LSM_OK ){
1276 int nSort;
1277 int nByte;
1278 u32 iPrevShmid;
1279 ShmChunkLoc *aSort;
1281 /* Allocate space for a merge sort. */
1282 nSort = 1;
1283 while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
1284 nByte = sizeof(ShmChunkLoc) * nSort * 2;
1285 aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
1286 iPrevShmid = pMin->iShmid;
1288 /* Fix all shm-ids, if required. */
1289 if( rc==LSM_OK ){
1290 iPrevShmid = pMin->iShmid-1;
1291 for(i=1; (u32)i<db->treehdr.nChunk; i++){
1292 p = treeShmChunk(db, i);
1293 aSort[i-1].pShm = p;
1294 aSort[i-1].iLoc = i;
1295 if( (u32)i!=db->treehdr.iFirst ){
1296 if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
1297 p->iShmid = iPrevShmid--;
1301 if( iMin!=db->treehdr.iFirst ){
1302 p = treeShmChunk(db, db->treehdr.iFirst);
1303 p->iShmid = iPrevShmid;
1307 if( rc==LSM_OK ){
1308 ShmChunkLoc *aSpace = &aSort[nSort];
1309 for(i=0; i<nSort; i++){
1310 if( aSort[i].pShm ){
1311 assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
1312 assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
1313 aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
1317 if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
1318 for(i=0; i<nSort-1; i++){
1319 if( aSpace[i].pShm ){
1320 aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
1324 rc = treeCheckLinkedList(db);
1325 lsmFree(db->pEnv, aSort);
1329 return rc;
1333 ** This function is called as part of opening a write-transaction if the
1334 ** writer-flag is already set - indicating that the previous writer
1335 ** failed before ending its transaction.
1337 int lsmTreeRepair(lsm_db *db){
1338 int rc = LSM_OK;
1339 TreeHeader hdr;
1340 ShmHeader *pHdr = db->pShmhdr;
1342 /* Ensure that the two tree-headers are consistent. Copy one over the other
1343 ** if necessary. Prefer the data from a tree-header for which the checksum
1344 ** computes. Or, if they both compute, prefer tree-header-1. */
1345 if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
1346 if( treeHeaderChecksumOk(&pHdr->hdr1) ){
1347 memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
1348 }else{
1349 memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
1353 /* Save the connections current copy of the tree-header. It will be
1354 ** restored before returning. */
1355 memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
1357 /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
1358 ** the transaction-id currently in the tree-headers. */
1359 rc = treeRepairPtrs(db);
1361 /* Repair the linked list of shared-memory chunks. */
1362 if( rc==LSM_OK ){
1363 rc = treeRepairList(db);
1366 memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
1367 return rc;
1370 static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
1371 if( *pRc==LSM_OK ){
1372 TreeRoot *p = &db->treehdr.root;
1373 TreeNode *pNew;
1374 u32 iNew;
1375 TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
1376 int iCell = pCsr->aiCell[pCsr->iNode];
1378 /* Create a copy of this node */
1379 if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
1380 pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
1381 }else{
1382 pNew = copyTreeNode(db, pNode, &iNew, pRc);
1385 if( pNew ){
1386 /* Modify the value in the new version */
1387 pNew->aiKeyPtr[iCell] = iKey;
1389 /* Change the pointer in the parent (if any) to point at the new
1390 ** TreeNode */
1391 pCsr->iNode--;
1392 treeUpdatePtr(db, pCsr, iNew);
1397 static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
1398 int iNode = pCsr->iNode;
1399 int iCell = pCsr->aiCell[iNode]+1;
1401 /* Cursor currently points to a leaf node. */
1402 assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
1404 while( iNode>=0 ){
1405 TreeNode *pNode = pCsr->apTreeNode[iNode];
1406 if( iCell<3 && pNode->aiKeyPtr[iCell] ){
1407 int rc = LSM_OK;
1408 TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
1409 assert( rc==LSM_OK );
1410 return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
1412 iNode--;
1413 iCell = pCsr->aiCell[iNode];
1416 return 0;
1419 static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
1420 int iNode = pCsr->iNode;
1422 /* Cursor currently points to a leaf node. */
1423 assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
1425 while( iNode>=0 ){
1426 TreeNode *pNode = pCsr->apTreeNode[iNode];
1427 int iCell = pCsr->aiCell[iNode]-1;
1428 if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
1429 int rc = LSM_OK;
1430 TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
1431 assert( rc==LSM_OK );
1432 return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
1434 iNode--;
1437 return 0;
1441 static int treeInsertEntry(
1442 lsm_db *pDb, /* Database handle */
1443 int flags, /* Flags associated with entry */
1444 void *pKey, /* Pointer to key data */
1445 int nKey, /* Size of key data in bytes */
1446 void *pVal, /* Pointer to value data (or NULL) */
1447 int nVal /* Bytes in value data (or -ve for delete) */
1449 int rc = LSM_OK; /* Return Code */
1450 TreeKey *pTreeKey; /* New key-value being inserted */
1451 u32 iTreeKey;
1452 TreeRoot *p = &pDb->treehdr.root;
1453 TreeCursor csr; /* Cursor to seek to pKey/nKey */
1454 int res = 0; /* Result of seek operation on csr */
1456 assert( nVal>=0 || pVal==0 );
1457 assert_tree_looks_ok(LSM_OK, pTree);
1458 assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE
1459 || flags==LSM_START_DELETE || flags==LSM_END_DELETE
1461 assert( (flags & LSM_CONTIGUOUS)==0 );
1462 #if 0
1463 dump_tree_contents(pDb, "before");
1464 #endif
1466 if( p->iRoot ){
1467 TreeKey *pRes; /* Key at end of seek operation */
1468 treeCursorInit(pDb, 0, &csr);
1470 /* Seek to the leaf (or internal node) that the new key belongs on */
1471 rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
1472 pRes = csrGetKey(&csr, &csr.blob, &rc);
1473 if( rc!=LSM_OK ) return rc;
1474 assert( pRes );
1476 if( flags==LSM_START_DELETE ){
1477 /* When inserting a start-delete-range entry, if the key that
1478 ** occurs immediately before the new entry is already a START_DELETE,
1479 ** then the new entry is not required. */
1480 if( (res<=0 && (pRes->flags & LSM_START_DELETE))
1481 || (res>0 && treePrevIsStartDelete(pDb, &csr))
1483 goto insert_entry_out;
1485 }else if( flags==LSM_END_DELETE ){
1486 /* When inserting an start-delete-range entry, if the key that
1487 ** occurs immediately after the new entry is already an END_DELETE,
1488 ** then the new entry is not required. */
1489 if( (res<0 && treeNextIsEndDelete(pDb, &csr))
1490 || (res>=0 && (pRes->flags & LSM_END_DELETE))
1492 goto insert_entry_out;
1496 if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
1497 if( pRes->flags & LSM_INSERT ){
1498 nVal = pRes->nValue;
1499 pVal = TKV_VAL(pRes);
1501 flags = flags | pRes->flags;
1504 if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
1505 if( (res<0 && (pRes->flags & LSM_START_DELETE))
1506 || (res>0 && (pRes->flags & LSM_END_DELETE))
1508 flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
1509 }else if( res==0 ){
1510 flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
1513 }else{
1514 memset(&csr, 0, sizeof(TreeCursor));
1517 /* Allocate and populate a new key-value pair structure */
1518 pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
1519 if( rc!=LSM_OK ) return rc;
1520 assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS );
1521 pTreeKey->flags |= flags;
1523 if( p->iRoot==0 ){
1524 /* The tree is completely empty. Add a new root node and install
1525 ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
1526 ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
1527 ** space for the fields used by interior nodes). This is because the
1528 ** treeInsert() routine may convert this node to an interior node. */
1529 TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
1530 if( rc==LSM_OK ){
1531 assert( p->nHeight==0 );
1532 pRoot->aiKeyPtr[1] = iTreeKey;
1533 p->nHeight = 1;
1535 }else{
1536 if( res==0 ){
1537 /* The search found a match within the tree. */
1538 treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
1539 }else{
1540 /* The cursor now points to the leaf node into which the new entry should
1541 ** be inserted. There may or may not be a free slot within the leaf for
1542 ** the new key-value pair.
1544 ** iSlot is set to the index of the key within pLeaf that the new key
1545 ** should be inserted to the left of (or to a value 1 greater than the
1546 ** index of the rightmost key if the new key is larger than all keys
1547 ** currently stored in the node).
1549 int iSlot = csr.aiCell[csr.iNode] + (res<0);
1550 if( csr.iNode==0 ){
1551 rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
1552 }else{
1553 rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
1558 #if 0
1559 dump_tree_contents(pDb, "after");
1560 #endif
1561 insert_entry_out:
1562 tblobFree(pDb, &csr.blob);
1563 assert_tree_looks_ok(rc, pTree);
1564 return rc;
1568 ** Insert a new entry into the in-memory tree.
1570 ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
1571 ** is inserted into the tree. In this case the value pointer, pVal, must be
1572 ** NULL.
1574 int lsmTreeInsert(
1575 lsm_db *pDb, /* Database handle */
1576 void *pKey, /* Pointer to key data */
1577 int nKey, /* Size of key data in bytes */
1578 void *pVal, /* Pointer to value data (or NULL) */
1579 int nVal /* Bytes in value data (or -ve for delete) */
1581 int flags;
1582 if( nVal<0 ){
1583 flags = LSM_POINT_DELETE;
1584 }else{
1585 flags = LSM_INSERT;
1588 return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
1591 static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
1592 TreeRoot *p = &db->treehdr.root;
1593 TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
1594 int iSlot = pCsr->aiCell[pCsr->iNode];
1595 int bLeaf;
1596 int rc = LSM_OK;
1598 assert( pNode->aiKeyPtr[1] );
1599 assert( pNode->aiKeyPtr[iSlot] );
1600 assert( iSlot==0 || iSlot==1 || iSlot==2 );
1601 assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
1603 bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
1605 if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
1606 /* There are currently at least 2 keys on this node. So just create
1607 ** a new copy of the node with one of the keys removed. If the node
1608 ** happens to be the root node of the tree, allocate an entire
1609 ** TreeNode structure instead of just a TreeLeaf. */
1610 TreeNode *pNew;
1611 u32 iNew;
1613 if( bLeaf ){
1614 pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
1615 }else{
1616 pNew = newTreeNode(db, &iNew, &rc);
1618 if( pNew ){
1619 int i;
1620 int iOut = 1;
1621 for(i=0; i<4; i++){
1622 if( i==iSlot ){
1623 i++;
1624 if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
1625 if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
1626 iOut++;
1627 }else if( bLeaf || p->nHeight==1 ){
1628 if( i<3 && pNode->aiKeyPtr[i] ){
1629 pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
1631 }else{
1632 if( getChildPtr(pNode, WORKING_VERSION, i) ){
1633 pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
1634 if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
1635 iOut++;
1639 assert( iOut<=4 );
1640 assert( bLeaf || pNew->aiChildPtr[0]==0 );
1641 pCsr->iNode--;
1642 rc = treeUpdatePtr(db, pCsr, iNew);
1645 }else if( pCsr->iNode==0 ){
1646 /* Removing the only key in the root node. iNewptr is the new root. */
1647 assert( iSlot==1 );
1648 db->treehdr.root.iRoot = iNewptr;
1649 db->treehdr.root.nHeight--;
1651 }else{
1652 /* There is only one key on this node and the node is not the root
1653 ** node. Find a peer for this node. Then redistribute the contents of
1654 ** the peer and the parent cell between the parent and either one or
1655 ** two new nodes. */
1656 TreeNode *pParent; /* Parent tree node */
1657 int iPSlot;
1658 u32 iPeer; /* Pointer to peer leaf node */
1659 int iDir;
1660 TreeNode *pPeer; /* The peer leaf node */
1661 TreeNode *pNew1; u32 iNew1; /* First new leaf node */
1663 assert( iSlot==1 );
1665 pParent = pCsr->apTreeNode[pCsr->iNode-1];
1666 iPSlot = pCsr->aiCell[pCsr->iNode-1];
1668 if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
1669 iDir = -1;
1670 }else{
1671 iDir = +1;
1673 iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
1674 pPeer = (TreeNode *)treeShmptr(db, iPeer);
1675 assertIsWorkingChild(db, pNode, pParent, iPSlot);
1677 /* Allocate the first new leaf node. This is always required. */
1678 if( bLeaf ){
1679 pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
1680 }else{
1681 pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
1684 if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
1685 /* Peer node is completely full. This means that two new leaf nodes
1686 ** and a new parent node are required. */
1688 TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
1689 TreeNode *pNewP; u32 iNewP; /* New parent node */
1691 if( bLeaf ){
1692 pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
1693 }else{
1694 pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
1696 pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
1698 if( iDir==-1 ){
1699 pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
1700 if( bLeaf==0 ){
1701 pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
1702 pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
1705 pNewP->aiChildPtr[iPSlot-1] = iNew1;
1706 pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
1707 pNewP->aiChildPtr[iPSlot] = iNew2;
1709 pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
1710 pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
1711 if( bLeaf==0 ){
1712 pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
1713 pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
1714 pNew2->aiChildPtr[2] = iNewptr;
1716 }else{
1717 pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
1718 if( bLeaf==0 ){
1719 pNew1->aiChildPtr[1] = iNewptr;
1720 pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
1723 pNewP->aiChildPtr[iPSlot] = iNew1;
1724 pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
1725 pNewP->aiChildPtr[iPSlot+1] = iNew2;
1727 pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
1728 pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
1729 if( bLeaf==0 ){
1730 pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
1731 pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
1732 pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
1735 assert( pCsr->iNode>=1 );
1736 pCsr->iNode -= 2;
1737 if( rc==LSM_OK ){
1738 assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
1739 rc = treeUpdatePtr(db, pCsr, iNewP);
1741 }else{
1742 int iKOut = 0;
1743 int iPOut = 0;
1744 int i;
1746 pCsr->iNode--;
1748 if( iDir==1 ){
1749 pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
1750 if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
1752 for(i=0; i<3; i++){
1753 if( pPeer->aiKeyPtr[i] ){
1754 pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
1757 if( bLeaf==0 ){
1758 for(i=0; i<4; i++){
1759 if( getChildPtr(pPeer, WORKING_VERSION, i) ){
1760 pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
1764 if( iDir==-1 ){
1765 iPSlot--;
1766 pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
1767 if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
1768 pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
1771 rc = treeDeleteEntry(db, pCsr, iNew1);
1775 return rc;
1779 ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
1780 ** function, not lsm_delete()).
1782 ** This is a two step process:
1784 ** 1) Remove all entries currently stored in the tree that have keys
1785 ** that fall into the deleted range.
1787 ** TODO: There are surely good ways to optimize this step - removing
1788 ** a range of keys from a b-tree. But for now, this function removes
1789 ** them one at a time using the usual approach.
1791 ** 2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
1792 ** already marked as START_DELETE, insert a START_DELETE key.
1793 ** Similarly, unless the smallest key greater than or equal to
1794 ** (pKey2/nKey2) is already START_END, insert a START_END key.
1796 int lsmTreeDelete(
1797 lsm_db *db,
1798 void *pKey1, int nKey1, /* Start of range */
1799 void *pKey2, int nKey2 /* End of range */
1801 int rc = LSM_OK;
1802 int bDone = 0;
1803 TreeRoot *p = &db->treehdr.root;
1804 TreeBlob blob = {0, 0};
1806 /* The range must be sensible - that (key1 < key2). */
1807 assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 );
1808 assert( assert_delete_ranges_match(db) );
1810 #if 0
1811 static int nCall = 0;
1812 printf("\n");
1813 nCall++;
1814 printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
1815 dump_tree_contents(db, "before delete");
1816 #endif
1818 /* Step 1. This loop runs until the tree contains no keys within the
1819 ** range being deleted. Or until an error occurs. */
1820 while( bDone==0 && rc==LSM_OK ){
1821 int res;
1822 TreeCursor csr; /* Cursor to seek to first key in range */
1823 void *pDel; int nDel; /* Key to (possibly) delete this iteration */
1824 #ifndef NDEBUG
1825 int nEntry = treeCountEntries(db);
1826 #endif
1828 /* Seek the cursor to the first entry in the tree greater than pKey1. */
1829 treeCursorInit(db, 0, &csr);
1830 lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
1831 if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
1833 /* If there is no such entry, or if it is greater than pKey2, then the
1834 ** tree now contains no keys in the range being deleted. In this case
1835 ** break out of the loop. */
1836 bDone = 1;
1837 if( lsmTreeCursorValid(&csr) ){
1838 lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
1839 if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
1842 if( bDone==0 ){
1843 if( (u32)csr.iNode==(p->nHeight-1) ){
1844 /* The element to delete already lies on a leaf node */
1845 rc = treeDeleteEntry(db, &csr, 0);
1846 }else{
1847 /* 1. Overwrite the current key with a copy of the next key in the
1848 ** tree (key N).
1850 ** 2. Seek to key N (cursor will stop at the internal node copy of
1851 ** N). Move to the next key (original copy of N). Delete
1852 ** this entry.
1854 u32 iKey;
1855 TreeKey *pKey;
1856 int iNode = csr.iNode;
1857 lsmTreeCursorNext(&csr);
1858 assert( (u32)csr.iNode==(p->nHeight-1) );
1860 iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
1861 lsmTreeCursorPrev(&csr);
1863 treeOverwriteKey(db, &csr, iKey, &rc);
1864 pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
1865 if( pKey ){
1866 rc = lsmTreeCursorSeek(&csr, TKV_KEY(pKey), pKey->nKey, &res);
1868 if( rc==LSM_OK ){
1869 assert( res==0 && csr.iNode==iNode );
1870 rc = lsmTreeCursorNext(&csr);
1871 if( rc==LSM_OK ){
1872 rc = treeDeleteEntry(db, &csr, 0);
1878 /* Clean up any memory allocated by the cursor. */
1879 tblobFree(db, &csr.blob);
1880 #if 0
1881 dump_tree_contents(db, "ddd delete");
1882 #endif
1883 assert( bDone || treeCountEntries(db)==(nEntry-1) );
1886 #if 0
1887 dump_tree_contents(db, "during delete");
1888 #endif
1890 /* Now insert the START_DELETE and END_DELETE keys. */
1891 if( rc==LSM_OK ){
1892 rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
1894 #if 0
1895 dump_tree_contents(db, "during delete 2");
1896 #endif
1897 if( rc==LSM_OK ){
1898 rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
1901 #if 0
1902 dump_tree_contents(db, "after delete");
1903 #endif
1905 tblobFree(db, &blob);
1906 assert( assert_delete_ranges_match(db) );
1907 return rc;
1911 ** Return, in bytes, the amount of memory currently used by the tree
1912 ** structure.
1914 int lsmTreeSize(lsm_db *pDb){
1915 return pDb->treehdr.root.nByte;
1919 ** Open a cursor on the in-memory tree pTree.
1921 int lsmTreeCursorNew(lsm_db *pDb, int bOld, TreeCursor **ppCsr){
1922 TreeCursor *pCsr;
1923 *ppCsr = pCsr = lsmMalloc(pDb->pEnv, sizeof(TreeCursor));
1924 if( pCsr ){
1925 treeCursorInit(pDb, bOld, pCsr);
1926 return LSM_OK;
1928 return LSM_NOMEM_BKPT;
1932 ** Close an in-memory tree cursor.
1934 void lsmTreeCursorDestroy(TreeCursor *pCsr){
1935 if( pCsr ){
1936 tblobFree(pCsr->pDb, &pCsr->blob);
1937 lsmFree(pCsr->pDb->pEnv, pCsr);
1941 void lsmTreeCursorReset(TreeCursor *pCsr){
1942 if( pCsr ){
1943 pCsr->iNode = -1;
1944 pCsr->pSave = 0;
1948 #ifndef NDEBUG
1949 static int treeCsrCompare(TreeCursor *pCsr, void *pKey, int nKey, int *pRc){
1950 TreeKey *p;
1951 int cmp = 0;
1952 assert( pCsr->iNode>=0 );
1953 p = csrGetKey(pCsr, &pCsr->blob, pRc);
1954 if( p ){
1955 cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey);
1957 return cmp;
1959 #endif
1963 ** Attempt to seek the cursor passed as the first argument to key (pKey/nKey)
1964 ** in the tree structure. If an exact match for the key is found, leave the
1965 ** cursor pointing to it and set *pRes to zero before returning. If an
1966 ** exact match cannot be found, do one of the following:
1968 ** * Leave the cursor pointing to the smallest element in the tree that
1969 ** is larger than the key and set *pRes to +1, or
1971 ** * Leave the cursor pointing to the largest element in the tree that
1972 ** is smaller than the key and set *pRes to -1, or
1974 ** * If the tree is empty, leave the cursor at EOF and set *pRes to -1.
1976 int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){
1977 int rc = LSM_OK; /* Return code */
1978 lsm_db *pDb = pCsr->pDb;
1979 TreeRoot *pRoot = pCsr->pRoot;
1980 u32 iNodePtr; /* Location of current node in search */
1982 /* Discard any saved position data */
1983 treeCursorRestore(pCsr, 0);
1985 iNodePtr = pRoot->iRoot;
1986 if( iNodePtr==0 ){
1987 /* Either an error occurred or the tree is completely empty. */
1988 assert( rc!=LSM_OK || pRoot->iRoot==0 );
1989 *pRes = -1;
1990 pCsr->iNode = -1;
1991 }else{
1992 TreeBlob b = {0, 0};
1993 int res = 0; /* Result of comparison function */
1994 int iNode = -1;
1995 while( iNodePtr ){
1996 TreeNode *pNode; /* Node at location iNodePtr */
1997 int iTest; /* Index of second key to test (0 or 2) */
1998 u32 iTreeKey;
1999 TreeKey *pTreeKey; /* Key to compare against */
2001 pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr);
2002 iNode++;
2003 pCsr->apTreeNode[iNode] = pNode;
2005 /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
2006 ** pNode. The middle slot is never empty. If the comparison is a match,
2007 ** then the search is finished. Break out of the loop. */
2008 pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]);
2009 if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
2010 pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
2011 if( rc!=LSM_OK ) break;
2013 res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
2014 if( res==0 ){
2015 pCsr->aiCell[iNode] = 1;
2016 break;
2019 /* Based on the results of the previous comparison, compare (pKey/nKey)
2020 ** to either the left or right key of the B-tree node, if such a key
2021 ** exists. */
2022 iTest = (res>0 ? 0 : 2);
2023 iTreeKey = pNode->aiKeyPtr[iTest];
2024 if( iTreeKey ){
2025 pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
2026 if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
2027 pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
2028 if( rc ) break;
2030 res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
2031 if( res==0 ){
2032 pCsr->aiCell[iNode] = (u8)iTest;
2033 break;
2035 }else{
2036 iTest = 1;
2039 if( (u32)iNode<(pRoot->nHeight-1) ){
2040 iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
2041 }else{
2042 iNodePtr = 0;
2044 pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
2047 *pRes = res;
2048 pCsr->iNode = iNode;
2049 tblobFree(pDb, &b);
2052 /* assert() that *pRes has been set properly */
2053 #ifndef NDEBUG
2054 if( rc==LSM_OK && lsmTreeCursorValid(pCsr) ){
2055 int cmp = treeCsrCompare(pCsr, pKey, nKey, &rc);
2056 assert( rc!=LSM_OK || *pRes==cmp || (*pRes ^ cmp)>0 );
2058 #endif
2060 return rc;
2063 int lsmTreeCursorNext(TreeCursor *pCsr){
2064 #ifndef NDEBUG
2065 TreeKey *pK1;
2066 TreeBlob key1 = {0, 0};
2067 #endif
2068 lsm_db *pDb = pCsr->pDb;
2069 TreeRoot *pRoot = pCsr->pRoot;
2070 const int iLeaf = pRoot->nHeight-1;
2071 int iCell;
2072 int rc = LSM_OK;
2073 TreeNode *pNode;
2075 /* Restore the cursor position, if required */
2076 int iRestore = 0;
2077 treeCursorRestore(pCsr, &iRestore);
2078 if( iRestore>0 ) return LSM_OK;
2080 /* Save a pointer to the current key. This is used in an assert() at the
2081 ** end of this function - to check that the 'next' key really is larger
2082 ** than the current key. */
2083 #ifndef NDEBUG
2084 pK1 = csrGetKey(pCsr, &key1, &rc);
2085 if( rc!=LSM_OK ) return rc;
2086 #endif
2088 assert( lsmTreeCursorValid(pCsr) );
2089 assert( pCsr->aiCell[pCsr->iNode]<3 );
2091 pNode = pCsr->apTreeNode[pCsr->iNode];
2092 iCell = ++pCsr->aiCell[pCsr->iNode];
2094 /* If the current node is not a leaf, and the current cell has sub-tree
2095 ** associated with it, descend to the left-most key on the left-most
2096 ** leaf of the sub-tree. */
2097 if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
2098 do {
2099 u32 iNodePtr;
2100 pCsr->iNode++;
2101 iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2102 pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2103 pCsr->apTreeNode[pCsr->iNode] = pNode;
2104 iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
2105 }while( pCsr->iNode < iLeaf );
2108 /* Otherwise, the next key is found by following pointer up the tree
2109 ** until there is a key immediately to the right of the pointer followed
2110 ** to reach the sub-tree containing the current key. */
2111 else if( iCell>=3 || pNode->aiKeyPtr[iCell]==0 ){
2112 while( (--pCsr->iNode)>=0 ){
2113 iCell = pCsr->aiCell[pCsr->iNode];
2114 if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
2118 #ifndef NDEBUG
2119 if( pCsr->iNode>=0 ){
2120 TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
2121 assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 );
2123 tblobFree(pDb, &key1);
2124 #endif
2126 return rc;
2129 int lsmTreeCursorPrev(TreeCursor *pCsr){
2130 #ifndef NDEBUG
2131 TreeKey *pK1;
2132 TreeBlob key1 = {0, 0};
2133 #endif
2134 lsm_db *pDb = pCsr->pDb;
2135 TreeRoot *pRoot = pCsr->pRoot;
2136 const int iLeaf = pRoot->nHeight-1;
2137 int iCell;
2138 int rc = LSM_OK;
2139 TreeNode *pNode;
2141 /* Restore the cursor position, if required */
2142 int iRestore = 0;
2143 treeCursorRestore(pCsr, &iRestore);
2144 if( iRestore<0 ) return LSM_OK;
2146 /* Save a pointer to the current key. This is used in an assert() at the
2147 ** end of this function - to check that the 'next' key really is smaller
2148 ** than the current key. */
2149 #ifndef NDEBUG
2150 pK1 = csrGetKey(pCsr, &key1, &rc);
2151 if( rc!=LSM_OK ) return rc;
2152 #endif
2154 assert( lsmTreeCursorValid(pCsr) );
2155 pNode = pCsr->apTreeNode[pCsr->iNode];
2156 iCell = pCsr->aiCell[pCsr->iNode];
2157 assert( iCell>=0 && iCell<3 );
2159 /* If the current node is not a leaf, and the current cell has sub-tree
2160 ** associated with it, descend to the right-most key on the right-most
2161 ** leaf of the sub-tree. */
2162 if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
2163 do {
2164 u32 iNodePtr;
2165 pCsr->iNode++;
2166 iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2167 pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2168 if( rc!=LSM_OK ) break;
2169 pCsr->apTreeNode[pCsr->iNode] = pNode;
2170 iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
2171 pCsr->aiCell[pCsr->iNode] = (u8)iCell;
2172 }while( pCsr->iNode < iLeaf );
2175 /* Otherwise, the next key is found by following pointer up the tree until
2176 ** there is a key immediately to the left of the pointer followed to reach
2177 ** the sub-tree containing the current key. */
2178 else{
2179 do {
2180 iCell = pCsr->aiCell[pCsr->iNode]-1;
2181 if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
2182 }while( (--pCsr->iNode)>=0 );
2183 pCsr->aiCell[pCsr->iNode] = (u8)iCell;
2186 #ifndef NDEBUG
2187 if( pCsr->iNode>=0 ){
2188 TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
2189 assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
2191 tblobFree(pDb, &key1);
2192 #endif
2194 return rc;
2198 ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
2199 ** in-memory tree.
2201 int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
2202 lsm_db *pDb = pCsr->pDb;
2203 TreeRoot *pRoot = pCsr->pRoot;
2204 int rc = LSM_OK;
2206 u32 iNodePtr;
2207 pCsr->iNode = -1;
2209 /* Discard any saved position data */
2210 treeCursorRestore(pCsr, 0);
2212 iNodePtr = pRoot->iRoot;
2213 while( iNodePtr ){
2214 int iCell;
2215 TreeNode *pNode;
2217 pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
2218 if( rc ) break;
2220 if( bLast ){
2221 iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
2222 }else{
2223 iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
2225 pCsr->iNode++;
2226 pCsr->apTreeNode[pCsr->iNode] = pNode;
2228 if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
2229 iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
2230 }else{
2231 iNodePtr = 0;
2233 pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
2236 return rc;
2239 int lsmTreeCursorFlags(TreeCursor *pCsr){
2240 int flags = 0;
2241 if( pCsr && pCsr->iNode>=0 ){
2242 int rc = LSM_OK;
2243 TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb,
2244 pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
2246 assert( rc==LSM_OK );
2247 flags = (pKey->flags & ~LSM_CONTIGUOUS);
2249 return flags;
2252 int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
2253 TreeKey *pTreeKey;
2254 int rc = LSM_OK;
2256 assert( lsmTreeCursorValid(pCsr) );
2258 pTreeKey = pCsr->pSave;
2259 if( !pTreeKey ){
2260 pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
2262 if( rc==LSM_OK ){
2263 *pnKey = pTreeKey->nKey;
2264 if( pFlags ) *pFlags = pTreeKey->flags;
2265 *ppKey = (void *)&pTreeKey[1];
2268 return rc;
2271 int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
2272 int res = 0;
2273 int rc;
2275 rc = treeCursorRestore(pCsr, &res);
2276 if( res==0 ){
2277 TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
2278 if( rc==LSM_OK ){
2279 if( pTreeKey->flags & LSM_INSERT ){
2280 *pnVal = pTreeKey->nValue;
2281 *ppVal = TKV_VAL(pTreeKey);
2282 }else{
2283 *ppVal = 0;
2284 *pnVal = -1;
2287 }else{
2288 *ppVal = 0;
2289 *pnVal = 0;
2292 return rc;
2296 ** Return true if the cursor currently points to a valid entry.
2298 int lsmTreeCursorValid(TreeCursor *pCsr){
2299 return (pCsr && (pCsr->pSave || pCsr->iNode>=0));
2303 ** Store a mark in *pMark. Later on, a call to lsmTreeRollback() with a
2304 ** pointer to the same TreeMark structure may be used to roll the tree
2305 ** contents back to their current state.
2307 void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
2308 pMark->iRoot = pDb->treehdr.root.iRoot;
2309 pMark->nHeight = pDb->treehdr.root.nHeight;
2310 pMark->iWrite = pDb->treehdr.iWrite;
2311 pMark->nChunk = pDb->treehdr.nChunk;
2312 pMark->iNextShmid = pDb->treehdr.iNextShmid;
2313 pMark->iRollback = intArraySize(&pDb->rollback);
2317 ** Roll back to mark pMark. Structure *pMark should have been previously
2318 ** populated by a call to lsmTreeMark().
2320 void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
2321 int iIdx;
2322 int nIdx;
2323 u32 iNext;
2324 ShmChunk *pChunk;
2325 u32 iChunk;
2326 u32 iShmid;
2328 /* Revert all required v2 pointers. */
2329 nIdx = intArraySize(&pDb->rollback);
2330 for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
2331 TreeNode *pNode;
2332 pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
2333 assert( pNode );
2334 pNode->iV2 = 0;
2335 pNode->iV2Child = 0;
2336 pNode->iV2Ptr = 0;
2338 intArrayTruncate(&pDb->rollback, pMark->iRollback);
2340 /* Restore the free-chunk list. */
2341 assert( pMark->iWrite!=0 );
2342 iChunk = treeOffsetToChunk(pMark->iWrite-1);
2343 pChunk = treeShmChunk(pDb, iChunk);
2344 iNext = pChunk->iNext;
2345 pChunk->iNext = 0;
2347 pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
2348 iShmid = pChunk->iShmid-1;
2350 while( iNext ){
2351 u32 iFree = iNext; /* Current chunk being rollback-freed */
2352 ShmChunk *pFree; /* Pointer to chunk iFree */
2354 pFree = treeShmChunk(pDb, iFree);
2355 iNext = pFree->iNext;
2357 if( iFree<pMark->nChunk ){
2358 pFree->iNext = pDb->treehdr.iFirst;
2359 pFree->iShmid = iShmid--;
2360 pDb->treehdr.iFirst = iFree;
2364 /* Restore the tree-header fields */
2365 pDb->treehdr.root.iRoot = pMark->iRoot;
2366 pDb->treehdr.root.nHeight = pMark->nHeight;
2367 pDb->treehdr.iWrite = pMark->iWrite;
2368 pDb->treehdr.nChunk = pMark->nChunk;
2369 pDb->treehdr.iNextShmid = pMark->iNextShmid;
2373 ** Load the in-memory tree header from shared-memory into pDb->treehdr.
2374 ** If the header cannot be loaded, return LSM_PROTOCOL.
2376 ** If the header is successfully loaded and parameter piRead is not NULL,
2377 ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
2378 ** the header was loaded from ShmHeader.hdr2.
2380 int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
2381 int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
2382 while( (nRem--)>0 ){
2383 ShmHeader *pShm = pDb->pShmhdr;
2385 memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
2386 if( treeHeaderChecksumOk(&pDb->treehdr) ){
2387 if( piRead ) *piRead = 1;
2388 return LSM_OK;
2390 memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));
2391 if( treeHeaderChecksumOk(&pDb->treehdr) ){
2392 if( piRead ) *piRead = 2;
2393 return LSM_OK;
2396 lsmShmBarrier(pDb);
2398 return LSM_PROTOCOL_BKPT;
2401 int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
2402 TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
2403 assert( iRead==1 || iRead==2 );
2404 return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
2408 ** This function is called to conclude a transaction. If argument bCommit
2409 ** is true, the transaction is committed. Otherwise it is rolled back.
2411 int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){
2412 ShmHeader *pShm = pDb->pShmhdr;
2414 treeHeaderChecksum(&pDb->treehdr, pDb->treehdr.aCksum);
2415 memcpy(&pShm->hdr2, &pDb->treehdr, sizeof(TreeHeader));
2416 lsmShmBarrier(pDb);
2417 memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
2418 pShm->bWriter = 0;
2419 intArrayFree(pDb->pEnv, &pDb->rollback);
2421 return LSM_OK;
2424 #ifndef NDEBUG
2425 static int assert_delete_ranges_match(lsm_db *db){
2426 int prev = 0;
2427 TreeBlob blob = {0, 0};
2428 TreeCursor csr; /* Cursor used to iterate through tree */
2429 int rc;
2431 treeCursorInit(db, 0, &csr);
2432 for( rc = lsmTreeCursorEnd(&csr, 0);
2433 rc==LSM_OK && lsmTreeCursorValid(&csr);
2434 rc = lsmTreeCursorNext(&csr)
2436 TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
2437 if( rc!=LSM_OK ) break;
2438 assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
2439 prev = pKey->flags;
2442 tblobFree(csr.pDb, &csr.blob);
2443 tblobFree(csr.pDb, &blob);
2445 return 1;
2448 static int treeCountEntries(lsm_db *db){
2449 TreeCursor csr; /* Cursor used to iterate through tree */
2450 int rc;
2451 int nEntry = 0;
2453 treeCursorInit(db, 0, &csr);
2454 for( rc = lsmTreeCursorEnd(&csr, 0);
2455 rc==LSM_OK && lsmTreeCursorValid(&csr);
2456 rc = lsmTreeCursorNext(&csr)
2458 nEntry++;
2461 tblobFree(csr.pDb, &csr.blob);
2463 return nEntry;
2465 #endif