From 384c140b5b21aa92e9ee1e31eb5b886a8c8630c0 Mon Sep 17 00:00:00 2001 From: prabatuty Date: Thu, 29 May 2008 08:37:29 +0000 Subject: [PATCH] two way caching changes related to config, deamon also added getprimarykey for sql odbc adapter --- csql.conf | 14 +++++--- csqlinstall.ksh | 2 +- include/Config.h | 6 ++++ include/SqlOdbcStatement.h | 1 + src/adapter/SqlOdbcStatement.cxx | 16 +++++++++ src/cache/CacheTableLoader.cxx | 8 ++++- src/odbc/Makefile | 28 +++++++-------- src/server/Config.cxx | 11 ++++++ src/tools/csqlcacheserver.cxx | 77 ++++++++++++++++++++++++++++++++-------- src/tools/csqlserver.cxx | 22 +++++++++--- 10 files changed, 145 insertions(+), 40 deletions(-) diff --git a/csql.conf b/csql.conf index e98158c4..01b03494 100644 --- a/csql.conf +++ b/csql.conf @@ -20,13 +20,13 @@ MAX_SYS_DB_SIZE=1048576 MAX_DB_SIZE=10485760 #Shared memory key to be used by the system to create and locate system database. -SYS_DB_KEY=1222 +SYS_DB_KEY=1422 #Shared memory key to be used by the system to create and locate user database. -USER_DB_KEY=4555 +USER_DB_KEY=4455 #Give full path for the log file where important system actions are stored. -LOG_FILE=/tmp/log/csql/log.out +LOG_FILE=/tmp/log1/csql/log.out #The virtual memory start address at which the shared memory segment # will be created and attached. @@ -50,13 +50,17 @@ LOCK_TIMEOUT_RETRIES=10 #####################################Cache Section######################## #Whether to enable caching of tables from target database -CACHE_TABLE=false +CACHE_TABLE=true #DSN Name to connect to the target database. #This should be present in ~/odbc.ini file DSN=myodbc3 +ENABLE_BIDIRECTIONAL_CACHE=true + +CACHE_RECEIVER_WAIT_SECS=10 + #Give full path for the file where all the cached table information is stored -TABLE_CONFIG_FILE=/tmp/csql/csqltable.conf +TABLE_CONFIG_FILE=/tmp/csql1/csqltable.conf #####################################End Section######################## diff --git a/csqlinstall.ksh b/csqlinstall.ksh index 2bb7e8e9..96417876 100755 --- a/csqlinstall.ksh +++ b/csqlinstall.ksh @@ -42,7 +42,7 @@ cd ${install_dir}/include #rm Globals.h Index.h Lock.h PredicateImpl.h #rm Process.h SessionImpl.h TableImpl.h Transaction.h UserManagerImpl.h cd ${install_dir}/bin -rm csqlcacheserver csqlreplserver repltable +rm csqlreplserver repltable cp ${root_dir}/README.INSTALL ${install_dir}/README cp ${root_dir}/Doxyfile ${install_dir} diff --git a/include/Config.h b/include/Config.h index 56575c82..f9cf64e9 100644 --- a/include/Config.h +++ b/include/Config.h @@ -38,6 +38,8 @@ class ConfigValues bool isCache; char dsn[IDENTIFIER_LENGTH]; char tableConfigFile[MAX_FILE_PATH_LEN]; + bool isTwoWay; + int cacheWaitSecs; bool isReplication; char replConfigFile[MAX_FILE_PATH_LEN]; @@ -74,6 +76,8 @@ class ConfigValues networkID=-1; nwResponseTimeout=3; nwConnectTimeout=5; + isTwoWay=true; + cacheWaitSecs =10; } }; @@ -112,6 +116,8 @@ class Config inline int getCacheNetworkID() { return cVal.cacheNetworkID; } inline int getNetworkResponseTimeout() { return cVal.nwResponseTimeout; } inline int getNetworkConnectTimeout() { return cVal.nwConnectTimeout; } + inline bool useTwoWayCache() { return cVal.isTwoWay; } + inline int getCacheWaitSecs() { return cVal.cacheWaitSecs; } }; class Conf diff --git a/include/SqlOdbcStatement.h b/include/SqlOdbcStatement.h index 027741f9..8a7eaec7 100644 --- a/include/SqlOdbcStatement.h +++ b/include/SqlOdbcStatement.h @@ -69,6 +69,7 @@ class SqlOdbcStatement: public AbsSqlStatement void setTimeParam(int paramPos, Time value); void setTimeStampParam(int paramPos, TimeStamp value); bool isSelect(); + void getPrimaryKeyFieldName(char *tablename, char *pkfieldname); private: bool isPrepared; diff --git a/src/adapter/SqlOdbcStatement.cxx b/src/adapter/SqlOdbcStatement.cxx index f0c71d82..cc60cdc1 100644 --- a/src/adapter/SqlOdbcStatement.cxx +++ b/src/adapter/SqlOdbcStatement.cxx @@ -519,3 +519,19 @@ void SqlOdbcStatement::setTimeStampParam(int paramPos, TimeStamp value) //*(TimeStamp*)(bindField->value) = value; AllDataType::convertToString(bindField->value, &value, typeTimeStamp); } + +void SqlOdbcStatement::getPrimaryKeyFieldName(char *tablename, char *pkfieldname) +{ + if (pkfieldname == NULL) return; + SqlOdbcConnection *conn = (SqlOdbcConnection*)con; + int retValue=SQLAllocHandle (SQL_HANDLE_STMT, conn->dbHdl, &hstmt); + if (retValue) return ; + char columnName[128]; + SQLINTEGER cbData; // Output length of data + SQLPrimaryKeys(hstmt, NULL, 0, NULL, 0, (SQLCHAR*) tablename, SQL_NTS); + SQLFetch(hstmt); + SQLGetData(hstmt, 4, SQL_C_CHAR, (SQLCHAR*) columnName, sizeof(columnName),&cbData); + strcpy(pkfieldname, columnName); + SQLFreeHandle (SQL_HANDLE_STMT, hstmt); + return; +} diff --git a/src/cache/CacheTableLoader.cxx b/src/cache/CacheTableLoader.cxx index a63bbc4f..04c46cbb 100644 --- a/src/cache/CacheTableLoader.cxx +++ b/src/cache/CacheTableLoader.cxx @@ -352,15 +352,20 @@ DbRetVal CacheTableLoader::refresh() DbRetVal CacheTableLoader::recoverAllCachedTables() { FILE *fp; + Connection conn; + DbRetVal rv = conn.open(userName, password); + //Note: if connection is not open, configuration veriables may be incorrect + fp = fopen(Conf::config.getTableConfigFile(),"r"); if( fp == NULL ) { printError(ErrSysInit, "cachetable.conf file does not exist"); return OK; } + conn.close(); //TODO::take exclusive lock on database char tablename[IDENTIFIER_LENGTH]; int mode; - DbRetVal rv = OK; + rv = OK; while(!feof(fp)) { fscanf(fp, "%d:%s\n", &mode, tablename); @@ -368,6 +373,7 @@ DbRetVal CacheTableLoader::recoverAllCachedTables() continue; printDebug(DM_Gateway, "Recovering Table from target db: %s\n", tablename); setTable(tablename); + printf("Recovering table %s\n", tablename); rv = load(); if (rv != OK) return rv; } diff --git a/src/odbc/Makefile b/src/odbc/Makefile index 9860f4b9..8efed0bc 100644 --- a/src/odbc/Makefile +++ b/src/odbc/Makefile @@ -23,7 +23,7 @@ pkglibdir = $(libdir)/csql pkgincludedir = $(includedir)/csql top_builddir = ../.. am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd -INSTALL = /usr/bin/ginstall -c +INSTALL = /usr/bin/install -c install_sh_DATA = $(install_sh) -c -m 644 install_sh_PROGRAM = $(install_sh) -c install_sh_SCRIPT = $(install_sh) -c @@ -77,14 +77,14 @@ HEADERS = $(noinst_HEADERS) ETAGS = etags CTAGS = ctags DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) -ACLOCAL = ${SHELL} /root/csql/csqlcache/csql/missing --run aclocal-1.9 +ACLOCAL = ${SHELL} /home/csql/csql/missing --run aclocal-1.9 AMDEP_FALSE = # AMDEP_TRUE = -AMTAR = ${SHELL} /root/csql/csqlcache/csql/missing --run tar +AMTAR = ${SHELL} /home/csql/csql/missing --run tar AR = ar -AUTOCONF = ${SHELL} /root/csql/csqlcache/csql/missing --run autoconf -AUTOHEADER = ${SHELL} /root/csql/csqlcache/csql/missing --run autoheader -AUTOMAKE = ${SHELL} /root/csql/csqlcache/csql/missing --run automake-1.9 +AUTOCONF = ${SHELL} /home/csql/csql/missing --run autoconf +AUTOHEADER = ${SHELL} /home/csql/csql/missing --run autoheader +AUTOMAKE = ${SHELL} /home/csql/csql/missing --run automake-1.9 AWK = gawk CC = gcc CCDEPMODE = depmode=gcc3 @@ -94,7 +94,7 @@ CPPFLAGS = CXX = g++ CXXCPP = g++ -E CXXDEPMODE = depmode=gcc3 -CXXFLAGS = -g -I/usr/lib/jdk1.5.0_09/include -I/usr/lib/jdk1.5.0_09/include/linux +CXXFLAGS = -g -I/usr/local/src/jdk1.5.0_14/include -I/usr/local/src/jdk1.5.0_14/include/linux CYGPATH_W = echo DEFS = -DHAVE_CONFIG_H DEPDIR = .deps @@ -102,11 +102,11 @@ ECHO = echo ECHO_C = ECHO_N = -n ECHO_T = -EGREP = /usr/bin/grep -E +EGREP = /bin/grep -E EXEEXT = -F77 = g77 +F77 = gfortran FFLAGS = -g -O2 -GREP = /usr/bin/grep +GREP = /bin/grep INSTALL_DATA = ${INSTALL} -m 644 INSTALL_PROGRAM = ${INSTALL} INSTALL_SCRIPT = ${INSTALL} @@ -120,7 +120,7 @@ LIBS = LIBTOOL = $(SHELL) $(top_builddir)/libtool LN_S = ln -s LTLIBOBJS = -MAKEINFO = ${SHELL} /root/csql/csqlcache/csql/missing --run makeinfo +MAKEINFO = ${SHELL} /home/csql/csql/missing --run makeinfo OBJEXT = o PACKAGE = csql PACKAGE_BUGREPORT = @@ -138,7 +138,7 @@ YACC = bison -y YFLAGS = ac_ct_CC = gcc ac_ct_CXX = g++ -ac_ct_F77 = g77 +ac_ct_F77 = gfortran am__fastdepCC_FALSE = # am__fastdepCC_TRUE = am__fastdepCXX_FALSE = # @@ -167,7 +167,7 @@ host_vendor = pc htmldir = ${docdir} includedir = ${prefix}/include infodir = ${datarootdir}/info -install_sh = /root/csql/csqlcache/csql/install-sh +install_sh = /home/csql/csql/install-sh libdir = ${exec_prefix}/lib libexecdir = ${exec_prefix}/libexec localedir = ${datarootdir}/locale @@ -176,7 +176,7 @@ mandir = ${datarootdir}/man mkdir_p = mkdir -p -- oldincludedir = /usr/include pdfdir = ${docdir} -prefix = /root/csql/csqlcache/csql/install +prefix = /home/csql/csql/install program_transform_name = s,x,x, psdir = ${docdir} sbindir = ${exec_prefix}/sbin diff --git a/src/server/Config.cxx b/src/server/Config.cxx index 4d9a193a..ff7c97b7 100644 --- a/src/server/Config.cxx +++ b/src/server/Config.cxx @@ -82,6 +82,10 @@ int Config::storeKeyVal(char *key, char *value) { cVal.nwResponseTimeout = atoi(value); } else if (strcasestr(key, "NETWORK_CONNECT_TIMEOUT") != NULL) { cVal.nwConnectTimeout = atoi(value); } + else if (strcasestr(key, "ENABLE_BIDIRECTIONAL_CACHE") != NULL) + { cVal.isTwoWay = os::atobool(value); } + else if (strcasestr(key, "CACHE_RECEIVER_WAIT_SECS") != NULL) + { cVal.cacheWaitSecs = atoi(value); } else return 1; return 0; } @@ -272,6 +276,11 @@ int Config::validateValues() printError(ErrBadArg, "NETWORK_CONNECT_TIMEOUT should be 0 to 60"); return 1; } + if (cVal.cacheWaitSecs <1) + { + printError(ErrBadArg, "CACHE_RECEIVER_WAIT_SECS should be >1"); + return 1; + } return 0; } @@ -340,6 +349,8 @@ void Config::print() printf(" useCache %d\n", useCache()); printf(" getDSN %s\n", getDSN()); printf(" getTableConfigFile %s\n", getTableConfigFile()); + printf(" isTwoWayCache %d\n", useTwoWayCache()); + printf(" getCacheWaitSecs %d\n", getCacheWaitSecs()); //printf(" useReplication %d\n", useReplication()); //printf(" getReplConfigFile %s\n", getReplConfigFile()); //printf(" getMaxLogStoreSize %ld\n", getMaxLogStoreSize()); diff --git a/src/tools/csqlcacheserver.cxx b/src/tools/csqlcacheserver.cxx index 2509ef0d..978d54e0 100644 --- a/src/tools/csqlcacheserver.cxx +++ b/src/tools/csqlcacheserver.cxx @@ -15,6 +15,7 @@ ***************************************************************************/ #include #include +#include #include #include @@ -68,11 +69,20 @@ int main(int argc, char **argv) printf("Cache is set to OFF in csql.conf file\n"); return 1; } + AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); + stmt->setConnection(targetconn); + rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';"); + targetconn->beginTrans(); + int rows=0; + stmt->execute(rows); + targetconn->commit(); + stmt->free(); + delete stmt; printf("Cache server started\n"); int ret = 0; struct timeval timeout, tval; - timeout.tv_sec = 5; //TODO::should be an csql.conf parameter + timeout.tv_sec = Conf::config.getCacheWaitSecs(); timeout.tv_usec = 0; while(!srvStop) @@ -82,7 +92,6 @@ int main(int argc, char **argv) ret = os::select(0, NULL, 0, 0, &tval); printf("Getting the updates\n"); ret = getRecordsFromTargetDb(1); - printf("Return value is %d\n", ret); if (ret !=0) srvStop = 1; //ret = getRecordsFromTargetDb(2); if (ret !=0) srvStop = 1; @@ -123,6 +132,11 @@ int getRecordsFromTargetDb(int mode) if (rv != OK) { printError(ErrSysInit, "Unable to execute stmt in target db"); + targetconn->rollback(); + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; return 1; } if (stmt->fetch() != NULL) { @@ -132,15 +146,16 @@ int getRecordsFromTargetDb(int mode) if (table == NULL) { printError(ErrSysInit, "Table %s not exist in csql", tablename); + targetconn->rollback(); + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; break; } - if (op == 3) //DELETE - { - ret = remove(table, pkid); - }else if (op == 2)// UPDATE + if (op == 2)//DELETE { ret = remove(table,pkid); - ret = insert(table, pkid); } else //INSERT { @@ -152,11 +167,18 @@ int getRecordsFromTargetDb(int mode) //Remove record from csql_log_XXX table delstmt->setIntParam(1, id); rv = delstmt->execute(rows); - if (rv != OK) printf("log record not deleted from the target db %d\n", rv); + if (rv != OK) + { + printf("log record not deleted from the target db %d\n", rv); + targetconn->rollback(); + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; + } rv = targetconn->commit(); //create table csql_log_int(tablename char(64), pkid int, op int, id int not null unique auto_increment); - //insert ino csql_log_int(tablename, pkid, op) values ('t1', 100, 1); } else { stmt->close(); @@ -164,14 +186,21 @@ int getRecordsFromTargetDb(int mode) } stmt->close(); } + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; return 0; } int insert(Table *table, int pkid) { AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); stmt->setConnection(targetconn); + SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt; + char pkfieldname[128]; + ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname); char sbuf[1024]; - sprintf(sbuf, "SELECT * FROM %s where f1 = %d;", table->getName(), pkid); + sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid); //TODO::get the primary key field name from the table interface. need to implement it DbRetVal rv = stmt->prepare(sbuf); if (rv != OK) return 1; @@ -180,13 +209,16 @@ int insert(Table *table, int pkid) ListIterator fNameIter = fNameList.getIterator(); FieldInfo *info = new FieldInfo(); int fcount =1; void *valBuf; int fieldsize=0; + void *buf[128];//TODO:resticts to support only 128 fields in table Identifier *elem = NULL; while (fNameIter.hasElement()) { elem = (Identifier*) fNameIter.nextElement(); table->getFieldInfo((const char*)elem->name, info); valBuf = AllDataType::alloc(info->type, info->length); + buf[fcount] = valBuf; table->bindFld(elem->name, valBuf); stmt->bindField(fcount++, valBuf); + } delete info; int rows=0; @@ -197,23 +229,38 @@ int insert(Table *table, int pkid) table->insertTuple(); //Note:insert may fail if the record is inserted from this cache } + for (int i=1; i < fcount; i++) { + free(buf[i]); + } + stmt->free(); + delete stmt; conn.commit(); return 0; } int remove(Table *table, int pkid) { DbRetVal rv = OK; + AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); + stmt->setConnection(targetconn); + SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt; + char pkfieldname[128]; + ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname); + delete stmt; Condition p1; - //TODO::get the primary key field name from the table interface. need to implement it - p1.setTerm("f1", OpEquals, &pkid); + p1.setTerm(pkfieldname, OpEquals, &pkid); table->setCondition(&p1); rv = conn.startTransaction(); if (rv != OK) return 1; rv = table->execute(); - if (rv != OK) { conn.rollback(); return 1;} + if (rv != OK) + { + table->setCondition(NULL); + conn.rollback(); + return 1; + } if (table->fetch() != NULL) - rv = table->deleteTuple(); - //Note:Delete may fail if the record is deleted from this cache + rv = table->deleteTuple(); + //Note:Delete may fail if the record is deleted from this cache table->setCondition(NULL); rv = conn.commit(); if (rv != OK) return 1; diff --git a/src/tools/csqlserver.cxx b/src/tools/csqlserver.cxx index 771a2b84..4894d003 100644 --- a/src/tools/csqlserver.cxx +++ b/src/tools/csqlserver.cxx @@ -23,7 +23,8 @@ #include char* version = "csql-linux-i686-1.2Beta"; int srvStop =0; -pid_t replpid; +pid_t replpid=0; +pid_t cachepid=0; static void sigTermHandler(int sig) { printf("Received signal %d\nStopping the server\n", sig); @@ -114,7 +115,17 @@ DbRetVal logActiveProcs(Database *sysdb) sysdb->releaseProcessTableMutex(false); return OK; } - +void startCacheServer() +{ + printf("Starting Cache Recv Server\n"); + char execName[1024]; + sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT")); + printf("filename is %s\n", execName); + cachepid = os::createProcess(execName, "-s"); + if (cachepid != -1) + printf("Cache Recv Server Started pid=%d\n", replpid); + return; +} void printUsage() { @@ -208,6 +219,7 @@ int main(int argc, char **argv) printf("Repl Server Started pid=%d\n", replpid); } + if (Conf::config.useCache() && Conf::config.useTwoWayCache()) startCacheServer(); printf("Database server started\n"); @@ -222,9 +234,11 @@ int main(int argc, char **argv) //TODO::check repl server is alive, if not restart it + //TODO::if it fails to start 5 times, exit + if (cachepid !=0 && checkDead(cachepid)) startCacheServer(); + } - //TODO::kill replication server process - + os::kill(cachepid, SIGTERM); logFine(logger, "Server Exiting"); logActiveProcs(sysdb); printf("Server Exiting\n"); -- 2.11.4.GIT