From 914249895b2b9764f0a3cc9ef3018aa85046880c Mon Sep 17 00:00:00 2001 From: kishoramballi Date: Sat, 23 Jan 2010 08:48:06 +0000 Subject: [PATCH] Embedded support for CSQL. --- buildmmdb.ksh | 1 + include/SessionImpl.h | 8 +- include/SqlConnection.h | 26 ++- include/build.h | 2 +- include/os.h | 1 + src/sql/SqlFactory.cxx | 3 + src/sql/SqlStatement.cxx | 436 ++++++++++++++++++++++++++++++++++++ src/storage/Database.cxx | 2 +- src/storage/DatabaseManagerImpl.cxx | 16 +- src/storage/SessionImpl.cxx | 126 ++++++++++- src/storage/Util.cxx | 3 +- src/storage/os.cxx | 4 + 12 files changed, 612 insertions(+), 16 deletions(-) diff --git a/buildmmdb.ksh b/buildmmdb.ksh index 709a1c95..9ec55ed7 100755 --- a/buildmmdb.ksh +++ b/buildmmdb.ksh @@ -11,6 +11,7 @@ cp src/Makefile.am.mmdb src/Makefile.am make -f Makefile.cvs +#./configure --prefix=`pwd`/install CXXFLAGS="-g -DMMDB -I$JDK_HOME/include -I$JDK_HOME/include/linux" ./configure --prefix=`pwd`/install CXXFLAGS="-g -DMMDB -I$JDK_HOME/include -I$JDK_HOME/include/linux" libtoolavailable=`which libtool` if [ -z "$libtoolavailable" ] diff --git a/include/SessionImpl.h b/include/SessionImpl.h index b5fcdc3f..d635333d 100644 --- a/include/SessionImpl.h +++ b/include/SessionImpl.h @@ -31,7 +31,9 @@ class SessionImpl : public Session bool isAuthenticated; bool isDba; bool isXTaken; - +#if (defined MMDB && defined EMBED) + static int noOfThreads; +#endif public: SessionImpl() { @@ -62,6 +64,10 @@ class SessionImpl : public Session Database* getSystemDatabase(); DbRetVal getExclusiveLock(); private: +#if (defined MMDB && defined EMBED) + DbRetVal openEmbeddedConnection(const char *uname, const char *password); + DbRetVal closeEmbeddedConnection(); +#endif DbRetVal authenticate(const char *username, const char *password); }; diff --git a/include/SqlConnection.h b/include/SqlConnection.h index 5e11189a..65f248b7 100644 --- a/include/SqlConnection.h +++ b/include/SqlConnection.h @@ -36,9 +36,24 @@ class SqlConnection : public AbsSqlConnection { Connection conn; bool isConnOpen; +#if (defined MMDB && defined EMBED) + DbRetVal recoverCsqlDB(); + DbRetVal recoverSystemAndUserDB(); + DbRetVal applySchemaFile(FILE *fp); + char getQueryFromSchemaFile(FILE *fp, char *buf); + int applyRedoLogs(char *redoFile); + AbsSqlStatement *getStmtFromHashTable(int stmtId); + void removeFromHashTable(int stmtID); + void addToHashTable(int stmtID, AbsSqlStatement* sHdl); +#endif public: static List connList; static bool isInit; +#if (defined MMDB && EMBED) + static bool firstThread; + static GlobalUniqueID UID; + void *stmtBuckets; +#endif void initialize(); List cachedStmts; SqlConnection(){ innerConn = NULL; isConnOpen = false; } @@ -49,13 +64,7 @@ class SqlConnection : public AbsSqlConnection * @param pass password for authentication * @return DbRetVal */ - DbRetVal connect (char *user, char * pass) { - DbRetVal ret = conn.open(user, pass); - if (ret == OK) isConnOpen = true; - if (!isInit) initialize(); - connList.append(this); - return ret; - } + DbRetVal connect (char *user, char * pass); /** closes connection to the database and releases all the resources * @return DbRetVal @@ -63,6 +72,9 @@ class SqlConnection : public AbsSqlConnection DbRetVal disconnect () { DbRetVal ret = conn.close(); if (ret == OK) isConnOpen = false; +# if (defined MMDB && defined EMBED) + if (Conf::config.useDurability() && connList.size()==1) UID.destroy(); +# endif connList.remove(this); return ret; } diff --git a/include/build.h b/include/build.h index 1a5102b2..56b4b486 100644 --- a/include/build.h +++ b/include/build.h @@ -3,4 +3,4 @@ #define CSQL #define LINUX #define i686 -#endif +#endif diff --git a/include/os.h b/include/os.h index fb8f2790..d5f40e44 100644 --- a/include/os.h +++ b/include/os.h @@ -198,6 +198,7 @@ class os static int getNoOfProcessors(); static mode_t umask(mode_t mask); static int fdatasync(int fd); + static int atexit(void (*exitHndlr)(void)); }; #endif diff --git a/src/sql/SqlFactory.cxx b/src/sql/SqlFactory.cxx index 70fd107a..27d8486d 100644 --- a/src/sql/SqlFactory.cxx +++ b/src/sql/SqlFactory.cxx @@ -41,6 +41,9 @@ AbsSqlConnection* SqlFactory::createConnection(SqlApiImplType implFlag) conn = new SqlConnection(); }else { AbsSqlConnection *sqlCon = new SqlConnection(); +#if (defined MMDB && defined EMBED) + ((SqlConnection *)sqlCon)->UID.create(); +#endif conn = new SqlLogConnection(); SqlLogConnection *logCon = (SqlLogConnection *)conn; logCon->setNoMsgLog(true); diff --git a/src/sql/SqlStatement.cxx b/src/sql/SqlStatement.cxx index 351e1905..4773f54d 100644 --- a/src/sql/SqlStatement.cxx +++ b/src/sql/SqlStatement.cxx @@ -27,6 +27,10 @@ extern ParsedData *parsedData; int yyparse (); bool SqlConnection::isInit = false; +#if (defined MMDB && defined EMBED) +bool SqlConnection::firstThread = false; +GlobalUniqueID SqlConnection::UID; +#endif List SqlConnection::connList; @@ -604,6 +608,44 @@ void SqlStatement::flushCacheStmt() return sqlCon->flushCacheStmt(); } //------------------------------------------------------------------- + +static void sigTermHandler(int sig) +{ + ListIterator iter= SqlConnection::connList.getIterator(); + SqlConnection *conn = NULL; + while (iter.hasElement()) + { + conn = (SqlConnection*) iter.nextElement(); + conn->flushCacheStmt(); + if (conn->isConnectionOpen()) conn->disconnect(); + } + exit(0); +} + +DbRetVal SqlConnection::connect (char *user, char * pass) +{ + DbRetVal ret = conn.open(user, pass); + if (ret != OK) return ret; + if (ret == OK) isConnOpen = true; + if (!isInit) initialize(); + connList.append(this); + DbRetVal rv = OK; +#if (defined MMDB && EMBED) + os::signal(SIGINT, sigTermHandler); + os::signal(SIGTERM, sigTermHandler); + if (Conf::config.useDurability() && !firstThread) { + rv = recoverCsqlDB(); + if (rv != OK) { + printError(ErrSysInternal, "Recovery Failed"); + return rv; + } + firstThread = true; + } + rollback(); //for drop table execute in redo log +#endif + return ret; +} + void SqlConnection::flushCacheStmt() { ListIterator iter = cachedStmts.getIterator(); @@ -696,8 +738,402 @@ static void sigUsr1Handler(int sig) os::signal(SIGCSQL1, sigUsr1Handler); return; } + +static void exithandler(void) +{ + ListIterator iter= SqlConnection::connList.getIterator(); + SqlConnection *conn = NULL; + while (iter.hasElement()) + { + conn = (SqlConnection*) iter.nextElement(); + conn->flushCacheStmt(); + conn->disconnect(); + } +} + void SqlConnection::initialize() { os::signal(SIGCSQL1, sigUsr1Handler); +#if (defined MMDB && defined EMBED) + os::atexit(exithandler); +#endif isInit = true; } + +#if (defined MMDB && defined EMBED) +DbRetVal SqlConnection::recoverCsqlDB() +{ + DbRetVal rv = OK; + char dbRedoFileName[MAX_FILE_LEN]; + char dbChkptSchema[MAX_FILE_LEN]; + char dbChkptMap[MAX_FILE_LEN]; + char dbChkptData[MAX_FILE_LEN]; + char dbBackupFile[MAX_FILE_LEN]; + char cmd[IDENTIFIER_LENGTH]; + //check for check point file if present recover + sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile()); + if (FILE *file = fopen(dbChkptSchema, "r")) { + fclose(file); + sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema, + Conf::config.getDbFile()); + int ret = system(cmd); + if (ret != 0) return ErrOS; + } + sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile()); + if (FILE *file = fopen(dbChkptMap, "r")) { + fclose(file); + sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap, + Conf::config.getDbFile()); + int ret = system(cmd); + if (ret != 0) return ErrOS; + } + int chkptID= Database::getCheckpointID(); + sprintf(dbChkptData, "%s/db.chkpt.data%d", Conf::config.getDbFile(), + chkptID); + sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile()); + FILE *fl = NULL; + if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) { + fclose(fl); + sprintf(cmd, "cp %s/db.chkpt.data1 %s", Conf::config.getDbFile(), + dbChkptData); + int ret = system(cmd); + if (ret != 0) return ErrOS; + } + if (FILE *file = fopen(dbChkptData, "r")) { + fclose(file); + rv = recoverSystemAndUserDB(); + } + + //check for redo log file if present apply redo logs + sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile()); + if (FILE *file = fopen(dbRedoFileName, "r")) + { + fclose(file); + applyRedoLogs(dbRedoFileName); + DatabaseManager *dbMgr = getConnObject().getDatabaseManager(); + rv = dbMgr->checkPoint(); + if (rv != OK) + { + printError(ErrSysInternal, "checkpoint failed after redo log apply"); + return ErrOS; + } + } + return OK; +} + +DbRetVal SqlConnection::recoverSystemAndUserDB() +{ + DbRetVal rv = OK; + char schFile[1024]; + sprintf(schFile, "%s/db.chkpt.schema", Conf::config.getDbFile()); + if (FILE *file = fopen(schFile, "r")) { + applySchemaFile(file); + } + DatabaseManager *dbMgr = getConnObject().getDatabaseManager(); + dbMgr->recover(); + return OK; +} + +DbRetVal SqlConnection::applySchemaFile(FILE *fp) +{ + char buf[8192]; + char eof; + SqlStatement *stmt = new SqlStatement(); + while ((eof = getQueryFromSchemaFile(fp,buf)) != EOF) { + stmt->setConnection(this); + stmt->prepare(buf); + int rows = 0; + stmt->execute(rows); + } + delete stmt; + return OK; +} + +char SqlConnection::getQueryFromSchemaFile(FILE *fp, char *buf) +{ + char c, *bufBegin=buf; + int charCnt=0; + while( (c=(char ) fgetc(fp)) != EOF && c != ';') + { + *buf++ = c; charCnt++; + if( charCnt == SQL_STMT_LEN ) { + printf("SQL Statement length is greater than %d. " + "Ignoring the statement.\n", SQL_STMT_LEN ); + *bufBegin++ =';'; + *bufBegin ='\0'; + return 0; + } + } + *buf++ = ';'; + *buf = '\0'; + return c; +} + +void SqlConnection::addToHashTable(int stmtID, AbsSqlStatement* sHdl) +{ + int bucketNo = stmtID % STMT_BUCKET_SIZE; + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + StmtNode *node = new StmtNode(); + node->stmtId = stmtID; + node->stmt = sHdl; + stmtBucket->bucketList.append(node); + return; +} + +void SqlConnection::removeFromHashTable(int stmtID) +{ + int bucketNo = stmtID % STMT_BUCKET_SIZE; + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + if(stmtID == node->stmtId) break; + } + it.reset(); + stmtBucket->bucketList.remove(node); + return; +} + +AbsSqlStatement *SqlConnection::getStmtFromHashTable(int stmtId) +{ + int bucketNo = stmtId % STMT_BUCKET_SIZE; + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + if(stmtId == node->stmtId) { + return node->stmt; + } + } + return NULL; +} + +void setParam(AbsSqlStatement *stmt, int pos, DataType type , int length, void *value) +{ + switch(type) + { + case typeInt: + stmt->setIntParam(pos, *(int*)value); + break; + case typeLong: + stmt->setLongParam(pos, *(long*) value); + break; + case typeLongLong: + stmt->setLongLongParam(pos, *(long long*)value); + break; + case typeShort: + stmt->setShortParam(pos, *(short*)value); + break; + case typeByteInt: + stmt->setByteIntParam(pos, *(ByteInt*)value); + break; + case typeDouble: + stmt->setDoubleParam(pos, *(double*)value); + break; + case typeFloat: + stmt->setFloatParam(pos, *(float*)value); + break; + case typeDate: + stmt->setDateParam(pos, *(Date*)value); + break; + case typeTime: + stmt->setTimeParam(pos, *(Time*)value); + break; + case typeTimeStamp: + stmt->setTimeStampParam(pos, *(TimeStamp*)value); + break; + case typeString: + case typeVarchar: + stmt->setStringParam(pos, (char*)value); + break; + case typeBinary: + stmt->setBinaryParam(pos, value, length); + break; + default: + printf("unknown type\n"); + break; + } + return; +} + +int SqlConnection::applyRedoLogs(char *redoFile) +{ + struct stat st; + DbRetVal rv = OK; + int fd = open(redoFile, O_RDONLY); + if (-1 == fd) { return OK; } + if (fstat(fd, &st) == -1) { + printError(ErrSysInternal, "Unable to retrieve undo log file size"); + close(fd); + return 1; + } + if (st.st_size ==0) { + close(fd); + return OK; + } + void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (MAP_FAILED == startAddr) { + printf("Unable to read undo log file:mmap failed.\n"); + return 2; + } + stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket)); + memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket)); + + char *iter = (char*)startAddr; + void *value = NULL; + int logType, eType; + int stmtID; + int txnID; + int len, ret, retVal =0; + int loglen; + char stmtString[SQL_STMT_LEN]; + //printf("size of file %d\n", st.st_size); + while(true) { + //printf("OFFSET HERE %d\n", iter - (char*)startAddr); + if (iter - (char*)startAddr >= st.st_size) break; + logType = *(int*)iter; + if (logType == -1) { //prepare + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + iter = iter + sizeof(int); + len = *(int*)iter; + iter = iter + sizeof(int); + strncpy(stmtString, iter, len); + iter = iter + len; + //printf("PREPARE:%d %d %s\n", stmtID, len, stmtString); + AbsSqlStatement *st = SqlFactory::createStatement(CSqlDirect); + SqlStatement *stmt = (SqlStatement *)st; + stmt->setConnection(this); + rv = stmt->prepare(stmtString); + if (rv != OK) { + printError(ErrSysInternal, "unable to prepare stmt:%s", stmtString); + retVal=1; + break; + } + stmt->prepare(stmtString); + SqlStatement *sqlStmt = (SqlStatement*)stmt; + sqlStmt->setLoading(true); + addToHashTable(stmtID, stmt); + } + else if(logType == -2) { //commit + beginTrans(); + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + char *curPtr = iter; + while(true) { + //printf("Iter length %d\n", iter - curPtr); + if (iter - (char*)startAddr >= st.st_size) { + //file end reached + //printf("Redo log file end\n"); + retVal=0; + break; + } + stmtID = *(int*)iter; + //printf("stmtid %d\n", stmtID); + iter = iter + sizeof(int); + eType = *(int*)iter; + //printf("eType is %d\n", eType); + AbsSqlStatement *stmt = getStmtFromHashTable(stmtID); + if (0 == eType) { //execute type + iter = iter + sizeof(int); + //printf("EXEC: %d\n", stmtID); + if (stmt) { + rv = stmt->execute(ret); + if (rv != OK) { + printError(ErrSysInternal, "unable to execute"); + retVal=2; + break; + } + } else { + printError(ErrSysInternal, "statement not found for %d\n",stmtID); + } + if (*(int*)iter <0) break; + } else if ( 1 == eType) { //set type + iter=iter+sizeof(int); + int pos = *(int*) iter; + iter=iter+sizeof(int); + DataType type = (DataType)(*(int*)iter); + iter=iter+sizeof(int); + int len = *(int*) iter; + iter=iter+sizeof(int); + value = iter; + //AllDataType::printVal(value, type, len); + iter=iter+len; + //printf("SET: %d %d %d %d\n", stmtID, pos, type, len); + setParam(stmt, pos, type, len, value); + if (*(int*)iter <0) break; + } + } + commit(); + } + else if(logType == -3) { //free + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + iter = iter + sizeof(int); + AbsSqlStatement *stmt = getStmtFromHashTable(stmtID); + if (stmt) { + stmt->free(); + removeFromHashTable(stmtID); + } else { printError(ErrSysInternal, "statement not found for %d\n",stmtID);} + } + else if(logType == -4) { //prepare and execute + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + iter = iter + sizeof(int); + len = *(int*)iter; + iter = iter + sizeof(int); + strncpy(stmtString, iter, len); + stmtString[len+1] ='\0'; + iter = iter + len; + //printf("CREATE:%d %d %s\n", stmtID, len, stmtString); + AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect); + if ( NULL == stmt) { + printError(ErrSysInternal, "unable to prepare:%s", stmtString); + retVal=3; + break; + } + stmt->setConnection(this); + rv = stmt->prepare(stmtString); + if (rv != OK) { + printError(ErrSysInternal, "unable to prepare:%s", stmtString); + retVal=4; + break; + } + rv = stmt->execute(ret); + if (rv != OK) { + if (strlen(stmtString) > 6 && + ( (strncasecmp(stmtString,"CREATE", 6) == 0) || + (strncasecmp(stmtString,"DROP", 4) == 0)) ) { + // conn->disconnect(); + // return OK; + continue; + } + printError(ErrSysInternal, "unable to execute %s", stmtString); + retVal=5; + break; + } + stmt->free(); + }else{ + printError(ErrSysInternal, "Redo log file corrupted: logType:%d", logType); + retVal=6; + break; + } + } + munmap((char*)startAddr, st.st_size); + close(fd); + //freeAllStmtHandles(); + return retVal; +} +#endif diff --git a/src/storage/Database.cxx b/src/storage/Database.cxx index 0acf1a6b..4dded4ed 100644 --- a/src/storage/Database.cxx +++ b/src/storage/Database.cxx @@ -579,7 +579,7 @@ DbRetVal Database::writeDirtyPages(char *dataFile) pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_; } } - printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten); + //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten); logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten); close(fd); return OK; diff --git a/src/storage/DatabaseManagerImpl.cxx b/src/storage/DatabaseManagerImpl.cxx index 41b244b8..6cd6088a 100644 --- a/src/storage/DatabaseManagerImpl.cxx +++ b/src/storage/DatabaseManagerImpl.cxx @@ -116,7 +116,12 @@ DbRetVal DatabaseManagerImpl::createDatabase(const char *name, size_t size) shm_id = os::shm_create(key, size, 0660); if (-1 == shm_id) { if (errno == EEXIST) +#if (defined MMDB && defined EMBED) + printError(ErrOS, "One application is already running."); + return ErrOS; +#else printError(ErrOS, "Shared Memory already exists"); +#endif printError(ErrOS, "Shared memory create failed"); shm_id = os::shm_open(Conf::config.getSysDbKey(), 100, 0660); os::shmctl(shm_id, IPC_RMID); @@ -177,6 +182,10 @@ DbRetVal DatabaseManagerImpl::createDatabase(const char *name, size_t size) printError(ErrOS, "Shared memory attach returned -ve value %d", rtnAddr); return ErrOS; } +# if (defined MMDB && defined EMBED) + if (0 == strcmp(name, SYSTEMDB)) ProcessManager::sysAddr = rtnAddr; + else ProcessManager::usrAddr = rtnAddr; +# endif } db_ = new Database(); printDebug(DM_Database, "Creating database:%s",name); @@ -330,8 +339,7 @@ DbRetVal DatabaseManagerImpl::openDatabase(const char *name) ProcessManager::usrAddr = (char*) shm_ptr; } } else { - if (0 == strcmp(name, SYSTEMDB)) - shm_ptr = ProcessManager::sysAddr; + if (0 == strcmp(name, SYSTEMDB)) shm_ptr = ProcessManager::sysAddr; else shm_ptr = ProcessManager::usrAddr; } @@ -1648,7 +1656,9 @@ DbRetVal DatabaseManagerImpl::registerThread() } pMgr_ = new ProcessManager(); rv = pMgr_->registerThread(); - if (rv ==OK) { procSlot = pMgr_->getProcSlot(); + if (rv ==OK) { + procSlot = pMgr_->getProcSlot(); + systemDatabase_->setProcSlot(procSlot); printDebug(DM_Process, "Process registed with slot %d\n", procSlot); } return rv; diff --git a/src/storage/SessionImpl.cxx b/src/storage/SessionImpl.cxx index 87b512f1..b3a79723 100644 --- a/src/storage/SessionImpl.cxx +++ b/src/storage/SessionImpl.cxx @@ -24,6 +24,11 @@ #include #include #include + +#if (defined MMDB && defined EMBED) +int SessionImpl::noOfThreads = 0; +#endif + //Before calling this method, application is required to call readConfigValues DbRetVal SessionImpl::initSystemDatabase() @@ -40,7 +45,7 @@ DbRetVal SessionImpl::initSystemDatabase() // Conf::config.print(); - dbMgr = new DatabaseManagerImpl(); + if (dbMgr == NULL) dbMgr = new DatabaseManagerImpl(); rv = dbMgr->createDatabase(SYSTEMDB, Conf::config.getMaxSysDbSize()); if (OK != rv) return rv; @@ -84,8 +89,9 @@ DbRetVal SessionImpl::initSystemDatabase() return ErrSysInit; } db->releaseCheckpointMutex(); - +#if !(defined MMDB && defined EMBED) printf("Sys_DB [Size=%4.4ldMB] \nUser_DB [Size=%4.4ldMB]\n", Conf::config.getMaxSysDbSize()/1048576, Conf::config.getMaxDbSize()/1048576); +#endif //create user database rv = dbMgr->createDatabase("userdb", Conf::config.getMaxDbSize()); if (OK != rv) return rv; @@ -106,6 +112,9 @@ DbRetVal SessionImpl::destroySystemDatabase() DbRetVal SessionImpl::open(const char *username, const char *password) { +#if (defined MMDB) && (defined EMBED) + openEmbeddedConnection(username, password); +# else DbRetVal rv = OK; rv = readConfigFile(); if (rv != OK) @@ -165,6 +174,7 @@ DbRetVal SessionImpl::open(const char *username, const char *password) //ProcessManager::systemDatabase = dbMgr->sysDb(); isXTaken = false; return OK; +#endif } DbRetVal SessionImpl::authenticate(const char *username, const char *password) { @@ -195,6 +205,9 @@ DbRetVal SessionImpl::getExclusiveLock() } DbRetVal SessionImpl::close() { +# if (defined MMDB) && (defined EMBED) + closeEmbeddedConnection(); +# else DbRetVal rv = OK; if (isXTaken && dbMgr ) dbMgr->sysDb()->releaseProcessTableMutex(true); if (dbMgr) @@ -233,6 +246,7 @@ DbRetVal SessionImpl::close() } isXTaken = false; return OK; +#endif } DatabaseManager* SessionImpl::getDatabaseManager() @@ -335,3 +349,111 @@ Database* SessionImpl::getSystemDatabase() return dbMgr->sysDb(); } +#if (defined MMDB && defined EMBED) +DbRetVal SessionImpl::openEmbeddedConnection(const char *username, const char *password) +{ + DbRetVal rv = OK; + rv = readConfigFile(); + if (rv != OK) + { + printError(ErrSysFatal, "Configuration file read failed\n"); + return ErrSysFatal; + } + + if ( NULL == dbMgr) + { + dbMgr = new DatabaseManagerImpl(); + } + int ret = ProcessManager::mutex.tryLock(10, 100); + //If you are not getting lock ret !=0, it means somebody else is there. + if (ret != 0) + { + printError(ErrSysInternal, "Another thread calling open:Wait and then Retry\n"); + return ErrSysInternal; + } + + if (!noOfThreads) { + ret = initSystemDatabase(); + if (0 != ret) { + printError(ErrSysInternal, "Unable to initialize the Database"); + ProcessManager::mutex.releaseLock(-1, false); + return ErrSysInternal; + } + ProcessManager::systemDatabase = dbMgr->sysDb(); + } else { + rv = dbMgr->openSystemDatabase(); + if (OK != rv) + { + printError(rv,"Unable to open the system database"); + ProcessManager::mutex.releaseLock(-1, false); + return rv; + } + } + + rv = authenticate(username, password); + if (OK != rv) + { + delete dbMgr; dbMgr = NULL; + ProcessManager::mutex.releaseLock(-1, false); + return rv; + } + + dbMgr->createTransactionManager(); + dbMgr->createLockManager(); + if (noOfThreads) { + rv = dbMgr->openDatabase("userdb"); + if (OK != rv) { + dbMgr->closeSystemDatabase(); + ProcessManager::mutex.releaseLock(-1, false); + delete dbMgr; dbMgr = NULL; + return rv; + } + } + + rv = dbMgr->registerThread(); + if (OK != rv) + { + printError(rv,"Unable to register to csql server"); + ProcessManager::mutex.releaseLock(-1, false); + delete dbMgr; dbMgr = NULL; + return rv; + } + + ProcessManager::mutex.releaseLock(-1, false); + noOfThreads++; + return OK; +} + +DbRetVal SessionImpl::closeEmbeddedConnection() +{ + DbRetVal rv = OK; + if (dbMgr) + { + int ret = ProcessManager::mutex.tryLock(10,100); + //If you are not getting lock ret !=0, it means somebody else is there. + if (ret != 0) + { + printError(ErrSysInternal, "Another thread calling open:Wait and then Retry\n"); + return ErrSysInternal; + } + + rv = dbMgr->deregisterThread(); + if (rv != OK) { + ProcessManager::mutex.releaseLock(-1, false); + return ErrBadCall; + } + ProcessManager::mutex.releaseLock(-1, false); + } + if (uMgr) + { + delete uMgr; + uMgr = NULL; + } + if(noOfThreads == 1) { + destroySystemDatabase(); + Conf::logger.stopLogger(); + } + noOfThreads--; + return OK; +} +#endif diff --git a/src/storage/Util.cxx b/src/storage/Util.cxx index d3541d98..720b0de4 100644 --- a/src/storage/Util.cxx +++ b/src/storage/Util.cxx @@ -27,7 +27,8 @@ DbRetVal GlobalUniqueID::create() int key = Conf::config.getShmIDKey(); int id = os::shm_create(key, MAX_UNIQUE_ID *sizeof(int), 0666); if (-1 == id) { - printError(ErrOS, "Unable to create shared memory"); + if (errno != EEXIST) + printError(ErrOS, "Unable to create shared memory"); return ErrOS; } ptr = os::shm_attach(id, NULL, 0); diff --git a/src/storage/os.cxx b/src/storage/os.cxx index ae3c7126..0f22d7d4 100644 --- a/src/storage/os.cxx +++ b/src/storage/os.cxx @@ -30,6 +30,10 @@ int os::munmap(caddr_t addr, size_t len) return ::munmap(addr, len); } +int os::atexit(void (*exitHndlr)(void)) +{ + return ::atexit(exitHndlr); +} shared_memory_id os::shm_create(shared_memory_key key, size_t size, int flag) { -- 2.11.4.GIT