From 001a33b7c1bd1a57a4fdc5e894dd6e21d035f82d Mon Sep 17 00:00:00 2001 From: kishoramballi Date: Mon, 11 May 2009 11:51:46 +0000 Subject: [PATCH] Enterprise to opensource. 40 files including Makefil.am and .in from storage, sqllog and tools for Util.cxx, FileSend.cxx and redo.cxx. --- csql.conf | 15 +- include/AbsSqlConnection.h | 5 + include/AbsSqlStatement.h | 14 ++ include/Config.h | 16 +- include/Mutex.h | 49 ++++++ include/Session.h | 2 + include/SessionImpl.h | 4 +- include/SqlConnection.h | 1 + include/SqlFactory.h | 2 +- include/SqlLogConnection.h | 83 ++++++++-- include/SqlLogStatement.h | 19 ++- include/SqlStatement.h | 1 + include/Statement.h | 1 + include/Table.h | 1 + include/TableImpl.h | 9 +- include/Util.h | 25 ++- include/os.h | 1 + src/gateway/SqlGwStatement.cxx | 61 ++++--- src/network/NetworkTable.cxx | 4 +- src/sql/SqlFactory.cxx | 51 ++++-- src/sql/SqlStatement.cxx | 11 ++ src/sql/Statement.h | 1 + src/sql/dmlyacc.yxx | 2 +- src/sqllog/Makefile.am | 2 +- src/sqllog/Makefile.in | 6 +- src/sqllog/SqlLogConnection.cxx | 120 +++++++------- src/sqllog/SqlLogStatement.cxx | 357 ++++++++++++++++++++++------------------ src/storage/Config.cxx | 52 +----- src/storage/Connection.cxx | 6 + src/storage/HashIndex.cxx | 14 +- src/storage/Makefile.am | 2 +- src/storage/Makefile.in | 21 ++- src/storage/SessionImpl.cxx | 12 ++ src/storage/TableImpl.cxx | 219 ++++++++++++------------ src/tools/Makefile.am | 5 +- src/tools/Makefile.in | 22 ++- src/tools/csqldump.cxx | 92 ++++++++--- src/tools/csqlreplserver.cxx | 4 +- src/tools/csqlserver.cxx | 167 +++++++++++++------ src/tools/isql.cxx | 9 +- 40 files changed, 930 insertions(+), 558 deletions(-) diff --git a/csql.conf b/csql.conf index 489a25a6..385df520 100644 --- a/csql.conf +++ b/csql.conf @@ -26,15 +26,18 @@ SYS_DB_KEY=2222 USER_DB_KEY=3333 #Give full path for the log file where important system actions are stored. -LOG_FILE=/tmp/log/csql/log.out +LOG_FILE=/tmp/csql/log/log.out #The virtual memory start address at which the shared memory segment # will be created and attached. MAP_ADDRESS=400000000 +# Whether to enable durability +DURABILITY=false + #Give full path for the database file where table and record information will #be stored for durability -DATABASE_FILE=/tmp/csql/csql.db +DATABASE_FILE=/tmp/csql/db #Important: For Server section parameters, make sure that the value is same for the # server process and all the csql client process which connects to it. otherwise, @@ -67,15 +70,17 @@ ENABLE_BIDIRECTIONAL_CACHE=false CACHE_RECEIVER_WAIT_SECS=10 -#Give full path for the file where all the cached table information is stored +#Give full path for the file where all the table information is stored TABLE_CONFIG_FILE=/tmp/csql/csqltable.conf ##########################SqlNetworkServer Section######################## # Whether to enable SqlNetwork server -CSQL_SQL_SERVER=false +CSQL_SQL_SERVER=true #Set port for Network access PORT=5678 -#####################################End Section########################## +ID_SHM_KEY=1947 + +#####################################End Section######################## diff --git a/include/AbsSqlConnection.h b/include/AbsSqlConnection.h index a0fc3ac4..f767126b 100644 --- a/include/AbsSqlConnection.h +++ b/include/AbsSqlConnection.h @@ -21,6 +21,11 @@ #define ABSSQLCONNECTION_H #include +class CachedTable{ + public: + char tableName[IDENTIFIER_LENGTH]; +}; + enum TransSyncMode { OSYNC=1, ASYNC=2, diff --git a/include/AbsSqlStatement.h b/include/AbsSqlStatement.h index 6ff83676..c1c5a32b 100644 --- a/include/AbsSqlStatement.h +++ b/include/AbsSqlStatement.h @@ -273,4 +273,18 @@ class BindSqlProjectField BindSqlProjectField(){ value = NULL; targetvalue = NULL; } }; +class StmtBucket +{ + public: + List bucketList; +}; + +class StmtNode +{ + public: + int stmtId; + AbsSqlStatement *stmt; + char stmtstr[1024]; +}; + #endif diff --git a/include/Config.h b/include/Config.h index 339b534c..85d22595 100644 --- a/include/Config.h +++ b/include/Config.h @@ -43,13 +43,12 @@ class ConfigValues bool isTwoWay; int cacheWaitSecs; - bool isReplication; + bool isDurable; bool isCsqlSqlServer; int port; - char replConfigFile[MAX_FILE_PATH_LEN]; int networkID; int cacheNetworkID; - + int shmKeyForId; long logStoreSize; int nwResponseTimeout; int nwConnectTimeout; @@ -71,20 +70,19 @@ class ConfigValues lockSecs =0; lockUSecs = 10; lockRetries = 10; - cacheId=1; + cacheId=1; isCache = false; cacheNetworkID =-1; strcpy(dsn, "myodbc3"); strcpy(tableConfigFile, "/tmp/csql/csqltable.conf"); - isReplication = false; isCsqlSqlServer = false; port = 5678; - strcpy(replConfigFile, "/tmp/csql/csqlnw.conf"); logStoreSize = 10485760; networkID=-1; + shmKeyForId = -1; nwResponseTimeout=3; nwConnectTimeout=5; - isTwoWay=true; + isTwoWay=false; cacheWaitSecs =10; } }; @@ -118,11 +116,11 @@ class Config inline bool useCache() { return cVal.isCache; } inline char* getDSN() { return cVal.dsn; } inline char* getTableConfigFile() { return cVal.tableConfigFile; } - inline bool useReplication() { return cVal.isReplication; } + inline bool useDurability() { return cVal.isDurable; } inline bool useCsqlSqlServer() { return cVal.isCsqlSqlServer; } inline int getPort() { return cVal.port; } - inline char* getReplConfigFile() { return cVal.replConfigFile; } inline long getMaxLogStoreSize() { return cVal.logStoreSize; } + inline int getShmIDKey() { return cVal.shmKeyForId; } inline int getNetworkID() { return cVal.networkID; } inline int getCacheNetworkID() { return cVal.cacheNetworkID; } inline int getNetworkResponseTimeout() { return cVal.nwResponseTimeout; } diff --git a/include/Mutex.h b/include/Mutex.h index c6a886cd..8f4edb79 100644 --- a/include/Mutex.h +++ b/include/Mutex.h @@ -39,6 +39,55 @@ class Mutex int releaseLock(int procSlot, bool procAccount=true); int destroy(); int recoverMutex(); + static int CAS(int *ptr, int oldVal, int newVal) + { + unsigned char ret; + __asm__ __volatile__ ( + " lock\n" + " cmpxchgl %2,%1\n" + " sete %0\n" + : "=q" (ret), "=m" (*ptr) + : "r" (newVal), "m" (*ptr), "a" (oldVal) + : "memory"); + + //above assembly returns 0 in case of failure + if (ret) return 0; + + struct timeval timeout; + timeout.tv_sec=0; + timeout.tv_usec=1000; + os::select(0,0,0,0, &timeout); + __asm__ __volatile__ ( + " lock\n" + " cmpxchgl %2,%1\n" + " sete %0\n" + : "=q" (ret), "=m" (*ptr) + : "r" (newVal), "m" (*ptr), "a" (oldVal) + : "memory"); + //if (ret) return 0; else {printf("DEBUG::CAS Fails %d-\n", ret); return 1; } + if (ret) return 0; else return 1; + + } + /*static int CASB(char *ptr, char oldVal, char newVal) + { + unsigned char ret; + __asm__ __volatile__ ("lock; cmpxchgb %b1, %2" + : "=a" (ret) + : "m" (*(ptr), "q" (newVal), "0" (oldVal) + : "memory"); + printf("Value of ret is %d\n", ret); + //above assembly returns 0 in case of failure + if (ret) return 0; + struct timeval timeout; + timeout.tv_sec=0; + timeout.tv_usec=500; + os::select(0,0,0,0, &timeout); + __asm__ __volatile__ ("lock; cmpxchgb %b1, %2" + : "=a" (ret) + : "m" (*(ptr), "q" (newVal), "0" (oldVal) + : "memory"); + if(!ret) { printf("DEBUG::CAS Fails\n"); return 1; } else return 0; + }*/ }; #endif diff --git a/include/Session.h b/include/Session.h index 9d2f8800..ad6509cb 100644 --- a/include/Session.h +++ b/include/Session.h @@ -110,6 +110,7 @@ class Connection * @return DbRetVal */ DbRetVal rollback(); + DbRetVal getExclusiveLock(); }; @@ -125,6 +126,7 @@ class Session virtual DbRetVal startTransaction(IsolationLevel level)=0; virtual DbRetVal commit()=0; virtual DbRetVal rollback()=0; + virtual DbRetVal getExclusiveLock()=0; //TODO:: virtual int setAutoCommit(bool flag)=0; //TODO::support for save points virtual ~Session() { } diff --git a/include/SessionImpl.h b/include/SessionImpl.h index 053087f8..419da58e 100644 --- a/include/SessionImpl.h +++ b/include/SessionImpl.h @@ -30,11 +30,12 @@ class SessionImpl : public Session char userName[IDENTIFIER_LENGTH]; bool isAuthenticated; bool isDba; + bool isXTaken; public: SessionImpl() { - dbMgr = NULL; uMgr = NULL; + dbMgr = NULL; uMgr = NULL; isXTaken = false; } ~SessionImpl() { @@ -58,6 +59,7 @@ class SessionImpl : public Session DbRetVal readConfigFile(); Database* getSystemDatabase(); + DbRetVal getExclusiveLock(); private: DbRetVal authenticate(const char *username, const char *password); }; diff --git a/include/SqlConnection.h b/include/SqlConnection.h index 01ce3d50..308456b6 100644 --- a/include/SqlConnection.h +++ b/include/SqlConnection.h @@ -82,6 +82,7 @@ class SqlConnection : public AbsSqlConnection Connection& getConnObject(){ return conn; } bool isConnectionOpen() { if (isConnOpen) return true; return false; }; + DbRetVal getExclusiveLock(){ return conn.getExclusiveLock(); } friend class SqlFactory; }; diff --git a/include/SqlFactory.h b/include/SqlFactory.h index a13d1b75..cf0dcfcf 100644 --- a/include/SqlFactory.h +++ b/include/SqlFactory.h @@ -19,7 +19,6 @@ ***************************************************************************/ #ifndef SQLFACTORY_H #define SQLFACTORY_H -#include #include #include @@ -33,6 +32,7 @@ enum SqlApiImplType CSqlNetworkAdapter, CSqlNetworkGateway, CSqlLog, + CSqlDirect, CSqlUnknown }; /** diff --git a/include/SqlLogConnection.h b/include/SqlLogConnection.h index 55bcbe10..9fee482e 100644 --- a/include/SqlLogConnection.h +++ b/include/SqlLogConnection.h @@ -20,26 +20,60 @@ #ifndef SQLLOGCONNECTION_H #define SQLLOGCONNECTION_H #include -#include #include #include #include -class CachedTable{ - public: - char tableName[IDENTIFIER_LENGTH]; -}; - /** * @class SqlLogConnection * */ + +class AbsSqlLogSend +{ + public: + virtual DbRetVal prepare(int txnId, int stmtId, int len, char *stmt)=0; + virtual DbRetVal commit(int len, void *data)=0; + virtual DbRetVal free(int txnId, int stmtId)=0; +}; + +class FileSend : public AbsSqlLogSend +{ + int fdRedoLog; + public: + FileSend(); + DbRetVal prepare(int txnId, int stmtId, int len, char *stmt); + DbRetVal commit(int len, void *data); + DbRetVal free(int txnId, int stmtId); +}; + +enum ExecType +{ + EXECONLY = 0, + SETPARAM +}; + +class ExecLogInfo +{ + public: + ExecLogInfo() : pos(0), len(0) {} + int stmtId; + ExecType type; + int pos; + DataType dataType; + int len; + char value[1]; +}; + class SqlLogConnection : public AbsSqlConnection { Connection dummyConn; //stores all the sql log packets to be shipped to peers List logStore; + + List execLogStore; + int execLogStoreSize; //stores all the prepare log packets to be shipped to peers //as soon as connection is reestablished to cache server @@ -55,15 +89,18 @@ class SqlLogConnection : public AbsSqlConnection //stores client objects in it for peer NetworkTable nwTable; + AbsSqlLogSend *fileSend; - static UniqueID txnUID; - + static GlobalUniqueID txnUID; static List cacheList; + int txnID; DbRetVal populateCachedTableList(); - public: - SqlLogConnection(){innerConn = NULL; syncMode = ASYNC;} - + SqlLogConnection() { + innerConn = NULL; syncMode = ASYNC; + if (Conf::config.useDurability()) { fileSend = new FileSend(); } + execLogStoreSize =0; + } bool isTableCached(char *name); //Note::forced to implement this as it is pure virtual in base class @@ -79,15 +116,35 @@ class SqlLogConnection : public AbsSqlConnection DbRetVal beginTrans (IsolationLevel isoLevel, TransSyncMode mode); + DbRetVal fileLogPrepare(int txnId, int stmtId, int len, char *stmt) + { + return fileSend->prepare(txnId, stmtId, len, stmt); + } + DbRetVal commitLogs(int logSize, void *data) + { + int txnId = getTxnID(); + if (Conf::config.useDurability()) fileSend->commit(logSize, data); + return OK; + } + DbRetVal freeLogs(int stmtId) + { + int txnId = getTxnID(); + if (Conf::config.useDurability()) fileSend->free(txnId, stmtId); + return OK; + } + void addExecLog(ExecLogInfo *info) { execLogStore.append(info); } + void addToExecLogSize(int size){ execLogStoreSize += size; } + int getExecLogStoreSize() { return execLogStoreSize; } + List getExecLogList() { return execLogStore; } DbRetVal addPacket(BasePacket *pkt); - DbRetVal addPreparePacket(PacketPrepare *pkt); DbRetVal removePreparePacket(int stmtid); DbRetVal setSyncMode(TransSyncMode mode); TransSyncMode getSyncMode() { return syncMode; } + int getTxnID() { return txnID; } DbRetVal connectIfNotConnected() { return nwTable.connectIfNotConnected(); } - DbRetVal sendAndReceive(NetworkPacketType type, char *packet, int length); + DbRetVal sendAndReceive(NetworkPacketType type, char *packet, int length); friend class SqlFactory; }; diff --git a/include/SqlLogStatement.h b/include/SqlLogStatement.h index 94f79921..b0fcfcac 100644 --- a/include/SqlLogStatement.h +++ b/include/SqlLogStatement.h @@ -23,10 +23,13 @@ #include #include #include + class SqlLogStatement: public AbsSqlStatement { public: - SqlLogStatement(){innerStmt = NULL; con = NULL; needLog= false; sid=0; } + SqlLogStatement() { + innerStmt = NULL; con = NULL; needLog= false; + sid=0; isNonSelDML = false; } void setConnection(AbsSqlConnection *conn) { @@ -34,6 +37,8 @@ class SqlLogStatement: public AbsSqlStatement con = conn; } AbsSqlStatement *getInnerStatement(){ return innerStmt;} + List getTableNameList() { return innerStmt->getTableNameList(); } + char *getTableName() { return innerStmt->getTableName(); } bool isNonSelectDML(char *stmtstr); DbRetVal prepare(char *stmt); @@ -50,6 +55,7 @@ class SqlLogStatement: public AbsSqlStatement int noOfProjFields(); void* getFieldValuePtr( int pos ); + void* getFieldValuePtr( char *name ); DbRetVal free(); @@ -73,19 +79,20 @@ class SqlLogStatement: public AbsSqlStatement void setBinaryParam(int paramPos, void *value, int length); bool isSelect(); bool isFldNull(int pos); - bool isFldNull(char *name){}; + bool isFldNull(char *name){} void setNull(int pos); int getFldPos(char *name){} List getAllTableNames(DbRetVal &ret); + int getNoOfPagesForTable(char *tbl){} + DbRetVal loadRecords(char *tbl, void *buf){} bool isCached; TableSyncMode mode; + bool isNonSelDML; + static GlobalUniqueID stmtUID; private: - bool needLog; int sid; //statement id - List paramList; - - static UniqueID stmtUID; + //static UniqueID stmtUID; friend class SqlFactory; }; diff --git a/include/SqlStatement.h b/include/SqlStatement.h index bc46f792..874b7a28 100644 --- a/include/SqlStatement.h +++ b/include/SqlStatement.h @@ -241,6 +241,7 @@ class SqlStatement: public AbsSqlStatement void setNull(int pos); int getFldPos(char *name); List getAllTableNames(DbRetVal &ret); + void setLoading(bool flag); private: SqlConnection *sqlCon; Statement *stmt; diff --git a/include/Statement.h b/include/Statement.h index 594137ef..b2f8cbd1 100644 --- a/include/Statement.h +++ b/include/Statement.h @@ -93,6 +93,7 @@ class DmlStatement : public Statement virtual void* getParamValuePtr( int pos )=0; virtual int getFldPos(char *name)=0; virtual ~DmlStatement(){} + void setLoading(bool flag) { table->setLoading(flag); } }; class InsStatement : public DmlStatement diff --git a/include/Table.h b/include/Table.h index 2bb84951..c20793d3 100644 --- a/include/Table.h +++ b/include/Table.h @@ -222,6 +222,7 @@ class Table virtual int getFldPos(char *name)=0; virtual ~Table() { } + virtual void setLoading(bool flag){}; //non virtual functions static void getFieldNameAlone(char *fname, char *name); diff --git a/include/TableImpl.h b/include/TableImpl.h index e4fae828..4270931e 100644 --- a/include/TableImpl.h +++ b/include/TableImpl.h @@ -107,7 +107,7 @@ class TableImpl:public Table TupleIterator *iter; - bool undoFlag; + bool loadFlag; public: FieldList fldList_; @@ -139,7 +139,7 @@ class TableImpl:public Table DbRetVal copyValuesToBindBuffer(void *tuple); void setNullBit(int fldpos); void clearNullBit(int fldpos); - DbRetVal insertIndexNode(Transaction *trans, void *indexPtr, IndexInfo *info, void *tuple); + DbRetVal insertIndexNode(Transaction *trans, void *indexPtr, IndexInfo *info, void *tuple, bool loadFlag=0); DbRetVal updateIndexNode(Transaction *trans, void *indexPtr, IndexInfo *info, void *tuple); DbRetVal deleteIndexNode(Transaction *trans, void *indexPtr, IndexInfo *info, void *tuple); @@ -155,7 +155,7 @@ class TableImpl:public Table pred_ = NULL; useIndex_ = -1; numFlds_ = 0; bindListArray_ = NULL; iNullInfo = 0; cNullInfo = NULL; isIntUsedForNULL = true; iNotNullInfo = 0; cNotNullInfo = NULL; curTuple_ = NULL; - isPlanCreated = false; undoFlag = true;ptrToAuto=NULL;} + isPlanCreated = false; loadFlag = false; ptrToAuto=NULL;} ~TableImpl(); void setDB(Database *db) { db_ = db; } @@ -229,7 +229,7 @@ class TableImpl:public Table DbRetVal lock(bool shared); DbRetVal unlock(); - DbRetVal setUndoLogging(bool flag) { undoFlag = flag; } + DbRetVal setUndoLogging(bool flag) { loadFlag = flag; } void printSQLIndexString(); @@ -245,6 +245,7 @@ class TableImpl:public Table char* getName() { return tblName_; } void setTableInfo(char *name, int tblid, size_t length, int numFld, int numIdx, void *chunk); + void setLoading(bool flag) { loadFlag = flag; } friend class DatabaseManagerImpl; }; diff --git a/include/Util.h b/include/Util.h index ceacc4a3..47119e92 100644 --- a/include/Util.h +++ b/include/Util.h @@ -12,11 +12,20 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * - ***************************************************************************/ + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ #ifndef UTIL_H #define UTIL_H #include #include +enum UniqueIDType { + STMT_ID=0, + TXN_ID +}; + class ListNode { public: @@ -51,7 +60,6 @@ class ListIterator iter = iter ->next; return node->element; } - //index start with one, such that 1->first element in list void* getElement(int index) { @@ -175,6 +183,19 @@ class List } }; +#define MAX_UNIQUE_ID 10 +class GlobalUniqueID +{ + void *ptr; + public: + GlobalUniqueID() { ptr = NULL; } + DbRetVal create(); + DbRetVal open(); + DbRetVal close() { os::shm_detach(ptr); return OK; } + DbRetVal destroy(); + int getID(UniqueIDType type); +}; + class UniqueID { int startID; diff --git a/include/os.h b/include/os.h index 63292da7..0c5cb9d6 100644 --- a/include/os.h +++ b/include/os.h @@ -92,6 +92,7 @@ enum MapMode #define DBAUSER "root" #define DBAPASS "manager" #define LOCK_BUCKET_SIZE 2048 +#define STMT_BUCKET_SIZE 1023 #define MAX_CHUNKS 20 #define PAGE_SIZE Conf::config.getPageSize() #define MAX_MUTEX_PER_THREAD 3 diff --git a/src/gateway/SqlGwStatement.cxx b/src/gateway/SqlGwStatement.cxx index a746e6a1..a076b0b1 100644 --- a/src/gateway/SqlGwStatement.cxx +++ b/src/gateway/SqlGwStatement.cxx @@ -20,6 +20,8 @@ #include #include #include +#include + DbRetVal SqlGwStatement::prepare(char *stmtstr) { DbRetVal rv = OK,ret=OK; @@ -28,31 +30,28 @@ DbRetVal SqlGwStatement::prepare(char *stmtstr) //conn->connectAdapterIfNotConnected(); stmtHdlr = NoHandler; if (innerStmt) rv = innerStmt->prepare(stmtstr); - SqlLogStatement *stmt = (SqlLogStatement*) innerStmt; - bool isAllcachedTable = true; +// SqlLogStatement *stmt = (SqlLogStatement*) innerStmt; + bool isAllcachedTableForJoin = true; int noOfTable = 0; - ListIterator titer =(stmt->getInnerStatement())->getTableNameList().getIterator(); + ListIterator titer =innerStmt->getTableNameList().getIterator(); while (titer.hasElement()) { TableName *t = (TableName*)titer.nextElement(); - ret = CacheTableLoader::isTableCached(t->tblName); - if(ret!=OK) isAllcachedTable=false; + ret = TableConf::config.isTableCached(t->tblName); + if(ret!=OK) isAllcachedTableForJoin=false; noOfTable++; } - if(noOfTable == 1){ isAllcachedTable = true;} - mode =CacheTableLoader::getTableMode((stmt->getInnerStatement())->getTableName()); - if ((rv == OK)&& ((mode!=5 && mode!=6)||innerStmt->isSelect()) && isAllcachedTable) { - if (!stmt->isCached) { - stmtHdlr = CSqlHandler; - return rv; - }else { - if (stmt->mode != TABLE_OSYNC) { - stmtHdlr = CSqlHandler; - return rv; - }else { - stmtHdlr = CSqlAndAdapterHandler; - } - } + if(noOfTable == 1){ isAllcachedTableForJoin = true;} + mode = TableConf::config.getTableMode(innerStmt->getTableName()); +/* if((mode==2||mode==6) && !innerStmt->isSelect()) + { + printError(ErrReadOnlyCache, "Partial Cache Condition Violation for Non select Dml statement\n"); + return ErrReadOnlyCache; + }*/ + if ((rv == OK)&& ((mode!=5 && mode!=6)||innerStmt->isSelect()) && isAllcachedTableForJoin) + { + stmtHdlr = CSqlHandler; + if(mode !=1) return rv; } //TODO::add procedures also in the below checking @@ -61,30 +60,28 @@ DbRetVal SqlGwStatement::prepare(char *stmtstr) !strncasecmp(stmtstr, "SELECT", 6) ==0 && !strncasecmp(stmtstr, "DELETE", 6) ==0) return rv; + printDebug(DM_Gateway, "Handled by csql %d\n", shouldCSqlHandle()); + + if (!shouldCSqlHandle()) stmtHdlr = AdapterHandler; + if ( shouldCSqlHandle() && !innerStmt->isSelect() ) + stmtHdlr = CSqlAndAdapterHandler; + printDebug(DM_Gateway, "Handled %d\n", stmtHdlr); //prepare failed. means table not there in csql->uncached table //or sql statement is complex and csql parser failed - if (adapter) rv = adapter->prepare(stmtstr); - if (rv == OK) { - printDebug(DM_Gateway, "Handled by csql %d\n", shouldCSqlHandle()); - if (!shouldCSqlHandle()) stmtHdlr = AdapterHandler; - else stmtHdlr = CSqlAndAdapterHandler; - printDebug(DM_Gateway, "Handled %d\n", stmtHdlr); - } - else - printError(ErrSysInit, "Both csql and adapter could not prepare\n"); + if (adapter && shouldAdapterHandle()) rv = adapter->prepare(stmtstr); + if (rv != OK) + printError(ErrBadCall, "Both adapter and csql could not prepare"); return rv; } bool SqlGwStatement::shouldAdapterHandle() { - if (stmtHdlr == AdapterHandler || - stmtHdlr == CSqlAndAdapterHandler) return true; + if (stmtHdlr == AdapterHandler || stmtHdlr == CSqlAndAdapterHandler) return true; return false; } bool SqlGwStatement::shouldCSqlHandle() { SqlGwConnection *conn = (SqlGwConnection*) con; - if (stmtHdlr == CSqlHandler || - stmtHdlr == CSqlAndAdapterHandler) return true; + if (stmtHdlr == CSqlHandler || stmtHdlr == CSqlAndAdapterHandler) return true; return false; } bool SqlGwStatement::isSelect() diff --git a/src/network/NetworkTable.cxx b/src/network/NetworkTable.cxx index 802749c2..fa86f2b0 100644 --- a/src/network/NetworkTable.cxx +++ b/src/network/NetworkTable.cxx @@ -23,7 +23,7 @@ DbRetVal NetworkTable::initialize() { DbRetVal rv = OK; - if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK; + //if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK; rv = readNetworkConfig(); return rv; @@ -38,7 +38,7 @@ DbRetVal NetworkTable::readNetworkConfig() int nwid; char hostname[IDENTIFIER_LENGTH]; int port; - fp = fopen(Conf::config.getReplConfigFile(),"r"); + //fp = fopen(Conf::config.getReplConfigFile(),"r"); if( fp == NULL ) { printError(ErrSysInit, "Invalid path/filename for NETWORK_CONFIG_FILE.\n"); return ErrSysInit; diff --git a/src/sql/SqlFactory.cxx b/src/sql/SqlFactory.cxx index 55cc1982..cba86d95 100644 --- a/src/sql/SqlFactory.cxx +++ b/src/sql/SqlFactory.cxx @@ -24,13 +24,24 @@ #include #include +Config Conf::config; + AbsSqlConnection* SqlFactory::createConnection(SqlApiImplType implFlag) { AbsSqlConnection *conn = NULL ; + Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE")); + bool isSqlLogNeeded = Conf::config.useDurability(); + switch(implFlag) { case CSql: - conn = new SqlConnection(); + if (!Conf::config.useDurability()) { + conn = new SqlConnection(); + }else { + AbsSqlConnection *sqlCon = new SqlConnection(); + conn = new SqlLogConnection(); + conn->setInnerConnection(sqlCon); + } break; case CSqlLog: { @@ -50,11 +61,13 @@ AbsSqlConnection* SqlFactory::createConnection(SqlApiImplType implFlag) case CSqlGateway: { AbsSqlConnection *sqlCon = new SqlConnection(); - AbsSqlConnection *sqllogconn = new SqlLogConnection(); - sqllogconn->setInnerConnection(sqlCon); - AbsSqlConnection *adapterCon = new SqlOdbcConnection(); SqlGwConnection *gwconn = new SqlGwConnection(); - gwconn->setInnerConnection(sqllogconn); + if (isSqlLogNeeded) { + AbsSqlConnection *sqllogconn = new SqlLogConnection(); + sqllogconn->setInnerConnection(sqlCon); + gwconn->setInnerConnection(sqllogconn); + } else gwconn->setInnerConnection(sqlCon); + AbsSqlConnection *adapterCon = new SqlOdbcConnection(); gwconn->setAdapter(adapterCon); conn = gwconn; break; @@ -81,20 +94,30 @@ AbsSqlConnection* SqlFactory::createConnection(SqlApiImplType implFlag) conn = sqlNwCon; break; } - + case CSqlDirect: + conn = new SqlConnection(); + break; default: printf("Todo"); break; } return conn; } + AbsSqlStatement* SqlFactory::createStatement(SqlApiImplType implFlag) { AbsSqlStatement *stmt = NULL; + bool isSqlLogNeeded = Conf::config.useDurability(); switch(implFlag) { case CSql: - stmt = new SqlStatement(); + if (!Conf::config.useDurability()) { + stmt = new SqlStatement(); + }else { + AbsSqlStatement *sqlStmt = new SqlStatement(); + stmt = new SqlLogStatement(); + stmt->setInnerStatement(sqlStmt); + } break; case CSqlLog: { @@ -113,12 +136,14 @@ AbsSqlStatement* SqlFactory::createStatement(SqlApiImplType implFlag) } case CSqlGateway: { + SqlGwStatement *gwstmt = new SqlGwStatement(); AbsSqlStatement *sqlstmt = new SqlStatement(); - AbsSqlStatement *sqllogstmt = new SqlLogStatement(); - sqllogstmt->setInnerStatement(sqlstmt); + if (isSqlLogNeeded) { + AbsSqlStatement *sqllogstmt = new SqlLogStatement(); + sqllogstmt->setInnerStatement(sqlstmt); + gwstmt->setInnerStatement(sqllogstmt); + } else gwstmt->setInnerStatement(sqlstmt); AbsSqlStatement *adapterstmt = new SqlOdbcStatement(); - SqlGwStatement *gwstmt = new SqlGwStatement(); - gwstmt->setInnerStatement(sqllogstmt); gwstmt->setAdapter(adapterstmt); stmt = gwstmt; break; @@ -145,7 +170,9 @@ AbsSqlStatement* SqlFactory::createStatement(SqlApiImplType implFlag) stmt = sqlNwStmt; break; } - + case CSqlDirect: + stmt = new SqlStatement(); + break; default: printf("Todo"); break; diff --git a/src/sql/SqlStatement.cxx b/src/sql/SqlStatement.cxx index a7a5830a..f008c0a5 100644 --- a/src/sql/SqlStatement.cxx +++ b/src/sql/SqlStatement.cxx @@ -338,3 +338,14 @@ List SqlStatement::getAllTableNames(DbRetVal &ret) return dbMgr->getAllTableNames(); } +void SqlStatement::setLoading(bool flag) +{ + if (pData.getStmtType() == InsertStatement|| + pData.getStmtType() == UpdateStatement|| + pData.getStmtType() == DeleteStatement) + { + DmlStatement *dmlStmt = (DmlStatement*) stmt; + dmlStmt->setLoading(flag); + } + return; +} diff --git a/src/sql/Statement.h b/src/sql/Statement.h index 594137ef..b2f8cbd1 100644 --- a/src/sql/Statement.h +++ b/src/sql/Statement.h @@ -93,6 +93,7 @@ class DmlStatement : public Statement virtual void* getParamValuePtr( int pos )=0; virtual int getFldPos(char *name)=0; virtual ~DmlStatement(){} + void setLoading(bool flag) { table->setLoading(flag); } }; class InsStatement : public DmlStatement diff --git a/src/sql/dmlyacc.yxx b/src/sql/dmlyacc.yxx index 7584385d..748b3d48 100644 --- a/src/sql/dmlyacc.yxx +++ b/src/sql/dmlyacc.yxx @@ -518,7 +518,7 @@ field_name: ident size_opt: | '(' NUMBER_STRING ')' { - DbRetVal rv = parsedData->setFldLength(atoi($2)+1); + DbRetVal rv = parsedData->setFldLength(atoi($2)); if (rv != OK) { yyerror("Binary field length < 256"); free($2); diff --git a/src/sqllog/Makefile.am b/src/sqllog/Makefile.am index 98935ba8..fe161f69 100644 --- a/src/sqllog/Makefile.am +++ b/src/sqllog/Makefile.am @@ -2,6 +2,6 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) METASOURCES = AUTO lib_LTLIBRARIES = libcsqlsqllog.la libcsqlsqllog_la_LDFLAGS = -avoid-version -module -libcsqlsqllog_la_SOURCES = SqlLogConnection.cxx SqlLogStatement.cxx +libcsqlsqllog_la_SOURCES = SqlLogConnection.cxx SqlLogStatement.cxx FileSend.cxx libcsqlsqllog_a_LIBADD = $(top_builddir)/src/sql/libcsqlsql.la $(top_builddir)/src/network/libcsqlnw.la diff --git a/src/sqllog/Makefile.in b/src/sqllog/Makefile.in index 8e3d764f..9789ba92 100644 --- a/src/sqllog/Makefile.in +++ b/src/sqllog/Makefile.in @@ -51,7 +51,8 @@ am__installdirs = "$(DESTDIR)$(libdir)" libLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(lib_LTLIBRARIES) libcsqlsqllog_la_LIBADD = -am_libcsqlsqllog_la_OBJECTS = SqlLogConnection.lo SqlLogStatement.lo +am_libcsqlsqllog_la_OBJECTS = SqlLogConnection.lo SqlLogStatement.lo \ + FileSend.lo libcsqlsqllog_la_OBJECTS = $(am_libcsqlsqllog_la_OBJECTS) libcsqlsqllog_la_LINK = $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CXXLD) $(AM_CXXFLAGS) \ @@ -188,7 +189,7 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) METASOURCES = AUTO lib_LTLIBRARIES = libcsqlsqllog.la libcsqlsqllog_la_LDFLAGS = -avoid-version -module -libcsqlsqllog_la_SOURCES = SqlLogConnection.cxx SqlLogStatement.cxx +libcsqlsqllog_la_SOURCES = SqlLogConnection.cxx SqlLogStatement.cxx FileSend.cxx libcsqlsqllog_a_LIBADD = $(top_builddir)/src/sql/libcsqlsql.la $(top_builddir)/src/network/libcsqlnw.la all: all-am @@ -259,6 +260,7 @@ mostlyclean-compile: distclean-compile: -rm -f *.tab.c +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FileSend.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SqlLogConnection.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SqlLogStatement.Plo@am__quote@ diff --git a/src/sqllog/SqlLogConnection.cxx b/src/sqllog/SqlLogConnection.cxx index 892f326c..ff7d27b0 100644 --- a/src/sqllog/SqlLogConnection.cxx +++ b/src/sqllog/SqlLogConnection.cxx @@ -17,11 +17,13 @@ * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ +#include #include +#include #include #include -UniqueID SqlLogConnection::txnUID; +GlobalUniqueID SqlLogConnection::txnUID; List SqlLogConnection::cacheList; DbRetVal SqlLogConnection::addPacket(BasePacket* pkt) @@ -62,12 +64,13 @@ DbRetVal SqlLogConnection::connect (char *user, char *pass) //printf("LOG: connect\n"); if (innerConn) rv = innerConn->connect(user,pass); if (rv != OK) return rv; - if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK; - if (rv !=OK) { innerConn->disconnect(); return rv; } + if (!Conf::config.useDurability()) return OK; //populate cacheList if not populated by another thread in same process //TODO::cacheList requires mutex guard if (0 == cacheList.size()) rv = populateCachedTableList(); + rv = SqlLogStatement::stmtUID.open(); + rv = SqlLogConnection::txnUID.open(); return rv; } @@ -77,7 +80,8 @@ DbRetVal SqlLogConnection::disconnect() //printf("LOG: disconnect\n"); if (innerConn) rv =innerConn->disconnect(); if (rv != OK) return rv; - if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK; + if (!Conf::config.useDurability()) return OK; + SqlLogStatement::stmtUID.close(); return rv; } DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode) @@ -85,8 +89,8 @@ DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mod DbRetVal rv = OK; if (innerConn) rv = innerConn->beginTrans(isoLevel); if (rv != OK) return rv; - syncMode = mode; + txnID = SqlLogConnection::txnUID.getID(TXN_ID); return OK; } DbRetVal SqlLogConnection::commit() @@ -94,12 +98,10 @@ DbRetVal SqlLogConnection::commit() DbRetVal rv = OK; //printf("LOG: commit %d\n", syncMode); //if (innerConn) rv = innerConn->commit(); - if (syncMode == OSYNC) { - if (innerConn) rv = innerConn->commit(); - return rv; - } - if (logStore.size() == 0) - { + if (innerConn) rv = innerConn->commit(); + if (!Conf::config.useDurability()) return OK; + + if (execLogStore.size() == 0) { //This means no execution for any non select statements in //this transaction //rollback so that subsequent beginTrans will not report that @@ -111,7 +113,7 @@ DbRetVal SqlLogConnection::commit() } return rv; } - if (syncMode == ASYNC) { + //TODO::put the packet in global log store /* PacketCommit *pkt = new PacketCommit(); @@ -136,17 +138,51 @@ DbRetVal SqlLogConnection::commit() } //TODO::remove all sql logs nodes and the list which contains ptr to it */ + int txnId = getTxnID(); + // len to be sent should also contain txnId + int len = 2 * sizeof(int) + os::align(getExecLogStoreSize()); + printDebug(DM_SqlLog, "commit: size of logstore: %d", len); + int bufferSize = sizeof(long) + len; + printDebug(DM_SqlLog, "commit: size of buffer: %d", bufferSize); + void *buffer = malloc(bufferSize); + printDebug(DM_SqlLog, "commit: buffer address: %x", buffer); + char *ptr = (char *)buffer + sizeof(long); // long type is for msgtype + char *data = ptr; + printDebug(DM_SqlLog, "commit: data address: %x", data); + *(int *) ptr = len; + ptr += sizeof(int); + *(int *) ptr = txnId; + ptr += sizeof(int); + ListIterator logStoreIter = execLogStore.getIterator(); + ExecLogInfo *elInfo = NULL; + while (logStoreIter.hasElement()) { + elInfo = (ExecLogInfo *)logStoreIter.nextElement(); + printDebug(DM_SqlLog, "commit: elem from logstore:: %x", elInfo); + *(int *) ptr = elInfo->stmtId; + printDebug(DM_SqlLog, "commit: stmtId to marshall: %d", elInfo->stmtId); + ptr += sizeof(int); + *(int *) ptr = (int) elInfo->type; + printDebug(DM_SqlLog, "commit: ExType to marshall: %d", elInfo->type); + //printf("PRABA::type is %d\n" , *(int *) ptr); + ptr += sizeof(int); + if (elInfo->type == SETPARAM) { + *(int *) ptr = elInfo->pos; + printDebug(DM_SqlLog, "commit: PrmPos to marshall: %d", elInfo->pos); + ptr += sizeof(int); + *(int *) ptr = (int) elInfo->dataType; + printDebug(DM_SqlLog, "commit: DtType to marshall: %d", elInfo->dataType); + ptr += sizeof(int); + *(int *) ptr = elInfo->len; + printDebug(DM_SqlLog, "commit: length to marshall: %d", elInfo->len); + ptr += sizeof(int); + memcpy(ptr, &elInfo->value, elInfo->len); + ptr += elInfo->len; + } } - - ListIterator logStoreIter = logStore.getIterator(); - PacketExecute *execPkt = NULL; - while (logStoreIter.hasElement()) - { - execPkt = (PacketExecute*)logStoreIter.nextElement(); - delete execPkt; - } - logStore.reset(); - if (innerConn) rv = innerConn->commit(); + commitLogs(len, data); + execLogStore.reset(); + execLogStoreSize =0; + //if (innerConn) rv = innerConn->commit(); return rv; } DbRetVal SqlLogConnection::rollback() @@ -163,8 +199,12 @@ DbRetVal SqlLogConnection::rollback() delete execPkt; } logStore.reset(); + + execLogStore.reset(); + execLogStoreSize =0; return rv; } + DbRetVal SqlLogConnection::populateCachedTableList() { FILE *fp = NULL; @@ -189,6 +229,7 @@ DbRetVal SqlLogConnection::populateCachedTableList() fclose(fp); return OK; } + bool SqlLogConnection::isTableCached(char *tblName) { if (NULL == tblName) @@ -207,38 +248,3 @@ bool SqlLogConnection::isTableCached(char *tblName) } return false; } - - -DbRetVal SqlLogConnection::sendAndReceive(NetworkPacketType type, char *packet, int length) -{ - return OK; - NetworkClient* nwClient = nwTable.getNetworkClient(); - DbRetVal rv = OK, retRV=OK; - printf("isCacheClient %d\n", nwClient->isCacheClient()); - printf("isConnected %d\n", nwClient->isConnected()); -/* - if (!nwClient->isConnected()) { - if (nwClient->isCacheClient()) return ErrOS; - //TODO::put this packet in send buffer. - return OK; - } -*/ - rv = nwClient->send(type, packet, length); - if (rv != OK) - { - printf("Unable to send pkt to peer with nwid %d\n", nwClient->getNetworkID()); - //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode - nwClient->setConnectFlag(false); - if (nwClient->isCacheClient()) return ErrOS; else return OK; - } - rv = nwClient->receive(); - if (rv != OK) - { - printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient->getNetworkID()); - nwClient->setConnectFlag(false); - if (nwClient->isCacheClient()) return ErrOS; - //TODO:: put this packet to resend buffer so that it can be sent later - //and call continue; - } - return OK; -} diff --git a/src/sqllog/SqlLogStatement.cxx b/src/sqllog/SqlLogStatement.cxx index 0ca72778..a883a877 100644 --- a/src/sqllog/SqlLogStatement.cxx +++ b/src/sqllog/SqlLogStatement.cxx @@ -19,7 +19,7 @@ ***************************************************************************/ #include -UniqueID SqlLogStatement::stmtUID; +GlobalUniqueID SqlLogStatement::stmtUID; bool SqlLogStatement::isNonSelectDML(char *stmtstr) { @@ -33,56 +33,27 @@ bool SqlLogStatement::isNonSelectDML(char *stmtstr) DbRetVal SqlLogStatement::prepare(char *stmtstr) { DbRetVal rv = OK; + SqlLogConnection *conn = (SqlLogConnection *)con; + int txnId=conn->getTxnID(); if (innerStmt) rv = innerStmt->prepare(stmtstr); - if (rv != OK) return rv; - - isCached = false; - //check if it is INSERT UPDATE DELETE statement - //if not, then no need to generate logs - if (!isNonSelectDML(stmtstr)) { return rv;} - if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK; - SqlLogConnection* logConn = (SqlLogConnection*)con; - if (!logConn->isTableCached(innerStmt->getTableName())) return OK; - isCached = true; - mode = TABLE_OSYNC;//TEMP::support only OSYNC - - sid = SqlLogStatement::stmtUID.getID(); - //TODO::if connected to peer then only send this packet - PacketPrepare *pkt = new PacketPrepare(); - pkt->stmtID= sid; - pkt->syncMode = ASYNC; - pkt->stmtString = stmtstr; - pkt->noParams = innerStmt->noOfParamFields(); - FieldInfo *info = new FieldInfo(); - if (pkt->noParams > 0) { - pkt->type = new int [pkt->noParams]; - pkt->length = new int [pkt->noParams]; - BindSqlField *bindField = NULL; - for (int i = 0; i < innerStmt->noOfParamFields(); i++) - { - innerStmt->getParamFldInfo(i+1, info); - bindField = new BindSqlField(); - bindField->type = info->type; - bindField->length = info->length; - pkt->type[i] = info->type; - pkt->length[i] = info->length; - bindField->value = AllDataType::alloc(info->type, info->length); - paramList.append(bindField); - } - } - pkt->marshall(); - /*logConn->connectIfNotConnected(); - //printf("Sending PREPARE packet of size %d\n", pkt->getBufferSize()); - rv = logConn->sendAndReceive(NW_PKT_PREPARE, pkt->getMarshalledBuffer(), pkt->getBufferSize()); - printf("RV from PREPARE SQLLOG %d\n", rv); - if (rv != OK) { - logConn->addPreparePacket(pkt); - delete info; - return OK; - }*/ - logConn->addPreparePacket(pkt); - delete info; - return rv; + if (rv != OK) { isNonSelDML = false; return rv; } + if (Conf::config.useDurability()) { + if (strlen(stmtstr) > 6 && ((strncasecmp(stmtstr,"CREATE", 6) == 0) || + (strncasecmp(stmtstr,"DROP", 4) == 0))) { + sid = SqlLogStatement::stmtUID.getID(STMT_ID); + printDebug(DM_SqlLog, "CREATE|DROP: stmt id = %d\n", sid); + conn->fileLogPrepare(0, sid, strlen(stmtstr)+1, stmtstr); + isNonSelDML = false; + return OK; + } + } + if (!isNonSelectDML(stmtstr)) { isNonSelDML = false; return rv;} + isNonSelDML = true; + sid = SqlLogStatement::stmtUID.getID(STMT_ID); + printDebug(DM_SqlLog, "stmt id = %d\n", sid); + if (Conf::config.useDurability()) + conn->fileLogPrepare(txnId, sid, strlen(stmtstr) + 1, stmtstr); + return OK; } bool SqlLogStatement::isSelect() @@ -93,30 +64,22 @@ bool SqlLogStatement::isSelect() DbRetVal SqlLogStatement::execute(int &rowsAffected) { - SqlLogConnection* logConn = (SqlLogConnection*)con; - DbRetVal rv = OK; if (innerStmt) rv = innerStmt->execute(rowsAffected); - if (rv != OK) return rv; - - //no need to generate log if it does not actually modify the table - if (rowsAffected == 0 ) return OK; - if (!isCached) return OK; - if (logConn->getSyncMode() == OSYNC) return OK; - - //printf("LOG:execute\n"); - PacketExecute *pkt = new PacketExecute(); - pkt->stmtID= sid; - pkt->noParams = innerStmt->noOfParamFields(); - pkt->setParams(paramList); - pkt->marshall(); - int *p = (int*)pkt->getMarshalledBuffer(); - //printf("After EXEC packet marshall %d %d size %d\n", *p, *(p+1), - // pkt->getBufferSize()); - // printf("EXEC pkt ptr is %x\n", pkt); - logConn->addPacket(pkt); - return rv; + if (!isNonSelDML) return rv; + if ( rv != OK) return rv; + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (sizeof(ExecLogInfo)); + elInfo->stmtId = sid; + printDebug(DM_SqlLog, "Execute: stmtID: %d", elInfo->stmtId); + elInfo->type = EXECONLY; + printDebug(DM_SqlLog, "Execute: ExType: %d", elInfo->type); + logConn->addExecLog(elInfo); + printDebug(DM_SqlLog, "Execute: elem address: %x", elInfo); + int size = 2 * sizeof(int); + logConn->addToExecLogSize(size); + printDebug(DM_SqlLog, "Execute: log length: %d", size); + return OK; } DbRetVal SqlLogStatement::bindParam(int pos, void* value) @@ -200,163 +163,236 @@ DbRetVal SqlLogStatement::free() DbRetVal rv = OK; if (innerStmt) rv = innerStmt->free(); //TODO::DEBUG::always innsrStmt->free() returns error - //if (rv != OK) return rv; - SqlLogConnection* logConn = (SqlLogConnection*)con; - if (sid != 0 ) logConn->removePreparePacket(sid); + if (rv != OK) return rv; + if (isNonSelDML) { + SqlLogConnection* logConn = (SqlLogConnection*)con; + if (sid != 0 ) logConn->freeLogs(sid); + } if (!isCached) return rv; - - //TODO - //If statement is freed before the txn commits, it will lead to issue - //incase of async mode. when the other site goes down and comes back, - //it will not have the cached SqlStatement objects, so in that case - //we need to send all the prepare packets again, so we should not free - //the statement straight away in client side as well as in server side - - - - /*PacketFree *pkt = new PacketFree(); - pkt->stmtID= sid; - pkt->marshall(); - SqlLogConnection* logConn = (SqlLogConnection*)con; - logConn->sendAndReceiveAllPeers(NW_PKT_FREE, pkt->getMarshalledBuffer(), pkt->getBufferSize()); - delete pkt;*/ isCached= false; sid = 0; - paramList.reset(); + isNonSelDML = false; return OK; } void SqlLogStatement::setShortParam(int paramPos, short value) { if (innerStmt) innerStmt->setShortParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeShort) return; - *(short*)(bindField->value) = value; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(short); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + printDebug(DM_SqlLog, "setShort: stmtID: %d", elInfo->stmtId); + elInfo->type = SETPARAM; + printDebug(DM_SqlLog, "setShort: ExecTp: %d", elInfo->type); + elInfo->pos = paramPos; + printDebug(DM_SqlLog, "setShort: PrmPos: %d", elInfo->pos); + elInfo->dataType = typeShort; + printDebug(DM_SqlLog, "setShort: DtType: %d", elInfo->dataType); + elInfo->len = sizeof (short); + printDebug(DM_SqlLog, "setShort: Length: %d", elInfo->len); + *(short *)&elInfo->value = value; + printDebug(DM_SqlLog, "setShort: Value : %d", *(short *)&elInfo->value); + logConn->addExecLog(elInfo); + printDebug(DM_SqlLog, "appended elem addr: %x", elInfo); + logConn->addToExecLogSize(buffer); + printDebug(DM_SqlLog, "appended bufsize: %d", buffer); return; } void SqlLogStatement::setIntParam(int paramPos, int value) { if (innerStmt) innerStmt->setIntParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeInt) return; - *(int*)(bindField->value) = value; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(int); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + printDebug(DM_SqlLog, "setInt: stmtID: %d", elInfo->stmtId); + elInfo->type = SETPARAM; + printDebug(DM_SqlLog, "setInt: ExecTp: %d", elInfo->type); + elInfo->pos = paramPos; + printDebug(DM_SqlLog, "setInt: PrmPos: %d", elInfo->pos); + elInfo->dataType = typeInt; + printDebug(DM_SqlLog, "setInt: DtType: %d", elInfo->dataType); + elInfo->len = sizeof(int); + printDebug(DM_SqlLog, "setInt: Length: %d", elInfo->len); + *(int *)&elInfo->value = value; + printDebug(DM_SqlLog, "setInt: Value : %d", *(int *)&elInfo->value); + logConn->addExecLog(elInfo); + printDebug(DM_SqlLog, "appended elem addr: %x", elInfo); + logConn->addToExecLogSize(buffer); + printDebug(DM_SqlLog, "appended bufsize: %d", buffer); return; - } void SqlLogStatement::setLongParam(int paramPos, long value) { if (innerStmt) innerStmt->setLongParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeLong) return; - *(long*)(bindField->value) = value; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(long); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeLong; + elInfo->len = sizeof(long); + *(long *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); return; - } void SqlLogStatement::setLongLongParam(int paramPos, long long value) { if (innerStmt) innerStmt->setLongLongParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeLongLong) return; - *(long long*)(bindField->value) = value; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(long long); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeLongLong; + elInfo->len = sizeof (long long); + *(long long *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); return; } void SqlLogStatement::setByteIntParam(int paramPos, ByteInt value) { if (innerStmt) innerStmt->setByteIntParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeByteInt) return; - *(char*)(bindField->value) = value; - + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(ByteInt); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeByteInt; + elInfo->len = sizeof(ByteInt); + *(ByteInt *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setFloatParam(int paramPos, float value) { if (innerStmt) innerStmt->setFloatParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeFloat) return; - *(float*)(bindField->value) = value; - + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(float); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeFloat; + elInfo->len = sizeof(float); + *(float *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setDoubleParam(int paramPos, double value) { if (innerStmt) innerStmt->setDoubleParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeDouble) return; - *(double*)(bindField->value) = value; - + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(double); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeDouble; + elInfo->len = sizeof(double); + *(double *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setStringParam(int paramPos, char *value) { if (innerStmt) innerStmt->setStringParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeString) return; - char *dest = (char*)bindField->value; - strncpy(dest, value, bindField->length); - dest[ bindField->length - 1] ='\0'; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + os::align(strlen(value) + 1); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeString; + elInfo->len = os::align(strlen(value) + 1); + strcpy((char *) &elInfo->value, value); + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); return; } void SqlLogStatement::setDateParam(int paramPos, Date value) { if (innerStmt) innerStmt->setDateParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeDate) return; - *(Date*)(bindField->value) = value; - + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(Date); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeDate; + elInfo->len = sizeof(Date); + *(Date *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setTimeParam(int paramPos, Time value) { if (innerStmt) innerStmt->setTimeParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos); - if (bindField->type != typeTime) return; - *(Time*)(bindField->value) = value; - + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(Time); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeTime; + elInfo->len = sizeof (Time); + *(Time *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setTimeStampParam(int paramPos, TimeStamp value) { if (innerStmt) innerStmt->setTimeStampParam(paramPos,value); + if (!isNonSelDML) return; SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return ; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*) paramList.get(paramPos); - if (bindField->type != typeTimeStamp) return; - *(TimeStamp*)(bindField->value) = value; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + sizeof(TimeStamp); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeTimeStamp; + elInfo->len = sizeof(TimeStamp); + *(TimeStamp *)&elInfo->value = value; + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } void SqlLogStatement::setBinaryParam(int paramPos, void *value, int length) { if (innerStmt) innerStmt->setBinaryParam(paramPos,value, length); - SqlLogConnection* logConn = (SqlLogConnection*)con; - if (logConn->getSyncMode() == OSYNC) return; - if (!isCached) return; - BindSqlField *bindField = (BindSqlField*) paramList.get(paramPos); - if (bindField->type != typeBinary) return; - memcpy(bindField->value, value, 2 * bindField->length); + if (!isNonSelDML) return; + SqlLogConnection* logConn = (SqlLogConnection*)con; + int buffer = sizeof(ExecLogInfo) - sizeof(void *) + os::align(length); + ExecLogInfo *elInfo = (ExecLogInfo *) malloc (buffer); + elInfo->stmtId = sid; + elInfo->type = SETPARAM; + elInfo->pos = paramPos; + elInfo->dataType = typeBinary; + elInfo->len = os::align(length); + memcpy(&elInfo->value, value, length); + logConn->addExecLog(elInfo); + logConn->addToExecLogSize(buffer); + return; } bool SqlLogStatement::isFldNull(int pos) { @@ -371,3 +407,4 @@ List SqlLogStatement::getAllTableNames(DbRetVal &ret) { if(innerStmt) return innerStmt->getAllTableNames(ret); } + diff --git a/src/storage/Config.cxx b/src/storage/Config.cxx index 06a2f4a1..7ca4d32b 100644 --- a/src/storage/Config.cxx +++ b/src/storage/Config.cxx @@ -72,19 +72,18 @@ int Config::storeKeyVal(char *key, char *value) { cVal.isCache = os::atobool(value); } else if(strcasestr(key,"CACHE_ID")!=NULL) { cVal.cacheId = atoi(value);} - - else if (strcasestr(key, "REPLICATION") != NULL) - { cVal.isReplication = os::atobool(value); } + else if (strcasestr(key, "DURABILITY") != NULL) + { cVal.isDurable = os::atobool(value); } else if (strcasestr(key, "CSQL_SQL_SERVER") != NULL) { cVal.isCsqlSqlServer = os::atobool(value); } else if (strcasestr(key, "PORT") != NULL) { cVal.port = atoi(value); } - else if (strcasestr(key, "NETWORK_CONFIG_FILE") != NULL) - { strcpy(cVal.replConfigFile , value); } else if (strcasestr(key, "MAX_LOG_STORE_SIZE") != NULL) { cVal.logStoreSize = atol(value); } else if (strcasestr(key, "MY_NETWORK_ID") != NULL) { cVal.networkID = atoi(value); } + else if (strcasestr(key, "ID_SHM_KEY") != NULL) + { cVal.shmKeyForId = atoi(value); } else if (strcasestr(key, "CACHE_NETWORK_ID") != NULL) { cVal.cacheNetworkID = atoi(value); } else if (strcasestr(key, "NETWORK_RESPONSE_TIMEOUT") != NULL) @@ -198,11 +197,6 @@ int Config::validateValues() printError(ErrBadArg, "LOCK_TIMEOUT_RETRY should be >= 0 and <= 100"); return 1; } - if (cVal.isCache && cVal.isReplication) { - printError(ErrBadArg, "Either caching or replication option should be set." - " Both options are not supported together"); - return 1; - } if (cVal.isCache) { if (0 == strcmp(cVal.dsn,"")) { @@ -210,39 +204,6 @@ int Config::validateValues() return 1; } } - if (cVal.isReplication || cVal.isCache) { - if (0 == strcmp(cVal.replConfigFile,"")) - { - //TODO::check whether file exists - printError(ErrBadArg, "NETWORK_CONFIG_FILE is set to NULL"); - return 1; - } - if (0 == strcmp(cVal.tableConfigFile,"")) - { - //TODO::check whether file exists - printError(ErrBadArg, "TABLE_CONFIG_FILE is set to NULL"); - return 1; - } - /*FILE *fp = fopen(cVal.replConfigFile,"r"); - if( fp == NULL ) { - printError(ErrSysInit, "Invalid path/filename for NETWORK_CONFIG_FILE.\n"); - return 1; - } - int count =0; - int nwid, port; - char hostname[IDENTIFIER_LENGTH]; - char nwmode; - - while(!feof(fp)) { - fscanf(fp, "%d:%d:%s\n", &nwid, &port, hostname); - count++; - } - if (count >2) { - printError(ErrSysInit, "NETWORK_CONFIG_FILE has more than 2 entries\n"); - return 1; - }*/ - - } /*if (cVal.isCache) { @@ -365,15 +326,16 @@ void Config::print() printf(" getLockSecs %d\n", getLockSecs()); printf(" getLockUSecs %d\n", getLockUSecs()); printf(" getLockRetries %d\n", getLockRetries()); + printf(" isDurable %d\n", useDurability()); printf(" useCache %d\n", useCache()); + printf(" getCacheID %d\n", getCacheID()); printf(" getDSN %s\n", getDSN()); printf(" getTableConfigFile %s\n", getTableConfigFile()); printf(" isTwoWayCache %d\n", useTwoWayCache()); printf(" getCacheWaitSecs %d\n", getCacheWaitSecs()); printf(" useCsqlSqlServer %d\n", useCsqlSqlServer()); printf(" getPort %d\n", getPort()); - //printf(" useReplication %d\n", useReplication()); - //printf(" getReplConfigFile %s\n", getReplConfigFile()); + printf(" getShmIDKey %d\n", getShmIDKey()); //printf(" getMaxLogStoreSize %ld\n", getMaxLogStoreSize()); //printf(" getNetworkID %d\n", getNetworkID()); //printf(" getCacheNetworkID %d\n", getCacheNetworkID()); diff --git a/src/storage/Connection.cxx b/src/storage/Connection.cxx index b5ef9954..ce3eaf2e 100644 --- a/src/storage/Connection.cxx +++ b/src/storage/Connection.cxx @@ -95,3 +95,9 @@ DbRetVal Connection::rollback() if (session == NULL) return ErrNoConnection; return session->rollback(); } + +DbRetVal Connection::getExclusiveLock() +{ + if (session == NULL) return ErrNoConnection; + return session->getExclusiveLock(); +} diff --git a/src/storage/HashIndex.cxx b/src/storage/HashIndex.cxx index 09dd7274..4ec441e8 100644 --- a/src/storage/HashIndex.cxx +++ b/src/storage/HashIndex.cxx @@ -107,7 +107,7 @@ unsigned int HashIndex::computeHashBucket(DataType type, void *key, int noOfBuck return -1; } -DbRetVal HashIndex::insert(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool undoFlag) +DbRetVal HashIndex::insert(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool loadFlag) { HashIndexInfo *info = (HashIndexInfo*) indInfo; CINDEX *iptr = (CINDEX*)indexPtr; @@ -217,7 +217,7 @@ DbRetVal HashIndex::insert(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde } } - if (undoFlag) { + if (!loadFlag) { rc = tr->appendLogicalHashUndoLog(tbl->sysDB_, InsertHashIndexOperation, hInfo, sizeof(HashUndoLogInfo)); if (rc !=OK) { @@ -234,7 +234,7 @@ DbRetVal HashIndex::insert(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde } -DbRetVal HashIndex::remove(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool undoFlag) +DbRetVal HashIndex::remove(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool loadFlag) { CINDEX *iptr = (CINDEX*)indexPtr; @@ -297,7 +297,7 @@ DbRetVal HashIndex::remove(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde bucket1->bucketList_ = list.getBucketListHead(); rc = OK; } - if (undoFlag) { + if (!loadFlag) { rc =tr->appendLogicalHashUndoLog(tbl->sysDB_, DeleteHashIndexOperation, hInfo, sizeof(HashUndoLogInfo)); if (rc !=OK) { @@ -312,7 +312,7 @@ DbRetVal HashIndex::remove(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde return rc; } -DbRetVal HashIndex::update(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool undoFlag) +DbRetVal HashIndex::update(TableImpl *tbl, Transaction *tr, void *indexPtr, IndexInfo *indInfo, void *tuple, bool loadFlag) { CINDEX *iptr = (CINDEX*)indexPtr; @@ -473,7 +473,7 @@ DbRetVal HashIndex::update(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde return ErrSysInternal; } DbRetVal rc = OK; - if (undoFlag) { + if (!loadFlag) { rc = tr->appendLogicalHashUndoLog(tbl->sysDB_, DeleteHashIndexOperation, hInfo1, sizeof(HashUndoLogInfo)); if (rc !=OK) { @@ -523,7 +523,7 @@ DbRetVal HashIndex::update(TableImpl *tbl, Transaction *tr, void *indexPtr, Inde list2.insert((Chunk*)iptr->hashNodeChunk_, tbl->db_, keyPtr, tuple); bucket1->bucketList_ = list2.getBucketListHead(); } - if (undoFlag) { + if (!loadFlag) { rc = tr->appendLogicalHashUndoLog(tbl->sysDB_, InsertHashIndexOperation, hInfo2, sizeof(HashUndoLogInfo)); if (rc !=OK) diff --git a/src/storage/Makefile.am b/src/storage/Makefile.am index 27980736..67a547c9 100644 --- a/src/storage/Makefile.am +++ b/src/storage/Makefile.am @@ -2,7 +2,7 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) METASOURCES = AUTO lib_LTLIBRARIES = libcsql.la libcsql_la_LDFLAGS = -avoid-version -module -libcsql_la_SOURCES = BucketIter.cxx TreeIter.cxx BucketList.cxx CatalogTables.cxx Chunk.cxx \ +libcsql_la_SOURCES = BucketIter.cxx TreeIter.cxx BucketList.cxx CatalogTables.cxx Chunk.cxx Util.cxx TableConfig.cxx\ ChunkIterator.cxx Condition.cxx Connection.cxx Database.cxx DatabaseManagerImpl.cxx DataType.cxx \ Debug.cxx FieldList.cxx Index.cxx LockListIter.cxx LockManager.cxx Logger.cxx Mutex.cxx os.cxx \ PageInfo.cxx PredicateImpl.cxx SessionImpl.cxx TableDef.cxx TableImpl.cxx Transaction.cxx \ diff --git a/src/storage/Makefile.in b/src/storage/Makefile.in index cfedce27..88621724 100644 --- a/src/storage/Makefile.in +++ b/src/storage/Makefile.in @@ -52,14 +52,15 @@ libLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(lib_LTLIBRARIES) libcsql_la_LIBADD = am_libcsql_la_OBJECTS = BucketIter.lo TreeIter.lo BucketList.lo \ - CatalogTables.lo Chunk.lo ChunkIterator.lo Condition.lo \ - Connection.lo Database.lo DatabaseManagerImpl.lo DataType.lo \ - Debug.lo FieldList.lo Index.lo LockListIter.lo LockManager.lo \ - Logger.lo Mutex.lo os.lo PageInfo.lo PredicateImpl.lo \ - SessionImpl.lo TableDef.lo TableImpl.lo Transaction.lo \ - TransactionManager.lo TupleIterator.lo UserManagerImpl.lo \ - HashIndex.lo TreeIndex.lo Config.lo Process.lo AggTableImpl.lo \ - JoinTableImpl.lo Expression.lo + CatalogTables.lo Chunk.lo Util.lo TableConfig.lo \ + ChunkIterator.lo Condition.lo Connection.lo Database.lo \ + DatabaseManagerImpl.lo DataType.lo Debug.lo FieldList.lo \ + Index.lo LockListIter.lo LockManager.lo Logger.lo Mutex.lo \ + os.lo PageInfo.lo PredicateImpl.lo SessionImpl.lo TableDef.lo \ + TableImpl.lo Transaction.lo TransactionManager.lo \ + TupleIterator.lo UserManagerImpl.lo HashIndex.lo TreeIndex.lo \ + Config.lo Process.lo AggTableImpl.lo JoinTableImpl.lo \ + Expression.lo libcsql_la_OBJECTS = $(am_libcsql_la_OBJECTS) libcsql_la_LINK = $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CXXLD) $(AM_CXXFLAGS) \ @@ -196,7 +197,7 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) METASOURCES = AUTO lib_LTLIBRARIES = libcsql.la libcsql_la_LDFLAGS = -avoid-version -module -libcsql_la_SOURCES = BucketIter.cxx TreeIter.cxx BucketList.cxx CatalogTables.cxx Chunk.cxx \ +libcsql_la_SOURCES = BucketIter.cxx TreeIter.cxx BucketList.cxx CatalogTables.cxx Chunk.cxx Util.cxx TableConfig.cxx\ ChunkIterator.cxx Condition.cxx Connection.cxx Database.cxx DatabaseManagerImpl.cxx DataType.cxx \ Debug.cxx FieldList.cxx Index.cxx LockListIter.cxx LockManager.cxx Logger.cxx Mutex.cxx os.cxx \ PageInfo.cxx PredicateImpl.cxx SessionImpl.cxx TableDef.cxx TableImpl.cxx Transaction.cxx \ @@ -297,6 +298,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PredicateImpl.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Process.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SessionImpl.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TableConfig.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TableDef.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TableImpl.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Transaction.Plo@am__quote@ @@ -305,6 +307,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TreeIter.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TupleIterator.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/UserManagerImpl.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Util.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/os.Plo@am__quote@ .cxx.o: diff --git a/src/storage/SessionImpl.cxx b/src/storage/SessionImpl.cxx index fe3d8d0d..9c95867b 100644 --- a/src/storage/SessionImpl.cxx +++ b/src/storage/SessionImpl.cxx @@ -142,6 +142,7 @@ DbRetVal SessionImpl::open(const char *username, const char *password) } ((DatabaseManagerImpl*)dbMgr)->setProcSlot(); //ProcessManager::systemDatabase = dbMgr->sysDb(); + isXTaken = false; return OK; } DbRetVal SessionImpl::authenticate(const char *username, const char *password) @@ -163,9 +164,20 @@ DbRetVal SessionImpl::authenticate(const char *username, const char *password) } return OK; } +DbRetVal SessionImpl::getExclusiveLock() +{ + if (dbMgr->isAnyOneRegistered()) { + printError(ErrLockTimeOut, "Unable to acquire exclusive lock. somebody is connected"); + return ErrLockTimeOut; + } + DbRetVal rv = dbMgr->sysDb()->getProcessTableMutex(true); + if (OK == rv) isXTaken = true; + return rv; +} DbRetVal SessionImpl::close() { DbRetVal rv = OK; + if (isXTaken && dbMgr ) dbMgr->sysDb()->releaseProcessTableMutex(true); if (dbMgr) { rv = dbMgr->closeDatabase(); diff --git a/src/storage/TableImpl.cxx b/src/storage/TableImpl.cxx index de1f09fa..e365dc00 100644 --- a/src/storage/TableImpl.cxx +++ b/src/storage/TableImpl.cxx @@ -389,47 +389,46 @@ void* TableImpl::fetchNoBind() return NULL; } DbRetVal lockRet = OK; - if ((*trans)->isoLevel_ == READ_COMMITTED) - { - //if iso level is read committed, operation duration lock is sufficent - //so release it here itself. - int tries = 5; - struct timeval timeout; - timeout.tv_sec = Conf::config.getMutexSecs(); - timeout.tv_usec = Conf::config.getMutexUSecs(); + if(!loadFlag) { + if ((*trans)->isoLevel_ == READ_COMMITTED) + { + //if iso level is read committed, operation duration lock is sufficent so release it here itself. + int tries = 5; + struct timeval timeout; + timeout.tv_sec = Conf::config.getMutexSecs(); + timeout.tv_usec = Conf::config.getMutexUSecs(); - bool status = false; - while(true) { - lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status); - if (OK != lockRet) + bool status = false; + while(true) { + lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status); + if (OK != lockRet) + { + printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); + curTuple_ = prevTuple; + return NULL; + } + if (!status) break; + tries--; + if (tries == 0) break; + os::select(0, 0, 0, 0, &timeout); + } + if (tries == 0) { printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); curTuple_ = prevTuple; return NULL; } - if (!status) break; - tries--; - if (tries == 0) break; - os::select(0, 0, 0, 0, &timeout); - - } - if (tries == 0) - { - printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); - curTuple_ = prevTuple; - return NULL; } - } - else if ((*trans)->isoLevel_ == READ_REPEATABLE) { - lockRet = lMgr_->getSharedLock(curTuple_, trans); - if (OK != lockRet) - { - printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); - curTuple_ = prevTuple; - return NULL; + else if ((*trans)->isoLevel_ == READ_REPEATABLE) { + lockRet = lMgr_->getSharedLock(curTuple_, trans); + if (OK != lockRet) + { + printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); + curTuple_ = prevTuple; + return NULL; + } } - - } + } return curTuple_; } @@ -449,48 +448,47 @@ void* TableImpl::fetchNoBind(DbRetVal &rv) return NULL; } DbRetVal lockRet = OK; - if ((*trans)->isoLevel_ == READ_REPEATABLE) { - lockRet = lMgr_->getSharedLock(curTuple_, trans); - if (OK != lockRet) - { - printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); - rv = ErrLockTimeOut; - curTuple_ = prevTuple; - return NULL; - } - - } - else if ((*trans)->isoLevel_ == READ_COMMITTED) - { - //if iso level is read committed, operation duration lock is sufficent - //so release it here itself. - int tries = 5; - struct timeval timeout; - timeout.tv_sec = Conf::config.getMutexSecs(); - timeout.tv_usec = Conf::config.getMutexUSecs(); - - bool status = false; - while(true) { - lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status); + if(!loadFlag) { + if ((*trans)->isoLevel_ == READ_REPEATABLE) { + lockRet = lMgr_->getSharedLock(curTuple_, trans); if (OK != lockRet) { printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); - curTuple_ = prevTuple; rv = ErrLockTimeOut; + curTuple_ = prevTuple; return NULL; } - if (!status) break; - tries--; - if (tries == 0) break; - os::select(0, 0, 0, 0, &timeout); - } - if (tries == 0) + else if ((*trans)->isoLevel_ == READ_COMMITTED) { - printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); - curTuple_ = prevTuple; - rv = ErrLockTimeOut; - return NULL; + //if iso level is read committed, operation duration lock is sufficent so release it here itself. + int tries = 5; + struct timeval timeout; + timeout.tv_sec = Conf::config.getMutexSecs(); + timeout.tv_usec = Conf::config.getMutexUSecs(); + + bool status = false; + while(true) { + lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status); + if (OK != lockRet) + { + printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); + curTuple_ = prevTuple; + rv = ErrLockTimeOut; + return NULL; + } + if (!status) break; + tries--; + if (tries == 0) break; + os::select(0, 0, 0, 0, &timeout); + } + if (tries == 0) + { + printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_); + curTuple_ = prevTuple; + rv = ErrLockTimeOut; + return NULL; + } } } return curTuple_; @@ -655,22 +653,25 @@ DbRetVal TableImpl::insertTuple() printError(ret, "Unable to allocate record from chunk"); return ret; } - ret = lMgr_->getExclusiveLock(tptr, trans); - if (OK != ret) - { - ((Chunk*)chunkPtr_)->free(db_, tptr); - printError(ret, "Could not get lock for the insert tuple %x", tptr); - return ErrLockTimeOut; + if (!loadFlag) { + ret = lMgr_->getExclusiveLock(tptr, trans); + if (OK != ret) + { + ((Chunk*)chunkPtr_)->free(db_, tptr); + printError(ret, "Could not get lock for the insert tuple %x", tptr); + return ErrLockTimeOut; + } } - curTuple_ = tptr; ret = copyValuesFromBindBuffer(tptr); if (ret != OK) { printError(ret, "Unable to copy values from bind buffer"); - (*trans)->removeFromHasList(db_, tptr); - lMgr_->releaseLock(tptr); + if (!loadFlag) { + (*trans)->removeFromHasList(db_, tptr); + lMgr_->releaseLock(tptr); + } ((Chunk*)chunkPtr_)->free(db_, tptr); return ret; } @@ -702,14 +703,16 @@ DbRetVal TableImpl::insertTuple() printError(ErrWarning, "Deleting index node"); deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr); } - lMgr_->releaseLock(tptr); - (*trans)->removeFromHasList(db_, tptr); + if (!loadFlag) { + (*trans)->removeFromHasList(db_, tptr); + lMgr_->releaseLock(tptr); + } ((Chunk*)chunkPtr_)->free(db_, tptr); printError(ret, "Unable to insert index node for tuple %x ", tptr); return ret; } } - if (undoFlag) + if (!loadFlag) ret = (*trans)->appendUndoLog(sysDB_, InsertOperation, tptr, length_); if (ret != OK) { printError(ret, "Unable to create undo log for %x %d", tptr, *(int*)tptr); @@ -717,8 +720,10 @@ DbRetVal TableImpl::insertTuple() printError(ErrWarning, "Deleting index node"); deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr); } - lMgr_->releaseLock(tptr); - (*trans)->removeFromHasList(db_, tptr); + if (!loadFlag) { + (*trans)->removeFromHasList(db_, tptr); + lMgr_->releaseLock(tptr); + } ((Chunk*)chunkPtr_)->free(db_, tptr); } return ret; @@ -731,11 +736,14 @@ DbRetVal TableImpl::deleteTuple() printError(ErrNotOpen, "Scan not open: No Current tuple"); return ErrNotOpen; } - DbRetVal ret = lMgr_->getExclusiveLock(curTuple_, trans); - if (OK != ret) - { - printError(ret, "Could not get lock for the delete tuple %x", curTuple_); - return ErrLockTimeOut; + DbRetVal ret = OK; + if (!loadFlag) { + ret = lMgr_->getExclusiveLock(curTuple_, trans); + if (OK != ret) + { + printError(ret, "Could not get lock for the delete tuple %x", curTuple_); + return ErrLockTimeOut; + } } if (NULL != indexPtr_) @@ -751,14 +759,16 @@ DbRetVal TableImpl::deleteTuple() { for (int j = 0; j < i ; j++) insertIndexNode(*trans, indexPtr_[j], idxInfo[j], curTuple_); - lMgr_->releaseLock(curTuple_); - (*trans)->removeFromHasList(db_, curTuple_); + if (!loadFlag) { + lMgr_->releaseLock(curTuple_); + (*trans)->removeFromHasList(db_, curTuple_); + } printError(ret, "Unable to insert index node for tuple %x", curTuple_); return ret; } } ((Chunk*)chunkPtr_)->free(db_, curTuple_); - if (undoFlag) + if (!loadFlag) ret = (*trans)->appendUndoLog(sysDB_, DeleteOperation, curTuple_, length_); iter->prev(); return ret; @@ -813,11 +823,14 @@ DbRetVal TableImpl::updateTuple() printError(ErrNotOpen, "Scan not open: No Current tuple"); return ErrNotOpen; } - DbRetVal ret = lMgr_->getExclusiveLock(curTuple_, trans); - if (OK != ret) - { - printError(ret, "Could not get lock for the update tuple %x", curTuple_); - return ErrLockTimeOut; + DbRetVal ret = OK; + if (!loadFlag) { + ret = lMgr_->getExclusiveLock(curTuple_, trans); + if (OK != ret) + { + printError(ret, "Could not get lock for the update tuple %x", curTuple_); + return ErrLockTimeOut; + } } if (NULL != indexPtr_) { @@ -830,14 +843,16 @@ DbRetVal TableImpl::updateTuple() ret = updateIndexNode(*trans, indexPtr_[i], idxInfo[i], curTuple_); if (ret != OK) { - lMgr_->releaseLock(curTuple_); - (*trans)->removeFromHasList(db_, curTuple_); + if (!loadFlag) { + lMgr_->releaseLock(curTuple_); + (*trans)->removeFromHasList(db_, curTuple_); + } printError(ret, "Unable to update index node for tuple %x", curTuple_); return ret; } } } - if (undoFlag) + if (!loadFlag) ret = (*trans)->appendUndoLog(sysDB_, UpdateOperation, curTuple_, length_); if (ret != OK) return ret; int addSize = 0; @@ -853,7 +868,7 @@ DbRetVal TableImpl::updateTuple() } } DbRetVal rv = copyValuesFromBindBuffer(curTuple_, false); - if (rv != OK) { + if (rv != OK && !loadFlag) { lMgr_->releaseLock(curTuple_); (*trans)->removeFromHasList(db_, curTuple_); return rv; @@ -998,13 +1013,13 @@ DbRetVal TableImpl::copyValuesToBindBuffer(void *tuplePtr) } //-1 index not supported -DbRetVal TableImpl::insertIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple) +DbRetVal TableImpl::insertIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple, bool loadFlag) { CINDEX *iptr = (CINDEX*)indexPtr; DbRetVal ret = OK; printDebug(DM_Table, "Inside insertIndexNode type %d", iptr->indexType_); Index* idx = Index::getIndex(iptr->indexType_); - ret = idx->insert(this, tr, indexPtr, info, tuple,undoFlag); + ret = idx->insert(this, tr, indexPtr, info, tuple, loadFlag); return ret; } @@ -1013,7 +1028,7 @@ DbRetVal TableImpl::deleteIndexNode(Transaction *tr, void *indexPtr, IndexInfo * CINDEX *iptr = (CINDEX*)indexPtr; DbRetVal ret = OK; Index* idx = Index::getIndex(iptr->indexType_); - ret = idx->remove(this, tr, indexPtr, info, tuple, undoFlag); + ret = idx->remove(this, tr, indexPtr, info, tuple, loadFlag); return ret; } void TableImpl::printSQLIndexString() @@ -1057,7 +1072,7 @@ DbRetVal TableImpl::updateIndexNode(Transaction *tr, void *indexPtr, IndexInfo * //TODO::currently it updates irrespective of whether the key changed or not //because of this commenting the whole index update code. relook at it and uncomment - ret = idx->update(this, tr, indexPtr, info, tuple, undoFlag); + ret = idx->update(this, tr, indexPtr, info, tuple, loadFlag); return ret; } diff --git a/src/tools/Makefile.am b/src/tools/Makefile.am index 6552c340..3179040b 100644 --- a/src/tools/Makefile.am +++ b/src/tools/Makefile.am @@ -1,9 +1,12 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) -I$(top_srcdir)/src/sql METASOURCES = AUTO -bin_PROGRAMS = csql catalog cachetable csqlserver csqlreplserver csqlsqlserver repltable csqlcacheserver csqldump cacheverify +bin_PROGRAMS = csql catalog cachetable csqlserver csqlreplserver csqlsqlserver repltable csqlcacheserver csqldump cacheverify redo csql_SOURCES = isql.cxx csql_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc -lreadline +redo_SOURCES = redo.cxx +redo_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc + catalog_SOURCES = catalog.cxx catalog_LDADD = $(top_builddir)/src/storage/libcsql.la \ $(top_builddir)/src/cache/libcacheload.la diff --git a/src/tools/Makefile.in b/src/tools/Makefile.in index 962b6226..c6de4179 100644 --- a/src/tools/Makefile.in +++ b/src/tools/Makefile.in @@ -36,7 +36,7 @@ bin_PROGRAMS = csql$(EXEEXT) catalog$(EXEEXT) cachetable$(EXEEXT) \ csqlserver$(EXEEXT) csqlreplserver$(EXEEXT) \ csqlsqlserver$(EXEEXT) repltable$(EXEEXT) \ csqlcacheserver$(EXEEXT) csqldump$(EXEEXT) \ - cacheverify$(EXEEXT) + cacheverify$(EXEEXT) redo$(EXEEXT) subdir = src/tools DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 @@ -143,6 +143,16 @@ csqlsqlserver_DEPENDENCIES = $(top_builddir)/src/sql/libcsqlsql.la \ csqlsqlserver_LINK = $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CXXLD) $(AM_CXXFLAGS) \ $(CXXFLAGS) $(csqlsqlserver_LDFLAGS) $(LDFLAGS) -o $@ +am_redo_OBJECTS = redo.$(OBJEXT) +redo_OBJECTS = $(am_redo_OBJECTS) +redo_DEPENDENCIES = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw am_repltable_OBJECTS = repltable.$(OBJEXT) repltable_OBJECTS = $(am_repltable_OBJECTS) repltable_DEPENDENCIES = $(top_builddir)/src/cache/libcacheload.la \ @@ -168,12 +178,12 @@ CXXLINK = $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) \ SOURCES = $(cachetable_SOURCES) $(cacheverify_SOURCES) \ $(catalog_SOURCES) $(csql_SOURCES) $(csqlcacheserver_SOURCES) \ $(csqldump_SOURCES) $(csqlreplserver_SOURCES) \ - $(csqlserver_SOURCES) $(csqlsqlserver_SOURCES) \ + $(csqlserver_SOURCES) $(csqlsqlserver_SOURCES) $(redo_SOURCES) \ $(repltable_SOURCES) DIST_SOURCES = $(cachetable_SOURCES) $(cacheverify_SOURCES) \ $(catalog_SOURCES) $(csql_SOURCES) $(csqlcacheserver_SOURCES) \ $(csqldump_SOURCES) $(csqlreplserver_SOURCES) \ - $(csqlserver_SOURCES) $(csqlsqlserver_SOURCES) \ + $(csqlserver_SOURCES) $(csqlsqlserver_SOURCES) $(redo_SOURCES) \ $(repltable_SOURCES) ETAGS = etags CTAGS = ctags @@ -293,6 +303,8 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) -I$(top_srcdir)/src/sql METASOURCES = AUTO csql_SOURCES = isql.cxx csql_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc -lreadline +redo_SOURCES = redo.cxx +redo_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc catalog_SOURCES = catalog.cxx catalog_LDADD = $(top_builddir)/src/storage/libcsql.la \ $(top_builddir)/src/cache/libcacheload.la @@ -456,6 +468,9 @@ csqlserver$(EXEEXT): $(csqlserver_OBJECTS) $(csqlserver_DEPENDENCIES) csqlsqlserver$(EXEEXT): $(csqlsqlserver_OBJECTS) $(csqlsqlserver_DEPENDENCIES) @rm -f csqlsqlserver$(EXEEXT) $(csqlsqlserver_LINK) $(csqlsqlserver_OBJECTS) $(csqlsqlserver_LDADD) $(LIBS) +redo$(EXEEXT): $(redo_OBJECTS) $(redo_DEPENDENCIES) + @rm -f redo$(EXEEXT) + $(CXXLINK) $(redo_OBJECTS) $(redo_LDADD) $(LIBS) repltable$(EXEEXT): $(repltable_OBJECTS) $(repltable_DEPENDENCIES) @rm -f repltable$(EXEEXT) $(repltable_LINK) $(repltable_OBJECTS) $(repltable_LDADD) $(LIBS) @@ -475,6 +490,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/csqlserver.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/csqlsqlserver.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/isql.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/redo.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/repltable.Po@am__quote@ .cxx.o: diff --git a/src/tools/csqldump.cxx b/src/tools/csqldump.cxx index 3edc10f6..d501c75d 100644 --- a/src/tools/csqldump.cxx +++ b/src/tools/csqldump.cxx @@ -13,6 +13,7 @@ * GNU General Public License for more details. * * * ***************************************************************************/ +#include #include #include #include @@ -59,8 +60,9 @@ int main(int argc, char **argv) char tblName[IDENTIFIER_LENGTH]; int c = 0, opt = 0; int noOfStmts =100; + bool exclusive = false; char name[IDENTIFIER_LENGTH]; - while ((c = getopt(argc, argv, "u:p:n:T:?")) != EOF) + while ((c = getopt(argc, argv, "u:p:n:T:X?")) != EOF) { switch (c) { @@ -68,6 +70,7 @@ int main(int argc, char **argv) case 'p' : { strcpy(password, argv[optind - 1]); opt=1; break; } case 'n' : { noOfStmts = atoi(argv[optind - 1]); opt = 5; break; } case 'T' : { strcpy(tblName, argv[optind - 1]); opt = 15; break; } + case 'X' : { exclusive = true; break; } case '?' : { opt = 10; break; } //print help default: opt=1; //list all the tables @@ -84,19 +87,32 @@ int main(int argc, char **argv) strcpy(username, "root"); strcpy(password, "manager"); } - SqlConnection *sqlconn = (SqlConnection*) SqlFactory::createConnection(CSql); - sqlconn->connect("root", "manager"); - SqlStatement *stmt = (SqlStatement*) SqlFactory::createStatement(CSql); - stmt->setConnection(sqlconn); + SqlConnection *sqlconn = (SqlConnection*) SqlFactory::createConnection(CSqlDirect); + + SqlStatement *stmt = (SqlStatement*) SqlFactory::createStatement(CSqlDirect); + if (exclusive) { + stmt->setLoading(true); + } - Connection conn; - DbRetVal rv = conn.open(username, password); - if (rv != OK) return 1; - DatabaseManagerImpl *dbMgr = (DatabaseManagerImpl*) conn.getDatabaseManager(); - if (dbMgr == NULL) { printf("Auth failed\n"); return 2;} if (opt == 0 || opt == 1) opt = 5; if (opt == 5) { + Connection conn; + DbRetVal rv = conn.open(username, password); + if (rv != OK) { + printf("Unable to get connection to csql\n"); + delete sqlconn; + delete stmt; + return 1; + } + DatabaseManagerImpl *dbMgr = (DatabaseManagerImpl*) conn.getDatabaseManager(); + if (dbMgr == NULL) { + printf("Unable to retrive db manager\n"); + conn.close(); + delete sqlconn; + delete stmt; + return 2; + } List tableList = dbMgr->getAllTableNames(); ListIterator iter = tableList.getIterator(); Identifier *elem = NULL; @@ -104,7 +120,7 @@ int main(int argc, char **argv) while (iter.hasElement()) { elem = (Identifier*) iter.nextElement(); - if (isCached(elem->name)) continue; + if (!exclusive && isCached(elem->name)) continue; printf("CREATE TABLE %s (", elem->name); Table *table = dbMgr->openTable(elem->name); FieldInfo *info = new FieldInfo(); @@ -117,7 +133,12 @@ int main(int argc, char **argv) elem = (Identifier*) fNameIter.nextElement(); Table::getFieldNameAlone(elem->name, fieldName); rv = table->getFieldInfo(elem->name, info); - if (rv !=OK) return rv; + if (rv !=OK) { + printf("unable to retrive info for table %s\n", elem->name); + conn.close(); + delete sqlconn; + delete stmt; + } if (firstField) { printf("%s %s ", fieldName, AllDataType::getSQLString(info->type)); firstField = false; @@ -135,12 +156,31 @@ int main(int argc, char **argv) delete info; dbMgr->closeTable(table); } + conn.close(); + rv = sqlconn->connect("root", "manager"); + if (OK !=rv) { + printf("unable to connect to csql\n"); + delete sqlconn; + delete stmt; + return 10; + } + stmt->setConnection(sqlconn); + if (exclusive) { + rv = sqlconn->getExclusiveLock(); + if (rv != OK) { + printf("Unable to get exclusive lock\n"); + sqlconn->disconnect(); + delete sqlconn; + delete stmt; + return 1; + } + } iter.reset(); char sqlstring[1024]; bool flag=false; while (iter.hasElement()) { elem = (Identifier*) iter.nextElement(); - if (isCached(elem->name)) continue; + if (!exclusive && isCached(elem->name)) continue; if (!flag) { printf("SET AUTOCOMMIT OFF;\n"); flag=true; } sprintf(sqlstring, "SELECT * FROM %s;", elem->name); sqlconn->beginTrans(); @@ -163,18 +203,22 @@ int main(int argc, char **argv) stmt->close(); stmt->free(); } - conn.close(); - sqlconn->disconnect(); - delete sqlconn; - delete stmt; - return 0; } if (opt == 15) { + Connection conn; + DbRetVal rv = conn.open(username, password); + if (rv != OK) { + printf("Unable to get connection to csql\n"); + delete sqlconn; + delete stmt; + return 1; + } + DatabaseManagerImpl *dbMgr = (DatabaseManagerImpl*) conn.getDatabaseManager(); + if (dbMgr == NULL) { printf("Auth failed\n"); return 2;} Table *table = dbMgr->openTable(tblName); if (table == NULL) { printf("csqldump: Table \'%s\' does not exist\n", tblName); conn.close(); - sqlconn->disconnect(); delete sqlconn; delete stmt; return 3; @@ -204,12 +248,19 @@ int main(int argc, char **argv) printf(");\n"); table->printSQLIndexString(); delete info; + conn.close(); char sqlstring[1024]; bool flag=false; if (!flag) { printf("SET AUTOCOMMIT OFF;\n"); flag=true; } + rv = sqlconn->connect("root", "manager"); + if (OK !=rv) { + printf("unable to connect\n"); + return 10; + } + stmt->setConnection(sqlconn); sprintf(sqlstring, "SELECT * FROM %s;", tblName); sqlconn->beginTrans(); - DbRetVal rv = stmt->prepare(sqlstring); + rv = stmt->prepare(sqlstring); int rows = 0; rv = stmt->execute(rows); void *tuple = NULL; @@ -228,7 +279,6 @@ int main(int argc, char **argv) stmt->close(); stmt->free(); } - conn.close(); sqlconn->disconnect(); delete sqlconn; delete stmt; diff --git a/src/tools/csqlreplserver.cxx b/src/tools/csqlreplserver.cxx index 41c6cf5c..223dbb6a 100644 --- a/src/tools/csqlreplserver.cxx +++ b/src/tools/csqlreplserver.cxx @@ -62,7 +62,7 @@ int main(int argc, char **argv) SqlNetworkHandler::conn = SqlFactory::createConnection(CSql); DbRetVal rv = SqlNetworkHandler::conn->connect("root", "manager"); if (rv != OK) return 1; - if (!Conf::config.useReplication()) + //if (!Conf::config.useReplication()) { printf("Replication is set to OFF in csql.conf file\n"); SqlNetworkHandler::conn->disconnect(); @@ -74,7 +74,7 @@ int main(int argc, char **argv) int nwid =0; char hostname[IDENTIFIER_LENGTH]; int port=0; - fp = fopen(Conf::config.getReplConfigFile(),"r"); + //fp = fopen(Conf::config.getReplConfigFile(),"r"); if( fp == NULL ) { printError(ErrSysInit, "Invalid path/filename for REPL_CONFIG_FILE.\n"); SqlNetworkHandler::conn->disconnect(); diff --git a/src/tools/csqlserver.cxx b/src/tools/csqlserver.cxx index c4635c93..a3f3d614 100644 --- a/src/tools/csqlserver.cxx +++ b/src/tools/csqlserver.cxx @@ -13,6 +13,7 @@ * GNU General Public License for more details. * * * ***************************************************************************/ +#include #include #include #include @@ -23,9 +24,11 @@ #include char* version = "csql-linux-i686-2.0GA"; int srvStop =0; -pid_t replpid=0; +pid_t sqlserverpid=0; pid_t cachepid=0; +bool recoverFlag=false; void dumpData(); +SessionImpl *session = NULL; static void sigTermHandler(int sig) { printf("Received signal %d\nStopping the server\n", sig); @@ -62,6 +65,7 @@ DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info ) { printf("Rollback Transaction %x\n", info->thrTrans_[i].trans_); tm->rollback(lm, info->thrTrans_[i].trans_); + info->thrTrans_[i].trans_->status_ = TransNotUsed; } } info->init(); @@ -106,15 +110,19 @@ DbRetVal logActiveProcs(Database *sysdb) return rv; } ThreadInfo* pInfo = sysdb->getThreadInfo(0); - int i=0; + int i=0, count =0; ThreadInfo* freeSlot = NULL; for (; i < Conf::config.getMaxProcs(); i++) { - if (pInfo->pid_ !=0 ) logFine(logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_); + if (pInfo->pid_ !=0 ) { + logFine(logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_); + printf("Client process with pid %d is still registered\n", pInfo->pid_); + count++; + } pInfo++; } sysdb->releaseProcessTableMutex(false); - return OK; + if (count) return ErrSysInternal; else return OK; } void startCacheServer() { @@ -122,7 +130,7 @@ void startCacheServer() char execName[1024]; sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT")); printf("filename is %s\n", execName); - cachepid = os::createProcess(execName, "-s"); + cachepid = os::createProcess(execName, "csqlcacheserver"); if (cachepid != -1) printf("Cache Recv Server Started pid=%d\n", cachepid); return; @@ -134,9 +142,9 @@ void startServiceClient() char execName[1024]; sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT")); printf("filename is %s\n", execName); - cachepid = os::createProcess(execName, "-s"); - if (cachepid != -1) - printf("Csql Network Daemon Started pid=%d\n", cachepid); + sqlserverpid = os::createProcess(execName, "csqlsqlserver"); + if (sqlserverpid != -1) + printf("Csql Network Daemon Started pid=%d\n", sqlserverpid); return; } @@ -170,9 +178,8 @@ int main(int argc, char **argv) printf("%s\n",version); return 0; } - - SessionImpl session; - DbRetVal rv = session.readConfigFile(); + session = new SessionImpl(); + DbRetVal rv = session->readConfigFile(); if (rv != OK) { printf("Unable to read the configuration file \n"); @@ -186,68 +193,114 @@ int main(int argc, char **argv) printf("Unable to start the logger\n"); return 2; } - + bool isInit = true; logFine(logger, "Server Started"); - int ret = session.initSystemDatabase(); + int ret = session->initSystemDatabase(); if (0 != ret) { - printf(" System Database Initialization failed\n"); - return 3; + //printf(" System Database Initialization failed\n"); + printf("Attaching to exising database\n"); + isInit = false; + delete session; + session = new SessionImpl(); + ret = session->open(DBAUSER, DBAPASS); + if (ret !=0) { + printf("Unable to attach to existing database\n"); + return 3; + } + }else { + printf("System Database initialized\n"); } - printf("System Database initialized\n"); - bool end = false; - struct timeval timeout, tval; timeout.tv_sec = 5; timeout.tv_usec = 0; - Database* sysdb = session.getSystemDatabase(); - if (FILE *file = fopen(Conf::config.getDbFile(), "r")) + Database* sysdb = session->getSystemDatabase(); + recoverFlag = false; + if(isInit && Conf::config.useDurability()) { - fclose(file); + char dbChkptFileName[1024]; + char dbRedoFileName[1024]; char cmd[1024]; - sprintf(cmd, "csql -S -s %s",Conf::config.getDbFile()); - int ret = system(cmd); - if (ret != 0) { - printf("Tables cannot be recovered. DB file corrupted\n"); + sprintf(dbChkptFileName, "%s/csql.db.chkpt", Conf::config.getDbFile()); + if (FILE *file = fopen(dbChkptFileName, "r")) + { + fclose(file); + sprintf(cmd, "csql -X -s %s",dbChkptFileName); + int ret = system(cmd); + if (ret != 0) { + printf("Tables cannot be recovered. chkpt file corrupted\n"); + } + } + sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile()); + if (FILE *file = fopen(dbRedoFileName, "r")) + { + fclose(file); + int ret = system("redo -a"); + if (ret != 0) { + printf("Recovery failed. Redo log file corrupted\n"); + logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 10; + } + //TODO::generate checkpoint file + sprintf(cmd, "csqldump -X > %s",dbChkptFileName); + ret = system(cmd); + if (ret != 0) { + printf("Unable to create checkpoint file\n"); + logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 11; + } + ret = unlink(dbRedoFileName); + if (ret != 0) { + printf("Unable to delete redo log file. Delete and restart the server\n"); + logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 11; + } } } - if (opt == 1) { + recoverFlag = true; + if (opt == 1 && isInit && ! Conf::config.useDurability()) { if (Conf::config.useCache()) { printf("Database server recovering cached tables...\n"); int ret = system("cachetable -U root -P manager -R"); if (ret != 0) { printf("Cached Tables recovery failed %d\n", ret); logger.stopLogger(); - session.destroySystemDatabase(); + session->destroySystemDatabase(); + delete session; return 2; } printf("Cached Tables recovered\n"); } else { printf("Cache mode is not set in csql.conf. Cannot recover\n"); logger.stopLogger(); - session.destroySystemDatabase(); + session->destroySystemDatabase(); + delete session; return 1; } } - - if (Conf::config.useReplication()) - { - printf("Starting Replication Server\n"); - char execName[1024]; - sprintf(execName, "%s/bin/csqlreplserver", os::getenv("CSQL_INSTALL_ROOT")); - printf("filename is %s\n", execName); - replpid = os::createProcess(execName, "-s"); - if (replpid != -1) - printf("Repl Server Started pid=%d\n", replpid); - + GlobalUniqueID UID; + if (isInit) UID.create(); + //TODO:: kill all the child servers and restart if !isInit + + bool isCacheReq = false, isSQLReq= false; + if(Conf::config.useCsqlSqlServer()) { + isSQLReq = true; + startServiceClient(); + } + if (Conf::config.useCache() && Conf::config.useTwoWayCache()) { + isCacheReq = true; + startCacheServer(); } - if (Conf::config.useCache() && Conf::config.useTwoWayCache()) startCacheServer(); printf("Database server started\n"); - - if(Conf::config.useCsqlSqlServer()) startServiceClient(); - +reloop: while(!srvStop) { tval.tv_sec = timeout.tv_sec; @@ -257,29 +310,35 @@ int main(int argc, char **argv) //send signal to all the registered process..check they are alive cleanupDeadProcs(sysdb); - //TODO::check repl server is alive, if not restart it - //TODO::if it fails to start 5 times, exit - if (cachepid !=0 && checkDead(cachepid)) startCacheServer(); - + if (isCacheReq && cachepid !=0 && checkDead(cachepid)) + startCacheServer(); + if (isSQLReq && sqlserverpid !=0 && checkDead(sqlserverpid)) + startServiceClient(); } + if (logActiveProcs(sysdb) != OK) {srvStop = 0; goto reloop; } os::kill(cachepid, SIGTERM); - dumpData(); + os::kill(sqlserverpid, SIGTERM); + //if (recoverFlag) dumpData(); logFine(logger, "Server Exiting"); - logActiveProcs(sysdb); printf("Server Exiting\n"); logFine(logger, "Server Ended"); + UID.destroy(); logger.stopLogger(); - session.destroySystemDatabase(); + session->destroySystemDatabase(); + delete session; return 0; } void dumpData() { char cmd[1024]; - sprintf(cmd, "csqldump >%s",Conf::config.getDbFile()); + //TODO::TAKE exclusive lock + sprintf(cmd, "csqldump >%s/csql.db.chkpt.1",Conf::config.getDbFile()); int ret = system(cmd); - if (ret != 0) { - printf("Table cannot be written. Recovery will fail\n"); - } + if (ret != 0) return; + sprintf(cmd, "rm -rf %s/csql.db.cur", Conf::config.getDbFile()); + if (ret != 0) return; + sprintf(cmd, "mv %s/csql.db.chkpt.1 %s/csql.db.chkpt", Conf::config.getDbFile()); + if (ret != 0) return; return; } diff --git a/src/tools/isql.cxx b/src/tools/isql.cxx index f4531230..93834c4b 100644 --- a/src/tools/isql.cxx +++ b/src/tools/isql.cxx @@ -142,9 +142,8 @@ int main(int argc, char **argv) else { if (gateway) type = CSqlGateway; else { - // if (exclusive) type=CSqlDirect; - // else type = CSql; - type = CSql; + if (exclusive) type=CSqlDirect; + else type = CSql; } conn = SqlFactory::createConnection(type); } @@ -153,7 +152,7 @@ int main(int argc, char **argv) if (rv != OK) return 1; if (exclusive) { SqlConnection *sCon = (SqlConnection*) conn; - //rv = sCon->getExclusiveLock(); + rv = sCon->getExclusiveLock(); if (rv != OK) { conn->disconnect(); delete conn; @@ -163,7 +162,7 @@ int main(int argc, char **argv) aconStmt = SqlFactory::createStatement(type); if (exclusive) { SqlStatement *sqlStmt = (SqlStatement*)aconStmt; - //sqlStmt->setLoading(true); + sqlStmt->setLoading(true); } aconStmt->setConnection(conn); //rv = conn->beginTrans(READ_COMMITTED, TSYNC); -- 2.11.4.GIT