From a5ed721380d8e1cad19f6353d0a4358d63b9303f Mon Sep 17 00:00:00 2001 From: prabatuty Date: Fri, 2 Apr 2010 15:58:34 +0000 Subject: [PATCH] allocator fixes --- include/Allocator.h | 2 +- include/Lock.h | 1 + include/os.h | 1 + src/storage/Chunk.cxx | 198 ++++++++++++++++++++++++++------------------ src/storage/LockManager.cxx | 46 +++++++--- 5 files changed, 156 insertions(+), 92 deletions(-) diff --git a/include/Allocator.h b/include/Allocator.h index 41a74786..c97a99b9 100644 --- a/include/Allocator.h +++ b/include/Allocator.h @@ -198,7 +198,7 @@ class Chunk int splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pslot, DbRetVal *status); void* varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *status); void freeForLargeAllocator(void *ptr, int pslot); - void freeForVarSizeAllocator(void *ptr, int pslot); + void freeForVarSizeAllocator(Database *db, void *ptr, int pslot); void* allocateForLargeDataSize(Database *db); void* allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *status); diff --git a/include/Lock.h b/include/Lock.h index 0dfe8e86..9c150795 100644 --- a/include/Lock.h +++ b/include/Lock.h @@ -91,6 +91,7 @@ class LockManager DbRetVal getExclusiveLock(void * tuple, Transaction **trans); DbRetVal releaseLock(void *tuple); DbRetVal isExclusiveLocked(void *tuple, Transaction **trans, bool &status); + DbRetVal getBucketMutex(Bucket *bucket, int procslot); void printUsageStatistics(); void printDebugInfo(); }; diff --git a/include/os.h b/include/os.h index d5f40e44..f8281dbf 100644 --- a/include/os.h +++ b/include/os.h @@ -115,6 +115,7 @@ enum MapMode #define CHUNK_NAME_LEN 64 #define LOG_ROLLOVER_SIZE 20*1024*1024 #define SIGCSQL1 SIGUSR1 +#define MIN_VARCHAR_ALLOC_SIZE 30 #define BIT(x) (1 << (x)) #define SETBITS(x,y) ((x) |= (y)) diff --git a/src/storage/Chunk.cxx b/src/storage/Chunk.cxx index 3eff1131..a6e75880 100644 --- a/src/storage/Chunk.cxx +++ b/src/storage/Chunk.cxx @@ -153,6 +153,7 @@ void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *st //printError(ErrLockTimeOut, "Unable to allocate from first page. Retry..."); return NULL; } + setPageDirty(pageIter); return data + sizeof(InUse); } @@ -195,6 +196,7 @@ void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status) pInfo->setPageAsFree(); printDebug(DM_Warning, "Unable to get lock to set chunk list."); *status = ErrLockTimeOut; + db->releaseAllocDatabaseMutex(); return NULL; } @@ -208,6 +210,7 @@ void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status) if (retVal !=0) printError(ErrSysFatal, "Fatal: Unable to reset the nextPage"); printDebug(DM_Warning, "Unable to get lock to set curPage"); *status = ErrLockTimeOut; + db->releaseAllocDatabaseMutex(); return NULL; } @@ -244,6 +247,8 @@ void* Chunk::allocate(Database *db, DbRetVal *status) return data; } + //Chunk Mutex is not required as atomic instructions are instead used. + //This improves the concurrency /*int ret = getChunkMutex(db->procSlot); if (ret != 0) { @@ -277,10 +282,12 @@ void* Chunk::allocate(Database *db, DbRetVal *status) if(ret !=0) { *status = ErrLockTimeOut; printDebug(DM_Warning, "Unable to set hasFreespace"); + //releaseChunkMutex(db->procSlot); return NULL; } *status = OK; data = (char*) allocateFromFirstPage(db, noOfDataNodes, status); + //releaseChunkMutex(db->procSlot); if (NULL == data && *status != ErrLockTimeOut) { *status = OK; @@ -291,21 +298,23 @@ void* Chunk::allocate(Database *db, DbRetVal *status) *status = ErrNoMemory; } } - setPageDirty(db, data); + if (*status == OK) setPageDirty(db, data); return data; } //*((InUse*)data) = 1; #if defined(__sparcv9) - int ret = Mutex::CASL((InUse*)data , 0, 1); + int retVal = Mutex::CASL((InUse*)data , 0, 1); #else - int ret = Mutex::CAS((InUse*)data , 0, 1); + int retVal = Mutex::CAS((InUse*)data , 0, 1); #endif - if(ret !=0) { + if(retVal !=0) { *status = ErrLockTimeOut; + //releaseChunkMutex(db->procSlot); printDebug(DM_Warning, "Unable to set isUsed : retry..."); return NULL; } setPageDirty(db, data); + //releaseChunkMutex(db->procSlot); return data + sizeof(InUse); } @@ -315,7 +324,7 @@ void Chunk::setPageDirty(Database *db, void *ptr) PageInfo *pageInfo = getPageInfo(db, ptr); if (NULL == pageInfo) { - printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo ); + printError(ErrSysFatal,"Fatal: pageInfo is NULL %x", ptr ); return; } SETBIT(pageInfo->flags, IS_DIRTY); @@ -365,7 +374,7 @@ void* Chunk::allocateForLargeDataSize(Database *db, size_t size) int oldVal = pageInfo->flags; int newVal = oldVal; CLEARBIT(newVal, HAS_SPACE); - SETBIT(newVal, IS_DIRTY); + setPageDirty(pageInfo); ret = Mutex::CAS(&pageInfo->flags, oldVal, newVal); if (ret !=0) printError(ErrSysFatal, "Unable to set flags"); return data + sizeof(InUse); @@ -382,7 +391,7 @@ void* Chunk::allocateForLargeDataSize(Database *db, size_t size) return NULL; } pageInfo->isUsed_=1; - SETBIT(pageInfo->flags, IS_DIRTY); + setPageDirty(pageInfo); return (char *) varInfo + sizeof(VarSizeInfo); } //REDESIGN MAY BE REQUIRED:Lets us live with this for now. @@ -424,6 +433,7 @@ void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, Db PageInfo *pInfo = (PageInfo*) newPage; pInfo->setPageAsUsed(0); setPageDirty(pInfo); + SETBIT(pInfo->flags, HAS_SPACE); if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot)) { printError(ErrSysFatal, "Split failed in new page...Should never happen"); @@ -470,10 +480,13 @@ void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv) VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo)); if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; } - while ((char*) varInfo < ((char*)page + PAGE_SIZE)) + int pageSize = PAGE_SIZE; + bool hasSpace= false; + while ((char*) varInfo < ((char*)page + pageSize)) { if (0 == varInfo->isUsed_) { + hasSpace= true; if( size + sizeof(VarSizeInfo) < varInfo->size_) { if (1 == splitDataBucket(varInfo, size, pslot, rv)) @@ -504,6 +517,9 @@ void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv) varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo) +varInfo->size_); } + if (!hasSpace) CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE); + if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE) + CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE); releaseChunkMutex(pslot); return NULL; } @@ -564,13 +580,18 @@ void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv) Page *page = ((PageInfo*)firstPage_); size_t alignedSize = os::alignLong(size); if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; } + int pageSize = PAGE_SIZE; + bool hasSpace=false; while(NULL != page) { - VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo)); - while ((char*) varInfo < ((char*)page + PAGE_SIZE)) + VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo)); + hasSpace=false; + if (BITSET(((PageInfo*)page)->flags, HAS_SPACE)){ + while ((char*) varInfo < ((char*)page + pageSize)) { if (0 == varInfo->isUsed_) { + hasSpace=true; if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_) { if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv)) @@ -599,21 +620,25 @@ void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv) varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo) +varInfo->size_); } - printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page); - page = ((PageInfo*) page)->nextPage_; + if (!hasSpace) CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE); + if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE) + CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE); + } + printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page); + page = ((PageInfo*) page)->nextPage_; } releaseChunkMutex(pslot); return NULL; } -void Chunk::freeForVarSizeAllocator(void *ptr, int pslot) +void Chunk::freeForVarSizeAllocator(Database *db, void *ptr, int pslot) { - /*int ret = getChunkMutex(pslot); + int ret = getChunkMutex(pslot); if (ret != 0) { printError(ErrLockTimeOut,"Unable to acquire chunk Mutex"); return; - }*/ + } VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo)); //varInfo->isUsed_ = 0; if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) { @@ -630,23 +655,31 @@ void Chunk::freeForVarSizeAllocator(void *ptr, int pslot) if (!found) { printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo ); + releaseChunkMutex(pslot); return ; } if(curPage_== pageInfo) {curPage_ = prev ; } pageInfo->isUsed_ = 0; pageInfo->nextPageAfterMerge_ = NULL; - CLEARBIT(pageInfo->flags, IS_DIRTY); + setPageDirty(pageInfo); SETBIT(pageInfo->flags, HAS_SPACE); prev->nextPage_ = pageInfo->nextPage_; } - int ret = Mutex::CAS((int*)&varInfo->isUsed_, 1, 0); - if(ret !=0) { + int retVal = Mutex::CAS((int*)&varInfo->isUsed_, 1, 0); + if(retVal !=0) { printError(ErrAlready, "Fatal: Varsize double free for %x", ptr); } - //TODO - //setPageDirty(ptr); + PageInfo *pageInfo = getPageInfo(db, ptr); + if (NULL == pageInfo) + { + printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo ); + releaseChunkMutex(db->procSlot); + return; + } + SETBIT(pageInfo->flags, HAS_SPACE); + SETBIT(pageInfo->flags, IS_DIRTY); printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo); - //releaseChunkMutex(pslot); + releaseChunkMutex(pslot); return; } @@ -694,17 +727,17 @@ void Chunk::freeForLargeAllocator(void *ptr, int pslot) ret = Mutex::CASL((long*)&firstPage_, (long) firstPage_, (long)pageInfo->nextPage_); if (ret != 0) printError(ErrSysFatal, "Unable to set firstPage"); - SETBIT(((PageInfo*)firstPage_)->flags , IS_DIRTY); + setPageDirty(((PageInfo*)firstPage_)); } else { //prev->nextPage_ = pageInfo->nextPage_; ret = Mutex::CASL((long*)&prev->nextPage_, (long) prev->nextPage_, (long)pageInfo->nextPage_); if (ret != 0) printError(ErrSysFatal, "Unable to set nextPage"); - SETBIT(prev->flags, IS_DIRTY); + setPageDirty(prev); } } - SETBIT(pageInfo->flags, IS_DIRTY); + setPageDirty(pageInfo); releaseChunkMutex(pslot); return; } @@ -714,7 +747,7 @@ void Chunk::free(Database *db, void *ptr) { if (0 == allocSize_) { - freeForVarSizeAllocator(ptr, db->procSlot); + freeForVarSizeAllocator(db, ptr, db->procSlot); return; } int noOfDataNodes =(int) os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_); @@ -730,21 +763,23 @@ void Chunk::free(Database *db, void *ptr) printError(ErrLockTimeOut,"Unable to acquire chunk Mutex"); return; }*/ - //below is the code for freeing in fixed size allocator - - //unset the used flag + //unset the used flag //*((int*)ptr -1 ) = 0; + int oldValue=1; if (*((InUse*)ptr -1 ) == 0) { - printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d", ptr, chunkID_); + printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d value:%d", ptr, chunkID_, + *((InUse*)ptr -1 )); + oldValue=0; //return; } #if defined(__sparcv9) - int ret = Mutex::CASL(((InUse*)ptr -1), 1, 0); + int retVal = Mutex::CASL(((InUse*)ptr -1), oldValue, 0); #else - int ret = Mutex::CAS(((InUse*)ptr -1), 1, 0); + int retVal = Mutex::CAS(((InUse*)ptr -1), oldValue, 0); #endif - if(ret !=0) { + if(retVal !=0) { printError(ErrSysFatal, "Unable to get lock to free for %x", ptr); + //releaseChunkMutex(db->procSlot); return; } PageInfo *pageInfo; @@ -759,11 +794,11 @@ void Chunk::free(Database *db, void *ptr) int oldVal = pageInfo->flags; int newVal = oldVal; SETBIT(newVal, HAS_SPACE); - SETBIT(newVal, IS_DIRTY); - ret = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal); - if(ret !=0) { + retVal = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal); + if(retVal !=0) { printError(ErrSysFatal, "Unable to get lock to set flags"); } + setPageDirty(pageInfo); //releaseChunkMutex(db->procSlot); return; } @@ -851,57 +886,62 @@ int Chunk::compact(int procSlot) { PageInfo* pageInfo = ((PageInfo*)firstPage_); PageInfo* prevPage = pageInfo; - if (NULL == pageInfo) return 0; + if (NULL == pageInfo) + { + return 0; + } int ret = getChunkMutex(procSlot); - if (ret != 0) { + if (ret != 0) + { printError(ErrLockTimeOut,"Unable to acquire chunk Mutex"); return ret; } pageInfo = (PageInfo*)pageInfo->nextPage_; - if (0 == allocSize_) { - while( pageInfo != NULL ) { - bool flag = false; - VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + - sizeof(PageInfo)); - while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE)) { - if (1 == varInfo->isUsed_) {flag=true; break;} - varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo) - +varInfo->size_); - } - if (!flag) { - printDebug(DM_VarAlloc, - "Freeing unused page in varsize allocator %x\n", pageInfo); - prevPage->nextPage_ = pageInfo->nextPage_; - pageInfo->isUsed_ = 0; - pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_); - } else { - prevPage = pageInfo; - pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_); - } - printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo); + if (0 == allocSize_) + { + while( pageInfo != NULL ) + { + bool flag = false; + VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + + sizeof(PageInfo)); + while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE)) + { + if (1 == varInfo->isUsed_) {flag=true; break;} + varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo) + +varInfo->size_); } - } else if (allocSize_ < PAGE_SIZE) { - while( pageInfo != NULL ) { - bool flag = false; - int noOfDataNodes=(int) - os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_); - char *data = ((char*)pageInfo) + sizeof(PageInfo); - for (int i = 0; i< noOfDataNodes ; i++) { - if (1 == *((InUse*)data)) { flag = true; break; } - data = data +allocSize_; - } - if (!flag) { - printDebug(DM_Alloc, - "Freeing unused page in fixed allocator %x\n", pageInfo); - prevPage->nextPage_ = pageInfo->nextPage_; - pageInfo->isUsed_ = 0; - pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_); - } else { - prevPage = pageInfo; - pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ; - } - printDebug(DM_Alloc,"compact iter %x\n", pageInfo); + if (!flag) { + printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo); + prevPage->nextPage_ = pageInfo->nextPage_; + pageInfo->isUsed_ = 0; } + prevPage = pageInfo; + pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ; + printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo); + } + }else if (allocSize_ < PAGE_SIZE) + { + while( pageInfo != NULL ) + { + bool flag = false; + int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_); + char *data = ((char*)pageInfo) + sizeof(PageInfo); + for (int i = 0; i< noOfDataNodes ; i++) + { + if (1 == *((InUse*)data)) { flag = true; break; } + data = data +allocSize_; + } + if (!flag) { + printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo); + prevPage->nextPage_ = pageInfo->nextPage_; + pageInfo->isUsed_ = 0; + pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_) ; + }else{ + prevPage = pageInfo; + pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ; + } + printDebug(DM_Alloc,"compact iter %x\n", pageInfo); + } } releaseChunkMutex(procSlot); return 0; diff --git a/src/storage/LockManager.cxx b/src/storage/LockManager.cxx index 2393488e..7b2aea3e 100644 --- a/src/storage/LockManager.cxx +++ b/src/storage/LockManager.cxx @@ -275,8 +275,8 @@ DbRetVal LockManager::getSharedLock(void *tuple, Transaction **trans) printError(ErrLockTimeOut, "Unable to add to hasList : Timeout. Retry.."); return ErrLockTimeOut; } - int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); - if (lockRet != 0) + DbRetVal lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); + if (lockRet != OK) { printError(ErrLockTimeOut, "Unable to acquire bucket mutex"); if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple); @@ -325,7 +325,7 @@ DbRetVal LockManager::getSharedLock(void *tuple, Transaction **trans) //printDebug(DM_Lock, "Trying to get mutex: for bucket %x\n", bucket); while (tries < Conf::config.getLockRetries()) { - /*lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + /*lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock"); @@ -434,7 +434,7 @@ DbRetVal LockManager::getExclusiveLock(void *tuple, Transaction **trans) //to acquire lock so for sure we will get it. Bucket *bucket = getLockBucket(tuple); - /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printDebug(DM_Lock, "Unable to acquire bucket mutex:May be deadlock"); @@ -536,7 +536,7 @@ DbRetVal LockManager::getExclusiveLock(void *tuple, Transaction **trans) printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry.."); return ErrLockTimeOut; } - int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printError(ErrLockTimeOut, "Unable to acquire bucket mutex"); @@ -584,7 +584,7 @@ DbRetVal LockManager::getExclusiveLock(void *tuple, Transaction **trans) while (tries < Conf::config.getLockRetries()) { - /*lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + /*lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printError(ErrLockTimeOut, "Unable to get bucket mutex"); @@ -736,7 +736,7 @@ DbRetVal LockManager::releaseLock(void *tuple) printDebug(DM_Lock, "LockManager:releaseLock Start"); Bucket *bucket = getLockBucket(tuple); printDebug(DM_Lock,"Bucket is %x", bucket); - /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock"); @@ -776,8 +776,8 @@ DbRetVal LockManager::releaseLock(void *tuple) { //TODO::above condition is not atomic //put waitReaders_, WaitReaders in one integer - int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); - if (lockRet == 0) { + DbRetVal lockRet = getBucketMutex(bucket,systemDatabase_->procSlot); + if (lockRet == OK) { int tries = Conf::config.getMutexRetries(); do { rv = deallocLockNode(iter, bucket); @@ -812,8 +812,8 @@ DbRetVal LockManager::releaseLock(void *tuple) } if (iter->lInfo_.waitWriters_ == 0 || iter->lInfo_.waitReaders_ ==0) { - int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); - if (lockRet == 0) { + DbRetVal lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); + if (lockRet == OK) { int tries = Conf::config.getMutexRetries(); do { rv = deallocLockNode(iter, bucket); @@ -872,7 +872,7 @@ DbRetVal LockManager::isExclusiveLocked(void *tuple, Transaction **trans, bool & status = false; return OK; } - /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot); + /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot); if (lockRet != 0) { printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock"); @@ -994,3 +994,25 @@ DbRetVal LockManager::deallocLockNode(LockHashNode *node, Bucket *bucket) chunk->free(systemDatabase_, node); return OK; } +DbRetVal LockManager::getBucketMutex(Bucket *bucket, int procSlot) +{ + struct timeval timeout, timeval; + timeout.tv_sec = Conf::config.getMutexSecs(); + timeout.tv_usec = Conf::config.getMutexUSecs(); + int tries=0; + int totalTries = Conf::config.getMutexRetries() *2; + int ret =0; + while (tries < totalTries) + { + ret = bucket->mutex_.getLock(procSlot, true); + if (ret == 0) break; + timeval.tv_sec = timeout.tv_sec; + timeval.tv_usec = timeout.tv_usec; + os::select(0, 0, 0, 0, &timeval); + tries++; + } + if (tries >= totalTries) return ErrLockTimeOut; + return OK; + +} + -- 2.11.4.GIT