Fix the ".lint fkey-indexes" shell command so that it works with WITHOUT ROWID
[sqlite.git] / ext / lsm1 / lsm_main.c
blob8a324a3efecb63914a010b2ed5e28c2b8f8cbb24
1 /*
2 ** 2011-08-18
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** The main interface to the LSM module.
15 #include "lsmInt.h"
18 #ifdef LSM_DEBUG
20 ** This function returns a copy of its only argument.
22 ** When the library is built with LSM_DEBUG defined, this function is called
23 ** whenever an error code is generated (not propagated - generated). So
24 ** if the library is mysteriously returning (say) LSM_IOERR, a breakpoint
25 ** may be set in this function to determine why.
27 int lsmErrorBkpt(int rc){
28 /* Set breakpoint here! */
29 return rc;
33 ** This function contains various assert() statements that test that the
34 ** lsm_db structure passed as an argument is internally consistent.
36 static void assert_db_state(lsm_db *pDb){
38 /* If there is at least one cursor or a write transaction open, the database
39 ** handle must be holding a pointer to a client snapshot. And the reverse
40 ** - if there are no open cursors and no write transactions then there must
41 ** not be a client snapshot. */
43 assert( (pDb->pCsr!=0||pDb->nTransOpen>0)==(pDb->iReader>=0||pDb->bRoTrans) );
45 assert( (pDb->iReader<0 && pDb->bRoTrans==0) || pDb->pClient!=0 );
47 assert( pDb->nTransOpen>=0 );
49 #else
50 # define assert_db_state(x)
51 #endif
54 ** The default key-compare function.
56 static int xCmp(void *p1, int n1, void *p2, int n2){
57 int res;
58 res = memcmp(p1, p2, LSM_MIN(n1, n2));
59 if( res==0 ) res = (n1-n2);
60 return res;
63 static void xLog(void *pCtx, int rc, const char *z){
64 (void)(rc);
65 (void)(pCtx);
66 fprintf(stderr, "%s\n", z);
67 fflush(stderr);
71 ** Allocate a new db handle.
73 int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
74 lsm_db *pDb;
76 /* If the user did not provide an environment, use the default. */
77 if( pEnv==0 ) pEnv = lsm_default_env();
78 assert( pEnv );
80 /* Allocate the new database handle */
81 *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
82 if( pDb==0 ) return LSM_NOMEM_BKPT;
84 /* Initialize the new object */
85 pDb->pEnv = pEnv;
86 pDb->nTreeLimit = LSM_DFLT_AUTOFLUSH;
87 pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT;
88 pDb->bAutowork = LSM_DFLT_AUTOWORK;
89 pDb->eSafety = LSM_DFLT_SAFETY;
90 pDb->xCmp = xCmp;
91 pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE;
92 pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
93 pDb->nMerge = LSM_DFLT_AUTOMERGE;
94 pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
95 pDb->bUseLog = LSM_DFLT_USE_LOG;
96 pDb->iReader = -1;
97 pDb->iRwclient = -1;
98 pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES;
99 pDb->iMmap = LSM_DFLT_MMAP;
100 pDb->xLog = xLog;
101 pDb->compress.iId = LSM_COMPRESSION_NONE;
102 return LSM_OK;
105 lsm_env *lsm_get_env(lsm_db *pDb){
106 assert( pDb->pEnv );
107 return pDb->pEnv;
111 ** If database handle pDb is currently holding a client snapshot, but does
112 ** not have any open cursors or write transactions, release it.
114 static void dbReleaseClientSnapshot(lsm_db *pDb){
115 if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
116 lsmFinishReadTrans(pDb);
120 static int getFullpathname(
121 lsm_env *pEnv,
122 const char *zRel,
123 char **pzAbs
125 int nAlloc = 0;
126 char *zAlloc = 0;
127 int nReq = 0;
128 int rc;
131 nAlloc = nReq;
132 rc = pEnv->xFullpath(pEnv, zRel, zAlloc, &nReq);
133 if( nReq>nAlloc ){
134 zAlloc = lsmReallocOrFreeRc(pEnv, zAlloc, nReq, &rc);
136 }while( nReq>nAlloc && rc==LSM_OK );
138 if( rc!=LSM_OK ){
139 lsmFree(pEnv, zAlloc);
140 zAlloc = 0;
142 *pzAbs = zAlloc;
143 return rc;
147 ** Check that the bits in the db->mLock mask are consistent with the
148 ** value stored in db->iRwclient. An assert shall fail otherwise.
150 static void assertRwclientLockValue(lsm_db *db){
151 #ifndef NDEBUG
152 u64 msk; /* Mask of mLock bits for RWCLIENT locks */
153 u64 rwclient = 0; /* Bit corresponding to db->iRwclient */
155 if( db->iRwclient>=0 ){
156 rwclient = ((u64)1 << (LSM_LOCK_RWCLIENT(db->iRwclient)-1));
158 msk = ((u64)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT)-1)) - 1;
159 msk -= (((u64)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1);
161 assert( (db->mLock & msk)==rwclient );
162 #endif
166 ** Open a new connection to database zFilename.
168 int lsm_open(lsm_db *pDb, const char *zFilename){
169 int rc;
171 if( pDb->pDatabase ){
172 rc = LSM_MISUSE;
173 }else{
174 char *zFull;
176 /* Translate the possibly relative pathname supplied by the user into
177 ** an absolute pathname. This is required because the supplied path
178 ** is used (either directly or with "-log" appended to it) for more
179 ** than one purpose - to open both the database and log files, and
180 ** perhaps to unlink the log file during disconnection. An absolute
181 ** path is required to ensure that the correct files are operated
182 ** on even if the application changes the cwd. */
183 rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
184 assert( rc==LSM_OK || zFull==0 );
186 /* Connect to the database. */
187 if( rc==LSM_OK ){
188 rc = lsmDbDatabaseConnect(pDb, zFull);
191 if( pDb->bReadonly==0 ){
192 /* Configure the file-system connection with the page-size and block-size
193 ** of this database. Even if the database file is zero bytes in size
194 ** on disk, these values have been set in shared-memory by now, and so
195 ** are guaranteed not to change during the lifetime of this connection.
197 if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){
198 lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
199 lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
203 lsmFree(pDb->pEnv, zFull);
204 assertRwclientLockValue(pDb);
207 assert( pDb->bReadonly==0 || pDb->bReadonly==1 );
208 assert( rc!=LSM_OK || (pDb->pShmhdr==0)==(pDb->bReadonly==1) );
210 return rc;
213 int lsm_close(lsm_db *pDb){
214 int rc = LSM_OK;
215 if( pDb ){
216 assert_db_state(pDb);
217 if( pDb->pCsr || pDb->nTransOpen ){
218 rc = LSM_MISUSE_BKPT;
219 }else{
220 lsmMCursorFreeCache(pDb);
221 lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
222 pDb->pClient = 0;
224 assertRwclientLockValue(pDb);
226 lsmDbDatabaseRelease(pDb);
227 lsmLogClose(pDb);
228 lsmFsClose(pDb->pFS);
229 /* assert( pDb->mLock==0 ); */
231 /* Invoke any destructors registered for the compression or
232 ** compression factory callbacks. */
233 if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx);
234 if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx);
236 lsmFree(pDb->pEnv, pDb->rollback.aArray);
237 lsmFree(pDb->pEnv, pDb->aTrans);
238 lsmFree(pDb->pEnv, pDb->apShm);
239 lsmFree(pDb->pEnv, pDb);
242 return rc;
245 int lsm_config(lsm_db *pDb, int eParam, ...){
246 int rc = LSM_OK;
247 va_list ap;
248 va_start(ap, eParam);
250 switch( eParam ){
251 case LSM_CONFIG_AUTOFLUSH: {
252 /* This parameter is read and written in KB. But all internal
253 ** processing is done in bytes. */
254 int *piVal = va_arg(ap, int *);
255 int iVal = *piVal;
256 if( iVal>=0 && iVal<=(1024*1024) ){
257 pDb->nTreeLimit = iVal*1024;
259 *piVal = (pDb->nTreeLimit / 1024);
260 break;
263 case LSM_CONFIG_AUTOWORK: {
264 int *piVal = va_arg(ap, int *);
265 if( *piVal>=0 ){
266 pDb->bAutowork = *piVal;
268 *piVal = pDb->bAutowork;
269 break;
272 case LSM_CONFIG_AUTOCHECKPOINT: {
273 /* This parameter is read and written in KB. But all internal processing
274 ** (including the lsm_db.nAutockpt variable) is done in bytes. */
275 int *piVal = va_arg(ap, int *);
276 if( *piVal>=0 ){
277 int iVal = *piVal;
278 pDb->nAutockpt = (i64)iVal * 1024;
280 *piVal = (int)(pDb->nAutockpt / 1024);
281 break;
284 case LSM_CONFIG_PAGE_SIZE: {
285 int *piVal = va_arg(ap, int *);
286 if( pDb->pDatabase ){
287 /* If lsm_open() has been called, this is a read-only parameter.
288 ** Set the output variable to the page-size according to the
289 ** FileSystem object. */
290 *piVal = lsmFsPageSize(pDb->pFS);
291 }else{
292 if( *piVal>=256 && *piVal<=65536 && ((*piVal-1) & *piVal)==0 ){
293 pDb->nDfltPgsz = *piVal;
294 }else{
295 *piVal = pDb->nDfltPgsz;
298 break;
301 case LSM_CONFIG_BLOCK_SIZE: {
302 /* This parameter is read and written in KB. But all internal
303 ** processing is done in bytes. */
304 int *piVal = va_arg(ap, int *);
305 if( pDb->pDatabase ){
306 /* If lsm_open() has been called, this is a read-only parameter.
307 ** Set the output variable to the block-size in KB according to the
308 ** FileSystem object. */
309 *piVal = lsmFsBlockSize(pDb->pFS) / 1024;
310 }else{
311 int iVal = *piVal;
312 if( iVal>=64 && iVal<=65536 && ((iVal-1) & iVal)==0 ){
313 pDb->nDfltBlksz = iVal * 1024;
314 }else{
315 *piVal = pDb->nDfltBlksz / 1024;
318 break;
321 case LSM_CONFIG_SAFETY: {
322 int *piVal = va_arg(ap, int *);
323 if( *piVal>=0 && *piVal<=2 ){
324 pDb->eSafety = *piVal;
326 *piVal = pDb->eSafety;
327 break;
330 case LSM_CONFIG_MMAP: {
331 int *piVal = va_arg(ap, int *);
332 if( pDb->iReader<0 && *piVal>=0 ){
333 pDb->iMmap = *piVal;
334 rc = lsmFsConfigure(pDb);
336 *piVal = pDb->iMmap;
337 break;
340 case LSM_CONFIG_USE_LOG: {
341 int *piVal = va_arg(ap, int *);
342 if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
343 pDb->bUseLog = *piVal;
345 *piVal = pDb->bUseLog;
346 break;
349 case LSM_CONFIG_AUTOMERGE: {
350 int *piVal = va_arg(ap, int *);
351 if( *piVal>1 ) pDb->nMerge = *piVal;
352 *piVal = pDb->nMerge;
353 break;
356 case LSM_CONFIG_MAX_FREELIST: {
357 int *piVal = va_arg(ap, int *);
358 if( *piVal>=2 && *piVal<=LSM_MAX_FREELIST_ENTRIES ){
359 pDb->nMaxFreelist = *piVal;
361 *piVal = pDb->nMaxFreelist;
362 break;
365 case LSM_CONFIG_MULTIPLE_PROCESSES: {
366 int *piVal = va_arg(ap, int *);
367 if( pDb->pDatabase ){
368 /* If lsm_open() has been called, this is a read-only parameter.
369 ** Set the output variable to true if this connection is currently
370 ** in multi-process mode. */
371 *piVal = lsmDbMultiProc(pDb);
372 }else{
373 pDb->bMultiProc = *piVal = (*piVal!=0);
375 break;
378 case LSM_CONFIG_READONLY: {
379 int *piVal = va_arg(ap, int *);
380 /* If lsm_open() has been called, this is a read-only parameter. */
381 if( pDb->pDatabase==0 && *piVal>=0 ){
382 pDb->bReadonly = *piVal = (*piVal!=0);
384 *piVal = pDb->bReadonly;
385 break;
388 case LSM_CONFIG_SET_COMPRESSION: {
389 lsm_compress *p = va_arg(ap, lsm_compress *);
390 if( pDb->iReader>=0 && pDb->bInFactory==0 ){
391 /* May not change compression schemes with an open transaction */
392 rc = LSM_MISUSE_BKPT;
393 }else{
394 if( pDb->compress.xFree ){
395 /* Invoke any destructor belonging to the current compression. */
396 pDb->compress.xFree(pDb->compress.pCtx);
398 if( p->xBound==0 ){
399 memset(&pDb->compress, 0, sizeof(lsm_compress));
400 pDb->compress.iId = LSM_COMPRESSION_NONE;
401 }else{
402 memcpy(&pDb->compress, p, sizeof(lsm_compress));
404 rc = lsmFsConfigure(pDb);
406 break;
409 case LSM_CONFIG_SET_COMPRESSION_FACTORY: {
410 lsm_compress_factory *p = va_arg(ap, lsm_compress_factory *);
411 if( pDb->factory.xFree ){
412 /* Invoke any destructor belonging to the current factory. */
413 pDb->factory.xFree(pDb->factory.pCtx);
415 memcpy(&pDb->factory, p, sizeof(lsm_compress_factory));
416 break;
419 case LSM_CONFIG_GET_COMPRESSION: {
420 lsm_compress *p = va_arg(ap, lsm_compress *);
421 memcpy(p, &pDb->compress, sizeof(lsm_compress));
422 break;
425 default:
426 rc = LSM_MISUSE;
427 break;
430 va_end(ap);
431 return rc;
434 void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
435 lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre,
436 pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize
440 static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
441 int rc = LSM_OK;
443 assert( *pbUnlock==0 );
444 if( !pDb->pWorker ){
445 rc = lsmBeginWork(pDb);
446 if( rc!=LSM_OK ) return rc;
447 *pbUnlock = 1;
449 if( pp ) *pp = pDb->pWorker;
450 return rc;
453 static void infoFreeWorker(lsm_db *pDb, int bUnlock){
454 if( bUnlock ){
455 int rcdummy = LSM_BUSY;
456 lsmFinishWork(pDb, 0, &rcdummy);
460 int lsmStructList(
461 lsm_db *pDb, /* Database handle */
462 char **pzOut /* OUT: Nul-terminated string (tcl list) */
464 Level *pTopLevel = 0; /* Top level of snapshot to report on */
465 int rc = LSM_OK;
466 Level *p;
467 LsmString s;
468 Snapshot *pWorker; /* Worker snapshot */
469 int bUnlock = 0;
471 /* Obtain the worker snapshot */
472 rc = infoGetWorker(pDb, &pWorker, &bUnlock);
473 if( rc!=LSM_OK ) return rc;
475 /* Format the contents of the snapshot as text */
476 pTopLevel = lsmDbSnapshotLevel(pWorker);
477 lsmStringInit(&s, pDb->pEnv);
478 for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
479 int i;
480 lsmStringAppendf(&s, "%s{%d", (s.n ? " " : ""), (int)p->iAge);
481 lsmAppendSegmentList(&s, " ", &p->lhs);
482 for(i=0; rc==LSM_OK && i<p->nRight; i++){
483 lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
485 lsmStringAppend(&s, "}", 1);
487 rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
489 /* Release the snapshot and return */
490 infoFreeWorker(pDb, bUnlock);
491 *pzOut = s.z;
492 return rc;
495 static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
496 LsmString *pStr = (LsmString *)pCtx;
497 lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
498 return 0;
501 int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
502 Snapshot *pWorker; /* Worker snapshot */
503 int bUnlock = 0;
504 LsmString s;
505 int rc;
507 /* Obtain the worker snapshot */
508 rc = infoGetWorker(pDb, &pWorker, &bUnlock);
509 if( rc!=LSM_OK ) return rc;
511 lsmStringInit(&s, pDb->pEnv);
512 rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
513 if( rc!=LSM_OK ){
514 lsmFree(pDb->pEnv, s.z);
515 }else{
516 *pzOut = s.z;
519 /* Release the snapshot and return */
520 infoFreeWorker(pDb, bUnlock);
521 return rc;
524 static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
525 ShmHeader *pShm = db->pShmhdr;
526 TreeHeader *p = &pShm->hdr1;
528 /* The following code suffers from two race conditions, as it accesses and
529 ** trusts the contents of shared memory without verifying checksums:
531 ** * The two values read - TreeHeader.root.nByte and oldroot.nByte - are
532 ** 32-bit fields. It is assumed that reading from one of these
533 ** is atomic - that it is not possible to read a partially written
534 ** garbage value. However the two values may be mutually inconsistent.
536 ** * TreeHeader.iLogOff is a 64-bit value. And lsmCheckpointLogOffset()
537 ** reads a 64-bit value from a snapshot stored in shared memory. It
538 ** is assumed that in each case it is possible to read a partially
539 ** written garbage value. If this occurs, then the value returned
540 ** for the size of the "old" tree may reflect the size of an "old"
541 ** tree that was recently flushed to disk.
543 ** Given the context in which this function is called (as a result of an
544 ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
545 ** be problems.
547 *pnNewKB = ((int)p->root.nByte + 1023) / 1024;
548 if( p->iOldShmid ){
549 if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
550 *pnOldKB = 0;
551 }else{
552 *pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024;
554 }else{
555 *pnOldKB = 0;
558 return LSM_OK;
561 int lsm_info(lsm_db *pDb, int eParam, ...){
562 int rc = LSM_OK;
563 va_list ap;
564 va_start(ap, eParam);
566 switch( eParam ){
567 case LSM_INFO_NWRITE: {
568 int *piVal = va_arg(ap, int *);
569 *piVal = lsmFsNWrite(pDb->pFS);
570 break;
573 case LSM_INFO_NREAD: {
574 int *piVal = va_arg(ap, int *);
575 *piVal = lsmFsNRead(pDb->pFS);
576 break;
579 case LSM_INFO_DB_STRUCTURE: {
580 char **pzVal = va_arg(ap, char **);
581 rc = lsmStructList(pDb, pzVal);
582 break;
585 case LSM_INFO_ARRAY_STRUCTURE: {
586 Pgno pgno = va_arg(ap, Pgno);
587 char **pzVal = va_arg(ap, char **);
588 rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal);
589 break;
592 case LSM_INFO_ARRAY_PAGES: {
593 Pgno pgno = va_arg(ap, Pgno);
594 char **pzVal = va_arg(ap, char **);
595 rc = lsmInfoArrayPages(pDb, pgno, pzVal);
596 break;
599 case LSM_INFO_PAGE_HEX_DUMP:
600 case LSM_INFO_PAGE_ASCII_DUMP: {
601 Pgno pgno = va_arg(ap, Pgno);
602 char **pzVal = va_arg(ap, char **);
603 int bUnlock = 0;
604 rc = infoGetWorker(pDb, 0, &bUnlock);
605 if( rc==LSM_OK ){
606 int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP);
607 rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal);
609 infoFreeWorker(pDb, bUnlock);
610 break;
613 case LSM_INFO_LOG_STRUCTURE: {
614 char **pzVal = va_arg(ap, char **);
615 rc = lsmInfoLogStructure(pDb, pzVal);
616 break;
619 case LSM_INFO_FREELIST: {
620 char **pzVal = va_arg(ap, char **);
621 rc = lsmInfoFreelist(pDb, pzVal);
622 break;
625 case LSM_INFO_CHECKPOINT_SIZE: {
626 int *pnKB = va_arg(ap, int *);
627 rc = lsmCheckpointSize(pDb, pnKB);
628 break;
631 case LSM_INFO_TREE_SIZE: {
632 int *pnOld = va_arg(ap, int *);
633 int *pnNew = va_arg(ap, int *);
634 rc = infoTreeSize(pDb, pnOld, pnNew);
635 break;
638 case LSM_INFO_COMPRESSION_ID: {
639 unsigned int *piOut = va_arg(ap, unsigned int *);
640 if( pDb->pClient ){
641 *piOut = pDb->pClient->iCmpId;
642 }else{
643 rc = lsmInfoCompressionId(pDb, piOut);
645 break;
648 default:
649 rc = LSM_MISUSE;
650 break;
653 va_end(ap);
654 return rc;
657 static int doWriteOp(
658 lsm_db *pDb,
659 int bDeleteRange,
660 const void *pKey, int nKey, /* Key to write or delete */
661 const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */
663 int rc = LSM_OK; /* Return code */
664 int bCommit = 0; /* True to commit before returning */
666 if( pDb->nTransOpen==0 ){
667 bCommit = 1;
668 rc = lsm_begin(pDb, 1);
671 if( rc==LSM_OK ){
672 int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE));
673 rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal);
676 lsmSortedSaveTreeCursors(pDb);
678 if( rc==LSM_OK ){
679 int pgsz = lsmFsPageSize(pDb->pFS);
680 int nQuant = LSM_AUTOWORK_QUANT * pgsz;
681 int nBefore;
682 int nAfter;
683 int nDiff;
685 if( nQuant>pDb->nTreeLimit ){
686 nQuant = pDb->nTreeLimit;
689 nBefore = lsmTreeSize(pDb);
690 if( bDeleteRange ){
691 rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
692 }else{
693 rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
696 nAfter = lsmTreeSize(pDb);
697 nDiff = (nAfter/nQuant) - (nBefore/nQuant);
698 if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
699 rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
703 /* If a transaction was opened at the start of this function, commit it.
704 ** Or, if an error has occurred, roll it back. */
705 if( bCommit ){
706 if( rc==LSM_OK ){
707 rc = lsm_commit(pDb, 0);
708 }else{
709 lsm_rollback(pDb, 0);
713 return rc;
717 ** Write a new value into the database.
719 int lsm_insert(
720 lsm_db *db, /* Database connection */
721 const void *pKey, int nKey, /* Key to write or delete */
722 const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */
724 return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
728 ** Delete a value from the database.
730 int lsm_delete(lsm_db *db, const void *pKey, int nKey){
731 return doWriteOp(db, 0, pKey, nKey, 0, -1);
735 ** Delete a range of database keys.
737 int lsm_delete_range(
738 lsm_db *db, /* Database handle */
739 const void *pKey1, int nKey1, /* Lower bound of range to delete */
740 const void *pKey2, int nKey2 /* Upper bound of range to delete */
742 int rc = LSM_OK;
743 if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
744 rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
746 return rc;
750 ** Open a new cursor handle.
752 ** If there are currently no other open cursor handles, and no open write
753 ** transaction, open a read transaction here.
755 int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){
756 int rc = LSM_OK; /* Return code */
757 MultiCursor *pCsr = 0; /* New cursor object */
759 /* Open a read transaction if one is not already open. */
760 assert_db_state(pDb);
762 if( pDb->pShmhdr==0 ){
763 assert( pDb->bReadonly );
764 rc = lsmBeginRoTrans(pDb);
765 }else if( pDb->iReader<0 ){
766 rc = lsmBeginReadTrans(pDb);
769 /* Allocate the multi-cursor. */
770 if( rc==LSM_OK ){
771 rc = lsmMCursorNew(pDb, &pCsr);
774 /* If an error has occured, set the output to NULL and delete any partially
775 ** allocated cursor. If this means there are no open cursors, release the
776 ** client snapshot. */
777 if( rc!=LSM_OK ){
778 lsmMCursorClose(pCsr, 0);
779 dbReleaseClientSnapshot(pDb);
782 assert_db_state(pDb);
783 *ppCsr = (lsm_cursor *)pCsr;
784 return rc;
788 ** Close a cursor opened using lsm_csr_open().
790 int lsm_csr_close(lsm_cursor *p){
791 if( p ){
792 lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
793 assert_db_state(pDb);
794 lsmMCursorClose((MultiCursor *)p, 1);
795 dbReleaseClientSnapshot(pDb);
796 assert_db_state(pDb);
798 return LSM_OK;
802 ** Attempt to seek the cursor to the database entry specified by pKey/nKey.
803 ** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
804 ** Otherwise, return LSM_OK.
806 int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
807 return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek);
810 int lsm_csr_next(lsm_cursor *pCsr){
811 return lsmMCursorNext((MultiCursor *)pCsr);
814 int lsm_csr_prev(lsm_cursor *pCsr){
815 return lsmMCursorPrev((MultiCursor *)pCsr);
818 int lsm_csr_first(lsm_cursor *pCsr){
819 return lsmMCursorFirst((MultiCursor *)pCsr);
822 int lsm_csr_last(lsm_cursor *pCsr){
823 return lsmMCursorLast((MultiCursor *)pCsr);
826 int lsm_csr_valid(lsm_cursor *pCsr){
827 return lsmMCursorValid((MultiCursor *)pCsr);
830 int lsm_csr_key(lsm_cursor *pCsr, const void **ppKey, int *pnKey){
831 return lsmMCursorKey((MultiCursor *)pCsr, (void **)ppKey, pnKey);
834 int lsm_csr_value(lsm_cursor *pCsr, const void **ppVal, int *pnVal){
835 return lsmMCursorValue((MultiCursor *)pCsr, (void **)ppVal, pnVal);
838 void lsm_config_log(
839 lsm_db *pDb,
840 void (*xLog)(void *, int, const char *),
841 void *pCtx
843 pDb->xLog = xLog;
844 pDb->pLogCtx = pCtx;
847 void lsm_config_work_hook(
848 lsm_db *pDb,
849 void (*xWork)(lsm_db *, void *),
850 void *pCtx
852 pDb->xWork = xWork;
853 pDb->pWorkCtx = pCtx;
856 void lsmLogMessage(lsm_db *pDb, int rc, const char *zFormat, ...){
857 if( pDb->xLog ){
858 LsmString s;
859 va_list ap, ap2;
860 lsmStringInit(&s, pDb->pEnv);
861 va_start(ap, zFormat);
862 va_start(ap2, zFormat);
863 lsmStringVAppendf(&s, zFormat, ap, ap2);
864 va_end(ap);
865 va_end(ap2);
866 pDb->xLog(pDb->pLogCtx, rc, s.z);
867 lsmStringClear(&s);
871 int lsm_begin(lsm_db *pDb, int iLevel){
872 int rc;
874 assert_db_state( pDb );
875 rc = (pDb->bReadonly ? LSM_READONLY : LSM_OK);
877 /* A value less than zero means open one more transaction. */
878 if( iLevel<0 ) iLevel = pDb->nTransOpen + 1;
879 if( iLevel>pDb->nTransOpen ){
880 int i;
882 /* Extend the pDb->aTrans[] array if required. */
883 if( rc==LSM_OK && pDb->nTransAlloc<iLevel ){
884 TransMark *aNew; /* New allocation */
885 int nByte = sizeof(TransMark) * (iLevel+1);
886 aNew = (TransMark *)lsmRealloc(pDb->pEnv, pDb->aTrans, nByte);
887 if( !aNew ){
888 rc = LSM_NOMEM;
889 }else{
890 nByte = sizeof(TransMark) * (iLevel+1 - pDb->nTransAlloc);
891 memset(&aNew[pDb->nTransAlloc], 0, nByte);
892 pDb->nTransAlloc = iLevel+1;
893 pDb->aTrans = aNew;
897 if( rc==LSM_OK && pDb->nTransOpen==0 ){
898 rc = lsmBeginWriteTrans(pDb);
901 if( rc==LSM_OK ){
902 for(i=pDb->nTransOpen; i<iLevel; i++){
903 lsmTreeMark(pDb, &pDb->aTrans[i].tree);
904 lsmLogTell(pDb, &pDb->aTrans[i].log);
906 pDb->nTransOpen = iLevel;
910 return rc;
913 int lsm_commit(lsm_db *pDb, int iLevel){
914 int rc = LSM_OK;
916 assert_db_state( pDb );
918 /* A value less than zero means close the innermost nested transaction. */
919 if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
921 if( iLevel<pDb->nTransOpen ){
922 if( iLevel==0 ){
923 int rc2;
924 /* Commit the transaction to disk. */
925 if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
926 if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
927 rc = lsmFsSyncLog(pDb->pFS);
929 rc2 = lsmFinishWriteTrans(pDb, (rc==LSM_OK));
930 if( rc==LSM_OK ) rc = rc2;
932 pDb->nTransOpen = iLevel;
934 dbReleaseClientSnapshot(pDb);
935 return rc;
938 int lsm_rollback(lsm_db *pDb, int iLevel){
939 int rc = LSM_OK;
940 assert_db_state( pDb );
942 if( pDb->nTransOpen ){
943 /* A value less than zero means close the innermost nested transaction. */
944 if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
946 if( iLevel<=pDb->nTransOpen ){
947 TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
948 lsmTreeRollback(pDb, &pMark->tree);
949 if( iLevel ) lsmLogSeek(pDb, &pMark->log);
950 pDb->nTransOpen = iLevel;
953 if( pDb->nTransOpen==0 ){
954 lsmFinishWriteTrans(pDb, 0);
956 dbReleaseClientSnapshot(pDb);
959 return rc;
962 int lsm_get_user_version(lsm_db *pDb, unsigned int *piUsr){
963 int rc = LSM_OK; /* Return code */
965 /* Open a read transaction if one is not already open. */
966 assert_db_state(pDb);
967 if( pDb->pShmhdr==0 ){
968 assert( pDb->bReadonly );
969 rc = lsmBeginRoTrans(pDb);
970 }else if( pDb->iReader<0 ){
971 rc = lsmBeginReadTrans(pDb);
974 /* Allocate the multi-cursor. */
975 if( rc==LSM_OK ){
976 *piUsr = pDb->treehdr.iUsrVersion;
979 dbReleaseClientSnapshot(pDb);
980 assert_db_state(pDb);
981 return rc;
984 int lsm_set_user_version(lsm_db *pDb, unsigned int iUsr){
985 int rc = LSM_OK; /* Return code */
986 int bCommit = 0; /* True to commit before returning */
988 if( pDb->nTransOpen==0 ){
989 bCommit = 1;
990 rc = lsm_begin(pDb, 1);
993 if( rc==LSM_OK ){
994 pDb->treehdr.iUsrVersion = iUsr;
997 /* If a transaction was opened at the start of this function, commit it.
998 ** Or, if an error has occurred, roll it back. */
999 if( bCommit ){
1000 if( rc==LSM_OK ){
1001 rc = lsm_commit(pDb, 0);
1002 }else{
1003 lsm_rollback(pDb, 0);
1007 return rc;