4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** The main interface to the LSM module.
20 ** This function returns a copy of its only argument.
22 ** When the library is built with LSM_DEBUG defined, this function is called
23 ** whenever an error code is generated (not propagated - generated). So
24 ** if the library is mysteriously returning (say) LSM_IOERR, a breakpoint
25 ** may be set in this function to determine why.
27 int lsmErrorBkpt(int rc
){
28 /* Set breakpoint here! */
33 ** This function contains various assert() statements that test that the
34 ** lsm_db structure passed as an argument is internally consistent.
36 static void assert_db_state(lsm_db
*pDb
){
38 /* If there is at least one cursor or a write transaction open, the database
39 ** handle must be holding a pointer to a client snapshot. And the reverse
40 ** - if there are no open cursors and no write transactions then there must
41 ** not be a client snapshot. */
43 assert( (pDb
->pCsr
!=0||pDb
->nTransOpen
>0)==(pDb
->iReader
>=0||pDb
->bRoTrans
) );
45 assert( (pDb
->iReader
<0 && pDb
->bRoTrans
==0) || pDb
->pClient
!=0 );
47 assert( pDb
->nTransOpen
>=0 );
50 # define assert_db_state(x)
54 ** The default key-compare function.
56 static int xCmp(void *p1
, int n1
, void *p2
, int n2
){
58 res
= memcmp(p1
, p2
, LSM_MIN(n1
, n2
));
59 if( res
==0 ) res
= (n1
-n2
);
63 static void xLog(void *pCtx
, int rc
, const char *z
){
66 fprintf(stderr
, "%s\n", z
);
71 ** Allocate a new db handle.
73 int lsm_new(lsm_env
*pEnv
, lsm_db
**ppDb
){
76 /* If the user did not provide an environment, use the default. */
77 if( pEnv
==0 ) pEnv
= lsm_default_env();
80 /* Allocate the new database handle */
81 *ppDb
= pDb
= (lsm_db
*)lsmMallocZero(pEnv
, sizeof(lsm_db
));
82 if( pDb
==0 ) return LSM_NOMEM_BKPT
;
84 /* Initialize the new object */
86 pDb
->nTreeLimit
= LSM_DFLT_AUTOFLUSH
;
87 pDb
->nAutockpt
= LSM_DFLT_AUTOCHECKPOINT
;
88 pDb
->bAutowork
= LSM_DFLT_AUTOWORK
;
89 pDb
->eSafety
= LSM_DFLT_SAFETY
;
91 pDb
->nDfltPgsz
= LSM_DFLT_PAGE_SIZE
;
92 pDb
->nDfltBlksz
= LSM_DFLT_BLOCK_SIZE
;
93 pDb
->nMerge
= LSM_DFLT_AUTOMERGE
;
94 pDb
->nMaxFreelist
= LSM_MAX_FREELIST_ENTRIES
;
95 pDb
->bUseLog
= LSM_DFLT_USE_LOG
;
98 pDb
->bMultiProc
= LSM_DFLT_MULTIPLE_PROCESSES
;
99 pDb
->iMmap
= LSM_DFLT_MMAP
;
101 pDb
->compress
.iId
= LSM_COMPRESSION_NONE
;
105 lsm_env
*lsm_get_env(lsm_db
*pDb
){
111 ** If database handle pDb is currently holding a client snapshot, but does
112 ** not have any open cursors or write transactions, release it.
114 static void dbReleaseClientSnapshot(lsm_db
*pDb
){
115 if( pDb
->nTransOpen
==0 && pDb
->pCsr
==0 ){
116 lsmFinishReadTrans(pDb
);
120 static int getFullpathname(
132 rc
= pEnv
->xFullpath(pEnv
, zRel
, zAlloc
, &nReq
);
134 zAlloc
= lsmReallocOrFreeRc(pEnv
, zAlloc
, nReq
, &rc
);
136 }while( nReq
>nAlloc
&& rc
==LSM_OK
);
139 lsmFree(pEnv
, zAlloc
);
147 ** Check that the bits in the db->mLock mask are consistent with the
148 ** value stored in db->iRwclient. An assert shall fail otherwise.
150 static void assertRwclientLockValue(lsm_db
*db
){
152 u64 msk
; /* Mask of mLock bits for RWCLIENT locks */
153 u64 rwclient
= 0; /* Bit corresponding to db->iRwclient */
155 if( db
->iRwclient
>=0 ){
156 rwclient
= ((u64
)1 << (LSM_LOCK_RWCLIENT(db
->iRwclient
)-1));
158 msk
= ((u64
)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT
)-1)) - 1;
159 msk
-= (((u64
)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1);
161 assert( (db
->mLock
& msk
)==rwclient
);
166 ** Open a new connection to database zFilename.
168 int lsm_open(lsm_db
*pDb
, const char *zFilename
){
171 if( pDb
->pDatabase
){
176 /* Translate the possibly relative pathname supplied by the user into
177 ** an absolute pathname. This is required because the supplied path
178 ** is used (either directly or with "-log" appended to it) for more
179 ** than one purpose - to open both the database and log files, and
180 ** perhaps to unlink the log file during disconnection. An absolute
181 ** path is required to ensure that the correct files are operated
182 ** on even if the application changes the cwd. */
183 rc
= getFullpathname(pDb
->pEnv
, zFilename
, &zFull
);
184 assert( rc
==LSM_OK
|| zFull
==0 );
186 /* Connect to the database. */
188 rc
= lsmDbDatabaseConnect(pDb
, zFull
);
191 if( pDb
->bReadonly
==0 ){
192 /* Configure the file-system connection with the page-size and block-size
193 ** of this database. Even if the database file is zero bytes in size
194 ** on disk, these values have been set in shared-memory by now, and so
195 ** are guaranteed not to change during the lifetime of this connection.
197 if( rc
==LSM_OK
&& LSM_OK
==(rc
= lsmCheckpointLoad(pDb
, 0)) ){
198 lsmFsSetPageSize(pDb
->pFS
, lsmCheckpointPgsz(pDb
->aSnapshot
));
199 lsmFsSetBlockSize(pDb
->pFS
, lsmCheckpointBlksz(pDb
->aSnapshot
));
203 lsmFree(pDb
->pEnv
, zFull
);
204 assertRwclientLockValue(pDb
);
207 assert( pDb
->bReadonly
==0 || pDb
->bReadonly
==1 );
208 assert( rc
!=LSM_OK
|| (pDb
->pShmhdr
==0)==(pDb
->bReadonly
==1) );
213 int lsm_close(lsm_db
*pDb
){
216 assert_db_state(pDb
);
217 if( pDb
->pCsr
|| pDb
->nTransOpen
){
218 rc
= LSM_MISUSE_BKPT
;
220 lsmMCursorFreeCache(pDb
);
221 lsmFreeSnapshot(pDb
->pEnv
, pDb
->pClient
);
224 assertRwclientLockValue(pDb
);
226 lsmDbDatabaseRelease(pDb
);
228 lsmFsClose(pDb
->pFS
);
229 /* assert( pDb->mLock==0 ); */
231 /* Invoke any destructors registered for the compression or
232 ** compression factory callbacks. */
233 if( pDb
->factory
.xFree
) pDb
->factory
.xFree(pDb
->factory
.pCtx
);
234 if( pDb
->compress
.xFree
) pDb
->compress
.xFree(pDb
->compress
.pCtx
);
236 lsmFree(pDb
->pEnv
, pDb
->rollback
.aArray
);
237 lsmFree(pDb
->pEnv
, pDb
->aTrans
);
238 lsmFree(pDb
->pEnv
, pDb
->apShm
);
239 lsmFree(pDb
->pEnv
, pDb
);
245 int lsm_config(lsm_db
*pDb
, int eParam
, ...){
248 va_start(ap
, eParam
);
251 case LSM_CONFIG_AUTOFLUSH
: {
252 /* This parameter is read and written in KB. But all internal
253 ** processing is done in bytes. */
254 int *piVal
= va_arg(ap
, int *);
256 if( iVal
>=0 && iVal
<=(1024*1024) ){
257 pDb
->nTreeLimit
= iVal
*1024;
259 *piVal
= (pDb
->nTreeLimit
/ 1024);
263 case LSM_CONFIG_AUTOWORK
: {
264 int *piVal
= va_arg(ap
, int *);
266 pDb
->bAutowork
= *piVal
;
268 *piVal
= pDb
->bAutowork
;
272 case LSM_CONFIG_AUTOCHECKPOINT
: {
273 /* This parameter is read and written in KB. But all internal processing
274 ** (including the lsm_db.nAutockpt variable) is done in bytes. */
275 int *piVal
= va_arg(ap
, int *);
278 pDb
->nAutockpt
= (i64
)iVal
* 1024;
280 *piVal
= (int)(pDb
->nAutockpt
/ 1024);
284 case LSM_CONFIG_PAGE_SIZE
: {
285 int *piVal
= va_arg(ap
, int *);
286 if( pDb
->pDatabase
){
287 /* If lsm_open() has been called, this is a read-only parameter.
288 ** Set the output variable to the page-size according to the
289 ** FileSystem object. */
290 *piVal
= lsmFsPageSize(pDb
->pFS
);
292 if( *piVal
>=256 && *piVal
<=65536 && ((*piVal
-1) & *piVal
)==0 ){
293 pDb
->nDfltPgsz
= *piVal
;
295 *piVal
= pDb
->nDfltPgsz
;
301 case LSM_CONFIG_BLOCK_SIZE
: {
302 /* This parameter is read and written in KB. But all internal
303 ** processing is done in bytes. */
304 int *piVal
= va_arg(ap
, int *);
305 if( pDb
->pDatabase
){
306 /* If lsm_open() has been called, this is a read-only parameter.
307 ** Set the output variable to the block-size in KB according to the
308 ** FileSystem object. */
309 *piVal
= lsmFsBlockSize(pDb
->pFS
) / 1024;
312 if( iVal
>=64 && iVal
<=65536 && ((iVal
-1) & iVal
)==0 ){
313 pDb
->nDfltBlksz
= iVal
* 1024;
315 *piVal
= pDb
->nDfltBlksz
/ 1024;
321 case LSM_CONFIG_SAFETY
: {
322 int *piVal
= va_arg(ap
, int *);
323 if( *piVal
>=0 && *piVal
<=2 ){
324 pDb
->eSafety
= *piVal
;
326 *piVal
= pDb
->eSafety
;
330 case LSM_CONFIG_MMAP
: {
331 int *piVal
= va_arg(ap
, int *);
332 if( pDb
->iReader
<0 && *piVal
>=0 ){
334 rc
= lsmFsConfigure(pDb
);
340 case LSM_CONFIG_USE_LOG
: {
341 int *piVal
= va_arg(ap
, int *);
342 if( pDb
->nTransOpen
==0 && (*piVal
==0 || *piVal
==1) ){
343 pDb
->bUseLog
= *piVal
;
345 *piVal
= pDb
->bUseLog
;
349 case LSM_CONFIG_AUTOMERGE
: {
350 int *piVal
= va_arg(ap
, int *);
351 if( *piVal
>1 ) pDb
->nMerge
= *piVal
;
352 *piVal
= pDb
->nMerge
;
356 case LSM_CONFIG_MAX_FREELIST
: {
357 int *piVal
= va_arg(ap
, int *);
358 if( *piVal
>=2 && *piVal
<=LSM_MAX_FREELIST_ENTRIES
){
359 pDb
->nMaxFreelist
= *piVal
;
361 *piVal
= pDb
->nMaxFreelist
;
365 case LSM_CONFIG_MULTIPLE_PROCESSES
: {
366 int *piVal
= va_arg(ap
, int *);
367 if( pDb
->pDatabase
){
368 /* If lsm_open() has been called, this is a read-only parameter.
369 ** Set the output variable to true if this connection is currently
370 ** in multi-process mode. */
371 *piVal
= lsmDbMultiProc(pDb
);
373 pDb
->bMultiProc
= *piVal
= (*piVal
!=0);
378 case LSM_CONFIG_READONLY
: {
379 int *piVal
= va_arg(ap
, int *);
380 /* If lsm_open() has been called, this is a read-only parameter. */
381 if( pDb
->pDatabase
==0 && *piVal
>=0 ){
382 pDb
->bReadonly
= *piVal
= (*piVal
!=0);
384 *piVal
= pDb
->bReadonly
;
388 case LSM_CONFIG_SET_COMPRESSION
: {
389 lsm_compress
*p
= va_arg(ap
, lsm_compress
*);
390 if( pDb
->iReader
>=0 && pDb
->bInFactory
==0 ){
391 /* May not change compression schemes with an open transaction */
392 rc
= LSM_MISUSE_BKPT
;
394 if( pDb
->compress
.xFree
){
395 /* Invoke any destructor belonging to the current compression. */
396 pDb
->compress
.xFree(pDb
->compress
.pCtx
);
399 memset(&pDb
->compress
, 0, sizeof(lsm_compress
));
400 pDb
->compress
.iId
= LSM_COMPRESSION_NONE
;
402 memcpy(&pDb
->compress
, p
, sizeof(lsm_compress
));
404 rc
= lsmFsConfigure(pDb
);
409 case LSM_CONFIG_SET_COMPRESSION_FACTORY
: {
410 lsm_compress_factory
*p
= va_arg(ap
, lsm_compress_factory
*);
411 if( pDb
->factory
.xFree
){
412 /* Invoke any destructor belonging to the current factory. */
413 pDb
->factory
.xFree(pDb
->factory
.pCtx
);
415 memcpy(&pDb
->factory
, p
, sizeof(lsm_compress_factory
));
419 case LSM_CONFIG_GET_COMPRESSION
: {
420 lsm_compress
*p
= va_arg(ap
, lsm_compress
*);
421 memcpy(p
, &pDb
->compress
, sizeof(lsm_compress
));
434 void lsmAppendSegmentList(LsmString
*pStr
, char *zPre
, Segment
*pSeg
){
435 lsmStringAppendf(pStr
, "%s{%lld %lld %lld %lld}", zPre
,
436 pSeg
->iFirst
, pSeg
->iLastPg
, pSeg
->iRoot
, pSeg
->nSize
440 static int infoGetWorker(lsm_db
*pDb
, Snapshot
**pp
, int *pbUnlock
){
443 assert( *pbUnlock
==0 );
445 rc
= lsmBeginWork(pDb
);
446 if( rc
!=LSM_OK
) return rc
;
449 if( pp
) *pp
= pDb
->pWorker
;
453 static void infoFreeWorker(lsm_db
*pDb
, int bUnlock
){
455 int rcdummy
= LSM_BUSY
;
456 lsmFinishWork(pDb
, 0, &rcdummy
);
461 lsm_db
*pDb
, /* Database handle */
462 char **pzOut
/* OUT: Nul-terminated string (tcl list) */
464 Level
*pTopLevel
= 0; /* Top level of snapshot to report on */
468 Snapshot
*pWorker
; /* Worker snapshot */
471 /* Obtain the worker snapshot */
472 rc
= infoGetWorker(pDb
, &pWorker
, &bUnlock
);
473 if( rc
!=LSM_OK
) return rc
;
475 /* Format the contents of the snapshot as text */
476 pTopLevel
= lsmDbSnapshotLevel(pWorker
);
477 lsmStringInit(&s
, pDb
->pEnv
);
478 for(p
=pTopLevel
; rc
==LSM_OK
&& p
; p
=p
->pNext
){
480 lsmStringAppendf(&s
, "%s{%d", (s
.n
? " " : ""), (int)p
->iAge
);
481 lsmAppendSegmentList(&s
, " ", &p
->lhs
);
482 for(i
=0; rc
==LSM_OK
&& i
<p
->nRight
; i
++){
483 lsmAppendSegmentList(&s
, " ", &p
->aRhs
[i
]);
485 lsmStringAppend(&s
, "}", 1);
487 rc
= s
.n
>=0 ? LSM_OK
: LSM_NOMEM
;
489 /* Release the snapshot and return */
490 infoFreeWorker(pDb
, bUnlock
);
495 static int infoFreelistCb(void *pCtx
, int iBlk
, i64 iSnapshot
){
496 LsmString
*pStr
= (LsmString
*)pCtx
;
497 lsmStringAppendf(pStr
, "%s{%d %lld}", (pStr
->n
?" ":""), iBlk
, iSnapshot
);
501 int lsmInfoFreelist(lsm_db
*pDb
, char **pzOut
){
502 Snapshot
*pWorker
; /* Worker snapshot */
507 /* Obtain the worker snapshot */
508 rc
= infoGetWorker(pDb
, &pWorker
, &bUnlock
);
509 if( rc
!=LSM_OK
) return rc
;
511 lsmStringInit(&s
, pDb
->pEnv
);
512 rc
= lsmWalkFreelist(pDb
, 0, infoFreelistCb
, &s
);
514 lsmFree(pDb
->pEnv
, s
.z
);
519 /* Release the snapshot and return */
520 infoFreeWorker(pDb
, bUnlock
);
524 static int infoTreeSize(lsm_db
*db
, int *pnOldKB
, int *pnNewKB
){
525 ShmHeader
*pShm
= db
->pShmhdr
;
526 TreeHeader
*p
= &pShm
->hdr1
;
528 /* The following code suffers from two race conditions, as it accesses and
529 ** trusts the contents of shared memory without verifying checksums:
531 ** * The two values read - TreeHeader.root.nByte and oldroot.nByte - are
532 ** 32-bit fields. It is assumed that reading from one of these
533 ** is atomic - that it is not possible to read a partially written
534 ** garbage value. However the two values may be mutually inconsistent.
536 ** * TreeHeader.iLogOff is a 64-bit value. And lsmCheckpointLogOffset()
537 ** reads a 64-bit value from a snapshot stored in shared memory. It
538 ** is assumed that in each case it is possible to read a partially
539 ** written garbage value. If this occurs, then the value returned
540 ** for the size of the "old" tree may reflect the size of an "old"
541 ** tree that was recently flushed to disk.
543 ** Given the context in which this function is called (as a result of an
544 ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
547 *pnNewKB
= ((int)p
->root
.nByte
+ 1023) / 1024;
549 if( p
->iOldLog
==lsmCheckpointLogOffset(pShm
->aSnap1
) ){
552 *pnOldKB
= ((int)p
->oldroot
.nByte
+ 1023) / 1024;
561 int lsm_info(lsm_db
*pDb
, int eParam
, ...){
564 va_start(ap
, eParam
);
567 case LSM_INFO_NWRITE
: {
568 int *piVal
= va_arg(ap
, int *);
569 *piVal
= lsmFsNWrite(pDb
->pFS
);
573 case LSM_INFO_NREAD
: {
574 int *piVal
= va_arg(ap
, int *);
575 *piVal
= lsmFsNRead(pDb
->pFS
);
579 case LSM_INFO_DB_STRUCTURE
: {
580 char **pzVal
= va_arg(ap
, char **);
581 rc
= lsmStructList(pDb
, pzVal
);
585 case LSM_INFO_ARRAY_STRUCTURE
: {
586 LsmPgno pgno
= va_arg(ap
, LsmPgno
);
587 char **pzVal
= va_arg(ap
, char **);
588 rc
= lsmInfoArrayStructure(pDb
, 0, pgno
, pzVal
);
592 case LSM_INFO_ARRAY_PAGES
: {
593 LsmPgno pgno
= va_arg(ap
, LsmPgno
);
594 char **pzVal
= va_arg(ap
, char **);
595 rc
= lsmInfoArrayPages(pDb
, pgno
, pzVal
);
599 case LSM_INFO_PAGE_HEX_DUMP
:
600 case LSM_INFO_PAGE_ASCII_DUMP
: {
601 LsmPgno pgno
= va_arg(ap
, LsmPgno
);
602 char **pzVal
= va_arg(ap
, char **);
604 rc
= infoGetWorker(pDb
, 0, &bUnlock
);
606 int bHex
= (eParam
==LSM_INFO_PAGE_HEX_DUMP
);
607 rc
= lsmInfoPageDump(pDb
, pgno
, bHex
, pzVal
);
609 infoFreeWorker(pDb
, bUnlock
);
613 case LSM_INFO_LOG_STRUCTURE
: {
614 char **pzVal
= va_arg(ap
, char **);
615 rc
= lsmInfoLogStructure(pDb
, pzVal
);
619 case LSM_INFO_FREELIST
: {
620 char **pzVal
= va_arg(ap
, char **);
621 rc
= lsmInfoFreelist(pDb
, pzVal
);
625 case LSM_INFO_CHECKPOINT_SIZE
: {
626 int *pnKB
= va_arg(ap
, int *);
627 rc
= lsmCheckpointSize(pDb
, pnKB
);
631 case LSM_INFO_TREE_SIZE
: {
632 int *pnOld
= va_arg(ap
, int *);
633 int *pnNew
= va_arg(ap
, int *);
634 rc
= infoTreeSize(pDb
, pnOld
, pnNew
);
638 case LSM_INFO_COMPRESSION_ID
: {
639 unsigned int *piOut
= va_arg(ap
, unsigned int *);
641 *piOut
= pDb
->pClient
->iCmpId
;
643 rc
= lsmInfoCompressionId(pDb
, piOut
);
657 static int doWriteOp(
660 const void *pKey
, int nKey
, /* Key to write or delete */
661 const void *pVal
, int nVal
/* Value to write. Or nVal==-1 for a delete */
663 int rc
= LSM_OK
; /* Return code */
664 int bCommit
= 0; /* True to commit before returning */
666 if( pDb
->nTransOpen
==0 ){
668 rc
= lsm_begin(pDb
, 1);
672 int eType
= (bDeleteRange
? LSM_DRANGE
: (nVal
>=0?LSM_WRITE
:LSM_DELETE
));
673 rc
= lsmLogWrite(pDb
, eType
, (void *)pKey
, nKey
, (void *)pVal
, nVal
);
676 lsmSortedSaveTreeCursors(pDb
);
679 int pgsz
= lsmFsPageSize(pDb
->pFS
);
680 int nQuant
= LSM_AUTOWORK_QUANT
* pgsz
;
685 if( nQuant
>pDb
->nTreeLimit
){
686 nQuant
= LSM_MAX(pDb
->nTreeLimit
, pgsz
);
689 nBefore
= lsmTreeSize(pDb
);
691 rc
= lsmTreeDelete(pDb
, (void *)pKey
, nKey
, (void *)pVal
, nVal
);
693 rc
= lsmTreeInsert(pDb
, (void *)pKey
, nKey
, (void *)pVal
, nVal
);
696 nAfter
= lsmTreeSize(pDb
);
697 nDiff
= (nAfter
/nQuant
) - (nBefore
/nQuant
);
698 if( rc
==LSM_OK
&& pDb
->bAutowork
&& nDiff
!=0 ){
699 rc
= lsmSortedAutoWork(pDb
, nDiff
* LSM_AUTOWORK_QUANT
);
703 /* If a transaction was opened at the start of this function, commit it.
704 ** Or, if an error has occurred, roll it back. */
707 rc
= lsm_commit(pDb
, 0);
709 lsm_rollback(pDb
, 0);
717 ** Write a new value into the database.
720 lsm_db
*db
, /* Database connection */
721 const void *pKey
, int nKey
, /* Key to write or delete */
722 const void *pVal
, int nVal
/* Value to write. Or nVal==-1 for a delete */
724 return doWriteOp(db
, 0, pKey
, nKey
, pVal
, nVal
);
728 ** Delete a value from the database.
730 int lsm_delete(lsm_db
*db
, const void *pKey
, int nKey
){
731 return doWriteOp(db
, 0, pKey
, nKey
, 0, -1);
735 ** Delete a range of database keys.
737 int lsm_delete_range(
738 lsm_db
*db
, /* Database handle */
739 const void *pKey1
, int nKey1
, /* Lower bound of range to delete */
740 const void *pKey2
, int nKey2
/* Upper bound of range to delete */
743 if( db
->xCmp((void *)pKey1
, nKey1
, (void *)pKey2
, nKey2
)<0 ){
744 rc
= doWriteOp(db
, 1, pKey1
, nKey1
, pKey2
, nKey2
);
750 ** Open a new cursor handle.
752 ** If there are currently no other open cursor handles, and no open write
753 ** transaction, open a read transaction here.
755 int lsm_csr_open(lsm_db
*pDb
, lsm_cursor
**ppCsr
){
756 int rc
= LSM_OK
; /* Return code */
757 MultiCursor
*pCsr
= 0; /* New cursor object */
759 /* Open a read transaction if one is not already open. */
760 assert_db_state(pDb
);
762 if( pDb
->pShmhdr
==0 ){
763 assert( pDb
->bReadonly
);
764 rc
= lsmBeginRoTrans(pDb
);
765 }else if( pDb
->iReader
<0 ){
766 rc
= lsmBeginReadTrans(pDb
);
769 /* Allocate the multi-cursor. */
771 rc
= lsmMCursorNew(pDb
, &pCsr
);
774 /* If an error has occured, set the output to NULL and delete any partially
775 ** allocated cursor. If this means there are no open cursors, release the
776 ** client snapshot. */
778 lsmMCursorClose(pCsr
, 0);
779 dbReleaseClientSnapshot(pDb
);
782 assert_db_state(pDb
);
783 *ppCsr
= (lsm_cursor
*)pCsr
;
788 ** Close a cursor opened using lsm_csr_open().
790 int lsm_csr_close(lsm_cursor
*p
){
792 lsm_db
*pDb
= lsmMCursorDb((MultiCursor
*)p
);
793 assert_db_state(pDb
);
794 lsmMCursorClose((MultiCursor
*)p
, 1);
795 dbReleaseClientSnapshot(pDb
);
796 assert_db_state(pDb
);
802 ** Attempt to seek the cursor to the database entry specified by pKey/nKey.
803 ** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
804 ** Otherwise, return LSM_OK.
806 int lsm_csr_seek(lsm_cursor
*pCsr
, const void *pKey
, int nKey
, int eSeek
){
807 return lsmMCursorSeek((MultiCursor
*)pCsr
, 0, (void *)pKey
, nKey
, eSeek
);
810 int lsm_csr_next(lsm_cursor
*pCsr
){
811 return lsmMCursorNext((MultiCursor
*)pCsr
);
814 int lsm_csr_prev(lsm_cursor
*pCsr
){
815 return lsmMCursorPrev((MultiCursor
*)pCsr
);
818 int lsm_csr_first(lsm_cursor
*pCsr
){
819 return lsmMCursorFirst((MultiCursor
*)pCsr
);
822 int lsm_csr_last(lsm_cursor
*pCsr
){
823 return lsmMCursorLast((MultiCursor
*)pCsr
);
826 int lsm_csr_valid(lsm_cursor
*pCsr
){
827 return lsmMCursorValid((MultiCursor
*)pCsr
);
830 int lsm_csr_key(lsm_cursor
*pCsr
, const void **ppKey
, int *pnKey
){
831 return lsmMCursorKey((MultiCursor
*)pCsr
, (void **)ppKey
, pnKey
);
834 int lsm_csr_value(lsm_cursor
*pCsr
, const void **ppVal
, int *pnVal
){
835 return lsmMCursorValue((MultiCursor
*)pCsr
, (void **)ppVal
, pnVal
);
840 void (*xLog
)(void *, int, const char *),
847 void lsm_config_work_hook(
849 void (*xWork
)(lsm_db
*, void *),
853 pDb
->pWorkCtx
= pCtx
;
856 void lsmLogMessage(lsm_db
*pDb
, int rc
, const char *zFormat
, ...){
860 lsmStringInit(&s
, pDb
->pEnv
);
861 va_start(ap
, zFormat
);
862 va_start(ap2
, zFormat
);
863 lsmStringVAppendf(&s
, zFormat
, ap
, ap2
);
866 pDb
->xLog(pDb
->pLogCtx
, rc
, s
.z
);
871 int lsm_begin(lsm_db
*pDb
, int iLevel
){
874 assert_db_state( pDb
);
875 rc
= (pDb
->bReadonly
? LSM_READONLY
: LSM_OK
);
877 /* A value less than zero means open one more transaction. */
878 if( iLevel
<0 ) iLevel
= pDb
->nTransOpen
+ 1;
879 if( iLevel
>pDb
->nTransOpen
){
882 /* Extend the pDb->aTrans[] array if required. */
883 if( rc
==LSM_OK
&& pDb
->nTransAlloc
<iLevel
){
884 TransMark
*aNew
; /* New allocation */
885 int nByte
= sizeof(TransMark
) * (iLevel
+1);
886 aNew
= (TransMark
*)lsmRealloc(pDb
->pEnv
, pDb
->aTrans
, nByte
);
890 nByte
= sizeof(TransMark
) * (iLevel
+1 - pDb
->nTransAlloc
);
891 memset(&aNew
[pDb
->nTransAlloc
], 0, nByte
);
892 pDb
->nTransAlloc
= iLevel
+1;
897 if( rc
==LSM_OK
&& pDb
->nTransOpen
==0 ){
898 rc
= lsmBeginWriteTrans(pDb
);
902 for(i
=pDb
->nTransOpen
; i
<iLevel
; i
++){
903 lsmTreeMark(pDb
, &pDb
->aTrans
[i
].tree
);
904 lsmLogTell(pDb
, &pDb
->aTrans
[i
].log
);
906 pDb
->nTransOpen
= iLevel
;
913 int lsm_commit(lsm_db
*pDb
, int iLevel
){
916 assert_db_state( pDb
);
918 /* A value less than zero means close the innermost nested transaction. */
919 if( iLevel
<0 ) iLevel
= LSM_MAX(0, pDb
->nTransOpen
- 1);
921 if( iLevel
<pDb
->nTransOpen
){
924 /* Commit the transaction to disk. */
925 if( rc
==LSM_OK
) rc
= lsmLogCommit(pDb
);
926 if( rc
==LSM_OK
&& pDb
->eSafety
==LSM_SAFETY_FULL
){
927 rc
= lsmFsSyncLog(pDb
->pFS
);
929 rc2
= lsmFinishWriteTrans(pDb
, (rc
==LSM_OK
));
930 if( rc
==LSM_OK
) rc
= rc2
;
932 pDb
->nTransOpen
= iLevel
;
934 dbReleaseClientSnapshot(pDb
);
938 int lsm_rollback(lsm_db
*pDb
, int iLevel
){
940 assert_db_state( pDb
);
942 if( pDb
->nTransOpen
){
943 /* A value less than zero means close the innermost nested transaction. */
944 if( iLevel
<0 ) iLevel
= LSM_MAX(0, pDb
->nTransOpen
- 1);
946 if( iLevel
<=pDb
->nTransOpen
){
947 TransMark
*pMark
= &pDb
->aTrans
[(iLevel
==0 ? 0 : iLevel
-1)];
948 lsmTreeRollback(pDb
, &pMark
->tree
);
949 if( iLevel
) lsmLogSeek(pDb
, &pMark
->log
);
950 pDb
->nTransOpen
= iLevel
;
953 if( pDb
->nTransOpen
==0 ){
954 lsmFinishWriteTrans(pDb
, 0);
956 dbReleaseClientSnapshot(pDb
);
962 int lsm_get_user_version(lsm_db
*pDb
, unsigned int *piUsr
){
963 int rc
= LSM_OK
; /* Return code */
965 /* Open a read transaction if one is not already open. */
966 assert_db_state(pDb
);
967 if( pDb
->pShmhdr
==0 ){
968 assert( pDb
->bReadonly
);
969 rc
= lsmBeginRoTrans(pDb
);
970 }else if( pDb
->iReader
<0 ){
971 rc
= lsmBeginReadTrans(pDb
);
974 /* Allocate the multi-cursor. */
976 *piUsr
= pDb
->treehdr
.iUsrVersion
;
979 dbReleaseClientSnapshot(pDb
);
980 assert_db_state(pDb
);
984 int lsm_set_user_version(lsm_db
*pDb
, unsigned int iUsr
){
985 int rc
= LSM_OK
; /* Return code */
986 int bCommit
= 0; /* True to commit before returning */
988 if( pDb
->nTransOpen
==0 ){
990 rc
= lsm_begin(pDb
, 1);
994 pDb
->treehdr
.iUsrVersion
= iUsr
;
997 /* If a transaction was opened at the start of this function, commit it.
998 ** Or, if an error has occurred, roll it back. */
1001 rc
= lsm_commit(pDb
, 0);
1003 lsm_rollback(pDb
, 0);