From 140ac571a8b7a14012329a33a201405921b6e290 Mon Sep 17 00:00:00 2001 From: prabatuty Date: Sun, 4 Apr 2010 06:44:59 +0000 Subject: [PATCH] redo log changes. prepare with param will go to stmt logs and other stmts go to redo logs --- include/Database.h | 4 + include/SqlLogConnection.h | 63 ++++--- src/sqllog/FileSend.cxx | 135 ++++++++++----- src/sqllog/SqlLogConnection.cxx | 4 +- src/sqllog/SqlLogStatement.cxx | 18 +- src/storage/Database.cxx | 101 +++++++++++ src/tools/redo.cxx | 359 +++++++++++++++++++++++++++++++++------- 7 files changed, 555 insertions(+), 129 deletions(-) diff --git a/include/Database.h b/include/Database.h index 64309ef4..de75dadc 100644 --- a/include/Database.h +++ b/include/Database.h @@ -180,17 +180,21 @@ class Database void setProcSlot(int slot) { procSlot =slot;} //checks whether the ptr falls in the range of the database file size bool isValidAddress(void *ptr); + DbRetVal writeDirtyPages(char *chkptFile); DbRetVal checkPoint(); + DbRetVal filterAndRemoveStmtLogs(); DbRetVal recoverUserDB(); DbRetVal recoverSystemDB(); static int getCheckpointID(); static void setCheckpointID(int id); + friend class DatabaseManagerImpl; friend class Table; friend class TreeIndex; friend class HashIndex; friend class Transaction; + }; #endif diff --git a/include/SqlLogConnection.h b/include/SqlLogConnection.h index c0a587e7..1fe4b630 100644 --- a/include/SqlLogConnection.h +++ b/include/SqlLogConnection.h @@ -24,7 +24,8 @@ #include #include -/** +/* +* * @class SqlLogConnection * */ @@ -37,9 +38,10 @@ typedef struct my_msgbuffer { class AbsSqlLogSend { public: - virtual DbRetVal prepare(int tId, int sId, int len, char *st, char *tn)=0; + virtual DbRetVal prepare(int tId, int sId, int len, char *st, char *tn, + bool hasParam)=0; virtual DbRetVal commit(int len, void *data)=0; - virtual DbRetVal free(int txnId, int stmtId)=0; + virtual DbRetVal free(int txnId, int stmtId, bool hasParam)=0; }; class MsgQueueSend : public AbsSqlLogSend @@ -47,23 +49,27 @@ class MsgQueueSend : public AbsSqlLogSend int msgQId; public: MsgQueueSend() { msgQId = os::msgget(Conf::config.getMsgKey(), 0666); } - DbRetVal prepare(int tId, int sId, int len, char *stmt, char *tn); + DbRetVal prepare(int tId, int sId, int len, char *stmt, char *tn, + bool hasParam); DbRetVal commit(int len, void *data); - DbRetVal free(int txnId, int stmtId); + DbRetVal free(int txnId, int stmtId, bool hasParam); }; class FileSend : public AbsSqlLogSend { int fdRedoLog; + int fdStmtLog; public: FileSend(); ~FileSend(); DbRetVal openRedoFile(); - DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn); + DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn, + bool hasParam); DbRetVal commit(int len, void *data); - DbRetVal free(int txnId, int stmtId); + DbRetVal free(int txnId, int stmtId, bool hasParam); }; + class OfflineLog : public AbsSqlLogSend { int fdOfflineLog; @@ -75,13 +81,13 @@ class OfflineLog : public AbsSqlLogSend OfflineLog(); ~OfflineLog(); DbRetVal openOfflineLogFile(); - DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn); + DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn, + bool hasParam); DbRetVal commit(int len, void *data); - DbRetVal free(int txnId, int stmtId); + DbRetVal free(int txnId, int stmtId, bool hasParam); }; - enum ExecType { EXECONLY = 0, @@ -135,19 +141,21 @@ class SqlLogConnection : public AbsSqlConnection public: SqlLogConnection() { innerConn = NULL; syncMode = ASYNC; - if (Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) - msgQSend = new MsgQueueSend(); + if ( Conf::config.useCache() && + Conf::config.getCacheMode()==ASYNC_MODE) + msgQSend = new MsgQueueSend(); else msgQSend = NULL; if (Conf::config.useDurability()) { fileSend = new FileSend(); } else fileSend = NULL; if (Conf::config.useCache() && - Conf::config.getCacheMode() == OFFLINE_MODE) + Conf::config.getCacheMode() == OFFLINE_MODE) offlineLog = new OfflineLog; else offlineLog = NULL; txnUID.open(); execLogStoreSize =0; noMsgLog = false; noOfflineLog = false; + txnID = 0; } ~SqlLogConnection(); bool isTableCached(char *name); @@ -166,17 +174,20 @@ class SqlLogConnection : public AbsSqlConnection DbRetVal beginTrans (IsolationLevel isoLevel, TransSyncMode mode); - DbRetVal msgPrepare(int tId, int sId, int len, char *stmt, char *tname) + DbRetVal msgPrepare(int tId, int sId, int len, char *stmt, char *tname, + bool hasParam) { - return msgQSend->prepare(tId, sId, len, stmt, tname); + return msgQSend->prepare(tId, sId, len, stmt, tname, hasParam); } - DbRetVal fileLogPrepare(int tId, int sId, int len, char *stmt, char *tname) + DbRetVal fileLogPrepare(int tId, int sId, int len, char *stmt, char *tname, + bool hasParam) { - return fileSend->prepare(tId, sId, len, stmt, tname); + return fileSend->prepare(tId, sId, len, stmt, tname, hasParam); } - DbRetVal offlineLogPrepare(int tId, int sId, int len, char *st, char *tnm) + DbRetVal offlineLogPrepare(int tId, int sId, int len, char *stmt, + char *tname, bool hasParam) { - return offlineLog->prepare(tId, sId, len, st, tnm); + return offlineLog->prepare(tId, sId, len, stmt, tname, hasParam); } DbRetVal commitLogs(int logSize, void *data) { @@ -186,22 +197,22 @@ class SqlLogConnection : public AbsSqlConnection msgQSend->commit(logSize, data); if (Conf::config.useDurability()) fileSend->commit(logSize, data); if (Conf::config.useCache() && - Conf::config.getCacheMode()==OFFLINE_MODE && - !noOfflineLog) + Conf::config.getCacheMode()==OFFLINE_MODE && + !noOfflineLog) offlineLog->commit(logSize, data); return OK; } - DbRetVal freeLogs(int stmtId) + DbRetVal freeLogs(int stmtId, bool hasParam) { int txnId = getTxnID(); if ( ((Conf::config.useCache() && Conf::config.getCacheMode() == ASYNC_MODE)) && !noMsgLog) - msgQSend->free(txnId, stmtId); - if (Conf::config.useDurability()) fileSend->free(txnId, stmtId); + msgQSend->free(txnId, stmtId, hasParam); + if (Conf::config.useDurability()) fileSend->free(txnId, stmtId, hasParam); if (Conf::config.useCache() && - Conf::config.getCacheMode()==OFFLINE_MODE && + Conf::config.getCacheMode()==OFFLINE_MODE && !noOfflineLog) - offlineLog->free(txnId, stmtId); + offlineLog->free(txnId, stmtId, hasParam); return OK; } void addExecLog(ExecLogInfo *info) { execLogStore.append(info); } diff --git a/src/sqllog/FileSend.cxx b/src/sqllog/FileSend.cxx index 5a37a783..f419e414 100644 --- a/src/sqllog/FileSend.cxx +++ b/src/sqllog/FileSend.cxx @@ -23,48 +23,70 @@ FileSend::FileSend() { + fdRedoLog = -1; + fdStmtLog = -1; openRedoFile(); } DbRetVal FileSend::openRedoFile() { + if (fdRedoLog > 0) os::closeFile(fdRedoLog); + if (fdStmtLog > 0) os::closeFile(fdStmtLog); char fileName[MAX_FILE_LEN]; + char stmtFileName[MAX_FILE_LEN]; sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile()); + sprintf(stmtFileName, "%s/csql.db.stmt", Conf::config.getDbFile()); int durableMode = Conf::config.getDurableMode(); switch(durableMode) { case 1: case 2: fdRedoLog = os::openFileForAppend(fileName, O_CREAT); + fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT); break; case 3: fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_SYNC); + fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_SYNC); break; case 4: #ifdef SOLARIS fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_DSYNC); + fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_DSYNC); #else fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_DIRECT); + fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_DIRECT); #endif break; default: fdRedoLog = os::openFileForAppend(fileName, O_CREAT); + fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT); break; } - if (-1 == fdRedoLog) { + if (-1 == fdRedoLog || -1 == fdStmtLog) { printError(ErrSysInternal, "Unable to open redo log file"); return ErrSysInternal; } return OK; } -FileSend::~FileSend() -{ +FileSend::~FileSend() { if (fdRedoLog > 0) os::closeFile(fdRedoLog); + if (fdStmtLog > 0) os::closeFile(fdStmtLog); fdRedoLog = -1; + fdStmtLog = -1; } -DbRetVal FileSend::prepare(int txnId, int stmtId, int len, char *stmt, char *tblName) +DbRetVal FileSend::prepare(int txnId, int stmtId, int len, char *stmt, + char *tblName, bool hasParam) { - if (fdRedoLog < 0) return ErrBadArg; + if (fdRedoLog < 0) { + printError(ErrBadArg, "Redo Log file not opened"); + return ErrBadArg; + } + if (fdStmtLog < 0) { + printError(ErrBadArg, "Redo Stmt Log file not opened"); + return ErrBadArg; + } + int fd =fdRedoLog; + if (hasParam) fd=fdStmtLog; //The following structure needs strlen after stmt id for traversal in //redolog file unlike msg queue structure where string is the last element //and is not a continuous piece of memory. @@ -91,19 +113,20 @@ DbRetVal FileSend::prepare(int txnId, int stmtId, int len, char *stmt, char *tbl bool firstTime=true; retry: if (Conf::config.getDurableMode() != 1) { - ret = os::lockFile(fdRedoLog); + ret = os::lockFile(fd); if (-1 == ret) { ::free(buf); printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); return ErrLockTimeOut; } } - ret = os::write(fdRedoLog, buf, datalen); + ret = os::write(fd, buf, datalen); if (Conf::config.getDurableMode() != 1) { - os::unlockFile(fdRedoLog); + os::unlockFile(fd); } if (-1 == ret) { DbRetVal rv = openRedoFile(); + if (hasParam) fd=fdStmtLog; else fd=fdRedoLog; if (OK == rv) { logFine(Conf::logger, "Reopening redo log file"); if(firstTime) { firstTime = false; goto retry; } @@ -118,7 +141,10 @@ retry: DbRetVal FileSend::commit(int len, void *data) { - if (fdRedoLog < 0) return ErrBadArg; + if (fdRedoLog < 0) { + printError(ErrBadArg, "Redo Log file not opened"); + return ErrBadArg; + } char *dat=(char*)data - sizeof(int); *(int*)dat = -2; //type 2->commit bool firstTime = true; @@ -145,8 +171,17 @@ retry: } return OK; } -DbRetVal FileSend::free(int txnId, int stmtId) +DbRetVal FileSend::free(int txnId, int stmtId, bool hasParam) { + if (fdRedoLog < 0) { + printError(ErrBadArg, "Redo Log file not opened"); + return ErrBadArg; + } + if (fdStmtLog < 0) { + printError(ErrBadArg, "Redo Stmt Log file not opened"); + return ErrBadArg; + } + int fd =fdRedoLog; int buflen = 4 *sizeof(int); char *msg = (char *) malloc(buflen); char *ptr = msg; @@ -158,22 +193,23 @@ DbRetVal FileSend::free(int txnId, int stmtId) ptr += sizeof(int); *(int *)ptr = stmtId; printDebug(DM_SqlLog, "stmtID sent = %d\n", *(int *)ptr); - bool firstTime = false; + bool firstTime = true; retry: if (Conf::config.getDurableMode() != 1) { - int ret = os::lockFile(fdRedoLog); + int ret = os::lockFile(fd); if (-1 == ret) { ::free(msg); printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); return ErrLockTimeOut; } } - int ret = os::write(fdRedoLog, msg, buflen); + int ret = os::write(fd, msg, buflen); if (Conf::config.getDurableMode() != 1) { - os::unlockFile(fdRedoLog); + os::unlockFile(fd); } if (-1 == ret) { DbRetVal rv = openRedoFile(); + if (hasParam) fd=fdStmtLog; else fd=fdRedoLog; if (OK == rv) { logFine(Conf::logger, "Reopening redo log file"); if(firstTime) { firstTime = false; goto retry; } @@ -182,15 +218,45 @@ retry: ::free(msg); return ErrOS; } + if (!hasParam) { + //For non parameterized stmts , no need to write in stmt log + ::free(msg); + return OK; + } + fd=fdStmtLog; +retry1: + if (Conf::config.getDurableMode() != 1) { + int ret = os::lockFile(fd); + if (-1 == ret) { + ::free(msg); + printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); + return ErrLockTimeOut; + } + } + + ret = os::write(fd, msg, buflen); + if (Conf::config.getDurableMode() != 1) { + os::unlockFile(fd); + } + if (-1 == ret) { + DbRetVal rv = openRedoFile(); + if (OK == rv) { + logFine(Conf::logger, "Reopening redo log file"); + if(firstTime) { firstTime = false; goto retry1; } + } + printError(ErrOS, "Unable to write undo log"); + ::free(msg); + return ErrOS; + } ::free(msg); return OK; } OfflineLog::OfflineLog() { - fdOfflineLog = -1; + fdOfflineLog = -1; metadata = NULL; - createMetadataFile(); + createMetadataFile(); openOfflineLogFile(); } @@ -210,13 +276,13 @@ DbRetVal OfflineLog::openOfflineLogFile() sprintf(fileName, "%s/offlineLogFile.%d", Conf::config.getDbFile(), *(int *)ptr); int ret = 0; - if ( ((ret = ::access(fileName, F_OK)) == 0) && - ((fileSize = os::getFileSize(fileName)) >= offlineLogFileSize) ) + if ( ((ret = ::access(fileName, F_OK)) == 0) && + ((fileSize = os::getFileSize(fileName)) >= offlineLogFileSize) ) sprintf(fileName, "%s/offlineLogFile.%d", - Conf::config.getDbFile(), ++(*(int *)ptr)); - else if (ret == 0) - sprintf(fileName, "%s/offlineLogFile.%d", Conf::config.getDbFile(), - *(int *)ptr); + Conf::config.getDbFile(), ++(*(int *)ptr)); + else if (ret == 0) + sprintf(fileName, "%s/offlineLogFile.%d", Conf::config.getDbFile(), + *(int *)ptr); else { sprintf(fileName, "%s/offlineLogFile.0", Conf::config.getDbFile()); *(int *) ptr = 0; @@ -259,7 +325,8 @@ OfflineLog::~OfflineLog() fdOfflineLog = -1; } -DbRetVal OfflineLog::prepare(int txnId, int stId, int len, char *stmt, char*tn) +DbRetVal OfflineLog::prepare(int txnId, int stId, int len, char *stmt, + char*tn, bool hasParam) { if (fdOfflineLog < 0) return ErrBadArg; DbRetVal rv = OK; @@ -271,16 +338,13 @@ DbRetVal OfflineLog::prepare(int txnId, int stId, int len, char *stmt, char*tn) //The following structure needs strlen after stmt id for traversal in //redolog file unlike msg queue structure where string is the last element //and is not a continuous piece of memory. - // for len + txnId + msg type + stmtId + tableName + stmtstrlen + - // stmtstring - int datalen = os::align(5 * sizeof(int) + len); + int datalen = os::align(5 * sizeof(int) + len); // for len + txnId + msg type + stmtId + tableName + stmtstrlen + stmtstring char *buf = (char*) malloc(datalen); char *msg = buf; //Note:: msg type is taken as -ve as we need to differentiate between //statement id and logtype during recovery. *(int*) msg = -1; - if (strlen(stmt) > 6 && ( strncasecmp(stmt,"CREATE", 6) == 0 || - strncasecmp(stmt,"DROP", 4) == 0 )) + if (strlen(stmt) > 6 && ( strncasecmp(stmt,"CREATE", 6) == 0 || strncasecmp(stmt,"DROP", 4) == 0 )) *(int*)msg = -4; //means prepare and execute the stmt msg = msg+sizeof(int); *(int *)msg = datalen; @@ -300,8 +364,7 @@ retry: ret = os::lockFile(fdOfflineLog); if (-1 == ret) { ::free(buf); - printError(ErrLockTimeOut, - "Unable to get exclusive lock on redo log file"); + printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); return ErrLockTimeOut; } } @@ -323,7 +386,6 @@ retry: fileSize += datalen; return OK; } - DbRetVal OfflineLog::commit(int len, void *data) { if (fdOfflineLog < 0) return ErrBadArg; @@ -340,8 +402,7 @@ retry: if (Conf::config.getDurableMode() != 1) { int ret = os::lockFile(fdOfflineLog); if (-1 == ret) { - printError(ErrLockTimeOut, - "Unable to get exclusive lock on redo log file"); + printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); return ErrLockTimeOut; } } @@ -361,8 +422,7 @@ retry: fileSize += len+sizeof(int); return OK; } - -DbRetVal OfflineLog::free(int txnId, int stId) +DbRetVal OfflineLog::free(int txnId, int stId, bool hasParam) { if (fdOfflineLog < 0) return ErrBadArg; DbRetVal rv = OK; @@ -388,8 +448,7 @@ retry: int ret = os::lockFile(fdOfflineLog); if (-1 == ret) { ::free(msg); - printError(ErrLockTimeOut, - "Unable to get exclusive lock on redo log file"); + printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file"); return ErrLockTimeOut; } } @@ -438,7 +497,7 @@ DbRetVal OfflineLog::createMetadataFile() return OK; } -void *OfflineLog::openMetadataFile() +void *OfflineLog::openMetadataFile() { char mmapFile[128]; int size = sizeof(int) + sizeof(long); diff --git a/src/sqllog/SqlLogConnection.cxx b/src/sqllog/SqlLogConnection.cxx index a6018c1c..193a3261 100644 --- a/src/sqllog/SqlLogConnection.cxx +++ b/src/sqllog/SqlLogConnection.cxx @@ -273,7 +273,7 @@ bool SqlLogConnection::isTableCached(char *tblName) } DbRetVal MsgQueueSend::prepare(int txnId, int stmtId, int len, char *stmt, - char *tblName) + char *tblName, bool hasParam) { //strlen is not included string is the last element in the following //structure @@ -316,7 +316,7 @@ DbRetVal MsgQueueSend::commit(int len, void *data) return OK; } -DbRetVal MsgQueueSend::free(int txnId, int stmtId) +DbRetVal MsgQueueSend::free(int txnId, int stmtId, bool hasParam) { // data to be sent is len + txn id + stmtId int dataLen = 3 * sizeof(int); diff --git a/src/sqllog/SqlLogStatement.cxx b/src/sqllog/SqlLogStatement.cxx index 885ab1d3..99a5ab5e 100644 --- a/src/sqllog/SqlLogStatement.cxx +++ b/src/sqllog/SqlLogStatement.cxx @@ -51,12 +51,14 @@ DbRetVal SqlLogStatement::prepare(char *stmtstr) isNonSelDML = false; return rv; } } + bool hasParam = false; + if (innerStmt->noOfParamFields() >0) hasParam = true; if (Conf::config.useDurability()) { if (strlen(stmtstr) > 6 && ((strncasecmp(stmtstr,"CREATE", 6) == 0) || (strncasecmp(stmtstr,"DROP", 4) == 0))) { sid = SqlLogStatement::stmtUID.getID(STMT_ID); printDebug(DM_SqlLog, "CREATE|DROP: stmt id = %d\n", sid); - conn->fileLogPrepare(0, sid, strlen(stmtstr)+1, stmtstr, NULL); + conn->fileLogPrepare(0, sid, strlen(stmtstr)+1, stmtstr, NULL, hasParam); isNonSelDML = false; return OK; } @@ -89,13 +91,13 @@ DbRetVal SqlLogStatement::prepare(char *stmtstr) printDebug(DM_SqlLog, "stmt id = %d\n", sid); if ((Conf::config.useCache() && Conf::config.getCacheMode() == ASYNC_MODE) && !conn->noMsgLog && isCached) - conn->msgPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, tblName); + conn->msgPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, tblName, hasParam); if (Conf::config.useDurability()) - conn->fileLogPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, NULL); + conn->fileLogPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, NULL, hasParam); if (Conf::config.useCache() && Conf::config.getCacheMode() == OFFLINE_MODE && !conn->noOfflineLog) - conn->offlineLogPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, NULL); + conn->offlineLogPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr, NULL, hasParam); return OK; } @@ -217,18 +219,20 @@ DbRetVal SqlLogStatement::getParamFldInfo (int parampos, FieldInfo *&fInfo) DbRetVal SqlLogStatement::free() { DbRetVal rv = OK; - if (!isPrepared) return OK; + bool hasParam = false; + if (innerStmt->noOfParamFields() >0) hasParam = true; if (innerStmt) rv = innerStmt->free(); + if (!isPrepared) return OK; if (rv != OK) return rv; if (!needLog) { isPrepared = false; return rv; } if (isNonSelDML && isCached) { SqlLogConnection* logConn = (SqlLogConnection*)con; - if (sid != 0 ) logConn->freeLogs(sid); + if (sid != 0 ) logConn->freeLogs(sid, hasParam); } sid = 0; isNonSelDML = false; isPrepared = false; - return OK; + return rv; } void SqlLogStatement::setShortParam(int paramPos, short value) { diff --git a/src/storage/Database.cxx b/src/storage/Database.cxx index 4dded4ed..1ef3d2b2 100644 --- a/src/storage/Database.cxx +++ b/src/storage/Database.cxx @@ -21,6 +21,7 @@ #include #include #include +#include const char* Database::getName() { @@ -619,6 +620,7 @@ DbRetVal Database::checkPoint() if (!os::fdatasync(fd)) { logFine(Conf::logger, "fsync succedded"); } + filterAndRemoveStmtLogs(); int ret = truncate(dbRedoFileName,0); if (ret != 0) { close(fd); @@ -657,6 +659,7 @@ DbRetVal Database::checkPoint() close(fd); return OK; } + filterAndRemoveStmtLogs(); int ret = truncate(dbRedoFileName,0); if (ret != 0) { printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n"); @@ -664,6 +667,104 @@ DbRetVal Database::checkPoint() } return OK; } +DbRetVal Database::filterAndRemoveStmtLogs() +{ + struct stat st; + char fName[MAX_FILE_LEN]; + sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile()); + int fdRead = open(fName, O_RDONLY); + if (-1 == fdRead) { return OK; } + if (fstat(fdRead, &st) == -1) { + printError(ErrSysInternal, "Unable to retrieve stmt log file size"); + close(fdRead); + return ErrSysInternal; + } + if (st.st_size ==0) { + close(fdRead); + return OK; + } + void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdRead, 0); + if (MAP_FAILED == startAddr) { + printError(ErrSysInternal, "Unable to mmap stmt log file\n"); + return ErrSysInternal; + } + sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile()); + int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC); + char *iter = (char*)startAddr; + char *logStart = NULL, *logEnd = NULL; + int logType; + int stmtID; + int len =0, ret =0; + int txnID, loglen; + DbRetVal rv = OK; + HashMap stmtMap; + stmtMap.setKeySize(sizeof(int)); + //PASS-I load all prepare stmts and free them + while(true) { + if (iter - (char*)startAddr >= st.st_size) break; + logType = *(int*)iter; + logStart = iter; + if (logType == -1) { //prepare + iter = iter + sizeof(int); + len = *(int*) iter; + iter = iter + 2 * sizeof(int); + stmtID = *(int*)iter; + stmtMap.insert(iter); + iter = logStart+ len; + ret =0; + } + else if(logType == -3) { //free + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + stmtMap.remove(iter); + iter = iter + sizeof(int); + }else{ + printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType); + rv = ErrSysInternal; + break; + } + } + //PASS-II take the prepared statements which are not freed into another backup file + while(true) { + if (iter - (char*)startAddr >= st.st_size) break; + logType = *(int*)iter; + logStart = iter; + if (logType == -1) { //prepare + iter = iter + sizeof(int); + len = *(int*) iter; + iter = iter + 2 * sizeof(int); + stmtID = *(int*)iter; + iter = logStart+ len; + ret =0; + if (stmtMap.find(&stmtID)) + ret = os::write(fd, logStart, len); + if (-1 == ret) { + printError(ErrSysInternal, "Unable to write statement logs"); + } + } + else if(logType == -3) { //free + iter = logStart + 4 *sizeof(int); + }else{ + printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType); + rv = ErrSysInternal; + break; + } + } + + os::closeFile(fd); + munmap((char*)startAddr, st.st_size); + close(fdRead); + stmtMap.removeAll(); + char cmd[MAX_FILE_LEN *2]; + sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt", + Conf::config.getDbFile(), Conf::config.getDbFile()); + ret = system(cmd); + return rv; +} + + int Database::getCheckpointID() { int id=0; diff --git a/src/tools/redo.cxx b/src/tools/redo.cxx index 8d8717b2..675c17fe 100644 --- a/src/tools/redo.cxx +++ b/src/tools/redo.cxx @@ -17,11 +17,13 @@ #include #include #include -#define SQL_STMT_LEN 1024 -#define DEBUG 1 +DbRetVal iterateStmtLogs(void *startAddr, int size); AbsSqlConnection *conn; -void *stmtBuckets; +void *stmtBuckets = NULL; +bool list = false; +bool interactive=false; +char fileName[MAX_FILE_LEN]; void addToHashTable(int stmtID, AbsSqlStatement* sHdl) { int bucketNo = stmtID % STMT_BUCKET_SIZE; @@ -38,14 +40,17 @@ void removeFromHashTable(int stmtID) int bucketNo = stmtID % STMT_BUCKET_SIZE; StmtBucket *buck = (StmtBucket *) stmtBuckets; StmtBucket *stmtBucket = &buck[bucketNo]; - StmtNode *node = NULL; + StmtNode *node = NULL, *delNode = NULL; ListIterator it = stmtBucket->bucketList.getIterator(); while(it.hasElement()) { node = (StmtNode *) it.nextElement(); - if(stmtID == node->stmtId) break; + if(stmtID == node->stmtId) { delNode = node; break; } } it.reset(); - stmtBucket->bucketList.remove(node); + if (delNode != NULL) { + stmtBucket->bucketList.remove(delNode); + delete delNode; + } return; } AbsSqlStatement *getStmtFromHashTable(int stmtId) @@ -53,19 +58,57 @@ AbsSqlStatement *getStmtFromHashTable(int stmtId) int bucketNo = stmtId % STMT_BUCKET_SIZE; StmtBucket *buck = (StmtBucket *) stmtBuckets; StmtBucket *stmtBucket = &buck[bucketNo]; + if (stmtBucket == NULL) return NULL; StmtNode *node = NULL; ListIterator it = stmtBucket->bucketList.getIterator(); while(it.hasElement()) { node = (StmtNode *) it.nextElement(); if(stmtId == node->stmtId) { + SqlStatement *sqlStmt = (SqlStatement*)node->stmt; + if (!sqlStmt->isPrepared()) sqlStmt->prepare(); return node->stmt; } } return NULL; } + +bool isStmtInHashTable(int stmtId) +{ + int bucketNo = stmtId % STMT_BUCKET_SIZE; + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + if (stmtBucket == NULL) return false; + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + if(stmtId == node->stmtId) { + SqlStatement *sqlStmt = (SqlStatement*)node->stmt; + if (sqlStmt->isPrepared()) return true; + else break; + } + } + return false; +} + + void freeAllStmtHandles() { - //TODO + if (NULL == stmtBuckets) return; + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtNode *node = NULL; + for (int i=0; i bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + node->stmt->free(); + delete node->stmt; + } + } + ::free(stmtBuckets); } void setParam(AbsSqlStatement *stmt, int pos, DataType type , int length, void *value) { @@ -114,44 +157,208 @@ void setParam(AbsSqlStatement *stmt, int pos, DataType type , int length, void * return; } +DbRetVal readAndPopulateStmts() +{ + struct stat st; + char fName[MAX_FILE_LEN]; + sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile()); + printf("Statement Redo log filename is :%s\n", fName); + int fd = open(fName, O_RDONLY); + if (-1 == fd) { return OK; } + if (fstat(fd, &st) == -1) { + printError(ErrSysInternal, "Unable to retrieve stmt log file size"); + close(fd); + return ErrSysInternal; + } + if (NULL != stmtBuckets) + { + printError(ErrSysInternal, "stmtBuckets already populated"); + return ErrSysInternal; + } + stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket)); + memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket)); + if (st.st_size ==0) { + printError(ErrNote, "No Statement logs found during recovery"); + close(fd); + return OK; + } + void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (MAP_FAILED == startAddr) { + printError(ErrSysInternal, "Unable to mmap stmt log file\n"); + return ErrSysInternal; + } + DbRetVal rv = iterateStmtLogs(startAddr, st.st_size); + munmap((char*)startAddr, st.st_size); + close(fd); + return rv; +} +DbRetVal filterAndWriteStmtLogs() +{ + struct stat st; + char fName[MAX_FILE_LEN]; + sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile()); + int fdRead = open(fName, O_RDONLY); + if (-1 == fdRead) { return OK; } + if (fstat(fdRead, &st) == -1) { + printError(ErrSysInternal, "Unable to retrieve stmt log file size"); + close(fdRead); + return ErrSysInternal; + } + if (st.st_size ==0) { + close(fdRead); + return OK; + } + void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdRead, 0); + if (MAP_FAILED == startAddr) { + printError(ErrSysInternal, "Unable to mmap stmt log file\n"); + return ErrSysInternal; + } + sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile()); + int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC); + char *iter = (char*)startAddr; + char *logStart = NULL, *logEnd = NULL; + int logType; + int stmtID; + int len =0, ret =0; + DbRetVal rv = OK; + while(true) { + if (iter - (char*)startAddr >= st.st_size) break; + logType = *(int*)iter; + logStart = iter; + if (logType == -1) { //prepare + iter = iter + sizeof(int); + len = *(int*) iter; + iter = iter + 2 * sizeof(int); + stmtID = *(int*)iter; + iter = logStart+ len; + ret =0; + if (isStmtInHashTable(stmtID)) + ret = os::write(fd, logStart, len); + if (-1 == ret) { + printError(ErrSysInternal, "Unable to write statement logs"); + } + } + else if(logType == -3) { //free + iter = logStart + 4 *sizeof(int); + }else{ + printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType); + rv = ErrSysInternal; + break; + } + } + os::closeFile(fd); + munmap((char*)startAddr, st.st_size); + close(fdRead); + char cmd[MAX_FILE_LEN *2]; + sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt", + Conf::config.getDbFile(), Conf::config.getDbFile()); + ret = system(cmd); + return rv; +} +DbRetVal iterateStmtLogs(void *startAddr, int size) +{ + char *iter = (char*)startAddr; + void *value = NULL; + int logType, eType; + int stmtID; + int txnID; + int len, ret, retVal =0; + int loglen; + char stmtString[SQL_STMT_LEN]; + DbRetVal rv = OK; + while(true) { + if (iter - (char*)startAddr >= size) break; + logType = *(int*)iter; + if (logType == -1) { //prepare + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + iter = iter + sizeof(int); + len = *(int*)iter; + iter = iter + sizeof(int); + strncpy(stmtString, iter, len); + iter = iter + len; + if (list) { + printf("PREPARE: SID:%d %s\n", stmtID, stmtString); + continue; + } + if (interactive) printf("STMTLOG PREPARE SID:%d %s\n", stmtID, stmtString); + AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect); + stmt->setConnection(conn); + SqlStatement *sqlStmt = (SqlStatement*)stmt; + sqlStmt->setStmtString(stmtString); + addToHashTable(stmtID, stmt); + } + else if(logType == -3) { //free + iter = iter + sizeof(int); + txnID = *(int*) iter; iter += sizeof(int); + loglen = *(int*) iter; iter += sizeof(int); + stmtID = *(int*)iter; + iter = iter + sizeof(int); + if (list) { + printf("FREE: SID:%d TID:%d \n", stmtID, txnID); + continue; + } + }else{ + printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType); + rv = ErrSysInternal; + break; + } + } + return rv; +} + int main(int argc, char **argv) { struct stat st; - char fileName[1024]; + strcpy(fileName, ""); int c = 0, opt=0; - bool interactive=0; - while ((c = getopt(argc, argv, "ai?")) != EOF) { + while ((c = getopt(argc, argv, "f:ail?")) != EOF) { switch (c) { case '?' : { opt = 1; break; } //print help case 'a' : { opt = 2; break; } - case 'i' : { interactive = 1; break; } + case 'i' : { interactive = true; break; } + case 'l' : { list = true; break; } + case 'f' : {strcpy(fileName , argv[optind - 1]); break;} default: printf("Wrong args\n"); exit(1); } }//while options if (2 !=opt) { - printf("This is an internal csql command."); + printf("This is an internal csql command with i and f options."); exit(1); } + char *verbose = os::getenv("CSQL_INTERACTIVE"); + if (verbose !=NULL && strcmp(verbose, "true") == 0) + { + printf("VERBOSE ON %s\n", verbose); + interactive=true; + } + + Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE")); - sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile()); - printf("Redo log filename is :%s\n", fileName); + if (strcmp(fileName, "") ==0) { + sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile()); + } int fd = open(fileName, O_RDONLY); if (-1 == fd) { return OK; } if (fstat(fd, &st) == -1) { - printf("Unable to retrieve undo log file size\n"); + printError(ErrSysInternal, "Unable to retrieve undo log file size"); close(fd); - return ErrUnknown; + return 1; } if (st.st_size ==0) { - printf("Nothing in redo log file\n"); + printError(ErrNote, "No Redo logs found during recovery"); + readAndPopulateStmts(); close(fd); return 0; } conn = SqlFactory::createConnection(CSqlDirect); DbRetVal rv = conn->connect(I_USER, I_PASS); SqlConnection *sCon = (SqlConnection*) conn; - rv = sCon->getExclusiveLock(); + if(!list) rv = sCon->getExclusiveLock(); + //during connection close, this exclusive lock will be automatically released if (rv != OK) { close(fd); conn->disconnect(); @@ -165,17 +372,24 @@ int main(int argc, char **argv) delete conn; return 2; } - stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket)); - memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket)); + rv = readAndPopulateStmts(); + if (OK != rv) + { + printf("Unable to read stmt log file\n"); + conn->disconnect(); + delete conn; + return 2; + } + printf("Redo log filename is :%s\n", fileName); char *iter = (char*)startAddr; - void *value; + void *value = NULL; int logType, eType; int stmtID; int txnID; - int len, ret; + int len, ret, retVal =0; int loglen; - char stmtString[1024]; + char stmtString[SQL_STMT_LEN]; //printf("size of file %d\n", st.st_size); while(true) { //printf("OFFSET HERE %d\n", iter - (char*)startAddr); @@ -191,17 +405,19 @@ int main(int argc, char **argv) iter = iter + sizeof(int); strncpy(stmtString, iter, len); iter = iter + len; - //printf("PREPARE:%d %d %s\n", stmtID, len, stmtString); + if (list) { + printf("PREPARE: SID:%d %s\n", stmtID, stmtString); + continue; + } AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect); stmt->setConnection(conn); if (interactive) printf("PREPARE %d : %s\n", stmtID, stmtString); rv = stmt->prepare(stmtString); if (rv != OK) { - printf("unable to prepare\n"); - conn->disconnect(); - return ErrSysFatal; + printError(ErrSysInternal, "unable to prepare stmt:%s", stmtString); + retVal=1; + break; } - stmt->prepare(stmtString); SqlStatement *sqlStmt = (SqlStatement*)stmt; sqlStmt->setLoading(true); addToHashTable(stmtID, stmt); @@ -217,13 +433,8 @@ int main(int argc, char **argv) if (iter - (char*)startAddr >= st.st_size) { //file end reached //printf("Redo log file end\n"); - conn->commit(); - freeAllStmtHandles(); - conn->disconnect(); - munmap((char*)startAddr, st.st_size); - close(fd); - delete conn; - return 0; + retVal=0; + break; } stmtID = *(int*)iter; //printf("stmtid %d\n", stmtID); @@ -231,19 +442,31 @@ int main(int argc, char **argv) iter = iter + sizeof(int); eType = *(int*)iter; //printf("eType is %d\n", eType); - AbsSqlStatement *stmt = getStmtFromHashTable(stmtID); + AbsSqlStatement *stmt = NULL; + if (!list) { + stmt = getStmtFromHashTable(stmtID); + if (NULL == stmt) { + printError(ErrSysInternal, "Unable to find in stmt hashtable"); + retVal=2; + break; + } + } if (0 == eType) { //execute type iter = iter + sizeof(int); - //printf("EXEC: %d\n", stmtID); + if (list) { + printf("EXEC SID:%d TID:%d\n", stmtID, txnID); + if (*(int*)iter <0) break; + continue; + } if (stmt) { rv = stmt->execute(ret); if (rv != OK) { - //printf("execute failed\n"); - conn->disconnect(); - return ErrSysFatal; + printError(ErrSysInternal, "unable to execute"); + retVal=2; + break; } } else { - printf("statement not found for %d\n",stmtID); + printError(ErrSysInternal, "statement not found for %d\n",stmtID); } if (*(int*)iter <0) break; } else if ( 1 == eType) { //set type @@ -255,9 +478,14 @@ int main(int argc, char **argv) int len = *(int*) iter; iter=iter+sizeof(int); value = iter; - //AllDataType::printVal(value, type, len); iter=iter+len; - //printf("SET: %d %d %d %d\n", stmtID, pos, type, len); + if (list) { + printf("SET SID:%d POS:%d TYPE:%d LEN:%d Value:", stmtID, pos, type, len); + AllDataType::printVal(value, type, len); + printf("\n"); + if (*(int*)iter <0) break; + continue; + } setParam(stmt, pos, type, len, value); if (*(int*)iter <0) break; } @@ -270,12 +498,17 @@ int main(int argc, char **argv) loglen = *(int*) iter; iter += sizeof(int); stmtID = *(int*)iter; iter = iter + sizeof(int); + if (list) { + printf("FREE SID:%d \n", stmtID); + continue; + } if (interactive) printf("FREE %d:\n", stmtID); AbsSqlStatement *stmt = getStmtFromHashTable(stmtID); if (stmt) { stmt->free(); + delete stmt; removeFromHashTable(stmtID); - } else { printf("statement not found for %d\n",stmtID);} + } else { printError(ErrSysInternal, "statement not found for %d\n",stmtID);} } else if(logType == -4) { //prepare and execute iter = iter + sizeof(int); @@ -288,20 +521,25 @@ int main(int argc, char **argv) strncpy(stmtString, iter, len); stmtString[len+1] ='\0'; iter = iter + len; - //printf("CREATE:%d %d %s\n", stmtID, len, stmtString); + if (list) { + printf("EXECDIRECT SID:%d TID:%d STMT:%s\n", stmtID, txnID, stmtString); + continue; + } AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect); if ( NULL == stmt) { - printf("unable to prepare\n"); - conn->disconnect(); - return ErrSysFatal; + printError(ErrSysInternal, "unable to prepare:%s", stmtString); + retVal=3; + break; } stmt->setConnection(conn); if (interactive) printf("EXECDIRECT %d : %s\n", stmtID, stmtString); rv = stmt->prepare(stmtString); if (rv != OK) { - printf("unable to prepare\n"); - conn->disconnect(); - return ErrSysFatal; + printError(ErrSysInternal, "unable to prepare:%s", stmtString); + stmt->free(); + delete stmt; + retVal=4; + break; } rv = stmt->execute(ret); if (rv != OK) { @@ -312,17 +550,26 @@ int main(int argc, char **argv) // return OK; continue; } - printf("unable to execute\n"); - conn->disconnect(); - return ErrSysFatal; + printError(ErrSysInternal, "unable to execute %s", stmtString); + stmt->free(); + retVal=5; + break; } stmt->free(); + delete stmt; + }else{ + printError(ErrSysInternal, "Redo log file corrupted: logType:%d", logType); + retVal=6; + break; } } munmap((char*)startAddr, st.st_size); close(fd); - freeAllStmtHandles(); + if (!list) { + filterAndWriteStmtLogs(); + freeAllStmtHandles(); + } conn->disconnect(); delete conn; - return 0; + return retVal; } -- 2.11.4.GIT