Snapshot of upstream SQLite 3.25.0
[sqlcipher.git] / ext / lsm1 / lsm_shared.c
blob2fdacf1eca3e75d148bf8bd9c32ea9427a5405cc
1 /*
2 ** 2012-01-23
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** Utilities used to help multiple LSM clients to coexist within the
14 ** same process space.
16 #include "lsmInt.h"
19 ** Global data. All global variables used by code in this file are grouped
20 ** into the following structure instance.
22 ** pDatabase:
23 ** Linked list of all Database objects allocated within this process.
24 ** This list may not be traversed without holding the global mutex (see
25 ** functions enterGlobalMutex() and leaveGlobalMutex()).
27 static struct SharedData {
28 Database *pDatabase; /* Linked list of all Database objects */
29 } gShared;
32 ** Database structure. There is one such structure for each distinct
33 ** database accessed by this process. They are stored in the singly linked
34 ** list starting at global variable gShared.pDatabase. Database objects are
35 ** reference counted. Once the number of connections to the associated
36 ** database drops to zero, they are removed from the linked list and deleted.
38 ** pFile:
39 ** In multi-process mode, this file descriptor is used to obtain locks
40 ** and to access shared-memory. In single process mode, its only job is
41 ** to hold the exclusive lock on the file.
42 **
44 struct Database {
45 /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */
46 char *zName; /* Canonical path to database file */
47 int nName; /* strlen(zName) */
48 int nDbRef; /* Number of associated lsm_db handles */
49 Database *pDbNext; /* Next Database structure in global list */
51 /* Protected by the local mutex (pClientMutex) */
52 int bReadonly; /* True if Database.pFile is read-only */
53 int bMultiProc; /* True if running in multi-process mode */
54 lsm_file *pFile; /* Used for locks/shm in multi-proc mode */
55 LsmFile *pLsmFile; /* List of deferred closes */
56 lsm_mutex *pClientMutex; /* Protects the apShmChunk[] and pConn */
57 int nShmChunk; /* Number of entries in apShmChunk[] array */
58 void **apShmChunk; /* Array of "shared" memory regions */
59 lsm_db *pConn; /* List of connections to this db. */
63 ** Functions to enter and leave the global mutex. This mutex is used
64 ** to protect the global linked-list headed at gShared.pDatabase.
66 static int enterGlobalMutex(lsm_env *pEnv){
67 lsm_mutex *p;
68 int rc = lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
69 if( rc==LSM_OK ) lsmMutexEnter(pEnv, p);
70 return rc;
72 static void leaveGlobalMutex(lsm_env *pEnv){
73 lsm_mutex *p;
74 lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
75 lsmMutexLeave(pEnv, p);
78 #ifdef LSM_DEBUG
79 static int holdingGlobalMutex(lsm_env *pEnv){
80 lsm_mutex *p;
81 lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
82 return lsmMutexHeld(pEnv, p);
84 #endif
86 #if 0
87 static void assertNotInFreelist(Freelist *p, int iBlk){
88 int i;
89 for(i=0; i<p->nEntry; i++){
90 assert( p->aEntry[i].iBlk!=iBlk );
93 #else
94 # define assertNotInFreelist(x,y)
95 #endif
98 ** Append an entry to the free-list. If (iId==-1), this is a delete.
100 int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
101 lsm_env *pEnv = db->pEnv;
102 Freelist *p;
103 int i;
105 assert( iId==-1 || iId>=0 );
106 p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
108 /* Extend the space allocated for the freelist, if required */
109 assert( p->nAlloc>=p->nEntry );
110 if( p->nAlloc==p->nEntry ){
111 int nNew;
112 int nByte;
113 FreelistEntry *aNew;
115 nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
116 nByte = sizeof(FreelistEntry) * nNew;
117 aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
118 if( !aNew ) return LSM_NOMEM_BKPT;
119 p->nAlloc = nNew;
120 p->aEntry = aNew;
123 for(i=0; i<p->nEntry; i++){
124 assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
125 if( p->aEntry[i].iBlk>=iBlk ) break;
128 if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
129 /* Clobber an existing entry */
130 p->aEntry[i].iId = iId;
131 }else{
132 /* Insert a new entry into the list */
133 int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
134 memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
135 p->aEntry[i].iBlk = iBlk;
136 p->aEntry[i].iId = iId;
137 p->nEntry++;
140 return LSM_OK;
144 ** This function frees all resources held by the Database structure passed
145 ** as the only argument.
147 static void freeDatabase(lsm_env *pEnv, Database *p){
148 assert( holdingGlobalMutex(pEnv) );
149 if( p ){
150 /* Free the mutexes */
151 lsmMutexDel(pEnv, p->pClientMutex);
153 if( p->pFile ){
154 lsmEnvClose(pEnv, p->pFile);
157 /* Free the array of shm pointers */
158 lsmFree(pEnv, p->apShmChunk);
160 /* Free the memory allocated for the Database struct itself */
161 lsmFree(pEnv, p);
165 typedef struct DbTruncateCtx DbTruncateCtx;
166 struct DbTruncateCtx {
167 int nBlock;
168 i64 iInUse;
171 static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
172 DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
173 if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
174 p->nBlock--;
175 return 0;
178 static int dbTruncate(lsm_db *pDb, i64 iInUse){
179 int rc = LSM_OK;
180 #if 0
181 int i;
182 DbTruncateCtx ctx;
184 assert( pDb->pWorker );
185 ctx.nBlock = pDb->pWorker->nBlock;
186 ctx.iInUse = iInUse;
188 rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
189 for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
190 rc = freelistAppend(pDb, i, -1);
193 if( rc==LSM_OK ){
194 #ifdef LSM_LOG_FREELIST
195 if( ctx.nBlock!=pDb->pWorker->nBlock ){
196 lsmLogMessage(pDb, 0,
197 "dbTruncate(): truncated db to %d blocks",ctx.nBlock
200 #endif
201 pDb->pWorker->nBlock = ctx.nBlock;
203 #endif
204 return rc;
209 ** This function is called during database shutdown (when the number of
210 ** connections drops from one to zero). It truncates the database file
211 ** to as small a size as possible without truncating away any blocks that
212 ** contain data.
214 static int dbTruncateFile(lsm_db *pDb){
215 int rc;
217 assert( pDb->pWorker==0 );
218 assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
219 rc = lsmCheckpointLoadWorker(pDb);
221 if( rc==LSM_OK ){
222 DbTruncateCtx ctx;
224 /* Walk the database free-block-list in reverse order. Set ctx.nBlock
225 ** to the block number of the last block in the database that actually
226 ** contains data. */
227 ctx.nBlock = pDb->pWorker->nBlock;
228 ctx.iInUse = -1;
229 rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
231 /* If the last block that contains data is not already the last block in
232 ** the database file, truncate the database file so that it is. */
233 if( rc==LSM_OK ){
234 rc = lsmFsTruncateDb(
235 pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
240 lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
241 pDb->pWorker = 0;
242 return rc;
245 static void doDbDisconnect(lsm_db *pDb){
246 int rc;
248 if( pDb->bReadonly ){
249 lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
250 }else{
251 /* Block for an exclusive lock on DMS1. This lock serializes all calls
252 ** to doDbConnect() and doDbDisconnect() across all processes. */
253 rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
254 if( rc==LSM_OK ){
256 lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
258 /* Try an exclusive lock on DMS2. If successful, this is the last
259 ** connection to the database. In this case flush the contents of the
260 ** in-memory tree to disk and write a checkpoint. */
261 rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
262 if( rc==LSM_OK ){
263 rc = lsmShmTestLock(pDb, LSM_LOCK_CHECKPOINTER, 1, LSM_LOCK_EXCL);
265 if( rc==LSM_OK ){
266 int bReadonly = 0; /* True if there exist read-only conns. */
268 /* Flush the in-memory tree, if required. If there is data to flush,
269 ** this will create a new client snapshot in Database.pClient. The
270 ** checkpoint (serialization) of this snapshot may be written to disk
271 ** by the following block.
273 ** There is no need to take a WRITER lock here. That there are no
274 ** other locks on DMS2 guarantees that there are no other read-write
275 ** connections at this time (and the lock on DMS1 guarantees that
276 ** no new ones may appear).
278 rc = lsmTreeLoadHeader(pDb, 0);
279 if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
280 rc = lsmFlushTreeToDisk(pDb);
283 /* Now check if there are any read-only connections. If there are,
284 ** then do not truncate the db file or unlink the shared-memory
285 ** region. */
286 if( rc==LSM_OK ){
287 rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
288 if( rc==LSM_BUSY ){
289 bReadonly = 1;
290 rc = LSM_OK;
294 /* Write a checkpoint to disk. */
295 if( rc==LSM_OK ){
296 rc = lsmCheckpointWrite(pDb, 0);
299 /* If the checkpoint was written successfully, delete the log file
300 ** and, if possible, truncate the database file. */
301 if( rc==LSM_OK ){
302 int bRotrans = 0;
303 Database *p = pDb->pDatabase;
305 /* The log file may only be deleted if there are no clients
306 ** read-only clients running rotrans transactions. */
307 rc = lsmDetectRoTrans(pDb, &bRotrans);
308 if( rc==LSM_OK && bRotrans==0 ){
309 lsmFsCloseAndDeleteLog(pDb->pFS);
312 /* The database may only be truncated if there exist no read-only
313 ** clients - either connected or running rotrans transactions. */
314 if( bReadonly==0 && bRotrans==0 ){
315 lsmFsUnmap(pDb->pFS);
316 dbTruncateFile(pDb);
317 if( p->pFile && p->bMultiProc ){
318 lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
325 if( pDb->iRwclient>=0 ){
326 lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
327 pDb->iRwclient = -1;
330 lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
332 pDb->pShmhdr = 0;
335 static int doDbConnect(lsm_db *pDb){
336 const int nUsMax = 100000; /* Max value for nUs */
337 int nUs = 1000; /* us to wait between DMS1 attempts */
338 int rc;
340 /* Obtain a pointer to the shared-memory header */
341 assert( pDb->pShmhdr==0 );
342 assert( pDb->bReadonly==0 );
344 /* Block for an exclusive lock on DMS1. This lock serializes all calls
345 ** to doDbConnect() and doDbDisconnect() across all processes. */
346 while( 1 ){
347 rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
348 if( rc!=LSM_BUSY ) break;
349 lsmEnvSleep(pDb->pEnv, nUs);
350 nUs = nUs * 2;
351 if( nUs>nUsMax ) nUs = nUsMax;
353 if( rc==LSM_OK ){
354 rc = lsmShmCacheChunks(pDb, 1);
356 if( rc!=LSM_OK ) return rc;
357 pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];
359 /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first
360 ** and only connection to the database. In this case initialize the
361 ** shared-memory and run log file recovery. */
362 assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
363 rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
364 if( rc==LSM_OK ){
365 memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
366 rc = lsmCheckpointRecover(pDb);
367 if( rc==LSM_OK ){
368 rc = lsmLogRecover(pDb);
370 if( rc==LSM_OK ){
371 ShmHeader *pShm = pDb->pShmhdr;
372 pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
373 pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
375 }else if( rc==LSM_BUSY ){
376 rc = LSM_OK;
379 /* Take a shared lock on DMS2. In multi-process mode this lock "cannot"
380 ** fail, as connections may only hold an exclusive lock on DMS2 if they
381 ** first hold an exclusive lock on DMS1. And this connection is currently
382 ** holding the exclusive lock on DSM1.
384 ** However, if some other connection has the database open in single-process
385 ** mode, this operation will fail. In this case, return the error to the
386 ** caller - the attempt to connect to the db has failed.
388 if( rc==LSM_OK ){
389 rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
392 /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
393 ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
394 if( rc!=LSM_OK ){
395 pDb->pShmhdr = 0;
396 }else{
397 int i;
398 for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
399 int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
400 if( rc2==LSM_OK ) pDb->iRwclient = i;
401 if( rc2!=LSM_BUSY ){
402 rc = rc2;
403 break;
407 lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
409 return rc;
412 static int dbOpenSharedFd(lsm_env *pEnv, Database *p, int bRoOk){
413 int rc;
415 rc = lsmEnvOpen(pEnv, p->zName, 0, &p->pFile);
416 if( rc==LSM_IOERR && bRoOk ){
417 rc = lsmEnvOpen(pEnv, p->zName, LSM_OPEN_READONLY, &p->pFile);
418 p->bReadonly = 1;
421 return rc;
425 ** Return a reference to the shared Database handle for the database
426 ** identified by canonical path zName. If this is the first connection to
427 ** the named database, a new Database object is allocated. Otherwise, a
428 ** pointer to an existing object is returned.
430 ** If successful, *ppDatabase is set to point to the shared Database
431 ** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
432 ** and and LSM error code returned.
434 ** Each successful call to this function should be (eventually) matched
435 ** by a call to lsmDbDatabaseRelease().
437 int lsmDbDatabaseConnect(
438 lsm_db *pDb, /* Database handle */
439 const char *zName /* Full-path to db file */
441 lsm_env *pEnv = pDb->pEnv;
442 int rc; /* Return code */
443 Database *p = 0; /* Pointer returned via *ppDatabase */
444 int nName = lsmStrlen(zName);
446 assert( pDb->pDatabase==0 );
447 rc = enterGlobalMutex(pEnv);
448 if( rc==LSM_OK ){
450 /* Search the global list for an existing object. TODO: Need something
451 ** better than the memcmp() below to figure out if a given Database
452 ** object represents the requested file. */
453 for(p=gShared.pDatabase; p; p=p->pDbNext){
454 if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break;
457 /* If no suitable Database object was found, allocate a new one. */
458 if( p==0 ){
459 p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc);
461 /* If the allocation was successful, fill in other fields and
462 ** allocate the client mutex. */
463 if( rc==LSM_OK ){
464 p->bMultiProc = pDb->bMultiProc;
465 p->zName = (char *)&p[1];
466 p->nName = nName;
467 memcpy((void *)p->zName, zName, nName+1);
468 rc = lsmMutexNew(pEnv, &p->pClientMutex);
471 /* If nothing has gone wrong so far, open the shared fd. And if that
472 ** succeeds and this connection requested single-process mode,
473 ** attempt to take the exclusive lock on DMS2. */
474 if( rc==LSM_OK ){
475 int bReadonly = (pDb->bReadonly && pDb->bMultiProc);
476 rc = dbOpenSharedFd(pDb->pEnv, p, bReadonly);
479 if( rc==LSM_OK && p->bMultiProc==0 ){
480 /* Hold an exclusive lock DMS1 while grabbing DMS2. This ensures
481 ** that any ongoing call to doDbDisconnect() (even one in another
482 ** process) is finished before proceeding. */
483 assert( p->bReadonly==0 );
484 rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_EXCL);
485 if( rc==LSM_OK ){
486 rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL);
487 lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK);
491 if( rc==LSM_OK ){
492 p->pDbNext = gShared.pDatabase;
493 gShared.pDatabase = p;
494 }else{
495 freeDatabase(pEnv, p);
496 p = 0;
500 if( p ){
501 p->nDbRef++;
503 leaveGlobalMutex(pEnv);
505 if( p ){
506 lsmMutexEnter(pDb->pEnv, p->pClientMutex);
507 pDb->pNext = p->pConn;
508 p->pConn = pDb;
509 lsmMutexLeave(pDb->pEnv, p->pClientMutex);
513 pDb->pDatabase = p;
514 if( rc==LSM_OK ){
515 assert( p );
516 rc = lsmFsOpen(pDb, zName, p->bReadonly);
519 /* If the db handle is read-write, then connect to the system now. Run
520 ** recovery as necessary. Or, if this is a read-only database handle,
521 ** defer attempting to connect to the system until a read-transaction
522 ** is opened. */
523 if( rc==LSM_OK ){
524 rc = lsmFsConfigure(pDb);
526 if( rc==LSM_OK && pDb->bReadonly==0 ){
527 rc = doDbConnect(pDb);
530 return rc;
533 static void dbDeferClose(lsm_db *pDb){
534 if( pDb->pFS ){
535 LsmFile *pLsmFile;
536 Database *p = pDb->pDatabase;
537 pLsmFile = lsmFsDeferClose(pDb->pFS);
538 pLsmFile->pNext = p->pLsmFile;
539 p->pLsmFile = pLsmFile;
543 LsmFile *lsmDbRecycleFd(lsm_db *db){
544 LsmFile *pRet;
545 Database *p = db->pDatabase;
546 lsmMutexEnter(db->pEnv, p->pClientMutex);
547 if( (pRet = p->pLsmFile)!=0 ){
548 p->pLsmFile = pRet->pNext;
550 lsmMutexLeave(db->pEnv, p->pClientMutex);
551 return pRet;
555 ** Release a reference to a Database object obtained from
556 ** lsmDbDatabaseConnect(). There should be exactly one call to this function
557 ** for each successful call to Find().
559 void lsmDbDatabaseRelease(lsm_db *pDb){
560 Database *p = pDb->pDatabase;
561 if( p ){
562 lsm_db **ppDb;
564 if( pDb->pShmhdr ){
565 doDbDisconnect(pDb);
568 lsmFsUnmap(pDb->pFS);
569 lsmMutexEnter(pDb->pEnv, p->pClientMutex);
570 for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
571 *ppDb = pDb->pNext;
572 dbDeferClose(pDb);
573 lsmMutexLeave(pDb->pEnv, p->pClientMutex);
575 enterGlobalMutex(pDb->pEnv);
576 p->nDbRef--;
577 if( p->nDbRef==0 ){
578 LsmFile *pIter;
579 LsmFile *pNext;
580 Database **pp;
582 /* Remove the Database structure from the linked list. */
583 for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
584 *pp = p->pDbNext;
586 /* If they were allocated from the heap, free the shared memory chunks */
587 if( p->bMultiProc==0 ){
588 int i;
589 for(i=0; i<p->nShmChunk; i++){
590 lsmFree(pDb->pEnv, p->apShmChunk[i]);
594 /* Close any outstanding file descriptors */
595 for(pIter=p->pLsmFile; pIter; pIter=pNext){
596 pNext = pIter->pNext;
597 lsmEnvClose(pDb->pEnv, pIter->pFile);
598 lsmFree(pDb->pEnv, pIter);
600 freeDatabase(pDb->pEnv, p);
602 leaveGlobalMutex(pDb->pEnv);
606 Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
607 return pSnapshot->pLevel;
610 void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
611 pSnap->pLevel = pLevel;
614 /* TODO: Shuffle things around to get rid of this */
615 static int firstSnapshotInUse(lsm_db *, i64 *);
618 ** Context object used by the lsmWalkFreelist() utility.
620 typedef struct WalkFreelistCtx WalkFreelistCtx;
621 struct WalkFreelistCtx {
622 lsm_db *pDb;
623 int bReverse;
624 Freelist *pFreelist;
625 int iFree;
626 int (*xUsr)(void *, int, i64); /* User callback function */
627 void *pUsrctx; /* User callback context */
628 int bDone; /* Set to true after xUsr() returns true */
632 ** Callback used by lsmWalkFreelist().
634 static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
635 WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
636 const int iDir = (p->bReverse ? -1 : 1);
637 Freelist *pFree = p->pFreelist;
639 assert( p->bDone==0 );
640 assert( iBlk>=0 );
641 if( pFree ){
642 while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
643 FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
644 if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
645 || (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
647 break;
648 }else{
649 p->iFree += iDir;
650 if( pEntry->iId>=0
651 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId)
653 p->bDone = 1;
654 return 1;
656 if( pEntry->iBlk==(u32)iBlk ) return 0;
661 if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
662 p->bDone = 1;
663 return 1;
665 return 0;
669 ** The database handle passed as the first argument must be the worker
670 ** connection. This function iterates through the contents of the current
671 ** free block list, invoking the supplied callback once for each list
672 ** element.
674 ** The difference between this function and lsmSortedWalkFreelist() is
675 ** that lsmSortedWalkFreelist() only considers those free-list elements
676 ** stored within the LSM. This function also merges in any in-memory
677 ** elements.
679 int lsmWalkFreelist(
680 lsm_db *pDb, /* Database handle (must be worker) */
681 int bReverse, /* True to iterate from largest to smallest */
682 int (*x)(void *, int, i64), /* Callback function */
683 void *pCtx /* First argument to pass to callback */
685 const int iDir = (bReverse ? -1 : 1);
686 int rc;
687 int iCtx;
689 WalkFreelistCtx ctx[2];
691 ctx[0].pDb = pDb;
692 ctx[0].bReverse = bReverse;
693 ctx[0].pFreelist = &pDb->pWorker->freelist;
694 if( ctx[0].pFreelist && bReverse ){
695 ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
696 }else{
697 ctx[0].iFree = 0;
699 ctx[0].xUsr = walkFreelistCb;
700 ctx[0].pUsrctx = (void *)&ctx[1];
701 ctx[0].bDone = 0;
703 ctx[1].pDb = pDb;
704 ctx[1].bReverse = bReverse;
705 ctx[1].pFreelist = pDb->pFreelist;
706 if( ctx[1].pFreelist && bReverse ){
707 ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
708 }else{
709 ctx[1].iFree = 0;
711 ctx[1].xUsr = x;
712 ctx[1].pUsrctx = pCtx;
713 ctx[1].bDone = 0;
715 rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);
717 if( ctx[0].bDone==0 ){
718 for(iCtx=0; iCtx<2; iCtx++){
719 int i;
720 WalkFreelistCtx *p = &ctx[iCtx];
721 for(i=p->iFree;
722 p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
723 i += iDir
725 FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
726 if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
727 return LSM_OK;
733 return rc;
737 typedef struct FindFreeblockCtx FindFreeblockCtx;
738 struct FindFreeblockCtx {
739 i64 iInUse;
740 int iRet;
741 int bNotOne;
744 static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
745 FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
746 if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
747 p->iRet = iBlk;
748 return 1;
750 return 0;
753 static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
754 int rc; /* Return code */
755 FindFreeblockCtx ctx; /* Context object */
757 ctx.iInUse = iInUse;
758 ctx.iRet = 0;
759 ctx.bNotOne = bNotOne;
760 rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
761 *piRet = ctx.iRet;
763 return rc;
767 ** Allocate a new database file block to write data to, either by extending
768 ** the database file or by recycling a free-list entry. The worker snapshot
769 ** must be held in order to call this function.
771 ** If successful, *piBlk is set to the block number allocated and LSM_OK is
772 ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
774 int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
775 Snapshot *p = pDb->pWorker;
776 int iRet = 0; /* Block number of allocated block */
777 int rc = LSM_OK;
778 i64 iInUse = 0; /* Snapshot id still in use */
779 i64 iSynced = 0; /* Snapshot id synced to disk */
781 assert( p );
783 #ifdef LSM_LOG_FREELIST
785 static int nCall = 0;
786 char *zFree = 0;
787 nCall++;
788 rc = lsmInfoFreelist(pDb, &zFree);
789 if( rc!=LSM_OK ) return rc;
790 lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
791 lsmFree(pDb->pEnv, zFree);
793 #endif
795 /* Set iInUse to the smallest snapshot id that is either:
797 ** * Currently in use by a database client,
798 ** * May be used by a database client in the future, or
799 ** * Is the most recently checkpointed snapshot (i.e. the one that will
800 ** be used following recovery if a failure occurs at this point).
802 rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
803 if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
804 iInUse = iSynced;
805 if( rc==LSM_OK && pDb->iReader>=0 ){
806 assert( pDb->pClient );
807 iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
809 if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
811 #ifdef LSM_LOG_FREELIST
813 lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
814 "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
815 iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
818 #endif
821 /* Unless there exists a read-only transaction (which prevents us from
822 ** recycling any blocks regardless, query the free block list for a
823 ** suitable block to reuse.
825 ** It might seem more natural to check for a read-only transaction at
826 ** the start of this function. However, it is better do wait until after
827 ** the call to lsmCheckpointSynced() to do so.
829 if( rc==LSM_OK ){
830 int bRotrans;
831 rc = lsmDetectRoTrans(pDb, &bRotrans);
833 if( rc==LSM_OK && bRotrans==0 ){
834 rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
838 if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
839 iRet = 0;
841 }else if( rc==LSM_OK ){
842 /* If a block was found in the free block list, use it and remove it from
843 ** the list. Otherwise, if no suitable block was found, allocate one from
844 ** the end of the file. */
845 if( iRet>0 ){
846 #ifdef LSM_LOG_FREELIST
847 lsmLogMessage(pDb, 0,
848 "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
849 #endif
850 rc = freelistAppend(pDb, iRet, -1);
851 if( rc==LSM_OK ){
852 rc = dbTruncate(pDb, iInUse);
854 }else{
855 iRet = ++(p->nBlock);
856 #ifdef LSM_LOG_FREELIST
857 lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
858 #endif
862 assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
863 *piBlk = iRet;
864 return rc;
868 ** Free a database block. The worker snapshot must be held in order to call
869 ** this function.
871 ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g.
872 ** LSM_NOMEM).
874 int lsmBlockFree(lsm_db *pDb, int iBlk){
875 Snapshot *p = pDb->pWorker;
876 assert( lsmShmAssertWorker(pDb) );
878 #ifdef LSM_LOG_FREELIST
879 lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
880 #endif
882 return freelistAppend(pDb, iBlk, p->iId);
886 ** Refree a database block. The worker snapshot must be held in order to call
887 ** this function.
889 ** Refreeing is required when a block is allocated using lsmBlockAllocate()
890 ** but then not used. This function is used to push the block back onto
891 ** the freelist. Refreeing a block is different from freeing is, as a refreed
892 ** block may be reused immediately. Whereas a freed block can not be reused
893 ** until (at least) after the next checkpoint.
895 int lsmBlockRefree(lsm_db *pDb, int iBlk){
896 int rc = LSM_OK; /* Return code */
898 #ifdef LSM_LOG_FREELIST
899 lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
900 #endif
902 rc = freelistAppend(pDb, iBlk, 0);
903 return rc;
907 ** If required, copy a database checkpoint from shared memory into the
908 ** database itself.
910 ** The WORKER lock must not be held when this is called. This is because
911 ** this function may indirectly call fsync(). And the WORKER lock should
912 ** not be held that long (in case it is required by a client flushing an
913 ** in-memory tree to disk).
915 int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
916 int rc; /* Return Code */
917 u32 nWrite = 0;
919 assert( pDb->pWorker==0 );
920 assert( 1 || pDb->pClient==0 );
921 assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
923 rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
924 if( rc!=LSM_OK ) return rc;
926 rc = lsmCheckpointLoad(pDb, 0);
927 if( rc==LSM_OK ){
928 int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
929 ShmHeader *pShm = pDb->pShmhdr;
930 int bDone = 0; /* True if checkpoint is already stored */
932 /* Check if this checkpoint has already been written to the database
933 ** file. If so, set variable bDone to true. */
934 if( pShm->iMetaPage ){
935 MetaPage *pPg; /* Meta page */
936 u8 *aData; /* Meta-page data buffer */
937 int nData; /* Size of aData[] in bytes */
938 i64 iCkpt; /* Id of checkpoint just loaded */
939 i64 iDisk = 0; /* Id of checkpoint already stored in db */
940 iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
941 rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
942 if( rc==LSM_OK ){
943 aData = lsmFsMetaPageData(pPg, &nData);
944 iDisk = lsmCheckpointId((u32 *)aData, 1);
945 nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
946 lsmFsMetaPageRelease(pPg);
948 bDone = (iDisk>=iCkpt);
951 if( rc==LSM_OK && bDone==0 ){
952 int iMeta = (pShm->iMetaPage % 2) + 1;
953 if( pDb->eSafety!=LSM_SAFETY_OFF ){
954 rc = lsmFsSyncDb(pDb->pFS, nBlock);
956 if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
957 if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
958 rc = lsmFsSyncDb(pDb->pFS, 0);
960 if( rc==LSM_OK ){
961 pShm->iMetaPage = iMeta;
962 nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
964 #ifdef LSM_LOG_WORK
965 lsmLogMessage(pDb, 0, "finish checkpoint %d",
966 (int)lsmCheckpointId(pDb->aSnapshot, 0)
968 #endif
972 lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
973 if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
974 return rc;
977 int lsmBeginWork(lsm_db *pDb){
978 int rc;
980 /* Attempt to take the WORKER lock */
981 rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
983 /* Deserialize the current worker snapshot */
984 if( rc==LSM_OK ){
985 rc = lsmCheckpointLoadWorker(pDb);
987 return rc;
990 void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
991 if( p ){
992 lsmSortedFreeLevel(pEnv, p->pLevel);
993 lsmFree(pEnv, p->freelist.aEntry);
994 lsmFree(pEnv, p->redirect.a);
995 lsmFree(pEnv, p);
1000 ** Attempt to populate one of the read-lock slots to contain lock values
1001 ** iLsm/iShm. Or, if such a slot exists already, this function is a no-op.
1003 ** It is not an error if no slot can be populated because the write-lock
1004 ** cannot be obtained. If any other error occurs, return an LSM error code.
1005 ** Otherwise, LSM_OK.
1007 ** This function is called at various points to try to ensure that there
1008 ** always exists at least one read-lock slot that can be used by a read-only
1009 ** client. And so that, in the usual case, there is an "exact match" available
1010 ** whenever a read transaction is opened by any client. At present this
1011 ** function is called when:
1013 ** * A write transaction that called lsmTreeDiscardOld() is committed, and
1014 ** * Whenever the working snapshot is updated (i.e. lsmFinishWork()).
1016 static int dbSetReadLock(lsm_db *db, i64 iLsm, u32 iShm){
1017 int rc = LSM_OK;
1018 ShmHeader *pShm = db->pShmhdr;
1019 int i;
1021 /* Check if there is already a slot containing the required values. */
1022 for(i=0; i<LSM_LOCK_NREADER; i++){
1023 ShmReader *p = &pShm->aReader[i];
1024 if( p->iLsmId==iLsm && p->iTreeId==iShm ) return LSM_OK;
1027 /* Iterate through all read-lock slots, attempting to take a write-lock
1028 ** on each of them. If a write-lock succeeds, populate the locked slot
1029 ** with the required values and break out of the loop. */
1030 for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1031 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1032 if( rc==LSM_BUSY ){
1033 rc = LSM_OK;
1034 }else{
1035 ShmReader *p = &pShm->aReader[i];
1036 p->iLsmId = iLsm;
1037 p->iTreeId = iShm;
1038 lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1039 break;
1043 return rc;
1047 ** Release the read-lock currently held by connection db.
1049 int dbReleaseReadlock(lsm_db *db){
1050 int rc = LSM_OK;
1051 if( db->iReader>=0 ){
1052 rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
1053 db->iReader = -1;
1055 db->bRoTrans = 0;
1056 return rc;
1061 ** Argument bFlush is true if the contents of the in-memory tree has just
1062 ** been flushed to disk. The significance of this is that once the snapshot
1063 ** created to hold the updated state of the database is synced to disk, log
1064 ** file space can be recycled.
1066 void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
1067 int rc = *pRc;
1068 assert( rc!=0 || pDb->pWorker );
1069 if( pDb->pWorker ){
1070 /* If no error has occurred, serialize the worker snapshot and write
1071 ** it to shared memory. */
1072 if( rc==LSM_OK ){
1073 rc = lsmSaveWorker(pDb, bFlush);
1076 /* Assuming no error has occurred, update a read lock slot with the
1077 ** new snapshot id (see comments above function dbSetReadLock()). */
1078 if( rc==LSM_OK ){
1079 if( pDb->iReader<0 ){
1080 rc = lsmTreeLoadHeader(pDb, 0);
1082 if( rc==LSM_OK ){
1083 rc = dbSetReadLock(pDb, pDb->pWorker->iId, pDb->treehdr.iUsedShmid);
1087 /* Free the snapshot object. */
1088 lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
1089 pDb->pWorker = 0;
1092 lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
1093 *pRc = rc;
1097 ** Called when recovery is finished.
1099 int lsmFinishRecovery(lsm_db *pDb){
1100 lsmTreeEndTransaction(pDb, 1);
1101 return LSM_OK;
1105 ** Check if the currently configured compression functions
1106 ** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
1107 ** compression id set to iReq. Compression routines are compatible if iReq
1108 ** is zero (indicating the database is empty), or if it is equal to the
1109 ** compression id of the configured compression routines.
1111 ** If the check shows that the current compression are incompatible and there
1112 ** is a compression factory registered, give it a chance to install new
1113 ** compression routines.
1115 ** If, after any registered factory is invoked, the compression functions
1116 ** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
1118 int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
1119 if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
1120 if( pDb->factory.xFactory ){
1121 pDb->bInFactory = 1;
1122 pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
1123 pDb->bInFactory = 0;
1125 if( pDb->compress.iId!=iReq ){
1126 /* Incompatible */
1127 return LSM_MISMATCH;
1130 /* Compatible */
1131 return LSM_OK;
1135 ** Begin a read transaction. This function is a no-op if the connection
1136 ** passed as the only argument already has an open read transaction.
1138 int lsmBeginReadTrans(lsm_db *pDb){
1139 const int MAX_READLOCK_ATTEMPTS = 10;
1140 const int nMaxAttempt = (pDb->bRoTrans ? 1 : MAX_READLOCK_ATTEMPTS);
1142 int rc = LSM_OK; /* Return code */
1143 int iAttempt = 0;
1145 assert( pDb->pWorker==0 );
1147 while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<nMaxAttempt ){
1148 int iTreehdr = 0;
1149 int iSnap = 0;
1150 assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
1152 /* Load the in-memory tree header. */
1153 rc = lsmTreeLoadHeader(pDb, &iTreehdr);
1155 /* Load the database snapshot */
1156 if( rc==LSM_OK ){
1157 if( lsmCheckpointClientCacheOk(pDb)==0 ){
1158 lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
1159 pDb->pClient = 0;
1160 lsmMCursorFreeCache(pDb);
1161 lsmFsPurgeCache(pDb->pFS);
1162 rc = lsmCheckpointLoad(pDb, &iSnap);
1163 }else{
1164 iSnap = 1;
1168 /* Take a read-lock on the tree and snapshot just loaded. Then check
1169 ** that the shared-memory still contains the same values. If so, proceed.
1170 ** Otherwise, relinquish the read-lock and retry the whole procedure
1171 ** (starting with loading the in-memory tree header). */
1172 if( rc==LSM_OK ){
1173 u32 iShmMax = pDb->treehdr.iUsedShmid;
1174 u32 iShmMin = pDb->treehdr.iNextShmid+1-LSM_MAX_SHMCHUNKS;
1175 rc = lsmReadlock(
1176 pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
1178 if( rc==LSM_OK ){
1179 if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
1180 && lsmCheckpointLoadOk(pDb, iSnap)
1182 /* Read lock has been successfully obtained. Deserialize the
1183 ** checkpoint just loaded. TODO: This will be removed after
1184 ** lsm_sorted.c is changed to work directly from the serialized
1185 ** version of the snapshot. */
1186 if( pDb->pClient==0 ){
1187 rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
1189 assert( (rc==LSM_OK)==(pDb->pClient!=0) );
1190 assert( pDb->iReader>=0 );
1192 /* Check that the client has the right compression hooks loaded.
1193 ** If not, set rc to LSM_MISMATCH. */
1194 if( rc==LSM_OK ){
1195 rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
1197 }else{
1198 rc = dbReleaseReadlock(pDb);
1202 if( rc==LSM_BUSY ){
1203 rc = LSM_OK;
1206 #if 0
1207 if( rc==LSM_OK && pDb->pClient ){
1208 fprintf(stderr,
1209 "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
1210 (void *)pDb,
1211 (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid,
1212 (int)pDb->treehdr.root.iTransId,
1213 (int)pDb->treehdr.iOldShmid
1216 #endif
1219 if( rc==LSM_OK ){
1220 rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
1222 if( rc!=LSM_OK ){
1223 dbReleaseReadlock(pDb);
1225 if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
1226 return rc;
1230 ** This function is used by a read-write connection to determine if there
1231 ** are currently one or more read-only transactions open on the database
1232 ** (in this context a read-only transaction is one opened by a read-only
1233 ** connection on a non-live database).
1235 ** If no error occurs, LSM_OK is returned and *pbExists is set to true if
1236 ** some other connection has a read-only transaction open, or false
1237 ** otherwise. If an error occurs an LSM error code is returned and the final
1238 ** value of *pbExist is undefined.
1240 int lsmDetectRoTrans(lsm_db *db, int *pbExist){
1241 int rc;
1243 /* Only a read-write connection may use this function. */
1244 assert( db->bReadonly==0 );
1246 rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
1247 if( rc==LSM_BUSY ){
1248 *pbExist = 1;
1249 rc = LSM_OK;
1250 }else{
1251 *pbExist = 0;
1254 return rc;
1258 ** db is a read-only database handle in the disconnected state. This function
1259 ** attempts to open a read-transaction on the database. This may involve
1260 ** connecting to the database system (opening shared memory etc.).
1262 int lsmBeginRoTrans(lsm_db *db){
1263 int rc = LSM_OK;
1265 assert( db->bReadonly && db->pShmhdr==0 );
1266 assert( db->iReader<0 );
1268 if( db->bRoTrans==0 ){
1270 /* Attempt a shared-lock on DMS1. */
1271 rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
1272 if( rc!=LSM_OK ) return rc;
1274 rc = lsmShmTestLock(
1275 db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
1277 if( rc==LSM_OK ){
1278 /* System is not live. Take a SHARED lock on the ROTRANS byte and
1279 ** release DMS1. Locking ROTRANS tells all read-write clients that they
1280 ** may not recycle any disk space from within the database or log files,
1281 ** as a read-only client may be using it. */
1282 rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
1283 lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
1285 if( rc==LSM_OK ){
1286 db->bRoTrans = 1;
1287 rc = lsmShmCacheChunks(db, 1);
1288 if( rc==LSM_OK ){
1289 db->pShmhdr = (ShmHeader *)db->apShm[0];
1290 memset(db->pShmhdr, 0, sizeof(ShmHeader));
1291 rc = lsmCheckpointRecover(db);
1292 if( rc==LSM_OK ){
1293 rc = lsmLogRecover(db);
1297 }else if( rc==LSM_BUSY ){
1298 /* System is live! */
1299 rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
1300 lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
1301 if( rc==LSM_OK ){
1302 rc = lsmShmCacheChunks(db, 1);
1303 if( rc==LSM_OK ){
1304 db->pShmhdr = (ShmHeader *)db->apShm[0];
1309 if( rc==LSM_OK ){
1310 rc = lsmBeginReadTrans(db);
1314 return rc;
1318 ** Close the currently open read transaction.
1320 void lsmFinishReadTrans(lsm_db *pDb){
1322 /* Worker connections should not be closing read transactions. And
1323 ** read transactions should only be closed after all cursors and write
1324 ** transactions have been closed. Finally pClient should be non-NULL
1325 ** only iff pDb->iReader>=0. */
1326 assert( pDb->pWorker==0 );
1327 assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
1329 if( pDb->bRoTrans ){
1330 int i;
1331 for(i=0; i<pDb->nShm; i++){
1332 lsmFree(pDb->pEnv, pDb->apShm[i]);
1334 lsmFree(pDb->pEnv, pDb->apShm);
1335 pDb->apShm = 0;
1336 pDb->nShm = 0;
1337 pDb->pShmhdr = 0;
1339 lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
1341 dbReleaseReadlock(pDb);
1345 ** Open a write transaction.
1347 int lsmBeginWriteTrans(lsm_db *pDb){
1348 int rc = LSM_OK; /* Return code */
1349 ShmHeader *pShm = pDb->pShmhdr; /* Shared memory header */
1351 assert( pDb->nTransOpen==0 );
1352 assert( pDb->bDiscardOld==0 );
1353 assert( pDb->bReadonly==0 );
1355 /* If there is no read-transaction open, open one now. */
1356 if( pDb->iReader<0 ){
1357 rc = lsmBeginReadTrans(pDb);
1360 /* Attempt to take the WRITER lock */
1361 if( rc==LSM_OK ){
1362 rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
1365 /* If the previous writer failed mid-transaction, run emergency rollback. */
1366 if( rc==LSM_OK && pShm->bWriter ){
1367 rc = lsmTreeRepair(pDb);
1368 if( rc==LSM_OK ) pShm->bWriter = 0;
1371 /* Check that this connection is currently reading from the most recent
1372 ** version of the database. If not, return LSM_BUSY. */
1373 if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
1374 rc = LSM_BUSY;
1377 if( rc==LSM_OK ){
1378 rc = lsmLogBegin(pDb);
1381 /* If everything was successful, set the "transaction-in-progress" flag
1382 ** and return LSM_OK. Otherwise, if some error occurred, relinquish the
1383 ** WRITER lock and return an error code. */
1384 if( rc==LSM_OK ){
1385 TreeHeader *p = &pDb->treehdr;
1386 pShm->bWriter = 1;
1387 p->root.iTransId++;
1388 if( lsmTreeHasOld(pDb) && p->iOldLog==pDb->pClient->iLogOff ){
1389 lsmTreeDiscardOld(pDb);
1390 pDb->bDiscardOld = 1;
1392 }else{
1393 lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
1394 if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
1396 return rc;
1400 ** End the current write transaction. The connection is left with an open
1401 ** read transaction. It is an error to call this if there is no open write
1402 ** transaction.
1404 ** If the transaction was committed, then a commit record has already been
1405 ** written into the log file when this function is called. Or, if the
1406 ** transaction was rolled back, both the log file and in-memory tree
1407 ** structure have already been restored. In either case, this function
1408 ** merely releases locks and other resources held by the write-transaction.
1410 ** LSM_OK is returned if successful, or an LSM error code otherwise.
1412 int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
1413 int rc = LSM_OK;
1414 int bFlush = 0;
1416 lsmLogEnd(pDb, bCommit);
1417 if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
1418 bFlush = 1;
1419 lsmTreeMakeOld(pDb);
1421 lsmTreeEndTransaction(pDb, bCommit);
1423 if( rc==LSM_OK ){
1424 if( bFlush && pDb->bAutowork ){
1425 rc = lsmSortedAutoWork(pDb, 1);
1426 }else if( bCommit && pDb->bDiscardOld ){
1427 rc = dbSetReadLock(pDb, pDb->pClient->iId, pDb->treehdr.iUsedShmid);
1430 pDb->bDiscardOld = 0;
1431 lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
1433 if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
1434 pDb->xWork(pDb, pDb->pWorkCtx);
1436 return rc;
1441 ** Return non-zero if the caller is holding the client mutex.
1443 #ifdef LSM_DEBUG
1444 int lsmHoldingClientMutex(lsm_db *pDb){
1445 return lsmMutexHeld(pDb->pEnv, pDb->pDatabase->pClientMutex);
1447 #endif
1449 static int slotIsUsable(ShmReader *p, i64 iLsm, u32 iShmMin, u32 iShmMax){
1450 return(
1451 p->iLsmId && p->iLsmId<=iLsm
1452 && shm_sequence_ge(iShmMax, p->iTreeId)
1453 && shm_sequence_ge(p->iTreeId, iShmMin)
1458 ** Obtain a read-lock on database version identified by the combination
1459 ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
1460 ** an LSM error code otherwise.
1462 int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
1463 int rc = LSM_OK;
1464 ShmHeader *pShm = db->pShmhdr;
1465 int i;
1467 assert( db->iReader<0 );
1468 assert( shm_sequence_ge(iShmMax, iShmMin) );
1470 /* This is a no-op if the read-only transaction flag is set. */
1471 if( db->bRoTrans ){
1472 db->iReader = 0;
1473 return LSM_OK;
1476 /* Search for an exact match. */
1477 for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1478 ShmReader *p = &pShm->aReader[i];
1479 if( p->iLsmId==iLsm && p->iTreeId==iShmMax ){
1480 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1481 if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iShmMax ){
1482 db->iReader = i;
1483 }else if( rc==LSM_BUSY ){
1484 rc = LSM_OK;
1489 /* Try to obtain a write-lock on each slot, in order. If successful, set
1490 ** the slot values to iLsm/iTree. */
1491 for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1492 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1493 if( rc==LSM_BUSY ){
1494 rc = LSM_OK;
1495 }else{
1496 ShmReader *p = &pShm->aReader[i];
1497 p->iLsmId = iLsm;
1498 p->iTreeId = iShmMax;
1499 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1500 assert( rc!=LSM_BUSY );
1501 if( rc==LSM_OK ) db->iReader = i;
1505 /* Search for any usable slot */
1506 for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1507 ShmReader *p = &pShm->aReader[i];
1508 if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
1509 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
1510 if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
1511 db->iReader = i;
1512 }else if( rc==LSM_BUSY ){
1513 rc = LSM_OK;
1518 if( rc==LSM_OK && db->iReader<0 ){
1519 rc = LSM_BUSY;
1521 return rc;
1525 ** This is used to check if there exists a read-lock locking a particular
1526 ** version of either the in-memory tree or database file.
1528 ** If iLsmId is non-zero, then it is a snapshot id. If there exists a
1529 ** read-lock using this snapshot or newer, set *pbInUse to true. Or,
1530 ** if there is no such read-lock, set it to false.
1532 ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
1533 ** Search for a read-lock using this sequence id or newer. etc.
1535 static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
1536 ShmHeader *pShm = db->pShmhdr;
1537 int i;
1538 int rc = LSM_OK;
1540 for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
1541 ShmReader *p = &pShm->aReader[i];
1542 if( p->iLsmId ){
1543 if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId)
1544 || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
1546 rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1547 if( rc==LSM_OK ){
1548 p->iLsmId = 0;
1549 lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1555 if( rc==LSM_BUSY ){
1556 *pbInUse = 1;
1557 return LSM_OK;
1559 *pbInUse = 0;
1560 return rc;
1564 ** This function is called by worker connections to determine the smallest
1565 ** snapshot id that is currently in use by a database client. The worker
1566 ** connection uses this result to determine whether or not it is safe to
1567 ** recycle a database block.
1569 static int firstSnapshotInUse(
1570 lsm_db *db, /* Database handle */
1571 i64 *piInUse /* IN/OUT: Smallest snapshot id in use */
1573 ShmHeader *pShm = db->pShmhdr;
1574 i64 iInUse = *piInUse;
1575 int i;
1577 assert( iInUse>0 );
1578 for(i=0; i<LSM_LOCK_NREADER; i++){
1579 ShmReader *p = &pShm->aReader[i];
1580 if( p->iLsmId ){
1581 i64 iThis = p->iLsmId;
1582 if( iThis!=0 && iInUse>iThis ){
1583 int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
1584 if( rc==LSM_OK ){
1585 p->iLsmId = 0;
1586 lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
1587 }else if( rc==LSM_BUSY ){
1588 iInUse = iThis;
1589 }else{
1590 /* Some error other than LSM_BUSY. Return the error code to
1591 ** the caller in this case. */
1592 return rc;
1598 *piInUse = iInUse;
1599 return LSM_OK;
1602 int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
1603 if( db->treehdr.iUsedShmid==iShmid ){
1604 *pbInUse = 1;
1605 return LSM_OK;
1607 return isInUse(db, 0, iShmid, pbInUse);
1610 int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
1611 if( db->pClient && db->pClient->iId<=iLsmId ){
1612 *pbInUse = 1;
1613 return LSM_OK;
1615 return isInUse(db, iLsmId, 0, pbInUse);
1619 ** This function may only be called after a successful call to
1620 ** lsmDbDatabaseConnect(). It returns true if the connection is in
1621 ** multi-process mode, or false otherwise.
1623 int lsmDbMultiProc(lsm_db *pDb){
1624 return pDb->pDatabase && pDb->pDatabase->bMultiProc;
1628 /*************************************************************************
1629 **************************************************************************
1630 **************************************************************************
1631 **************************************************************************
1632 **************************************************************************
1633 *************************************************************************/
1636 ** Ensure that database connection db has cached pointers to at least the
1637 ** first nChunk chunks of shared memory.
1639 int lsmShmCacheChunks(lsm_db *db, int nChunk){
1640 int rc = LSM_OK;
1641 if( nChunk>db->nShm ){
1642 static const int NINCR = 16;
1643 Database *p = db->pDatabase;
1644 lsm_env *pEnv = db->pEnv;
1645 int nAlloc;
1646 int i;
1648 /* Ensure that the db->apShm[] array is large enough. If an attempt to
1649 ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
1650 ** is always extended in multiples of 16 entries - so the actual allocated
1651 ** size can be inferred from nShm. */
1652 nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
1653 while( nChunk>=nAlloc ){
1654 void **apShm;
1655 nAlloc += NINCR;
1656 apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
1657 if( !apShm ) return LSM_NOMEM_BKPT;
1658 db->apShm = apShm;
1661 if( db->bRoTrans ){
1662 for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
1663 db->apShm[i] = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
1664 db->nShm++;
1667 }else{
1669 /* Enter the client mutex */
1670 lsmMutexEnter(pEnv, p->pClientMutex);
1672 /* Extend the Database objects apShmChunk[] array if necessary. Using the
1673 ** same pattern as for the lsm_db.apShm[] array above. */
1674 nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
1675 while( nChunk>=nAlloc ){
1676 void **apShm;
1677 nAlloc += NINCR;
1678 apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
1679 if( !apShm ){
1680 rc = LSM_NOMEM_BKPT;
1681 break;
1683 p->apShmChunk = apShm;
1686 for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
1687 if( i>=p->nShmChunk ){
1688 void *pChunk = 0;
1689 if( p->bMultiProc==0 ){
1690 /* Single process mode */
1691 pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
1692 }else{
1693 /* Multi-process mode */
1694 rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
1696 if( rc==LSM_OK ){
1697 p->apShmChunk[i] = pChunk;
1698 p->nShmChunk++;
1701 if( rc==LSM_OK ){
1702 db->apShm[i] = p->apShmChunk[i];
1703 db->nShm++;
1707 /* Release the client mutex */
1708 lsmMutexLeave(pEnv, p->pClientMutex);
1712 return rc;
1715 static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
1716 int rc = LSM_OK;
1717 if( p->bMultiProc ){
1718 rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
1720 return rc;
1724 ** Test if it would be possible for connection db to obtain a lock of type
1725 ** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
1726 ** would not be possible to obtain the lock due to a lock held by another
1727 ** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the
1728 ** lsm_env.xTestLock function), return some other LSM error code.
1730 ** Note that this function never actually locks the database - it merely
1731 ** queries the system to see if there exists a lock that would prevent
1732 ** it from doing so.
1734 int lsmShmTestLock(
1735 lsm_db *db,
1736 int iLock,
1737 int nLock,
1738 int eOp
1740 int rc = LSM_OK;
1741 lsm_db *pIter;
1742 Database *p = db->pDatabase;
1743 int i;
1744 u64 mask = 0;
1746 for(i=iLock; i<(iLock+nLock); i++){
1747 mask |= ((u64)1 << (iLock-1));
1748 if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
1751 lsmMutexEnter(db->pEnv, p->pClientMutex);
1752 for(pIter=p->pConn; pIter; pIter=pIter->pNext){
1753 if( pIter!=db && (pIter->mLock & mask) ){
1754 assert( pIter!=db );
1755 break;
1759 if( pIter ){
1760 rc = LSM_BUSY;
1761 }else if( p->bMultiProc ){
1762 rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
1765 lsmMutexLeave(db->pEnv, p->pClientMutex);
1766 return rc;
1770 ** Attempt to obtain the lock identified by the iLock and bExcl parameters.
1771 ** If successful, return LSM_OK. If the lock cannot be obtained because
1772 ** there exists some other conflicting lock, return LSM_BUSY. If some other
1773 ** error occurs, return an LSM error code.
1775 ** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
1776 ** or else a value returned by the LSM_LOCK_READER macro.
1778 int lsmShmLock(
1779 lsm_db *db,
1780 int iLock,
1781 int eOp, /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
1782 int bBlock /* True for a blocking lock */
1784 lsm_db *pIter;
1785 const u64 me = ((u64)1 << (iLock-1));
1786 const u64 ms = ((u64)1 << (iLock+32-1));
1787 int rc = LSM_OK;
1788 Database *p = db->pDatabase;
1790 assert( eOp!=LSM_LOCK_EXCL || p->bReadonly==0 );
1791 assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) );
1792 assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 );
1793 assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
1795 /* Check for a no-op. Proceed only if this is not one of those. */
1796 if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0)
1797 || (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms)
1798 || (eOp==LSM_LOCK_EXCL && (db->mLock & me)==0)
1800 int nExcl = 0; /* Number of connections holding EXCLUSIVE */
1801 int nShared = 0; /* Number of connections holding SHARED */
1802 lsmMutexEnter(db->pEnv, p->pClientMutex);
1804 /* Figure out the locks currently held by this process on iLock, not
1805 ** including any held by connection db. */
1806 for(pIter=p->pConn; pIter; pIter=pIter->pNext){
1807 assert( (pIter->mLock & me)==0 || (pIter->mLock & ms)!=0 );
1808 if( pIter!=db ){
1809 if( pIter->mLock & me ){
1810 nExcl++;
1811 }else if( pIter->mLock & ms ){
1812 nShared++;
1816 assert( nExcl==0 || nExcl==1 );
1817 assert( nExcl==0 || nShared==0 );
1818 assert( nExcl==0 || (db->mLock & (me|ms))==0 );
1820 switch( eOp ){
1821 case LSM_LOCK_UNLOCK:
1822 if( nShared==0 ){
1823 lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK);
1825 db->mLock &= ~(me|ms);
1826 break;
1828 case LSM_LOCK_SHARED:
1829 if( nExcl ){
1830 rc = LSM_BUSY;
1831 }else{
1832 if( nShared==0 ){
1833 rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED);
1835 if( rc==LSM_OK ){
1836 db->mLock |= ms;
1837 db->mLock &= ~me;
1840 break;
1842 default:
1843 assert( eOp==LSM_LOCK_EXCL );
1844 if( nExcl || nShared ){
1845 rc = LSM_BUSY;
1846 }else{
1847 rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL);
1848 if( rc==LSM_OK ){
1849 db->mLock |= (me|ms);
1852 break;
1855 lsmMutexLeave(db->pEnv, p->pClientMutex);
1858 return rc;
1861 #ifdef LSM_DEBUG
1863 int shmLockType(lsm_db *db, int iLock){
1864 const u64 me = ((u64)1 << (iLock-1));
1865 const u64 ms = ((u64)1 << (iLock+32-1));
1867 if( db->mLock & me ) return LSM_LOCK_EXCL;
1868 if( db->mLock & ms ) return LSM_LOCK_SHARED;
1869 return LSM_LOCK_UNLOCK;
1873 ** The arguments passed to this function are similar to those passed to
1874 ** the lsmShmLock() function. However, instead of obtaining a new lock
1875 ** this function returns true if the specified connection already holds
1876 ** (or does not hold) such a lock, depending on the value of eOp. As
1877 ** follows:
1879 ** (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
1880 ** (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
1881 ** (eOp==LSM_LOCK_EXCL) -> true if db has an EXCLUSIVE lock on iLock.
1883 int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
1884 int ret = 0;
1885 int eHave;
1887 assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
1888 assert( iLock<=16 );
1889 assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
1891 eHave = shmLockType(db, iLock);
1893 switch( eOp ){
1894 case LSM_LOCK_UNLOCK:
1895 ret = (eHave==LSM_LOCK_UNLOCK);
1896 break;
1897 case LSM_LOCK_SHARED:
1898 ret = (eHave!=LSM_LOCK_UNLOCK);
1899 break;
1900 case LSM_LOCK_EXCL:
1901 ret = (eHave==LSM_LOCK_EXCL);
1902 break;
1903 default:
1904 assert( !"bad eOp value passed to lsmShmAssertLock()" );
1905 break;
1908 return ret;
1911 int lsmShmAssertWorker(lsm_db *db){
1912 return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
1916 ** This function does not contribute to library functionality, and is not
1917 ** included in release builds. It is intended to be called from within
1918 ** an interactive debugger.
1920 ** When called, this function prints a single line of human readable output
1921 ** to stdout describing the locks currently held by the connection. For
1922 ** example:
1924 ** (gdb) call print_db_locks(pDb)
1925 ** (shared on dms2) (exclusive on writer)
1927 void print_db_locks(lsm_db *db){
1928 int iLock;
1929 for(iLock=0; iLock<16; iLock++){
1930 int bOne = 0;
1931 const char *azLock[] = {0, "shared", "exclusive"};
1932 const char *azName[] = {
1933 0, "dms1", "dms2", "writer", "worker", "checkpointer",
1934 "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
1936 int eHave = shmLockType(db, iLock);
1937 if( azLock[eHave] ){
1938 printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
1939 bOne = 1;
1942 printf("\n");
1944 void print_all_db_locks(lsm_db *db){
1945 lsm_db *p;
1946 for(p=db->pDatabase->pConn; p; p=p->pNext){
1947 printf("%s connection %p ", ((p==db)?"*":""), p);
1948 print_db_locks(p);
1951 #endif
1953 void lsmShmBarrier(lsm_db *db){
1954 lsmEnvShmBarrier(db->pEnv);
1957 int lsm_checkpoint(lsm_db *pDb, int *pnKB){
1958 int rc; /* Return code */
1959 u32 nWrite = 0; /* Number of pages checkpointed */
1961 /* Attempt the checkpoint. If successful, nWrite is set to the number of
1962 ** pages written between this and the previous checkpoint. */
1963 rc = lsmCheckpointWrite(pDb, &nWrite);
1965 /* If required, calculate the output variable (KB of data checkpointed).
1966 ** Set it to zero if an error occured. */
1967 if( pnKB ){
1968 int nKB = 0;
1969 if( rc==LSM_OK && nWrite ){
1970 nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
1972 *pnKB = nKB;
1975 return rc;