From 35ebe486757ad758715cbfa14dfd802462d81f5b Mon Sep 17 00:00:00 2001 From: prabatuty Date: Mon, 26 Oct 2009 19:07:59 +0000 Subject: [PATCH] code reorganisation phase-I --- include/Process.h | 2 +- src/storage/DatabaseManagerImpl.cxx | 8 +- src/storage/Process.cxx | 180 ++++++++++-------------------------- src/storage/SessionImpl.cxx | 10 +- src/storage/Transaction.cxx | 5 + src/tools/csqlserver.cxx | 31 +++---- 6 files changed, 83 insertions(+), 153 deletions(-) diff --git a/include/Process.h b/include/Process.h index 0dfb5571..bab82e14 100644 --- a/include/Process.h +++ b/include/Process.h @@ -50,7 +50,7 @@ class ThreadInfo pthread_t thrid_; - ThreadTrans thrTrans_[MAX_THREADS_PER_PROCESS]; //list of thread specific transactions + ThreadTrans thrTrans_; //list of thread specific transactions Mutex *want_; //single mutex which we are waiting for. diff --git a/src/storage/DatabaseManagerImpl.cxx b/src/storage/DatabaseManagerImpl.cxx index d0051f51..6cfc85ef 100644 --- a/src/storage/DatabaseManagerImpl.cxx +++ b/src/storage/DatabaseManagerImpl.cxx @@ -1766,13 +1766,13 @@ DbRetVal DatabaseManagerImpl::recover() rv = sysDb()->recoverSystemDB(); return rv; } + void DatabaseManagerImpl::sendSignal(int signal) { - ThreadInfo* pInfo = sysDb()->getThreadInfo(0); + ThreadInfo* tInfo = sysDb()->getThreadInfo(0); for (int i=0; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ !=0) os::kill(pInfo->pid_, signal); - pInfo++; + if (tInfo->pid_ !=0) os::kill(tInfo->pid_, signal); + tInfo++; } } - diff --git a/src/storage/Process.cxx b/src/storage/Process.cxx index b70f9001..dff0ec4b 100644 --- a/src/storage/Process.cxx +++ b/src/storage/Process.cxx @@ -34,7 +34,7 @@ void ThreadInfo::init() thrid_ =0; want_ = NULL; for (int i =0; i %x \n", has_[i]); printf(" \n"); printf(" \n"); - for (int i =0; i \n"); printf("\n"); @@ -81,15 +81,15 @@ DbRetVal ProcessManager::registerThread() pid_t pid; pid = os::getpid(); pthread_t thrid = os::getthrid(); - ThreadInfo* pInfo = systemDatabase->getThreadInfo(0); + ThreadInfo* tInfo = systemDatabase->getThreadInfo(0); int i=0; ThreadInfo* freeSlot = NULL; int freeSlotPos =0; bool freeSlotSelected = false; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ == 0 ) break; - pInfo++; + if (tInfo->pid_ == 0 ) break; + tInfo++; } if ( i == Conf::config.getMaxProcs()) { @@ -97,12 +97,10 @@ DbRetVal ProcessManager::registerThread() printError(ErrNoResource, "No free thread slot. Limit reached"); return ErrNoResource; } - //printf("Process slot used %d %x\n", i, pInfo); - //TODO::make the above debug message - //TODO:print it to the trace file - pInfo->init(); - pInfo->pid_ = pid; - pInfo->thrid_ = thrid; + logFiner(Conf::logger, "Process slot taken: %d", i); + tInfo->init(); + tInfo->pid_ = pid; + tInfo->thrid_ = thrid; procSlot = i; printDebug(DM_Process, "Process %d %lu registered with slot %d\n", pid, thrid, procSlot); systemDatabase->releaseProcessTableMutex(false); @@ -118,25 +116,7 @@ DbRetVal ProcessManager::deregisterThread(int procSlot) { printError(rv,"Unable to get process table mutex"); return rv; - }/* - pid_t pid = os::getpid(); - pthread_t thrid = os::getthrid(); - - ThreadInfo* pInfo = systemDatabase->getThreadInfo(0); - int i=0; - for (; i < Conf::config.getMaxProcs(); i++) - { - if (pInfo->pid_ == pid && pInfo->thrid_ == thrid) break; - pInfo++; } - - systemDatabase->releaseProcessTableMutex(false); - if (i == Conf::config.getMaxProcs()) - { - printError(ErrSysFatal, "Degistering process %d is not registered with csql", pid); - return ErrNoResource; - }*/ - ThreadInfo* pInfo = systemDatabase->getThreadInfo(procSlot); Transaction *trans = ProcessManager::getThreadTransaction(procSlot); if (NULL != trans) { @@ -146,97 +126,65 @@ DbRetVal ProcessManager::deregisterThread(int procSlot) } trans->status_ = TransNotUsed; } - if (pInfo->want_ != NULL) + ThreadInfo* tInfo = systemDatabase->getThreadInfo(procSlot); + if (tInfo->want_ != NULL) { - printError(ErrSysFatal, "Probable data corruption.wants_ is not null\n"); + printError(ErrSysFatal, "Fatal:wants_ is not null\n"); systemDatabase->releaseProcessTableMutex(false); return ErrSysFatal; } for (int muti = 0 ;muti < MAX_MUTEX_PER_THREAD; muti++) { - if (pInfo->has_[muti] != NULL) + if (tInfo->has_[muti] != NULL) { - printError(ErrSysFatal, "Probable data corruption.some mutexes are not freed %x \n", pInfo->has_[muti] ); - pInfo->has_[muti]->print(); - pInfo->has_[muti]->releaseLock(procSlot); + printError(ErrSysFatal, "Fatal:Some mutexes are not freed %x\n",tInfo->has_[muti] ); + tInfo->has_[muti]->print(); + tInfo->has_[muti]->releaseLock(procSlot); systemDatabase->releaseProcessTableMutex(false); return ErrSysFatal; } } - printDebug(DM_Process, "Process %d %lu deregistered slot %d\n", pInfo->pid_, pInfo->thrid_, procSlot); - - //printf("Slot freed %d %x %d %lu\n", i, pInfo, pid, thrid); - pInfo->init(); + printDebug(DM_Process, "Process %d %lu deregistered slot %d\n", tInfo->pid_, tInfo->thrid_, procSlot); + logFiner(Conf::logger, "ProcSlot Freed %d", procSlot); + tInfo->init(); systemDatabase->releaseProcessTableMutex(false); return OK; } DbRetVal ProcessManager::addMutex(Mutex *mut, int pslot) { - //pid_t pid = os::getpid(); - //pthread_t thrid = os::getthrid(); - if (systemDatabase == NULL) - { - return OK; - } - ThreadInfo* pInfo = systemDatabase->getThreadInfo(pslot); - int i=0; - /*for (; i < Conf::config.getMaxProcs(); i++) - { - if (pInfo->pid_ == pid && pInfo->thrid_ == thrid) break; - pInfo++; - } - if (i == Conf::config.getMaxProcs()) - { - printError(ErrSysFatal, "Logical Error pid %d thrid %lu not found in procTable while adding mutex %s", pid, thrid, mut->name); - return ErrSysFatal; - }*/ + if (systemDatabase == NULL) return OK; + ThreadInfo* tInfo = systemDatabase->getThreadInfo(pslot); for (int i = 0 ;i < MAX_MUTEX_PER_THREAD; i++) { - if (pInfo->has_[i] == NULL) + if (tInfo->has_[i] == NULL) { - pInfo->has_[i] = mut; + tInfo->has_[i] = mut; printDebug(DM_Process, "procSlot %d acquiring %d mutex %x %s\n", pslot, i, mut, mut->name); + logFinest(Conf::logger, "acquiring mutex %x %s", mut, mut->name); return OK; } } printError(ErrSysInternal, "All slots are full. Reached per thread mutex limit."); - //printStackTrace(); for (int i = 0 ;i < MAX_MUTEX_PER_THREAD; i++) { - printError(ErrWarning, "mutex %d %x", i, pInfo->has_[i]); - pInfo->has_[i]->print(); + printError(ErrWarning, "mutex %d %x", i, tInfo->has_[i]); + tInfo->has_[i]->print(); } return ErrSysInternal; } DbRetVal ProcessManager::removeMutex(Mutex *mut, int pslot) { - //pid_t pid = os::getpid(); - //pthread_t thrid = os::getthrid(); - if (systemDatabase == NULL) - { - return OK; - } - - ThreadInfo* pInfo = systemDatabase->getThreadInfo(pslot); - int i=0; - /*for (; i < Conf::config.getMaxProcs(); i++) - { - if (pInfo->pid_ == pid && pInfo->thrid_ == thrid) break; - pInfo++; - } - if (i == Conf::config.getMaxProcs()) - { - printError(ErrSysFatal, "Logical Error pid %d thrid %lu not found in procTable", pid, thrid); - return ErrSysFatal; - }*/ + if (systemDatabase == NULL) return OK; + ThreadInfo* tInfo = systemDatabase->getThreadInfo(pslot); for (int i = 0 ;i < MAX_MUTEX_PER_THREAD; i++) { - if (pInfo->has_[i] == mut) + if (tInfo->has_[i] == mut) { - pInfo->has_[i] = NULL; + tInfo->has_[i] = NULL; printDebug(DM_Process, "procSlot %d releasing %d mutex %x %s\n", pslot, i, mut, mut->name); + logFinest(Conf::logger, "releasing mutex %x %d", mut, mut->name); return OK; } } @@ -246,39 +194,24 @@ DbRetVal ProcessManager::removeMutex(Mutex *mut, int pslot) DbRetVal ProcessManager::setThreadTransaction(Transaction *trans, int pslot) { + if (systemDatabase == NULL) return OK; pid_t pid = os::getpid(); pthread_t thrid = os::getthrid(); - if (systemDatabase == NULL) - { - return OK; - } - ThreadInfo* pInfo = systemDatabase->getThreadInfo(pslot); - int i=0; - - for (int i = 0 ;i < MAX_THREADS_PER_PROCESS; i++) - { - if (pInfo->thrTrans_[i].pid_ != 0) continue; - } - if (i == MAX_THREADS_PER_PROCESS) - { - printError(ErrSysInternal, "Max thread limit reached."); - return ErrSysInternal; - } - pInfo->thrTrans_[i].pid_ = pid; - pInfo->thrTrans_[i].thrid_ = thrid; - pInfo->thrTrans_[i].trans_ = trans; + ThreadInfo* tInfo = systemDatabase->getThreadInfo(pslot); + tInfo->thrTrans_.pid_ = pid; + tInfo->thrTrans_.thrid_ = thrid; + tInfo->thrTrans_.trans_ = trans; printDebug(DM_Process, "procSlot %d: pid: %d thrid: %lu is set to use trans %x\n", pslot, pid, thrid, trans); - //pInfo->trans_ = trans; return OK; } Transaction* ProcessManager::getThreadTransaction(int pslot) { - ThreadInfo* pInfo = systemDatabase->getThreadInfo(pslot); - return pInfo->thrTrans_[0].trans_; + ThreadInfo* tInfo = systemDatabase->getThreadInfo(pslot); + return tInfo->thrTrans_.trans_; } Transaction** ProcessManager::getThreadTransAddr(int pslot) @@ -290,22 +223,11 @@ Transaction** ProcessManager::getThreadTransAddr(int pslot) return NULL; } - ThreadInfo* pInfo = systemDatabase->getThreadInfo(pslot); - int i=0; - - for (int i = 0 ;i < MAX_THREADS_PER_PROCESS; i++) - { - if (pInfo->thrTrans_[i].pid_ == pid && pInfo->thrTrans_[i].thrid_ == thrid) break; - } - if (i == MAX_THREADS_PER_PROCESS) - { - printDebug(DM_Process, "Thread specific trans could not be found in list"); - return NULL; - } + ThreadInfo* tInfo = systemDatabase->getThreadInfo(pslot); printDebug(DM_Process, "procSlot %d: pid: %d thrid: %lu is returning trans %x\n", pslot, - pid, thrid, pInfo->thrTrans_[i].trans_); - return &pInfo->thrTrans_[i].trans_; + pid, thrid, tInfo->thrTrans_.trans_); + return &tInfo->thrTrans_.trans_; } @@ -313,12 +235,12 @@ Transaction** ProcessManager::getThreadTransAddr(int pslot) void ProcessManager::printUsageStatistics() { - ThreadInfo* pInfo = systemDatabase->getThreadInfo(0); + ThreadInfo* tInfo = systemDatabase->getThreadInfo(0); int i=0, usedCount =0 , freeCount =0; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ != 0 ) usedCount++; else freeCount++; - pInfo++; + if (tInfo->pid_ != 0 ) usedCount++; else freeCount++; + tInfo++; } printf("\n"); printf(" %d \n", usedCount); @@ -330,29 +252,29 @@ void ProcessManager::printUsageStatistics() void ProcessManager::printDebugInfo() { printf("\n"); - ThreadInfo* pInfo = systemDatabase->getThreadInfo(0); + ThreadInfo* tInfo = systemDatabase->getThreadInfo(0); int i=0, usedCount =0 , freeCount =0; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ != 0 ) {pInfo->print(); usedCount++;} else freeCount++; - pInfo++; + if (tInfo->pid_ != 0 ) {tInfo->print(); usedCount++;} else freeCount++; + tInfo++; } printf(" %d \n", usedCount); printf(" %d \n", freeCount); printf("\n"); } - +//caller is expected to take proc mutex bool ProcessManager::isAnyOneRegistered() { //the process which calls this will have an entry in proc table. //so checking for 1 - ThreadInfo* pInfo = systemDatabase->getThreadInfo(0); + ThreadInfo* tInfo = systemDatabase->getThreadInfo(0); int i=0, usedCount =0; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ != 0 ) usedCount++; - pInfo++; + if (tInfo->pid_ != 0 ) usedCount++; + tInfo++; } if (usedCount == 1) return false; else return true; } diff --git a/src/storage/SessionImpl.cxx b/src/storage/SessionImpl.cxx index 25d7549b..6de27404 100644 --- a/src/storage/SessionImpl.cxx +++ b/src/storage/SessionImpl.cxx @@ -177,12 +177,17 @@ DbRetVal SessionImpl::authenticate(const char *username, const char *password) } DbRetVal SessionImpl::getExclusiveLock() { + DbRetVal rv = dbMgr->sysDb()->getProcessTableMutex(true); + if (OK != rv) { + printError(ErrLockTimeOut, "Unable to acquire proc table mutex"); + return rv; + } if (dbMgr->isAnyOneRegistered()) { printError(ErrLockTimeOut, "Unable to acquire exclusive lock. somebody is connected"); + dbMgr->sysDb()->releaseProcessTableMutex(true); return ErrLockTimeOut; } - DbRetVal rv = dbMgr->sysDb()->getProcessTableMutex(true); - if (OK == rv) isXTaken = true; + isXTaken = true; return rv; } DbRetVal SessionImpl::close() @@ -205,6 +210,7 @@ DbRetVal SessionImpl::close() delete uMgr; uMgr = NULL; } + isXTaken = false; return OK; } diff --git a/src/storage/Transaction.cxx b/src/storage/Transaction.cxx index 8c8d3cac..33293a77 100644 --- a/src/storage/Transaction.cxx +++ b/src/storage/Transaction.cxx @@ -61,6 +61,7 @@ DbRetVal Transaction::insertIntoHasList(Database *sysdb, LockHashNode *node) while (NULL != it->next_) { it = it->next_; } it->next_ = hasNode; printDebug(DM_Transaction, "Added to hasLockList at end:%x",it); + logFinest(Conf::logger, "Added locknode:%x to hasLockList", hasNode->node_); return OK; } @@ -80,6 +81,8 @@ DbRetVal Transaction::removeFromHasList(Database *sysdb, void *tuple) prev->next_ = iter->next_; chunk->free(sysdb, iter); if (iter == hasLockList_) hasLockList_ = NULL; + logFinest(Conf::logger, "Removed locknode:%x from hasLockList", + iter->node_); return OK; } prev = iter; @@ -111,6 +114,7 @@ DbRetVal Transaction::releaseAllLocks(LockManager *lockManager_) prev = iter; iter = iter->next_; printDebug(DM_Transaction, "Releasing lock %x",prev->node_->ptrToTuple_); + logFinest(Conf::logger, "Releasing lock for tuple:%x",prev->node_->ptrToTuple_); rv = lockManager_->releaseLock(prev->node_->ptrToTuple_); chunk->free(sysdb, prev); } @@ -301,6 +305,7 @@ DbRetVal Transaction::applyUndoLogs(Database *sysdb) UndoLogInfo *logInfo = NULL; while(NULL != (logInfo = popUndoLog())) { + logFinest(Conf::logger, "Apply undo log type:%d", logInfo->opType_); switch(logInfo->opType_) { case InsertOperation: diff --git a/src/tools/csqlserver.cxx b/src/tools/csqlserver.cxx index b3792e88..e1e09ec7 100644 --- a/src/tools/csqlserver.cxx +++ b/src/tools/csqlserver.cxx @@ -74,14 +74,11 @@ DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info ) } TransactionManager *tm = new TransactionManager(); LockManager *lm = new LockManager(sysdb); - for (int i = 0 ;i < MAX_THREADS_PER_PROCESS; i++) + if (info->thrTrans_.trans_ != NULL && info->thrTrans_.trans_->status_ == TransRunning) { - if (info->thrTrans_[i].trans_ != NULL && info->thrTrans_[i].trans_->status_ == TransRunning) - { - printf("Rollback Transaction %x\n", info->thrTrans_[i].trans_); - tm->rollback(lm, info->thrTrans_[i].trans_); - info->thrTrans_[i].trans_->status_ = TransNotUsed; - } + printf("Rollback Transaction %x\n", info->thrTrans_.trans_); + tm->rollback(lm, info->thrTrans_.trans_); + info->thrTrans_.trans_->status_ = TransNotUsed; } info->init(); delete tm; @@ -102,14 +99,14 @@ DbRetVal cleanupDeadProcs(Database *sysdb) pthread_t thrid = os::getthrid(); - ThreadInfo* pInfo = sysdb->getThreadInfo(0); + ThreadInfo* tInfo = sysdb->getThreadInfo(0); int i=0; ThreadInfo* freeSlot = NULL; for (; i < Conf::config.getMaxProcs(); i++) { //check whether it is alive - if (pInfo->pid_ !=0 && checkDead(pInfo->pid_)) releaseAllResources(sysdb, pInfo); - pInfo++; + if (tInfo->pid_ !=0 && checkDead(tInfo->pid_)) releaseAllResources(sysdb, tInfo); + tInfo++; } sysdb->releaseProcessTableMutex(false); return OK; @@ -124,19 +121,19 @@ DbRetVal logActiveProcs(Database *sysdb) printError(rv,"Unable to get process table mutex"); return rv; } - ThreadInfo* pInfo = sysdb->getThreadInfo(0); + ThreadInfo* tInfo = sysdb->getThreadInfo(0); int i=0, count =0; ThreadInfo* freeSlot = NULL; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ !=0 ) { - logFine(Conf::logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_); - printf("Client process with pid %d is still registered\n", pInfo->pid_); - if( pInfo->pid_ != asyncpid && pInfo->pid_ != cachepid && - pInfo->pid_ != sqlserverpid) + if (tInfo->pid_ !=0 ) { + logFine(Conf::logger, "Registered Procs: %d %lu\n", tInfo->pid_, tInfo->thrid_); + printf("Client process with pid %d is still registered\n", tInfo->pid_); + if( tInfo->pid_ != asyncpid && tInfo->pid_ != cachepid && + tInfo->pid_ != sqlserverpid) count++; } - pInfo++; + tInfo++; } sysdb->releaseProcessTableMutex(false); if (count) return ErrSysInternal; else return OK; -- 2.11.4.GIT