From b0bd350f6638cca40e78e9873955650cab8e745f Mon Sep 17 00:00:00 2001 From: kishoramballi Date: Thu, 8 Oct 2009 07:19:09 +0000 Subject: [PATCH] Update in sync with enterprise version. --- src/tools/Makefile.am | 115 ++++-- src/tools/cachetable.cxx | 195 +++++---- src/tools/cacheverify.cxx | 925 ++++++++++++++++++++++++------------------ src/tools/catalog.cxx | 22 +- src/tools/checkpoint.cxx | 78 ++++ src/tools/csqlasyncserver.cxx | 633 +++++++++++++++++++++++++++++ src/tools/csqlcacheserver.cxx | 611 ++++++++++++++++++++-------- src/tools/csqlds.cxx | 167 ++++++++ src/tools/csqldump.cxx | 244 ++++++----- src/tools/csqlserver.cxx | 164 ++++++-- src/tools/csqlsqlserver.cxx | 5 +- src/tools/isql.cxx | 177 +++++++- src/tools/recover.cxx | 92 +++++ src/tools/redo.cxx | 26 +- 14 files changed, 2611 insertions(+), 843 deletions(-) create mode 100644 src/tools/checkpoint.cxx create mode 100644 src/tools/csqlasyncserver.cxx create mode 100644 src/tools/csqlds.cxx create mode 100644 src/tools/recover.cxx diff --git a/src/tools/Makefile.am b/src/tools/Makefile.am index 3179040b..c82a7471 100644 --- a/src/tools/Makefile.am +++ b/src/tools/Makefile.am @@ -1,25 +1,69 @@ INCLUDES = -I$(top_srcdir)/include $(all_includes) -I$(top_srcdir)/src/sql METASOURCES = AUTO -bin_PROGRAMS = csql catalog cachetable csqlserver csqlreplserver csqlsqlserver repltable csqlcacheserver csqldump cacheverify redo +bin_PROGRAMS = csql catalog cachetable csqlserver csqlsqlserver csqlcacheserver csqldump cacheverify redo recover checkpoint csqlds csqlasyncserver csql_SOURCES = isql.cxx -csql_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc -lreadline +csql_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw \ + -lrt -lpthread -lcrypt -lodbc -lreadline redo_SOURCES = redo.cxx -redo_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc +redo_LDADD = $(top_builddir)/src/storage/.libs/libcsql $(top_builddir)/src/sql/.libs/libcsqlsql $(top_builddir)/src/sqllog/.libs/libcsqlsqllog $(top_builddir)/src/network/.libs/libcsqlnw $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter $(top_builddir)/src/gateway/.libs/libcsqlgw $(top_builddir)/src/cache/.libs/libcacheload $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt -lodbc + +recover_SOURCES = recover.cxx +recover_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw +recover_LDFLAGS = -lcrypt -lrt -lpthread -lcrypt -lodbc + +checkpoint_SOURCES = checkpoint.cxx +checkpoint_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw +checkpoint_LDFLAGS = -lcrypt -lrt -lpthread -lcrypt -lodbc catalog_SOURCES = catalog.cxx -catalog_LDADD = $(top_builddir)/src/storage/libcsql.la \ - $(top_builddir)/src/cache/libcacheload.la -catalog_LDFLAGS = -lcrypt +catalog_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw +catalog_LDFLAGS = -lcrypt -lrt -cachetable_LDADD = $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/storage/libcsql.la $(top_builddir)/src/adapter/libcsqlodbcadapter.la -cachetable_LDFLAGS = -lcrypt -lodbc +cachetable_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw +cachetable_LDFLAGS = -lcrypt -lodbc -lrt cachetable_SOURCES = cachetable.cxx cacheverify_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ $(top_builddir)/src/sqllog/libcsqlsqllog.la \ $(top_builddir)/src/network/libcsqlnw.la \ + $(top_builddir)/src/sqlnetwork/libcsqlsqlnw.la \ + $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/gateway/libcsqlgw.la \ $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ $(top_builddir)/src/cache/libcacheload.la \ @@ -28,36 +72,36 @@ cacheverify_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ cacheverify_LDFLAGS = -lcrypt -lodbc cacheverify_SOURCES = cacheverify.cxx -repltable_LDADD = $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/storage/libcsql.la \ - $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ - $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/gateway/libcsqlgw.la -repltable_LDFLAGS = -lcrypt -lodbc -repltable_SOURCES = repltable.cxx - -csqlserver_LDADD = $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/storage/libcsql.la \ - $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/gateway/libcsqlgw.la -lrt -lpthread -lcrypt +csqlserver_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt csqlserver_LDFLAGS = -lcrypt -lodbc csqlserver_SOURCES = csqlserver.cxx -csqlreplserver_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ +csqlasyncserver_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ $(top_builddir)/src/sqllog/libcsqlsqllog.la \ $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/gateway/libcsqlgw.la \ $(top_builddir)/src/cache/libcacheload.la \ + $(top_builddir)/src/sqlnetwork/libcsqlsqlnw.la \ + $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ $(top_builddir)/src/storage/libcsql.la -lrt -lpthread -lcrypt -csqlreplserver_LDFLAGS = -lcrypt -lodbc -csqlreplserver_SOURCES = csqlreplserver.cxx +csqlasyncserver_LDFLAGS = -lcrypt -lodbc +csqlasyncserver_SOURCES = csqlasyncserver.cxx csqlsqlserver_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ $(top_builddir)/src/sqllog/libcsqlsqllog.la \ $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/gateway/libcsqlgw.la \ $(top_builddir)/src/cache/libcacheload.la \ + $(top_builddir)/src/sqlnetwork/libcsqlsqlnw.la \ + $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ $(top_builddir)/src/storage/libcsql.la -lrt -lpthread -lcrypt csqlsqlserver_LDFLAGS = -lcrypt -lodbc @@ -67,18 +111,25 @@ csqldump_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ $(top_builddir)/src/sqllog/libcsqlsqllog.la \ $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/gateway/libcsqlgw.la \ + $(top_builddir)/src/sqlnetwork/libcsqlsqlnw.la \ + $(top_builddir)/src/network/libcsqlnw.la \ $(top_builddir)/src/cache/libcacheload.la \ $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ $(top_builddir)/src/storage/libcsql.la -lrt -lpthread -lcrypt csqldump_LDFLAGS = -lcrypt -lodbc csqldump_SOURCES = csqldump.cxx -csqlcacheserver_LDADD = $(top_builddir)/src/sql/libcsqlsql.la \ - $(top_builddir)/src/sqllog/libcsqlsqllog.la \ - $(top_builddir)/src/network/libcsqlnw.la \ - $(top_builddir)/src/gateway/libcsqlgw.la \ - $(top_builddir)/src/cache/libcacheload.la \ - $(top_builddir)/src/adapter/libcsqlodbcadapter.la \ - $(top_builddir)/src/storage/libcsql.la -lrt -lpthread -lcrypt -csqlcacheserver_LDFLAGS = -lcrypt -lodbc +csqlcacheserver_LDADD = $(top_builddir)/src/storage/.libs/libcsql \ + $(top_builddir)/src/sql/.libs/libcsqlsql \ + $(top_builddir)/src/sqllog/.libs/libcsqlsqllog \ + $(top_builddir)/src/network/.libs/libcsqlnw \ + $(top_builddir)/src/adapter/.libs/libcsqlodbcadapter \ + $(top_builddir)/src/gateway/.libs/libcsqlgw \ + $(top_builddir)/src/cache/.libs/libcacheload \ + $(top_builddir)/src/sqlnetwork/.libs/libcsqlsqlnw -lrt -lpthread -lcrypt +csqlcacheserver_LDFLAGS = -lcrypt -lodbc -lrt -lpthread csqlcacheserver_SOURCES = csqlcacheserver.cxx + +csqlds_LDADD = $(top_builddir)/src/storage/.libs/libcsql +csqlds_LDFLAGS = -lcrypt +csqlds_SOURCES = csqlds.cxx diff --git a/src/tools/cachetable.cxx b/src/tools/cachetable.cxx index da99f084..56b38af0 100644 --- a/src/tools/cachetable.cxx +++ b/src/tools/cachetable.cxx @@ -1,4 +1,5 @@ /*************************************************************************** + * * Copyright (C) 2007 by www.databasecache.com * * Contact: praba_tuty@databasecache.com * * * @@ -13,42 +14,54 @@ * GNU General Public License for more details. * * * ***************************************************************************/ +#include #include #include +#include void printUsage() { - printf("Usage: cachetable [-U username] [-P passwd] -t tablename[-D] -c \"condition\" -f \"selected field names\" -p fieldname -S\n" - " [-R] [-s] [-r]\n"); - printf(" username -> username to connect with csql.\n"); - printf(" passwd -> password for the above username to connect with csql.\n"); - printf(" tablename -> table name to be cached in csql from target db.\n"); - printf(" fieldname -> field name to be specified for the bidirectional caching on which trigger to be run .\n"); - printf(" R -> recover all cached tables from the target database.\n"); - printf(" s -> load only the records from target db. Assumes table is already created in csql\n"); - printf(" r -> reload the table. get the latest image of table from target db\n"); - printf(" u -> unload the table. if used with -s option, removes only records and preserves the schema\n"); - printf(" no option -> get table definition and records from target db and create in csql.\n"); - printf(" D -> Enable direct access option to target database\n"); - printf(" S -> Cache Description\n"); + printf("Usage: cachetable [ -U ] [ -P ]\n"); + printf(" [ -t [-D] [ -c \"condition\" ]\n"); + printf(" [-f \"fieldListToCache\"]\n"); + printf(" [ -p [-F] ]\n"); + printf(" [ -u [-s] ]\n"); + printf(" [ -s | -r ] ]\n"); + printf(" [ -S | -R ]\n\n"); + printf(" U -> Username to connect with csql.\n"); + printf(" P -> Password for above username.\n"); + printf(" t -> Table name to be cached in csql from target db.\n"); + printf(" D -> Enable direct access option to target database.\n"); + printf(" c -> Conditional expression used in std SQL WHERE clause.\n"); + printf(" f -> List of field names to be cached. Comma separated.\n"); + printf(" p -> Not Null unique index field name for Bidirectional\n"); + printf(" caching on which trigger needs to be run.\n"); + printf(" F -> Forceful caching.\n"); + printf(" s -> Load only the records from target db.\n"); + printf(" Assumes table is already created in csql.\n"); + printf(" u -> Unload the table. If used with -s option,\n"); + printf(" removes only records and preserves the schema.\n"); + printf(" r -> Reload the table. Get the latest image of table from target db.\n"); + printf(" S -> Cache Description for cached tables.\n"); + printf(" R -> Recover all cached tables from the target database.\n"); return; } int main(int argc, char **argv) { - DbRetVal rv = OK; - Connection conn; - rv = conn.open("root","manager"); - if(rv != OK) return 1; - - if(!Conf::config.useCache()) - { - printf("CACHE_TABLE is set to FALSE in csql.conf file.\n"); - conn.close(); - return 1; - } - else{ conn.close(); } - + DbRetVal rv = OK; + Connection conn; + rv = conn.open(I_USER, I_PASS); + if (rv != OK) return 1; + + if (!Conf::config.useCache()) + { + printf("CACHE_TABLE is set to FALSE in csql.conf file.\n"); + conn.close(); + return 2; + } + else{ conn.close(); } + char username[IDENTIFIER_LENGTH]; username [0] = '\0'; char password[IDENTIFIER_LENGTH]; @@ -60,12 +73,17 @@ int main(int argc, char **argv) char condition[IDENTIFIER_LENGTH]; char fieldlist[IDENTIFIER_LENGTH]; char syncModeStr[IDENTIFIER_LENGTH]; + char dsnName[IDENTIFIER_LENGTH]; + bool Isuid=false; + bool Ispid=false; bool conditionval = false; bool fieldlistval = false; bool tableDefinition = true; bool tableNameSpecified = false; bool fieldNameSpecified = false; - while ((c = getopt(argc, argv, "U:P:t:f:c:p:RDSsru?")) != EOF) + bool forceEnable=false; + bool isDsn=false; + while ((c = getopt(argc, argv, "U:P:t:d:f:c:p:FRDSsru?")) != EOF) { switch (c) { @@ -76,6 +94,7 @@ int main(int argc, char **argv) tableNameSpecified = true; break; } + case 'd' : { strcpy(dsnName,argv[optind - 1]);isDsn=true;break;} case 'p' : { strcpy(fieldname, argv[optind - 1]); if(opt==2){fieldNameSpecified = true;break;} } @@ -83,15 +102,16 @@ int main(int argc, char **argv) case 'D' : { if(opt==2) {isDirect=true;break;} } - case 'c' : {strcpy(condition,argv[optind - 1]); conditionval = true; break; }// condition for selelcted records by :Jitendra + case 'c' : {strcpy(condition,argv[optind - 1]); conditionval = true; break; } case 'f' : {strcpy(fieldlist,argv[optind - 1]);fieldlistval = true ;break; } + case 'F' : { if(opt==2 && fieldNameSpecified ) forceEnable=true; break;} case '?' : { opt = 10; break; } //print help case 'R' : { opt = 3; break; } //recover all the tables case 's' : { tableDefinition=false; break; } //do not get the schema information from target db case 'r' : { opt = 4; break; } //reload the table case 'u' : { opt = 5; break; } //unload the table - case 'S' : { opt = 6; break; } - default: opt=10; + case 'S' : {opt=6;break;} + default: opt=10; } }//while options @@ -99,73 +119,92 @@ int main(int argc, char **argv) printUsage(); return 0; } - - //printf("%s %s \n", username, password); + if (username[0] == '\0' ) { - strcpy(username, "root"); - strcpy(password, "manager"); + strcpy(username, I_USER); + strcpy(password, I_PASS); } CacheTableLoader cacheLoader; cacheLoader.setConnParam(username, password); - + TableConf::config.setConnParam(username, password); if(conditionval){ - cacheLoader.setCondition(condition);}// new one - if(fieldlistval){ - cacheLoader.setFieldListVal(fieldlist);} + cacheLoader.setCondition(condition);// new one + TableConf::config.setCondition(condition); + } + + if(isDsn){ + cacheLoader.setDsnName(dsnName); + TableConf::config.setDsnName(dsnName); + } + + if(fieldlistval) { + cacheLoader.setFieldListVal(fieldlist); + TableConf::config.setFieldListVal(fieldlist); + } + if(forceEnable) { + cacheLoader.setForceFlag(forceEnable); + TableConf::config.setForceFlag(forceEnable); + } + bool isCached = false; + unsigned int mode = TableConf::config.getTableMode(tablename); + if (opt==2) { cacheLoader.setTable(tablename); - if(fieldNameSpecified){ cacheLoader.setFieldName(fieldname); } - rv = CacheTableLoader::isTableCached(tablename); - if(rv!=OK){ - rv = cacheLoader.load(tableDefinition); - if(rv == OK){ - cacheLoader.addToCacheTableFile(isDirect); - }else exit(2); - } else - { - printf("Table is already cached, unload table by \" cachetable -t -u\" and then try \n"); - exit(3); + TableConf::config.setTable(tablename); + if(fieldNameSpecified){ + cacheLoader.setFieldName(fieldname); + TableConf::config.setFieldName(fieldname); } - }else if (opt==3) //recover - { + + isCached = TableConf::config.isTableCached(mode); + if (isCached) { + printf("Table is already cached, unload table by\n"); + printf("\"cachetable -t -u\" and then try \n"); + return 3; + } + rv = cacheLoader.load(tableDefinition); + if(rv != OK) return 4; + TableConf::config.addToCacheTableFile(isDirect); + } else if (opt==3) {//recover rv = cacheLoader.recoverAllCachedTables(); - if (rv != OK) exit (1); - }else if (opt==4) //reload - { + if (rv != OK) return 5; + } else if (opt==4) {//reload if (!tableNameSpecified) { printf("Table name is not specified. Check usage with ? \n"); - return 1; + return 6; } cacheLoader.setTable(tablename); + TableConf::config.setTable(tablename); rv = cacheLoader.reload(); - if (rv != OK) exit (1); - }else if (opt==5) //unload - { + if (rv != OK) return 7; + + } else if (opt==5) {//unload if (!tableNameSpecified) { printf("Table name is not specified. Check usage with ? option\n"); - return 1; + return 8; } cacheLoader.setTable(tablename); - rv = cacheLoader.unload(tableDefinition); - if (rv != OK) exit (1); - rv = cacheLoader.removeFromCacheTableFile(); - if (rv != OK) exit (2); - - }else if(opt==6) - { - if(tableNameSpecified) - { - cacheLoader.setTable(tablename); - } - rv = cacheLoader.CacheInfo(tableNameSpecified); - if(rv !=OK) - { - printf("\nError (%d): None of the table found in Cache,You need to cache the table from Target DB.\n\n",rv); - exit(2); - } - } - return 0; + TableConf::config.setTable(tablename); + isCached = TableConf::config.isTableCached(mode); + if (!mode) { + printError(ErrNotCached, "Table is not Cached"); + return 9; + } + TableConf::config.removeFromCacheTableFile(); + } else if(opt==6) { + if(tableNameSpecified) { + cacheLoader.setTable(tablename); + TableConf::config.setTable(tablename); + } + rv = TableConf::config.CacheInfo(tableNameSpecified); + if (rv !=OK) { + printf("\nError (%d): None of the table found in Cache,You need to cache the table from Target DB.\n\n",rv); + exit(2); + } + } + return 0; } + diff --git a/src/tools/cacheverify.cxx b/src/tools/cacheverify.cxx index f4f8d8aa..6634421a 100644 --- a/src/tools/cacheverify.cxx +++ b/src/tools/cacheverify.cxx @@ -13,12 +13,17 @@ * GNU General Public License for more details. * * * ***************************************************************************/ +#include #include -#include #include +#include #include #include +AbsSqlConnection *conn=NULL; +AbsSqlStatement *stmt=NULL; +DataType pkFldType = typeUnknown; +int pkFldLen = 0; void printUsage() { printf("Usage: cacheverify [-U username] [-P passwd] -t tablename [-p] [-f]\n"); @@ -32,523 +37,577 @@ void printUsage() } typedef struct PrimaryKeyField { - int val; bool inCsql; bool inTrgtDb; + char val[1]; } PrimKeyFldVal; - typedef struct FldVal { void *value; DataType type; int length; + int pos; }FldVal; typedef struct Rec { - int val; - List fldValList; + List csqlFldValList; + List tdbFldValList; + char val[1]; }Record; -using namespace std; +int cmpIntPkFldVal (const void *pkfv1, const void *pkfv2) +{ + PrimKeyFldVal *p1 = (PrimKeyFldVal *)pkfv1; + PrimKeyFldVal *p2 = (PrimKeyFldVal *)pkfv2; + bool result = false; + result=AllDataType::compareVal(&p1->val,&p2->val,OpLessThan,pkFldType); + if (result) return -1; + else return 1; +} -// functions to sort the list for STL list -bool cmp(const PrimKeyFldVal *a, const PrimKeyFldVal *b) +int cmpStringPkFldVal (const void *pkfv1, const void *pkfv2) { - return a->val < b->val; + PrimKeyFldVal *p1 = (PrimKeyFldVal *)pkfv1; + PrimKeyFldVal *p2 = (PrimKeyFldVal *)pkfv2; + char *val1 = (char *) &p1->val; + char *val2 = (char *) &p2->val; + if (strcmp(val1, val2) < 0) return -1; + else return 1; } -bool cmpRec(const Record *a, const Record *b) +int cmpIntRecord (const void *pkfv1, const void *pkfv2) { - return a->val < b->val; + Record *p1 = (Record *)pkfv1; + Record *p2 = (Record *)pkfv2; + if (AllDataType::compareVal(&p1->val, &p2->val, OpLessThan, pkFldType)) + return -1; + else return 1; } -DbRetVal verifyCount(const char *tblName, long numTuples) +int cmpStringRecord (const void *pkfv1, const void *pkfv2) +{ + Record *p1 = (Record *)pkfv1; + Record *p2 = (Record *)pkfv2; + char *val1 = (char *) &p1->val; + char *val2 = (char *) &p2->val; + if (strcmp(val1, val2) < 0) return -1; + else return 1; +} + +void setParamValues(AbsSqlStatement *stmt, int parampos, DataType type, int length, void *value); + +DbRetVal verifyCount(const char *tblName, long numTuples) { char statement[200]; - AbsSqlConnection *con = SqlFactory::createConnection(CSqlAdapter); - DbRetVal rv = con->connect("root", "manager"); - if (rv != OK) { delete con; return ErrSysInit; } - AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); - stmt->setConnection(con); - int count = 0; + AbsSqlConnection *adConn = SqlFactory::createConnection(CSqlAdapter); + DbRetVal rv = adConn->connect(I_USER,I_PASS); + if (rv != OK) { delete adConn; return ErrSysInit; } + AbsSqlStatement *adStmt = SqlFactory::createStatement(CSqlAdapter); + adStmt->setConnection(adConn); + float count = 0; + int count1=0; int rows = 0; sprintf(statement, "select count(*) from %s;", tblName); - rv = con->beginTrans(); - rv = stmt->prepare(statement); - if(rv != OK) { - delete stmt; delete con; - printf("Prepare failed\n"); - return rv; - } - stmt->bindField(1, &count); - rv = stmt->execute(rows); - if(rv != OK) { - delete stmt; delete con; - printf("Execute failed\n"); - return rv; - } - if (stmt->fetch()== NULL) { - delete stmt; delete con; - printf("Fetch failed\n"); - return ErrSysInternal; - } - con->commit(); - stmt->free(); + rv = adConn->beginTrans(); + rv = adStmt->prepare(statement); + if(rv != OK) { + delete adStmt; delete adConn; + printf("Prepare failed\n"); + return rv; + } + SqlOdbcConnection *adcon= (SqlOdbcConnection *)adConn; + adStmt->bindField(1, &count1); + rv = adStmt->execute(rows); + if(rv != OK) { + delete adStmt; delete adConn; + printf("Execute failed\n"); + return rv; + } + if (adStmt->fetch()== NULL) { + delete adStmt; delete adConn; + printf("Fetch failed\n"); + return ErrSysInternal; + } + adConn->commit(); + adStmt->free(); printf("\nNumber of Records:\n"); printf("-------------------+-------------------+-------------------+\n"); printf(" Data | In CSQL | In TargetDB |\n"); printf("-------------------+-------------------+-------------------+\n"); - printf(" No. Of Records | %-6ld | %-6d |\n", numTuples, count); + printf(" No. Of Records | %-6ld | %-6d |\n", numTuples, count1); printf("-------------------+-------------------+-------------------+\n"); - delete stmt; delete con; + delete adStmt; delete adConn; return OK; } DbRetVal verifyMismatchingRecords(const char *tblName, int option) { - Connection conn; - DbRetVal rv = conn.open("root", "manager"); - if (rv != OK) return ErrSysInit; - DatabaseManager *dbMgr = (DatabaseManager *) conn.getDatabaseManager(); - if (dbMgr == NULL) { - conn.close(); - printf("Auth failed\n"); - return ErrSysInit; - } - Table *table = dbMgr->openTable(tblName); - if(table == NULL) { - conn.close(); - printf("Table \'%s\' does not exist", tblName); - return ErrNotExists; - } - - char statement[200]; + char csqlstatement[256]; + char tdbstatement[256]; AbsSqlConnection *trgtDbCon = SqlFactory::createConnection(CSqlAdapter); - rv = trgtDbCon->connect("root", "manager"); - if (rv != OK) { - dbMgr->closeTable(table); conn.close(); - delete trgtDbCon; return ErrSysInit; + DbRetVal rv = trgtDbCon->connect(I_USER,I_PASS); + if (rv != OK) { + delete trgtDbCon; return ErrSysInit; } AbsSqlStatement *trgtDbStmt = SqlFactory::createStatement(CSqlAdapter); trgtDbStmt->setConnection(trgtDbCon); - + char fieldName[IDENTIFIER_LENGTH]; fieldName[0] = '\0'; SqlOdbcStatement *ostmt = (SqlOdbcStatement*) trgtDbStmt; ostmt->getPrimaryKeyFieldName((char*)tblName, fieldName); if (fieldName[0] == '\0') { - dbMgr->closeTable(table); conn.close(); - delete trgtDbStmt; delete trgtDbCon; + delete trgtDbStmt; delete trgtDbCon; printf("Primary key does not exist on table %s\n", tblName); return ErrNotExists; } printf("\nPrimary key field name is \'%s\'\n", fieldName); - //FieldInfo *fldInfo = new FieldInfo(); - //table->getFieldInfo(fieldName, fldInfo); - //if(! fldInfo->isPrimary) { printError(ErrBadArg, "\'%s\' is not a primary key field", fldName); return ErrBadArg; } - int csqlVal = 0; + int rows = 0; + //will work for single field primary keys + //composite need to be worked out + FieldInfo *fInfo = new FieldInfo(); + ((SqlStatement *)stmt)->getFieldInfo(tblName, fieldName, fInfo); + pkFldType = fInfo->type; + if (pkFldType == typeString) pkFldLen = os::align(fInfo->length + 1); + else pkFldLen = fInfo->length; + void *pkval = AllDataType::alloc(pkFldType, pkFldLen); + memset(pkval, 0, pkFldLen); //List for primary key field values present in csql server List valListInCsql; - table->bindFld(fieldName, &csqlVal); - conn.startTransaction(); - table->setCondition(NULL); - rv = table->execute(); - if(rv != OK) { - dbMgr->closeTable(table); conn.close(); + List valListInTrgtDb; + List sameInBothDb; + List missingList; + + sprintf(csqlstatement, "select %s from %s;", fieldName, tblName); + sprintf(tdbstatement, "select %s from %s where %s=?;", fieldName, tblName, fieldName); + rv = stmt->prepare(csqlstatement); + if (rv != OK) { + delete trgtDbStmt; delete trgtDbCon; + printError(rv, "Prepare failed"); + return rv; + } + rv = trgtDbStmt->prepare(tdbstatement); + if (rv != OK) { delete trgtDbStmt; delete trgtDbCon; - printf("Execute failed\n"); - return rv; - } - while(true) { - void* tuple = table->fetch(rv); - if (tuple == NULL) break; - PrimKeyFldVal *pkFldVal = new PrimKeyFldVal(); - pkFldVal->val = csqlVal; - pkFldVal->inCsql = true; + printError(rv, "Prepare failed"); + return rv; + } + stmt->bindField(1, pkval); + trgtDbStmt->bindField(1, pkval); + rv = conn->beginTrans(); + if (rv != OK) { + stmt->free(); + delete trgtDbStmt; delete trgtDbCon; + printError(rv, "BeginTrans failed"); + return rv; + } + rv = stmt->execute(rows); + if(rv != OK) { + stmt->free(); + delete trgtDbStmt; delete trgtDbCon; + printf("Execute failed\n"); + return rv; + } + rv = trgtDbCon->beginTrans(); + int pkFldValSize = sizeof(PrimKeyFldVal) - 1 + pkFldLen; + while(stmt->fetch(rv) != NULL && rv == OK) { + PrimKeyFldVal *pkFldVal = (PrimKeyFldVal *) malloc (pkFldValSize); + memset(pkFldVal, 0, pkFldValSize); + AllDataType::copyVal(&pkFldVal->val, pkval, pkFldType, pkFldLen); + pkFldVal->inCsql = true; pkFldVal->inTrgtDb = true; - valListInCsql.append(pkFldVal); - } - table->closeScan(); - conn.commit(); - int trgtDbVal = 0; - int rows = 0; - + setParamValues(trgtDbStmt, 1, pkFldType, pkFldLen, pkval); + trgtDbStmt->execute(rows); + if (trgtDbStmt->fetch(rv) != NULL) { + sameInBothDb.append(pkFldVal); + } else { + pkFldVal->inTrgtDb = false; + missingList.append(pkFldVal); + } + trgtDbStmt->close(); + } + trgtDbCon->commit(); + stmt->close(); + conn->commit(); + stmt->free(); + trgtDbStmt->free(); + // List for primary key field values present in target DB - List valListInTrgtDb; - sprintf(statement, "select %s from %s;", fieldName, tblName); + sprintf(tdbstatement, "select %s from %s;", fieldName, tblName); + sprintf(csqlstatement, "select %s from %s where %s=?;", fieldName, tblName, fieldName); rv = trgtDbCon->beginTrans(); - rv = trgtDbStmt->prepare(statement); - if(rv != OK) { - dbMgr->closeTable(table); conn.close(); + rv = trgtDbStmt->prepare(tdbstatement); + if(rv != OK) { delete trgtDbStmt; delete trgtDbCon; - printf("Prepare failed\n"); - return rv; + printf("Prepare failed\n"); + return rv; } - trgtDbStmt->bindField(1, &trgtDbVal); + stmt->prepare(csqlstatement); + stmt->bindField(1, pkval); + trgtDbStmt->bindField(1, pkval); rv = trgtDbStmt->execute(rows); - if(rv != OK) { - dbMgr->closeTable(table); conn.close(); + if(rv != OK) { delete trgtDbStmt; delete trgtDbCon; - printf("Execute failed\n"); - return rv; + printf("Execute failed\n"); + return rv; } + conn->beginTrans(); while (trgtDbStmt->fetch() != NULL) { - PrimKeyFldVal *pkFldVal = new PrimKeyFldVal(); - pkFldVal->val = trgtDbVal; - pkFldVal->inCsql = true; + PrimKeyFldVal *pkFldVal = (PrimKeyFldVal *) malloc (pkFldValSize); + memset(pkFldVal, 0, pkFldValSize); + AllDataType::copyVal(&pkFldVal->val, pkval, pkFldType, pkFldLen); + pkFldVal->inCsql = true; pkFldVal->inTrgtDb = true; - valListInTrgtDb.append(pkFldVal); + setParamValues(stmt, 1, pkFldType, pkFldLen, pkval); + stmt->execute(rows); + if (stmt->fetch(rv) == NULL && rv ==OK) { + pkFldVal->inCsql = false; + missingList.append(pkFldVal); + } } + stmt->close(); + trgtDbStmt->close(); + conn->commit(); trgtDbCon->commit(); + stmt->free(); trgtDbStmt->free(); - - // List for primary key field values present in either of the databases - List diffInValList; - - // List for primary key field values present in both the databases - List sameInBothDbList; - ListIterator csqlValIter = valListInCsql.getIterator(); - ListIterator trgtDbValIter = valListInTrgtDb.getIterator(); - PrimKeyFldVal *csqlelem, *trgtdbelem; - while( (csqlelem = (PrimKeyFldVal *) csqlValIter.nextElement()) != NULL) { - while ( (trgtdbelem = (PrimKeyFldVal *) trgtDbValIter.nextElement()) != NULL) { - if (csqlelem->val == trgtdbelem->val) { - PrimKeyFldVal *elm = new PrimKeyFldVal(); - *elm = *csqlelem; - sameInBothDbList.append(elm); - trgtDbValIter.reset(); - break; - } - } - if (trgtdbelem == NULL) { - csqlelem->inTrgtDb = false; - PrimKeyFldVal *elm = new PrimKeyFldVal(); - *elm = * csqlelem; - diffInValList.append(elm); - trgtDbValIter.reset(); +/* + PrimKeyFldVal *pkArr = NULL; + ListIterator missIter = missingList.getIterator(); + int nEitherDb = missingList.size(); + if (nEitherDb) { + pkArr = (PrimKeyFldVal *) malloc(nEitherDb * pkFldValSize); + int i = 0; + char *ptr = (char *)pkArr; + while (missIter.hasElement()) { + PrimKeyFldVal *elm = (PrimKeyFldVal *)missIter.nextElement(); + memcpy(ptr, elm, pkFldValSize); + ptr += pkFldValSize; } + if (pkFldType == typeByteInt || pkFldType == typeShort || + pkFldType == typeInt || pkFldType == typeLong || + pkFldType == typeLongLong) + qsort (pkArr, nEitherDb, pkFldValSize, cmpIntPkFldVal); + else if (pkFldType == typeString) + qsort (pkArr, nEitherDb, pkFldValSize, cmpStringPkFldVal); } - csqlValIter.reset(); - while((trgtdbelem = (PrimKeyFldVal *)trgtDbValIter.nextElement()) != NULL) { - while((csqlelem = (PrimKeyFldVal *)csqlValIter.nextElement()) != NULL) { - if (trgtdbelem->val == csqlelem->val) { - csqlValIter.reset(); - break; - } - } - if (csqlelem == NULL) { - trgtdbelem->inCsql = false; - PrimKeyFldVal *elm = new PrimKeyFldVal(); - *elm = *trgtdbelem; - diffInValList.append(elm); - csqlValIter.reset(); - } - } - +*/ // Sorting the primary key field values present in either of the databases - list li; - ListIterator diffValIter = diffInValList.getIterator(); bool missingRecords = false; printf("\nMissing Records: Marked by \'X\'\n"); printf("-------------------+-------------------+-------------------+\n"); printf(" Primary Key | In CSQL | In Target DB |\n"); - printf("-------------------+-------------------+-------------------+\n"); - if (diffInValList.size()) { + printf("-------------------+-------------------+-------------------+\n"); +/* if (missingList.size()) { + char *ptr = (char *) pkArr; missingRecords = true; - PrimKeyFldVal *elem = NULL; - while ((elem = (PrimKeyFldVal *) diffValIter.nextElement()) != NULL ) - li.push_back(elem); - - list::iterator it; - li.sort(cmp); - for (it = li.begin(); it != li.end(); it++) { - if ((*it)->inCsql == false) - printf(" %-6d | X | |\n", (*it)->val); - else if ((*it)->inTrgtDb == false) - printf(" %-6d | | X |\n", (*it)->val); + for (int i = 0; i < nEitherDb; i++) { + PrimKeyFldVal *pkFldVal = (PrimKeyFldVal *) ptr; + printf(" "); + int nChrs = AllDataType::printVal(&pkFldVal->val, pkFldType, + pkFldLen); + nChrs = 17 - nChrs; + while (nChrs-- != 0) printf(" "); + if (pkFldVal->inCsql == false) { + printf("| X | |\n"); + } + else if (pkFldVal->inTrgtDb == false) { + printf("| | X |\n"); + } + ptr += pkFldValSize; } - printf("-------------------+-------------------+-------------------+\n"); + printf("-------------------+-------------------+-------------------+\n"); } else { printf(" No missing Records in either of the databases |\n"); - printf("-------------------+-------------------+-------------------+\n"); + printf("-------------------+-------------------+-------------------+\n"); } +*/ + - // Need to clean up the mess that is no more required - PrimKeyFldVal *pkFldVal = NULL; - while ((pkFldVal = (PrimKeyFldVal *) csqlValIter.nextElement()) != NULL) - delete pkFldVal; - valListInCsql.reset(); - while ((pkFldVal = (PrimKeyFldVal *) trgtDbValIter.nextElement()) != NULL) - delete pkFldVal; - valListInTrgtDb.reset(); - while ((pkFldVal = (PrimKeyFldVal *) diffValIter.nextElement()) != NULL) - delete pkFldVal; - diffInValList.reset(); - - if (option == 4) { - AbsSqlConnection *csqlCon = SqlFactory::createConnection(CSql); - rv = csqlCon->connect("root", "manager"); - if (rv != OK) { - dbMgr->closeTable(table); conn.close(); - delete trgtDbStmt; delete trgtDbCon; - delete csqlCon; return ErrSysInit; + + + + + ListIterator missIter = missingList.getIterator(); + if (missingList.size()) { + missingRecords = true; + while (missIter.hasElement()) { + PrimKeyFldVal *pkFldVal = (PrimKeyFldVal *) missIter.nextElement(); + printf(" "); + int nChrs = AllDataType::printVal(&pkFldVal->val, pkFldType, + pkFldLen); + nChrs = 17 - nChrs; + while (nChrs-- != 0) printf(" "); + if (pkFldVal->inCsql == false) { + printf("| X | |\n"); + } + else if (pkFldVal->inTrgtDb == false) { + printf("| | X |\n"); + } } - AbsSqlStatement *csqlStmt = SqlFactory::createStatement(CSql); - csqlStmt->setConnection(csqlCon); + printf("-------------------+-------------------+-------------------+\n"); + } + else { + printf(" No missing Records in either of the databases |\n"); + printf("-------------------+-------------------+-------------------+\n"); + } + // Need to clean up the mess that is no more required + //free (pkArr); + missIter.reset(); + PrimKeyFldVal *pkFldVal = NULL; + while ((pkFldVal = (PrimKeyFldVal *) missIter.nextElement()) != NULL) + free (pkFldVal); + missingList.reset(); + + if (option == 4) { //statement to fetch the values from the database - sprintf(statement, "select * from %s where %s = ?;", tblName, fieldName); - rv = csqlStmt->prepare(statement); + sprintf(csqlstatement, "select * from %s where %s = ?;", tblName, fieldName); + rv = stmt->prepare(csqlstatement); + rv = trgtDbStmt->prepare(csqlstatement); if(rv != OK) { - dbMgr->closeTable(table); conn.close(); delete trgtDbStmt; delete trgtDbCon; - delete csqlStmt; delete csqlCon; printf("Prepare failed\n"); return rv; } - + // need to bind each field with buffer which is list of field values - List fldNameList = table->getFieldNameList(); + SqlStatement *sqlStmt = (SqlStatement *) stmt; + List fldNameList = sqlStmt->getFieldNameList(tblName); ListIterator iter = fldNameList.getIterator(); Identifier *fname = NULL; FieldInfo *fldInfo = new FieldInfo(); - List fieldValueList; + List cfieldValueList; + List tfieldValueList; // List to hold all the records that are present in both the databases - List csqlRecordList; + List recordList; int paramPos = 1; while (iter.hasElement()) { fname = (Identifier *) iter.nextElement(); if (NULL == fname) { - dbMgr->closeTable(table); conn.close(); delete trgtDbStmt; delete trgtDbCon; - delete csqlStmt; delete csqlCon; - table = NULL; delete fldInfo; printf("Should never happen. Field Name list has NULL\n"); return ErrSysFatal; } - rv = table->getFieldInfo(fname->name, fldInfo); + rv = sqlStmt->getFieldInfo(tblName, fname->name, fldInfo); if (ErrNotFound == rv) { - dbMgr->closeTable(table); conn.close(); delete trgtDbStmt; delete trgtDbCon; - delete csqlStmt; delete csqlCon; - table = NULL; delete fldInfo; - printf("Field %s does not exist in table\n", - fname->name); + printf("Field %s does not exist in table\n", fname->name); return ErrSyntaxError; } - FldVal *fldVal = new FldVal(); - fldVal->type = fldInfo->type; - fldVal->length = fldInfo->length; - fldVal->value = AllDataType::alloc(fldInfo->type, fldInfo->length); - fieldValueList.append(fldVal); - csqlStmt->bindField(paramPos++, fldVal->value); + FldVal *cfldVal = new FldVal(); + FldVal *tfldVal = new FldVal(); + cfldVal->type = fldInfo->type; + tfldVal->type = fldInfo->type; + if(cfldVal->type == typeString) + cfldVal->length = os::align(fldInfo->length + 1); + else cfldVal->length = fldInfo->length; + cfldVal->value = AllDataType::alloc(fldInfo->type, cfldVal->length); + tfldVal->value = AllDataType::alloc(fldInfo->type, cfldVal->length); + memset(cfldVal->value, 0, cfldVal->length); + memset(tfldVal->value, 0, cfldVal->length); + cfieldValueList.append(cfldVal); + tfieldValueList.append(tfldVal); + stmt->bindField(paramPos, cfldVal->value); + trgtDbStmt->bindField(paramPos, tfldVal->value); + paramPos++; } delete fldInfo; - + iter.reset(); + while ((fname=(Identifier *)iter.nextElement())!= NULL) delete fname; + fldNameList.reset(); + // WHERE parameter should be binded with the primary key field value of the list that is present in both the databases - ListIterator sameValIter = sameInBothDbList.getIterator(); + int recSize = 2 * sizeof(List) + pkFldLen; + ListIterator sameValIter = sameInBothDb.getIterator(); PrimKeyFldVal *sameElem = NULL; while((sameElem = (PrimKeyFldVal *)sameValIter.nextElement()) != NULL) { - csqlCon->beginTrans(); - csqlStmt->setIntParam(1, sameElem->val); - rv = csqlStmt->execute(rows); - if(rv != OK) { - dbMgr->closeTable(table); conn.close(); - delete trgtDbStmt; delete trgtDbCon; - delete csqlStmt; delete csqlCon; - printf("Execute failed\n"); - return rv; - } - while (csqlStmt->fetch() != NULL) { - Record *rec = new Record(); - rec->val = sameElem->val; - ListIterator fldValIter = fieldValueList.getIterator(); - while (fldValIter.hasElement()) { - FldVal *fldVal = (FldVal *) fldValIter.nextElement(); - FldVal *fldValue = new FldVal(); - fldValue->type = fldVal->type; - fldValue->length = fldVal->length; - fldValue->value = AllDataType::alloc(fldValue->type, fldValue->length); - AllDataType::copyVal(fldValue->value, fldVal->value, fldVal->type, fldVal->length); - rec->fldValList.append(fldValue); - } - csqlRecordList.append(rec); - } - csqlStmt->close(); - csqlCon->commit(); - } - csqlStmt->free(); - - //statement to fetch the values from the database - sprintf(statement, "select * from %s where %s = ?;", tblName, fieldName); - rv = trgtDbStmt->prepare(statement); - if(rv != OK) { - dbMgr->closeTable(table); conn.close(); - delete csqlStmt; delete csqlCon; - delete trgtDbStmt; delete trgtDbCon; - printf("Prepare failed\n"); - return rv; - } - - // need to bind each field with buffer which is list of field values - ListIterator fldValIter = fieldValueList.getIterator(); - fldValIter.reset(); - List trgtDbRecordList; - paramPos = 1; - while (fldValIter.hasElement()) { - FldVal *fldVal = (FldVal *) fldValIter.nextElement(); - trgtDbStmt->bindField(paramPos++, fldVal->value); - } - - // WHERE parameter should be binded - sameValIter.reset(); - while((sameElem = (PrimKeyFldVal *)sameValIter.nextElement()) != NULL) { + conn->beginTrans(); trgtDbCon->beginTrans(); - trgtDbStmt->setIntParam(1, sameElem->val); + setParamValues(stmt, 1, pkFldType, pkFldLen, sameElem->val); + setParamValues(trgtDbStmt, 1, pkFldType, pkFldLen, sameElem->val); + rv = stmt->execute(rows); rv = trgtDbStmt->execute(rows); if(rv != OK) { - dbMgr->closeTable(table); conn.close(); - delete csqlStmt; delete csqlCon; delete trgtDbStmt; delete trgtDbCon; printf("Execute failed\n"); return rv; } - while (trgtDbStmt->fetch() != NULL) { - Record *rec = new Record(); - rec->val = sameElem->val; - fldValIter.reset(); - while (fldValIter.hasElement()) { - FldVal *fldVal = (FldVal *) fldValIter.nextElement(); - FldVal *fldValue = new FldVal(); - fldValue->type = fldVal->type; - fldValue->length = fldVal->length; - fldValue->value = AllDataType::alloc(fldValue->type, fldValue->length); - AllDataType::copyVal(fldValue->value, fldVal->value, fldVal->type, fldVal->length); - rec->fldValList.append(fldValue); + if (stmt->fetch() != NULL && trgtDbStmt->fetch() != NULL) { + Record *rec = (Record *) malloc(recSize); + memset(rec, 0, recSize); + AllDataType::copyVal(&rec->val, &sameElem->val, + pkFldType, pkFldLen); + ListIterator cfldValIter = cfieldValueList.getIterator(); + ListIterator tfldValIter = tfieldValueList.getIterator(); + int pos = 1; + while (cfldValIter.hasElement() && tfldValIter.hasElement()) { + FldVal *cfldVal = (FldVal *) cfldValIter.nextElement(); + FldVal *tfldVal = (FldVal *) tfldValIter.nextElement(); + if (AllDataType::compareVal(cfldVal->value, tfldVal->value, OpEquals, cfldVal->type, cfldVal->length) == false) { + FldVal *cfldValue = new FldVal(); + FldVal *tfldValue = new FldVal(); + cfldValue->type = cfldVal->type; + tfldValue->type = cfldVal->type; + cfldValue->length = cfldVal->length; + tfldValue->length = cfldVal->length; + cfldValue->value = AllDataType::alloc(cfldValue->type, cfldValue->length); + tfldValue->value = AllDataType::alloc(cfldValue->type, cfldValue->length); + cfldValue->pos = pos; tfldValue->pos = pos; + memset(cfldValue->value, 0, cfldValue->length); + memset(tfldValue->value, 0, cfldValue->length); + AllDataType::cachecopyVal(cfldValue->value, cfldVal->value, cfldVal->type, cfldVal->length); + AllDataType::cachecopyVal(tfldValue->value, tfldVal->value, cfldVal->type, cfldVal->length); + rec->csqlFldValList.append(cfldValue); + rec->tdbFldValList.append(tfldValue); + } + pos++; } - trgtDbRecordList.append(rec); - } + if (rec->csqlFldValList.size()) recordList.append(rec); + } + stmt->close(); trgtDbStmt->close(); + conn->commit(); trgtDbCon->commit(); } + // stmt->free(); // dont free it just yet needed further up trgtDbStmt->free(); - - // freeing the fieldValue buffer list which is not required any more - FldVal *fldVal = NULL; - while ((fldVal =(FldVal *) fldValIter.nextElement()) != NULL) { - free(fldVal->value); - delete fldVal; - } - fieldValueList.reset(); - + // freeing the field value list that is present in both the databases PrimKeyFldVal *pkFldVal = NULL; while ((pkFldVal = (PrimKeyFldVal *) sameValIter.nextElement()) != NULL) delete pkFldVal; - sameInBothDbList.reset(); - + sameInBothDb.reset(); +/* // sort the records based on Primary key that is present in both the databases - list csqlRecList; - ListIterator csqlRecListIter = csqlRecordList.getIterator(); - Record *elem; - while ((elem = (Record *) csqlRecListIter.nextElement()) != NULL ) - csqlRecList.push_back(elem); - csqlRecList.sort(cmpRec); - - list trgtDbRecList; - ListIterator trgtDbRecListIter = trgtDbRecordList.getIterator(); - while ((elem = (Record *) trgtDbRecListIter.nextElement()) != NULL ) - trgtDbRecList.push_back(elem); - trgtDbRecList.sort(cmpRec); - + int size = recordList.size(); + char *arr = (char *) malloc(size * recSize); + memset(arr, 0, size * recSize); + char *ptr = arr; + int i = 0; + ListIterator recIter = recordList.getIterator(); + while (recIter.hasElement()) { + Record *rec = (Record *) recIter.nextElement(); + memcpy(ptr, rec, recSize); + ptr += recSize; + } + if (pkFldType == typeByteInt || pkFldType == typeShort || + pkFldType == typeInt || pkFldType == typeLong || + pkFldType == typeLongLong) + qsort(arr, size, recSize, cmpIntRecord); + else if (pkFldType == typeString) + qsort(arr, size, recSize, cmpStringRecord); +*/ + int flag = 0; bool isConsistent = true; - list::iterator itCsql; - list::iterator itTrgtDb; - iter.reset(); printf("\nInconsistent Records for the same key:\n"); printf("-------------------+-------------------+-------------------+-------------------+\n"); - printf(" %-16s | %-16s | %-16s | %-16s |\n", "Primary Key", "Field Name", "In CSQL", "In Trgt DB"); + printf(" %-16s | %-16s | %-16s | %-16s |\n", "Primary Key", "Field Name", "In CSQL", "In Trgt DB"); printf("-------------------+-------------------+-------------------+-------------------+\n"); - for (itCsql = csqlRecList.begin(), itTrgtDb = trgtDbRecList.begin(); - itCsql != csqlRecList.end() && itTrgtDb != trgtDbRecList.end(); - itCsql++, itTrgtDb++) { - ListIterator csqlIt = (ListIterator)((*itCsql)->fldValList).getIterator(); - ListIterator trgtDbIt = (ListIterator)((*itTrgtDb)->fldValList).getIterator(); - iter.reset(); +/* ptr = arr; + char *fldname = NULL; + for (int i = 0; i < size; i++) { + Record *recd = (Record *) ptr; + ListIterator csqlIt = recd->csqlFldValList.getIterator(); + ListIterator trgtDbIt = recd->tdbFldValList.getIterator(); flag = 0; while (csqlIt.hasElement() && trgtDbIt.hasElement()) { FldVal *csqlElem = (FldVal *) csqlIt.nextElement(); FldVal *trgtDbElem = (FldVal *) trgtDbIt.nextElement(); - fname = (Identifier *) iter.nextElement(); + fldname = ((SqlStatement *) stmt)->getFieldName(csqlElem->pos); if (AllDataType::compareVal(csqlElem->value, trgtDbElem->value, OpEquals, csqlElem->type, csqlElem->length) == false) { isConsistent = false; - if (! flag) { - printf(" %-16d | %-16s | ", (*itCsql)->val, fname); + if (! flag) { + printf(" "); + int cnt = AllDataType::printVal(&recd->val, + pkFldType, pkFldLen); + cnt = 17 - cnt; + while (cnt-- != 0) printf(" "); + printf("| %-16s | ", fldname); flag = 1; } - else printf(" | %-16s | ", fname); + else printf(" | %-16s | ", fldname); int cnt = AllDataType::printVal(csqlElem->value, csqlElem->type, csqlElem->length); cnt = 17 - cnt; - while (cnt-- != 0) + while (cnt-- != 0) printf(" "); printf("| "); cnt = AllDataType::printVal(trgtDbElem->value, trgtDbElem->type, trgtDbElem->length); cnt = 17 - cnt; - while (cnt-- != 0) + while (cnt-- != 0) printf(" "); printf("|\n"); - } + } + } + ptr += recSize; + } +*/ + ListIterator recIter = recordList.getIterator(); + + char *fldname = NULL; + if (recordList.size()) { + isConsistent = false; + while(recIter.hasElement()) { + Record *recd = (Record *) recIter.nextElement(); + ListIterator csqlIt = recd->csqlFldValList.getIterator(); + ListIterator trgtDbIt = recd->tdbFldValList.getIterator(); + flag = 0; + while (csqlIt.hasElement()) { + FldVal *csqlElem = (FldVal *) csqlIt.nextElement(); + FldVal *trgtDbElem = (FldVal *) trgtDbIt.nextElement(); + fldname = ((SqlStatement *) stmt)->getFieldName(csqlElem->pos); + if (! flag) { + printf(" "); + int cnt = AllDataType::printVal(&recd->val, + pkFldType, pkFldLen); + cnt = 17 - cnt; + while (cnt-- != 0) printf(" "); + printf("| %-16s | ", fldname); + flag = 1; + } + else printf(" | %-16s | ", fldname); + int cnt = AllDataType::printVal(csqlElem->value, csqlElem->type, csqlElem->length); + cnt = 17 - cnt; + while (cnt-- != 0) printf(" "); + printf("| "); + cnt = AllDataType::printVal(trgtDbElem->value, trgtDbElem->type, trgtDbElem->length); + cnt = 17 - cnt; + while (cnt-- != 0) printf(" "); + printf("|\n"); + } } } - if (isConsistent == true && missingRecords == false) + + if (isConsistent == true && missingRecords == false) printf(" The data is consistent in both the databases |\n"); - else if (isConsistent == true && missingRecords == true) + else if (isConsistent == true && missingRecords == true) printf(" The data is consistent for the records with the same key |\n"); printf("-------------------+-------------------+-------------------+-------------------+\n"); // clean up all the mess before leaving - csqlRecList.clear(); - trgtDbRecList.clear(); - - iter.reset(); - while ((fname = (Identifier *) iter.nextElement()) != NULL) - delete fname; - fldNameList.reset(); - - Record *item = NULL; - while((item = (Record *) csqlRecListIter.nextElement()) != NULL) { - ListIterator it = (ListIterator) item->fldValList.getIterator(); - FldVal *fldVal = NULL; - while((fldVal = (FldVal *) it.nextElement()) != NULL) { - free (fldVal->value); - delete fldVal; + stmt->free(); + //free(arr); + Record *itm = NULL; + while((itm = (Record *) recIter.nextElement()) != NULL) { + ListIterator cit = (ListIterator) itm->csqlFldValList.getIterator(); + ListIterator tit = (ListIterator) itm->tdbFldValList.getIterator(); + FldVal *cfldVal = NULL; FldVal *tfldVal = NULL; + while( (cfldVal = (FldVal *) cit.nextElement()) != NULL && + (tfldVal = (FldVal *) tit.nextElement()) != NULL ) { + free (cfldVal->value); free (tfldVal->value); + delete cfldVal; delete tfldVal; } - it.reset(); + cit.reset(); tit.reset(); } - csqlRecordList.reset(); - - while((item = (Record *) trgtDbRecListIter.nextElement()) != NULL) { - ListIterator it = (ListIterator) item->fldValList.getIterator(); - FldVal *fldVal = NULL; - while((fldVal = (FldVal *) it.nextElement()) != NULL) { - free (fldVal->value); - delete fldVal; - } - it.reset(); - } - trgtDbRecordList.reset(); - delete csqlStmt; delete csqlCon; - } - - dbMgr->closeTable(table); - conn.close(); + recordList.reset(); + } delete trgtDbStmt; delete trgtDbCon; return OK; } @@ -595,74 +654,154 @@ int main(int argc, char **argv) //printf("%s %s \n", username, password); if (username[0] == '\0' ) { - strcpy(username, "root"); - strcpy(password, "manager"); + strcpy(username, I_USER); + strcpy(password, I_PASS); } - Connection conn; - DbRetVal rv = conn.open(username, password); + conn = SqlFactory::createConnection(CSqlDirect); + DbRetVal rv = conn->connect(username, password); if (rv != OK) { printf("Authentication failed\n"); + delete conn; return 1; } - - DatabaseManager *dbMgr = (DatabaseManager *) conn.getDatabaseManager(); - if (dbMgr == NULL) { - conn.close(); - printf("could not connect to the database\n"); - return 2; - } - - - Table *table = dbMgr->openTable(tableName); - if(table == NULL) { - conn.close(); - printf("Table \'%s\' does not exist\n", tableName); - return 3; + stmt = SqlFactory::createStatement(CSqlDirect); + stmt->setConnection(conn); + bool found = false; + List tableNameList = stmt->getAllTableNames(rv); + ListIterator it = tableNameList.getIterator(); + while (it.hasElement()) { + Identifier *elem = (Identifier *) it.nextElement(); + if (strcmp(elem->name, tableName) == 0) { + found = true; + break; + } } - FILE *fp; fp = fopen(Conf::config.getTableConfigFile(),"r"); if( fp == NULL ) { - dbMgr->closeTable(table); - conn.close(); + conn->disconnect(); + delete stmt; delete conn; printf("cachetable.conf file does not exist\n"); - return 4; + return 2; } char tablename[IDENTIFIER_LENGTH]; char fieldname[IDENTIFIER_LENGTH]; char condition[IDENTIFIER_LENGTH]; char field[IDENTIFIER_LENGTH]; + char dsnName[IDENTIFIER_LENGTH]; + int mode; - bool filePresent = false; + bool isCached = false; while(!feof(fp)) { - fscanf(fp, "%d:%s %s %s %s\n", &mode, tablename,fieldname,condition,field); - if (mode ==2 ) //just replicated table and not cached - continue; + fscanf(fp, "%d %s %s %s %s %s\n", &mode, tablename,fieldname,condition,field,dsnName); if (strcmp(tableName, tablename) == 0) { - filePresent = true; + isCached = true; break; } } fclose(fp); - long numTuples = table->numTuples(); - dbMgr->closeTable(table); - conn.close(); + long numTuples = 0; + int rows; - if (filePresent == false) { printf("The table \'%s\' is not cached\n", tableName); + char statement[200]; + sprintf(statement, "select count(*) from %s;", tableName); + rv = stmt->prepare(statement); + if (rv != OK) { + conn->disconnect(); + delete stmt; delete conn; + return 3; + } + rv = conn->beginTrans(); + if (rv != OK) { + conn->disconnect(); + delete stmt; delete conn; + return 4; + } + stmt->bindField(1, &numTuples); + stmt->execute(rows); + if (rv != OK) { + conn->disconnect(); + delete stmt; delete conn; return 5; } + void *tuple = stmt->fetch(rv); + stmt->close(); + conn->commit(); + stmt->free(); + + if (isCached == false) { + conn->disconnect(); + printf("The table \'%s\' is not cached\n", tableName); + delete stmt; delete conn; return 5; + } if (opt == 2) { rv = verifyCount(tableName, numTuples); - if (rv != OK) return 6; + if (rv != OK) { + conn->disconnect(); delete stmt; delete conn; return 7; + } } if (opt == 3 || opt == 4) { rv = verifyCount(tableName, numTuples); - if (rv != OK) return 7; + if (rv != OK) { + conn->disconnect(); delete stmt; delete conn; return 8; + } rv = verifyMismatchingRecords(tableName, opt); - if (rv != OK) return 8; + if (rv != OK) { + conn->disconnect(); delete stmt; delete conn; return 9; + } } + conn->disconnect(); delete stmt; delete conn; return 0; } + +void setParamValues(AbsSqlStatement *stmt, int parampos, DataType type, int length, void *value) +{ + switch(type) + { + case typeInt: + stmt->setIntParam(parampos, *(int*)value); + break; + case typeLong: + stmt->setLongParam(parampos, *(long*)value); + break; + case typeLongLong: + stmt->setLongLongParam(parampos, *(long long*)value); + break; + case typeShort: + stmt->setShortParam(parampos, *(short*)value); + break; + case typeByteInt: + stmt->setByteIntParam(parampos, *(char*)value); + break; + case typeDouble: + stmt->setDoubleParam(parampos, *(double*)value); + break; + case typeFloat: + stmt->setFloatParam(parampos, *(float*)value); + break; + case typeDate: + stmt->setDateParam(parampos, *(Date*)value); + break; + case typeTime: + stmt->setTimeParam(parampos, *(Time*)value); + break; + case typeTimeStamp: + stmt->setTimeStampParam(parampos, *(TimeStamp*)value); + break; + case typeString: + { + char *d =(char*)value; + d[length-1] = '\0'; + stmt->setStringParam(parampos, (char*)value); + break; + } + case typeBinary: + stmt->setBinaryParam(parampos, (char *) value, length); + break; + } + return; +} + diff --git a/src/tools/catalog.cxx b/src/tools/catalog.cxx index a679915a..5a9bad04 100644 --- a/src/tools/catalog.cxx +++ b/src/tools/catalog.cxx @@ -13,11 +13,14 @@ * GNU General Public License for more details. * * * ***************************************************************************/ +#include #include #include #include #include #include +#include + void printUsage() { printf("Usage: catalog [-u username] [-p passwd] [-l] [-i] [-d] [-T table] [-I index] [-D ]\n"); @@ -41,7 +44,7 @@ int main(int argc, char **argv) password [0] = '\0'; int c = 0, opt = 0; char name[IDENTIFIER_LENGTH]; - while ((c = getopt(argc, argv, "u:p:T:I:D:licd?")) != EOF) + while ((c = getopt(argc, argv, "u:p:T:I:D:licdsS?")) != EOF) { switch (c) { @@ -53,6 +56,12 @@ int main(int argc, char **argv) case 'l' : { opt = 2; break; } //list all the table with field info case 'i' : { opt = 3; break; }//reinitialize the catalog table case 'd' : { opt = 4; break; }//print db usage statistics + case 's' : { if(opt == 6) opt=11; + else printf("Use -I IndexName -s\n"); + break; }//print db usage statistics + case 'S' : { if(opt == 6) opt=12; + else printf("Use -I IndexName -S\n"); + break; }//print db usage statistics case '?' : { opt = 10; break; } //print help default: opt=1; //list all the tables @@ -66,8 +75,8 @@ int main(int argc, char **argv) //printf("%s %s \n", username, password); if (username[0] == '\0' ) { - strcpy(username, "root"); - strcpy(password, "manager"); + strcpy(username, I_USER); + strcpy(password, I_PASS); opt=1;//if username is not specified, just list all table names } @@ -143,7 +152,7 @@ int main(int argc, char **argv) { elem = (Identifier*) iter.nextElement(); #ifndef MMDB - rv=CacheTableLoader::isTableCached(elem->name); + rv=TableConf::config.isTableCached(elem->name); if(rv!=OK){ printf(" %s \n", elem->name); dbMgr->dropTable(elem->name); @@ -154,6 +163,7 @@ int main(int argc, char **argv) #endif count++; } + //TODO::If durability is on, remove chkpt and redo log file if (count ==0) printf(" \n"); printf("\n"); @@ -258,6 +268,10 @@ int main(int argc, char **argv) printUsage(); ret =1; } + } else if (opt == 11){ + dbMgr->printTreeIndexNodeInfo(name, true); + } else if (opt == 12){ + dbMgr->printTreeIndexNodeInfo(name,false); } iter.reset(); while (iter.hasElement()) delete iter.nextElement(); diff --git a/src/tools/checkpoint.cxx b/src/tools/checkpoint.cxx new file mode 100644 index 00000000..3f4d41e9 --- /dev/null +++ b/src/tools/checkpoint.cxx @@ -0,0 +1,78 @@ +/*************************************************************************** + * Copyright (C) 2007 by www.databasecache.com * + * Contact: praba_tuty@databasecache.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + ***************************************************************************/ +#include +#include +//#include +//#include +#include +#include +//#include +//#include +//#include +//#include +//#define SQL_STMT_LEN 1024 + +//FILE *fp; + +AbsSqlConnection *conn = NULL; +AbsSqlStatement *stmt = NULL; + +int main(int argc, char **argv) +{ + char username[IDENTIFIER_LENGTH]; + username [0] = '\0'; + char password[IDENTIFIER_LENGTH]; + password [0] = '\0'; + char filename[512]; + filename [0] ='\0'; + int c = 0, opt=0; + int connOpt = 0; +/* while ((c = getopt(argc, argv, "u:p:")) != EOF) + { + switch (c) + { + case 'u' : strcpy(username , argv[optind - 1]); break; + case 'p' : strcpy(password , argv[optind - 1]); break; + default: printf("Wrong args\n"); exit(1); + } + + }*/ + + + conn = SqlFactory::createConnection(CSqlDirect); + DbRetVal rv = conn->connect(I_USER, I_PASS); + if (rv != OK) { + delete conn; + return 1; + } + SqlConnection *sqlcon = (SqlConnection *) conn; + rv = sqlcon->getExclusiveLock(); + if (rv != OK) { + conn->disconnect(); + delete conn; + return 2; + } + DatabaseManager *dbMgr = conn->getConnObject().getDatabaseManager(); + rv = dbMgr->checkPoint(); + if (rv != OK) { + printError(rv, "checkpoint: failed"); + conn->disconnect(); + return 3; + } + conn->disconnect(); + delete conn; + return 0; +} diff --git a/src/tools/csqlasyncserver.cxx b/src/tools/csqlasyncserver.cxx new file mode 100644 index 00000000..9d4e909e --- /dev/null +++ b/src/tools/csqlasyncserver.cxx @@ -0,0 +1,633 @@ +/*************************************************************************** + * Copyright (C) 2007 by www.databasecache.com * + * Contact: praba_tuty@databasecache.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + ***************************************************************************/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include //for BindSqlField +#include + +class AbsCSqlQIterator +{ + public: + virtual void *next() = 0; +}; + +class AbsCSqlQueue +{ + public: + virtual void push(void *log, int len) = 0; + virtual void pop() = 0; + virtual int size() = 0; +}; + +class ListIter : public AbsCSqlQIterator +{ + public: + ListIterator iter; + ListIter(List &list) { iter = list.getIterator(); } + void *next() { return iter.nextElementInQueue(); } +}; + +typedef struct FailedStmtObject { + int stmtId; + DbRetVal eType; +} FailStmt; + + +long long qIndex = 0; + +class ListAsQueue : public AbsCSqlQueue +{ + public: + List list; + ListAsQueue() {} + void push(void *log, int len) + { + int logSize = sizeof(long long) + len + sizeof(int) + sizeof(long); + char *logElem = (char *) malloc(os::align(logSize)); + *(long long *) logElem = ++qIndex; + *(int*)(logElem + sizeof(long long))= len; + char *ptr = logElem + sizeof(int) + sizeof(long long); + memcpy(ptr, log, len+sizeof(long)); //long for msg type + list.append(logElem); + printDebug(DM_ReplServer, "Pushed Element: %x", logElem); + } + int size() { return list.size(); } + void pop(){}; +}; + + +class ThreadInputData +{ + public: + AbsCSqlQIterator *qIter; + long long *indexPtr; + ThreadInputData() { qIter = NULL; indexPtr = NULL; } +}; + +void *startThread(void *p); + + +void printUsage() +{ + printf("Usage: csqlasyncserver \n"); + printf("Description: Start the csql Async server.\n"); + return; +} + +DbRetVal processMessage(void *str, int len, void *conn, void *hashBucketPtr, + SqlApiImplType flag, List *prepareFailList); +void *freeMsgFromQueue(void *p); +DbRetVal handlePrepare(void *str, void *conn, void *stmtBuckets, + SqlApiImplType flag, List *prepareFailList); +DbRetVal handleCommit(void *str, int len, void *conn, void *stmtBuckets, + List *prepareFailList); +DbRetVal handleFree(void *str, void *stmtBuckets, List *prepareFailList); +AbsSqlStatement *getStmtFromHashTable(int stmtId, void *stmtBuckets); +DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, + char *dsn); + +int getHashBucket(int stmtid) +{ + return (stmtid % STMT_BUCKET_SIZE); +} + +AbsCSqlQueue *csqlQ = NULL; + +int srvStop =0; +int msgKey = 0; + +ThreadInputData **thrInput; + + +int asyncSites = 0; +int syncSites = 0; +long long * indexCountPtr = NULL; +pthread_t freeThrId = 0; + +static void sigTermHandler(int sig) +{ + printf("Received signal %d\nStopping the server\n", sig); + os::msgctl(msgKey, IPC_RMID, NULL); + srvStop = 1; +} + +int main(int argc, char **argv) +{ + int c = 0, opt = 0; + while ((c = getopt(argc, argv, "?")) != EOF) { + switch (c) { + case '?' : { opt = 10; break; } //print help + default: opt=10; + } + }//while options + if (opt == 10) { printUsage(); + return 0; + } + + os::signal(SIGINT, sigTermHandler); + os::signal(SIGTERM, sigTermHandler); + + Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE")); + if (( !Conf::config.useCache() && + Conf::config.getCacheMode() == SYNC_MODE) ) { + printf("Replication server not started\n"); + return 1; + } + + bool found =false; + //printf("config id = %d\n", Conf::config.getSiteID()); + + if ((!Conf::config.useCache() && + Conf::config.getCacheMode()==SYNC_MODE)) { + printf("There are no async sites\n"); + return 4; + } + + msgKey = os::msgget(Conf::config.getMsgKey(), 0666); + if (msgKey == -1) { + printf("Message Queue creation failed\n"); + return 4; + } + + csqlQ = new ListAsQueue(); + ListIterator itr = ((ListAsQueue *)csqlQ)->list.getIterator(); + + if (Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) { + asyncSites++; + } + pthread_t *thrId =new pthread_t [asyncSites]; + thrInput = (ThreadInputData **) malloc(sizeof(ThreadInputData *) * asyncSites); + indexCountPtr = (long long *) malloc(sizeof(long long) * asyncSites); + memset(indexCountPtr, 0, sizeof(long long) * asyncSites); + int i=0; + if(Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) { + thrInput[i] = new ThreadInputData(); + thrInput[i]->qIter = NULL; + thrInput[i]->indexPtr = &indexCountPtr[i]; + pthread_create(&thrId[i], NULL, &startThread, thrInput[i]); + i++; + } + pthread_create(&freeThrId, NULL, freeMsgFromQueue, NULL); + struct timeval timeout, tval; + timeout.tv_sec = 5; + timeout.tv_usec = 0; + int msgSize = Conf::config.getAsyncMsgMax(); + char str[8192]; + // printf("Replication Server Started"); + while (!srvStop) { + tval.tv_sec = timeout.tv_sec; + tval.tv_usec = timeout.tv_usec; + os::select(0, 0, 0, 0, &tval); + printDebug(DM_ReplServer, "waiting for message"); + while(true) { + long size = os::msgrcv(msgKey, str, msgSize, 0, 0666|IPC_NOWAIT);// process logs + printDebug(DM_ReplServer, "Received msg size = %d", size); + if (size == -1 || srvStop) break; + csqlQ->push(str, size); // long for mtype of msg + } + } + delete[] thrId; + printf("Replication Server Exiting\n"); + return 0; +} + +void *startThread(void *thrInfo) +{ + DbRetVal rv = OK; + ThreadInputData *thrInput = (ThreadInputData *)thrInfo; + List prepareFailList; + SqlApiImplType flag; + flag = CSqlAdapter; + printDebug(DM_ReplServer, "SqlAdapter Thread created"); + AbsCSqlQIterator *iter = thrInput->qIter; + while (1) { if (csqlQ->size()) break; } + iter = new ListIter(((ListAsQueue *)csqlQ)->list); + AbsSqlConnection *conn = SqlFactory::createConnection(flag); + void *stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket)); + memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket)); + printDebug(DM_ReplServer, "stmtbuckets: %x", stmtBuckets); + struct timeval timeout, tval; + timeout.tv_sec = 0; + while (1) { + while (1) { + rv = conn->connect(I_USER, I_PASS); + if (rv == OK) break; + printError(rv, "Unable to connect to peer site"); + timeout.tv_sec = 10; + timeout.tv_usec = 0; + os::select(0, 0, 0, 0, &timeout); + } + while (1) { + void *msg = NULL; + while (1) { + msg = iter->next(); //receive msg from csqlQ + if (msg != NULL) break; + tval.tv_sec = 5; + tval.tv_usec = 1000; + os::select(0, 0, 0, 0, &tval); + } + long long index = *(long long *) msg; + int length = *(int *)((char *)msg+sizeof(long long)); + char *msgptr = (char *)msg + sizeof(long long) + sizeof(int); + printDebug(DM_ReplServer, "entering process message"); + rv = processMessage(msgptr, length, conn, stmtBuckets, flag, + &prepareFailList); + if (rv == ErrNoConnection) break; + printDebug(DM_ReplServer, "processed message"); + *(long long *) thrInput->indexPtr = index; + printDebug(DM_ReplServer, "index %d is stored in Main index log array\n", index); + } + } + return NULL; +} + +DbRetVal processMessage(void *str, int len, void *conn, void *stmtBuckets, + SqlApiImplType flag, List *prepareFailList) +{ + long type = *(long *) str; + printDebug(DM_ReplServer, "type = %d\n", type); + char *data = (char *) str + sizeof(long); + if (type == 1) return handlePrepare(data, conn, stmtBuckets, flag, + prepareFailList); + else if (type == 2) return handleCommit(data, len, conn, stmtBuckets, + prepareFailList); + else if (type == 3) return handleFree(data, stmtBuckets, prepareFailList); +} + +void *freeMsgFromQueue(void *indCntPtr) +{ + long long minIndex = 0; + struct timeval timeout, tval; + timeout.tv_sec = 0; + printDebug(DM_ReplServer, "waiting for free the q elements"); + while(1) { + if (csqlQ->size()) break; +/// printError(ErrWarning, "List is empty"); + tval.tv_sec = 1; + tval.tv_usec = 1000; + os::select(0, 0, 0, 0, &tval); + } + AbsCSqlQIterator *iter = new ListIter(((ListAsQueue *)csqlQ)->list); + while (1) { + minIndex = indexCountPtr[0]; + for (int i=0; i < asyncSites; i++) { + if (minIndex > indexCountPtr[i]) minIndex = indexCountPtr[i]; + } + void *msg = NULL; + while (1) { + msg = iter->next(); //receive msg from csqlQ + if (msg == NULL) break; + if( *(long long *) msg <= minIndex) { + long long num = *(long long *) msg; + free (msg); msg = NULL; + } else break; + } + tval.tv_sec = 0; + tval.tv_usec = 100000; + os::select(0, 0, 0, 0, &tval); + } + return NULL; +} + +DbRetVal handlePrepare(void *data, void *conn, void *stmtBuckets, + SqlApiImplType flag, List *prepareFailList) +{ + DbRetVal rv = OK; + AbsSqlConnection *con = (AbsSqlConnection *)conn; + AbsSqlStatement *stmt = SqlFactory::createStatement(flag); + stmt->setConnection(con); + char *ptr = (char *) data; + int length = *(int *) ptr; ptr += sizeof(int); + int txnId = *(int *) ptr; ptr += sizeof(int); + int stmtId = *(int *) ptr; ptr += sizeof(int); + char *tblName = ptr; ptr += IDENTIFIER_LENGTH; + char *stmtstr = (char *)data + 3 * sizeof(int) + IDENTIFIER_LENGTH; + int i = 1; + + unsigned int mode = TableConf::config.getTableMode(tblName); + bool isCached = TableConf::config.isTableCached(mode); + + if ((flag == CSqlAdapter) && !isCached) { + FailStmt *fst = new FailStmt(); + fst->stmtId = stmtId; + fst->eType = ErrNotCached; + prepareFailList->append(fst); + return OK; + } + + printDebug(DM_ReplServer, "stmt str: %s", stmtstr); + rv = stmt->prepare(stmtstr); + if (rv != OK) { + FailStmt *fst = new FailStmt(); + fst->stmtId = stmtId; + fst->eType = rv; + prepareFailList->append(fst); + return rv; + } + int bucketNo = getHashBucket(stmtId); + printDebug(DM_ReplServer, "PrepHdl: stmtBuckets: %x", stmtBuckets); + printDebug(DM_ReplServer, "PrepHdl: bucketno: %d", bucketNo); + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + printDebug(DM_ReplServer, "PrepHdl: bucket addr: %x", stmtBucket); + StmtNode *node = new StmtNode(); + printDebug(DM_ReplServer, "PredHdl: stmtNode addr: %x", node); + node->stmtId = stmtId; + node->stmt = stmt; + strcpy(node->stmtstr, stmtstr); + printDebug(DM_ReplServer, "PrepHdl: stmt id: %d stmt %x", node->stmtId, node->stmt); + stmtBucket->bucketList.append(node); + + printDebug(DM_ReplServer, "returning from prepare"); + return rv; +}; + +DbRetVal handleCommit(void *data, int len, void *conn, void *stmtBuckets, + List *prepareFailList) +{ + DbRetVal rv = OK; + AbsSqlConnection *con = (AbsSqlConnection *)conn; + // get dsn if adapter to write into conflict resolution file + char *dsn = NULL; + SqlOdbcConnection *adCon = (SqlOdbcConnection *) con; + dsn = adCon->dsn; + char *ptr = (char *) data; + int datalen = *(int *) ptr; ptr += sizeof(int); + int txnId = *(int *) ptr; ptr += sizeof(int); + FailStmt *elem = NULL; + rv = con->beginTrans(); + if (rv != OK) { + printError(rv, "Begin trans failed"); + return rv; + } + while ((ptr - (char *)data) < len) { + int stmtId = *(int *)ptr; + ptr += sizeof(int); + AbsSqlStatement *stmt = getStmtFromHashTable(stmtId, stmtBuckets); + printDebug(DM_ReplServer, "commit: stmtId: %d", stmtId); + printDebug(DM_ReplServer, "commit: stmtbuckets: %x", stmtBuckets); + printDebug(DM_ReplServer, "commit: stmt: %x", stmt); + ExecType type = (ExecType) (*(int *) ptr); + ptr += sizeof(int); + if (type == SETPARAM) { + int parampos = *(int *) ptr; + ptr += sizeof(int); + DataType dataType = (DataType) ( *(int *) ptr); + ptr += sizeof(int); + int length = *(int *) ptr; + ptr += sizeof(int); + void *value = ptr; + ptr += length; + if (stmt != NULL) + SqlNetworkHandler::setParamValues(stmt, parampos, dataType, + length, (char *)value); + } else { + // start executing and committing for all active connections + int rows; + if (stmt != NULL) rv = stmt->execute(rows); + if (rv != OK) { + printError(rv, "Execute failed with return value %d", rv); + if (rv == ErrNoConnection) return rv; + else { + // write to conflict resolution file + writeToConfResFile(data, len, stmtBuckets, dsn); + con->rollback(); + return OK; + } + } + if (stmt == NULL) { + ListIterator it = prepareFailList->getIterator(); + bool found = false; + while (it.hasElement()) { + elem = (FailStmt *) it.nextElement(); + if (elem->stmtId == stmtId) { found = true; break; } + } + if (! found) continue; // for local table + if ((elem->eType == ErrNotCached) || + elem->eType == ErrNotExists) + continue; + else { + // write to conflict resolution file + writeToConfResFile(data, len, stmtBuckets, dsn); + con->rollback(); + return OK; + } + } + } + } + rv = con->commit(); + if (rv != OK) { printDebug(DM_ReplServer, "commit failed"); } + else { printDebug(DM_ReplServer, "commit passed"); } + return OK; +} + +DbRetVal handleFree(void *data, void *stmtBuckets, List *prepareFailList) +{ + DbRetVal rv = OK; + char *ptr = (char *) data; + int len = *(int *) ptr; ptr += sizeof(int); + int txnId = *(int *) ptr; ptr += sizeof(int); + int stmtId = *(int *)ptr; + AbsSqlStatement *stmt = getStmtFromHashTable(stmtId, stmtBuckets); + FailStmt *elem = NULL; + if (stmt == NULL) { + ListIterator failListIter = prepareFailList->getIterator(); + while (failListIter.hasElement()) { + elem = (FailStmt *) failListIter.nextElement(); + if (elem->stmtId == stmtId) break; + } + failListIter.reset(); + prepareFailList->remove(elem); + return OK; + } + rv = stmt->free(); + if (rv != OK) { + printError(rv, "HandleFree failed with return vlaue %d", rv); + return rv; + } + int bucketNo = getHashBucket(stmtId); + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + if(stmtId == node->stmtId) break; + } + it.reset(); + printDebug(DM_ReplServer, "GSFHT: node: %x", node); + printDebug(DM_ReplServer, "GSFHT: stmtId: %d", node->stmtId); + printDebug(DM_ReplServer, "GSFHT stmt: %x", node->stmt); + stmtBucket->bucketList.remove(node); + if (node->stmt) delete stmt; + delete node; node = NULL; + return OK; +} + +AbsSqlStatement *getStmtFromHashTable(int stmtId, void *stmtBuckets) +{ + int bucketNo = getHashBucket(stmtId); + printDebug(DM_ReplServer, "GSFHT: bucketNo: %d", bucketNo); + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + printDebug(DM_ReplServer, "GSFHT: bucket: %x", stmtBucket); + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + printDebug(DM_ReplServer, "GSFHT: node: %x", node); + if(stmtId == node->stmtId) { + printDebug(DM_ReplServer, "GSFHT: stmtId: %d", node->stmtId); + printDebug(DM_ReplServer, "GSFHT stmt: %x", node->stmt); + return node->stmt; + } + } + return NULL; +} + +DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, + char *dsn) +{ + DbRetVal rv = OK; + bool isPrmStmt=false; + char confResFile[1024]; + sprintf(confResFile, "%s", Conf::config.getConflResoFile()); + int fd = open(confResFile, O_WRONLY|O_CREAT| O_APPEND, 0644); + if (fd < 0) { + printError(ErrOS, "Could not create conflict Resolution file"); + return ErrOS; + } + char buffer[1024]; + char paramStmtString[1024]; + + char *ptr = (char *) data; + int datalen = *(int *) ptr; ptr += sizeof(int); + int txnId = *(int *) ptr; ptr += sizeof(int); + strcpy(buffer, "SET AUTOCOMMIT OFF;\n"); + int ret = os::write(fd, buffer, strlen(buffer)); + if (ret != strlen(buffer)) { + printError(ErrOS, "Write error into conf resolution file"); + return ErrOS; + } + bool first = true; + int counter = 0; // if at all the statement is parameterized + int nop = 0; + int pos = 0; + while ((ptr - (char *)data) < len) { + int stmtId = *(int *)ptr; + ptr += sizeof(int); + int bucketNo = getHashBucket(stmtId); + StmtBucket *buck = (StmtBucket *) stmtBuckets; + StmtBucket *stmtBucket = &buck[bucketNo]; + StmtNode *node = NULL; + ListIterator it = stmtBucket->bucketList.getIterator(); + while(it.hasElement()) { + node = (StmtNode *) it.nextElement(); + if(stmtId == node->stmtId) break; + } + printf("DEBUG:node = %x\n", node); + ExecType type = (ExecType) (*(int *) ptr); + ptr += sizeof(int); + if (type == SETPARAM) { + isPrmStmt = true; + if (first) { + first = false; + sprintf(paramStmtString, "%s", node->stmtstr); + char *it = node->stmtstr; + } + int parampos = *(int *)ptr; + ptr += sizeof(int); + DataType dataType = (DataType) ( *(int *) ptr); + ptr += sizeof(int); + int length = *(int *) ptr; + ptr += sizeof(int); + void *value = ptr; + ptr += length; + char * it = paramStmtString; + int prntdChars = 0; + + while (*it != '\0') { + if (*it == '?') { + pos++; + if(pos != parampos) { it++; continue; } + else { + *it++ = ' '; + strcpy(buffer,it); + switch (dataType) { + case typeString: case typeBinary: case typeDate: + case typeTime: case typeTimeStamp: + *it++ = '\''; + AllDataType::convertToString(it, value, dataType, length); + prntdChars = AllDataType::printVal(value, dataType,length); + it += prntdChars; + *it++ = '\''; + break; + default: + AllDataType::convertToString(it, value, dataType, length); + prntdChars = AllDataType::printVal(value, dataType,length); + it += prntdChars; + + } + sprintf(it, " %s", buffer); + //strcpy(buffer, paramStmtString); + break; + } + } else { it++; } + } + } else { + if (!isPrmStmt) { + sprintf(buffer, "%s", node->stmtstr); + buffer[strlen(buffer)] = '\n'; + ret = os::write(fd, buffer, strlen(node->stmtstr)+1); + if(ret != strlen(node->stmtstr)+1) { + printError(ErrOS, "Write error into conf resolution file"); + return ErrOS; + } + } else { + strcpy(buffer, paramStmtString); + isPrmStmt = false; + first = true; + pos = 0; + int strlength = strlen(buffer); + buffer[strlen(buffer)] = '\n'; + ret = os::write(fd, buffer, strlength+1); + if(ret != strlength+1) { + printError(ErrOS, "Write error into conf resolution file"); + return ErrOS; + } + } + } + } + strcpy(buffer, "COMMIT;\n\n"); + ret = os::write(fd, buffer, strlen(buffer)); + if(ret != strlen(buffer)) { + printError(ErrOS, "Write error into conf resolution file"); + return ErrOS; + } + close(fd); +} diff --git a/src/tools/csqlcacheserver.cxx b/src/tools/csqlcacheserver.cxx index 50752aca..b16b0b61 100644 --- a/src/tools/csqlcacheserver.cxx +++ b/src/tools/csqlcacheserver.cxx @@ -15,18 +15,33 @@ ***************************************************************************/ #include #include +#include #include #include +#include +#include #include #include -int insert(Table *table, int pkid); -int remove(Table *table, int pkid); -int getRecordsFromTargetDb(int mode); +// List which keeps all DS Information. +struct MultiThreadDSN +{ + char dsn[IDENTIFIER_LENGTH]; + char user[IDENTIFIER_LENGTH]; + char pwd[IDENTIFIER_LENGTH]; + char tdb[IDENTIFIER_LENGTH]; + struct MultiThreadDSN *next; +}; + +int insert(char *table, long long pkid, AbsSqlConnection *targetconn, SqlStatement *sqlstmt, AbsSqlStatement *csqlstmt, AbsSqlConnection *csqlcon); +int remove(char *table, long long pkid, AbsSqlConnection *targetconn, AbsSqlStatement *csqlstmt,AbsSqlConnection *csqlcon); +int getRecordsFromTargetDb(AbsSqlConnection *targetconn, AbsSqlConnection *csqlcon,AbsSqlStatement *csqlstmt, SqlConnection *con, SqlStatement *sqlstmt); void createCacheTableList(); DbRetVal getCacheField(char *tblName,char *fldName); DbRetVal getCacheProjField(char *tblName,char *fielflist); DbRetVal getCacheCondition(char *tblName,char *condition); +void setParamValues(AbsSqlStatement *stmt, int parampos, DataType type, int length, char *value); +void *fillBindBuffer(TDBInfo tName, DataType type, void *valBuf, int length=0); List cacheTableList; int srvStop =0; static void sigTermHandler(int sig) @@ -41,8 +56,25 @@ void printUsage() printf("Description: Start the csql caching server.\n"); return; } -AbsSqlConnection *targetconn; -Connection conn; + +AbsSqlConnection *csqlcon = NULL; + + +//MultiDSN Section +class MultiDsnThread +{ + public: + char ds[IDENTIFIER_LENGTH]; + char targetDb[IDENTIFIER_LENGTH]; + char userName[IDENTIFIER_LENGTH]; + char pwdName[IDENTIFIER_LENGTH]; + MultiDsnThread() { ds[0]='\0'; targetDb[0]='\0'; userName[0]='\0'; pwdName[0]='\0';} +}; + +void *startThread(void *p);// Function is used for Thread +MultiDsnThread **multiDsnInput; + + int main(int argc, char **argv) { int c = 0, opt = 0; @@ -63,27 +95,112 @@ int main(int argc, char **argv) os::signal(SIGINT, sigTermHandler); os::signal(SIGTERM, sigTermHandler); - DbRetVal rv = conn.open("root", "manager"); - if (rv != OK) return 1; + DbRetVal rv=OK; + csqlcon = SqlFactory::createConnection(CSqlLog); + SqlLogConnection *logConn = (SqlLogConnection *) csqlcon; + logConn->setNoMsgLog(true); + rv = csqlcon->connect(I_USER, I_PASS); + if (rv != OK) return NULL; + + // Reading "csqlds.conf file" + FILE *fp = NULL; + fp = fopen(Conf::config.getDsConfigFile(),"r"); + if(fp==NULL){ + printError(ErrSysInit,"csqlds.conf file does not exist"); + exit(1); + } + struct MultiThreadDSN *head=NULL, *pnode=NULL; + + char dsnname[IDENTIFIER_LENGTH];dsnname[0]='\0'; + char tdbname[IDENTIFIER_LENGTH];tdbname[0] = '\0'; + char username[IDENTIFIER_LENGTH];username[0]='\0'; + char password[IDENTIFIER_LENGTH];password[0]='\0'; + int totalDsn=0; + + // Populate the List + while(!feof(fp)){ + struct MultiThreadDSN *multiDsn = new struct MultiThreadDSN; + fscanf(fp,"%s %s %s %s\n",dsnname,username,password,tdbname); + totalDsn++; + strcpy(multiDsn->dsn,dsnname); + strcpy(multiDsn->user,username); + strcpy(multiDsn->pwd,password); + strcpy(multiDsn->tdb,tdbname); + multiDsn->next=NULL; + + if(pnode==NULL) {head=multiDsn; pnode=multiDsn;} + else { pnode->next=multiDsn; pnode=pnode->next; } + } + fclose(fp); + + // Declare number of thread + pthread_t *thrId =new pthread_t [totalDsn]; + multiDsnInput = (MultiDsnThread **) malloc (sizeof(MultiDsnThread *) * totalDsn); + int i=0, status; + + //Traversing the list + pnode=head; + while(pnode != NULL){ + multiDsnInput[i] = new MultiDsnThread(); + strcpy(multiDsnInput[i]->ds,pnode->dsn); + strcpy(multiDsnInput[i]->targetDb,pnode->tdb); + strcpy(multiDsnInput[i]->userName,pnode->user); + strcpy(multiDsnInput[i]->pwdName,pnode->pwd); + + //pthread_create + pthread_create(&thrId[i], NULL, &startThread, multiDsnInput[i]); + i++; + pnode=pnode->next; + } + + // Pthread_join + for(int j=0; jdisconnect(); + // targetconn->disconnect(); + // printf("Out of main\n"); + delete[]thrId; + return 0; + +} + +// Function for THreads +void *startThread(void *thrInfo) +{ + DbRetVal rv = OK; + AbsSqlConnection *targetconn; + //AbsSqlConnection *csqlcon = NULL; + AbsSqlStatement *csqlstmt = NULL; + SqlConnection *con = NULL; + SqlStatement *sqlstmt = NULL; + + + MultiDsnThread *multiDsnInput = (MultiDsnThread *)thrInfo; +/* csqlcon = SqlFactory::createConnection(CSqlLog); + SqlLogConnection *logConn = (SqlLogConnection *) csqlcon; + logConn->setNoMsgLog(true); + rv = csqlcon->connect(I_USER, I_PASS); + if (rv != OK) return NULL; +*/ targetconn = SqlFactory::createConnection(CSqlAdapter); - rv = targetconn->connect("root", "manager"); - if (rv != OK) return 1; + SqlOdbcConnection *dsnAda = (SqlOdbcConnection*)targetconn; + dsnAda->setDsn(multiDsnInput->ds);//line added + rv = targetconn->connect(I_USER, I_PASS); + if (rv != OK) return NULL; if (!Conf::config.useCache()) { printf("Cache is set to OFF in csql.conf file\n"); - return 1; - } + return NULL; + } AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); stmt->setConnection(targetconn); - /*rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';"); - targetconn->beginTrans(); - int rows=0; - stmt->execute(rows); - targetconn->commit(); - stmt->free(); - delete stmt;*/ + csqlstmt = SqlFactory::createStatement(CSqlLog); + csqlstmt->setConnection(csqlcon); - printf("Cache server started\n"); int ret = 0; struct stat ofstatus,nfstatus; ret=stat(Conf::config.getTableConfigFile(),&ofstatus); @@ -95,7 +212,7 @@ int main(int argc, char **argv) { tval.tv_sec = timeout.tv_sec; tval.tv_usec = timeout.tv_usec; - ret = os::select(0, NULL, 0, 0, &tval); + ret = os::select(0, 0, 0, 0, &tval); printf("Checking for cache updates\n"); ret=stat(Conf::config.getTableConfigFile(),&nfstatus); if(ofstatus.st_mtime != nfstatus.st_mtime) @@ -104,218 +221,272 @@ int main(int argc, char **argv) createCacheTableList(); ofstatus.st_mtime = nfstatus.st_mtime; } - ret = getRecordsFromTargetDb(1); - if (ret !=0) srvStop = 1; - //ret = getRecordsFromTargetDb(2); - if (ret !=0) srvStop = 1; + if((ret = getRecordsFromTargetDb( targetconn, csqlcon, csqlstmt, con, sqlstmt )) == 1) { + if (srvStop) break; + } } - printf("Cache Server Exiting\n"); - cacheTableList.reset(); - conn.close(); + + //printf("Cache Server Exiting\n"); + //cacheTableList.reset(); + //csqlcon->disconnect(); targetconn->disconnect(); - return 0; + return NULL; } -int getRecordsFromTargetDb(int mode) + +int getRecordsFromTargetDb(AbsSqlConnection *targetconn, AbsSqlConnection *csqlcon,AbsSqlStatement *csqlstmt, SqlConnection *con, SqlStatement *sqlstmt) { - int pkid; + long long pkid=0; char tablename[64]; - int op, id,caId; + long long op=0, id=0,cId=0; + int caId=0; int rows =0; DbRetVal rv = OK; + int ret =0; char StmtStr[1024]; caId =Conf::config.getSiteID(); AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); stmt->setConnection(targetconn); AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter); delstmt->setConnection(targetconn); - if (mode == 1 ) { - //rv = delstmt->prepare("DELETE from csql_log_int where id=?;"); - sprintf(StmtStr, "SELECT * FROM csql_log_int where cacheid = %d;", caId); - rv = stmt->prepare(StmtStr); - if (rv != OK) {printf("Stmt prepare failed\n"); return 1; } - } - else { - rv = stmt->prepare("SELECT * FROM csql_log_char;"); - if (rv != OK) {printf("Stmt prepare failed\n"); return 1; } - //rv = delstmt->prepare("DELETE from csql_log_char where id=?;"); - } + //rv = delstmt->prepare("DELETE from csql_log_int where id=?;"); + sprintf(StmtStr, "SELECT * FROM csql_log_int where cacheid = %d;", caId); + rv = stmt->prepare(StmtStr); if (rv != OK) {printf("Stmt prepare failed\n"); return 1; } stmt->bindField(1, tablename); stmt->bindField(2, &pkid); stmt->bindField(3, &op); - stmt->bindField(4, &caId); + stmt->bindField(4, &cId); stmt->bindField(5, &id); - DatabaseManager *dbMgr = conn.getDatabaseManager(); - while(true) { - rv = targetconn->beginTrans(); - rv = stmt->execute(rows); - if (rv != OK) - { - printError(ErrSysInit, "Unable to execute stmt in target db"); - targetconn->rollback(); - stmt->free(); - delstmt->free(); - delete stmt; - delete delstmt; - return 1; - } - if (stmt->fetch() != NULL) { - printf("Row value is %s %d %d %d\n", tablename, pkid, op,caId); - - Table *table = dbMgr->openTable(tablename); - int ret = 0; - if (table == NULL) - { - printError(ErrSysInit, "Table %s not exist in csql", tablename); - targetconn->rollback(); - stmt->free(); - delstmt->free(); - delete stmt; - delete delstmt; - break; - } - if (op == 2)//DELETE - { - ret = remove(table,pkid); - } - else //INSERT - { - ret = insert(table, pkid); - } - dbMgr->closeTable(table); - rv = targetconn->commit(); - rv = targetconn->beginTrans(); - //Remove record from csql_log_XXX table - sprintf(StmtStr, "DELETE from csql_log_int where id=%d ;", id); - rv = delstmt->prepare(StmtStr); - if (rv != OK) {printf("FAILED\n"); return 1; } - // delstmt->setIntParam(1, id); - rv = delstmt->execute(rows); - if (rv != OK) - { - printf("log record not deleted from the target db %d\n", rv); - targetconn->rollback(); - stmt->free(); - delstmt->free(); - delete stmt; - delete delstmt; - } - delstmt->free(); - - rv = targetconn->commit(); - - } - else { - stmt->close(); - break; - } - stmt->close(); - } - stmt->free(); - delstmt->free(); - delete stmt; - delete delstmt; - return 0; + con = (SqlConnection *) csqlcon->getInnerConnection(); + sqlstmt = (SqlStatement *)csqlstmt->getInnerStatement(); + sqlstmt->setSqlConnection(con); + + sprintf(StmtStr, "DELETE from csql_log_int where id=?;"); + rv = delstmt->prepare(StmtStr); + if (rv != OK) { + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; + printf("FAILED\n"); + return 1; + } + int retVal =0; + while(true){ + rv = targetconn->beginTrans(); + rv = stmt->execute(rows); + if (rv != OK) { + printError(ErrSysInit, "Unable to execute stmt in target db"); + targetconn->rollback(); + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; + return 1; + } + bool found = false; + while ( stmt->fetch() != NULL) + { + Util::trimEnd(tablename); + printf("Row value is %s %lld %lld %lld\n", tablename, pkid, op,cId); + + if (op == 2) { //DELETE + retVal = remove(tablename,pkid, targetconn, csqlstmt, csqlcon); + } //DELETE + else { + retVal = insert(tablename, pkid, targetconn, sqlstmt, csqlstmt, csqlcon); + } + //targetconn->commit(); + //rv = targetconn->beginTrans(); + if (retVal) ret =2; + delstmt->setIntParam(1, id); + rv = delstmt->execute(rows); + if (rv != OK) { + printf("log record not deleted from the target db %d\n", rv); + targetconn->rollback(); + break; + } + rv = targetconn->commit(); + rv = targetconn->beginTrans(); + found=true; + } + stmt->close(); + delstmt->close(); + if(!found) break; + } + targetconn->rollback(); + stmt->free(); + delstmt->free(); + delete stmt; + delete delstmt; + return ret; } -int insert(Table *table, int pkid) + +int insert(char *tablename, long long pkid, AbsSqlConnection *targetconn, SqlStatement *sqlstmt, AbsSqlStatement *csqlstmt, AbsSqlConnection *csqlcon) { AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); stmt->setConnection(targetconn); + TDBInfo tdbname = ((SqlOdbcConnection*)targetconn)->getTrDbName(); SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt; + char insStmt[1024]; char pkfieldname[128]; - DbRetVal rv=getCacheField(table->getName(), pkfieldname); + DbRetVal rv=getCacheField(tablename, pkfieldname); if(rv!=OK){ - ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname); + ostmt->getPrimaryKeyFieldName(tablename, pkfieldname); } + //Util::str_tolower(pkfieldname); char fieldlist[IDENTIFIER_LENGTH]; char condition[IDENTIFIER_LENGTH]; char sbuf[1024]; - rv=getCacheProjField(table->getName(),fieldlist); + rv=getCacheProjField(tablename,fieldlist); if(rv!=OK){ - rv=getCacheCondition(table->getName(),condition); + rv=getCacheCondition(tablename,condition); if(rv!=OK){ - sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid); + sprintf(sbuf, "SELECT * FROM %s where %s = %lld;", tablename, pkfieldname, pkid); } else { - sprintf(sbuf, "SELECT * FROM %s where %s = %d and %s ;", table->getName(), pkfieldname, pkid,condition); + sprintf(sbuf, "SELECT * FROM %s where %s = %lld and %s ;", tablename, pkfieldname, pkid,condition); } } else { - rv=getCacheCondition(table->getName(),condition); + rv=getCacheCondition(tablename,condition); if(rv!=OK){ - sprintf(sbuf, "SELECT %s FROM %s where %s = %d;",fieldlist,table->getName(), pkfieldname, pkid); + sprintf(sbuf, "SELECT %s FROM %s where %s = %lld;",fieldlist,tablename, pkfieldname, pkid); } else { - sprintf(sbuf, "SELECT %s FROM %s where %s = %d and %s;",fieldlist,table->getName(), pkfieldname, pkid,condition); + sprintf(sbuf, "SELECT %s FROM %s where %s = %lld and %s;",fieldlist,tablename, pkfieldname, pkid,condition); } } //TODO::get the primary key field name from the table interface. need to implement it + //printf("InsertString: %s\n", sbuf); rv = stmt->prepare(sbuf); - if (rv != OK) return 1; + if (rv != OK) return 2; - List fNameList = table->getFieldNameList(); + char *ptr = insStmt; + sprintf(ptr,"INSERT INTO %s VALUES(", tablename); + ptr += strlen(ptr); + bool firstFld = true; + List fNameList = sqlstmt->getFieldNameList(tablename); + int noOfFields = fNameList.size(); + while (noOfFields--) { + if (firstFld) { + firstFld = false; + sprintf(ptr,"?", tablename); + ptr += strlen(ptr); + } else { + sprintf(ptr, ",?"); + ptr += strlen(ptr); + } + } + sprintf(ptr, ");"); + ptr += strlen(ptr); + //printf("insert stmt: '%s'\n", insStmt); + + rv = csqlstmt->prepare(insStmt); + if (rv != OK) { return 2; } + List valBufList; ListIterator fNameIter = fNameList.getIterator(); FieldInfo *info = new FieldInfo(); - int fcount =1; void *valBuf; int fieldsize=0; + int fcount =0; void *valBuf; int fieldsize=0; void *buf[128];//TODO:resticts to support only 128 fields in table + for (int i=0; i < 128; i++) buf[i]= NULL; + DataType dType[128]; Identifier *elem = NULL; + BindBuffer *bBuf = NULL; while (fNameIter.hasElement()) { elem = (Identifier*) fNameIter.nextElement(); - table->getFieldInfo((const char*)elem->name, info); - valBuf = AllDataType::alloc(info->type, info->length); + sqlstmt->getFieldInfo(tablename, (const char*)elem->name, info); + valBuf = AllDataType::alloc(info->type, info->length+1); + os::memset(valBuf,0,info->length); + bBuf = (BindBuffer *) fillBindBuffer(tdbname, info->type, valBuf, info->length); +if (info->type == typeString) { +} + valBufList.append(bBuf); + dType[fcount] = info->type; buf[fcount] = valBuf; - table->bindFld(elem->name, valBuf); - stmt->bindField(fcount++, valBuf); - + stmt->bindField(fcount+1, buf[fcount]); + fcount++; } delete info; int rows=0; int retValue = stmt->execute(rows); - if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; } - conn.startTransaction(); + if (retValue && rows != 1) { + printError(ErrSysInit, "Unable to execute statement at target db\n"); + return ErrSysInit; + } + ListIterator bindIter = valBufList.getIterator(); if (stmt->fetch() != NULL) { - ostmt->setNullInfo(table); - table->insertTuple(); + ostmt->setNullInfo(csqlstmt); + if(tdbname == postgres){ + for (int i=0; i < fcount; i++) { + if(dType[i] == typeString) Util::trimRight((char *)buf[i]); + } + } + //setXXXParams to be called here + int pos = 1; + while (bindIter.hasElement()) { + bBuf = (BindBuffer *) bindIter.nextElement(); + setParamValues(csqlstmt, pos++, bBuf->type, bBuf->length, + (char *)bBuf->csql); + } + csqlcon->beginTrans(); + int rows = 0; + rv = csqlstmt->execute(rows); + if (rv != OK) { + printf ("execute failed \n"); + printf(" STMT: %s\n",insStmt); + return 3; + } + csqlcon->commit(); + //printf("successfully inserted value with pkid = %d\n", pkid); //Note:insert may fail if the record is inserted from this cache } - for (int i=1; i < fcount; i++) { - free(buf[i]); + //for (int i=0; i < fcount; i++) free(buf[i]); + ListIterator iter = valBufList.getIterator(); + while (iter.hasElement()){ + bBuf = (BindBuffer*) iter.nextElement(); +if (bBuf->type == typeString) + //printf("Values %x %x \n", bBuf->csql, bBuf->targetdb); + delete bBuf; } + stmt->free(); delete stmt; - conn.commit(); return 0; } -int remove(Table *table, int pkid) +int remove(char *tablename, long long pkid, AbsSqlConnection *targetconn, AbsSqlStatement *csqlstmt,AbsSqlConnection *csqlcon) { DbRetVal rv = OK; + char delStmt[1024]; AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter); stmt->setConnection(targetconn); SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt; char pkfieldname[128]; - rv=getCacheField(table->getName(), pkfieldname); + rv=getCacheField(tablename, pkfieldname); if(rv!=OK){ - ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname); + ostmt->getPrimaryKeyFieldName(tablename, pkfieldname); } + Util::str_tolower(pkfieldname); + stmt->close(); + stmt->free(); delete stmt; - Condition p1; - p1.setTerm(pkfieldname, OpEquals, &pkid); - table->setCondition(&p1); - rv = conn.startTransaction(); - if (rv != OK) return 1; - rv = table->execute(); - if (rv != OK) + sprintf(delStmt, "DELETE FROM %s where %s = %d", tablename, pkfieldname, pkid); + //printf("delStmt is %s\n", delStmt); + rv = csqlstmt->prepare(delStmt); + if (rv != OK) { return 2; } + rv = csqlcon->beginTrans(); + if (rv != OK) return 2; + int rows = 0; + rv = csqlstmt->execute(rows); + if (rv != OK || rows !=1) { - table->setCondition(NULL); - conn.rollback(); - return 1; + csqlcon->rollback(); + printf("Delete failed for stmt %s\n", delStmt); + return 3; } - if (table->fetch() != NULL) - rv = table->deleteTuple(); - //Note:Delete may fail if the record is deleted from this cache - table->setCondition(NULL); - rv = conn.commit(); - if (rv != OK) return 1; + rv = csqlcon->commit(); + return 0; } + void createCacheTableList() { FILE *fp; @@ -328,10 +499,12 @@ void createCacheTableList() char fieldname[IDENTIFIER_LENGTH]; char condition[IDENTIFIER_LENGTH]; char field[IDENTIFIER_LENGTH]; + char dsnName[IDENTIFIER_LENGTH]; + int mode; while(!feof(fp)) { - fscanf(fp,"%d:%s %s %s %s\n",&mode,tablename,fieldname,condition,field); + fscanf(fp,"%d %s %s %s %s %s \n",&mode,tablename,fieldname,condition,field,dsnName); CacheTableInfo *cacheTable=new CacheTableInfo(); cacheTable->setTableName(tablename); cacheTable->setFieldName(fieldname); @@ -397,3 +570,123 @@ DbRetVal getCacheField(char *tblName,char *fldName) } return ErrNotExists; } + +void setParamValues(AbsSqlStatement *stmt, int parampos, DataType type, int length, char *value) +{ + switch(type) + { + case typeInt: + stmt->setIntParam(parampos, *(int*)value); + break; + case typeLong: + stmt->setLongParam(parampos, *(long*)value); + break; + case typeLongLong: + stmt->setLongLongParam(parampos, *(long long*)value); + break; + case typeShort: + stmt->setShortParam(parampos, *(short*)value); + break; + case typeByteInt: + stmt->setByteIntParam(parampos, *(char*)value); + break; + case typeDouble: + stmt->setDoubleParam(parampos, *(double*)value); + break; + case typeFloat: + stmt->setFloatParam(parampos, *(float*)value); + break; + case typeDate: + stmt->setDateParam(parampos, *(Date*)value); + break; + case typeTime: + stmt->setTimeParam(parampos, *(Time*)value); + break; + case typeTimeStamp: + stmt->setTimeStampParam(parampos, *(TimeStamp*)value); + break; + case typeString: + { + char *d =(char*)value; + d[length-1] = '\0'; + stmt->setStringParam(parampos, (char*)value); + break; + } + case typeBinary: + stmt->setBinaryParam(parampos, (char *) value, length); + break; + } + return; +} + +void *fillBindBuffer(TDBInfo tdbName, DataType type, void *valBuf, int length) +{ + BindBuffer *bBuf = NULL; + switch(type) + { + case typeDate: + bBuf = new BindBuffer(); + bBuf->csql = valBuf; + bBuf->type = typeDate; + bBuf->length = sizeof(DATE_STRUCT); + bBuf->targetdb = malloc(bBuf->length); + memset(bBuf->targetdb, 0, bBuf->length); + valBuf = bBuf->targetdb; + break; + case typeTime: + bBuf = new BindBuffer(); + bBuf->csql = valBuf; + bBuf->type = typeTime; + bBuf->length = sizeof(TIME_STRUCT); + bBuf->targetdb = malloc(bBuf->length); + memset(bBuf->targetdb, 0, bBuf->length); + valBuf = bBuf->targetdb; + break; + case typeTimeStamp: + bBuf = new BindBuffer(); + bBuf->csql = valBuf; + bBuf->type = typeTimeStamp; + bBuf->length = sizeof(TIMESTAMP_STRUCT); + bBuf->targetdb = malloc(bBuf->length); + memset(bBuf->targetdb, 0, bBuf->length); + valBuf = bBuf->targetdb; + break; + case typeLongLong: + { + if( tdbName == postgres ) + { + bBuf = new BindBuffer(); + bBuf->type = typeLongLong; + bBuf->length = 40; + bBuf->csql = valBuf; + bBuf->targetdb = AllDataType::alloc(typeString,bBuf->length); + memset(bBuf->targetdb, 0, bBuf->length); + valBuf = bBuf->targetdb; + break; + }else + { + bBuf = new BindBuffer(); + bBuf->type = type; + bBuf->csql = valBuf; + bBuf->length = length; + break; + } + } + case typeString: + if( tdbName == postgres ) + { + bBuf = new BindBuffer(); + bBuf->type = typeString; + bBuf->csql = valBuf; + bBuf->length = length+1; + break; + } + default: + bBuf = new BindBuffer(); + bBuf->type = type; + bBuf->csql = valBuf; + bBuf->length = length; + break; + } + return bBuf; +} diff --git a/src/tools/csqlds.cxx b/src/tools/csqlds.cxx new file mode 100644 index 00000000..b2c8eb93 --- /dev/null +++ b/src/tools/csqlds.cxx @@ -0,0 +1,167 @@ +/*************************************************************************** + * Copyright (C) 2007 by www.databasecache.com * + * Contact: praba_tuty@databasecache.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + ***************************************************************************/ +#include +#include +#include + +void printUsage() +{ + printf("Usage: csqlds [-U username] [-P passwd] [-D dsnname] [-N tdbname] \n" + " [-a] [-r] \n"); + printf(" username -> Username of TDB .\n"); + printf(" passwd -> Password of TDB.\n"); + printf(" dsnname -> DSN to connect to tdb.\n"); + printf(" tdbname -> Name of Target database\n"); + printf(" -a -> Add entry to csqlds.conf\n"); + printf(" -r -> Remove entry from csqlds.conf\n"); + return; +} +DbRetVal removeEntryFromCsqlds(char *dsn); +DbRetVal addEntryToCsqlds(char *dsn, char *uname, char *pwd, char *tbdname); +Config Conf::config; +bool shouldadd=false; +int main(int argc, char **argv) +{ + Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE")); + char username[IDENTIFIER_LENGTH]; + username [0] = '\0'; + char password[IDENTIFIER_LENGTH]; + password [0] = '\0'; + int c = 0, opt = 10; + char dsn[IDENTIFIER_LENGTH]; + char tdbname[IDENTIFIER_LENGTH]; + bool isUserNameSpecified=false; + bool isPasswordSpecified=false; + bool isTDBSpecified=false; + bool isDSNSpecified=false; + bool showmsg=false; + while ((c = getopt(argc, argv, "U:P:D:N:ar?")) != EOF) + { + switch (c) + { + case 'U' : { strcpy(username, argv[optind - 1]); + isUserNameSpecified=true; opt=10; break; } + case 'P' : { strcpy(password, argv[optind - 1]); + isPasswordSpecified=true;opt=10; break; } + case 'D' : { strcpy(dsn, argv[optind - 1]); + opt = 2; + isDSNSpecified = true; + break; + } + case 'N' : { strcpy(tdbname, argv[optind - 1]); + if(opt == 2) opt=3; + else opt=10; + isTDBSpecified = true; + break; + } + case '?' : {showmsg=true; break; } //print help + case 'r' : { shouldadd=false; break;} + case 'a' : { shouldadd=true; break; } + default: opt=10; + } + }//while options + if (opt == 10 || true==showmsg) { + printUsage(); + return 0; + } + + if( !isUserNameSpecified || !isPasswordSpecified) + { + strcpy(username, "NULL"); + strcpy(password, "NULL"); + } + + if(shouldadd) + { + if(opt==3){ + addEntryToCsqlds( dsn,username,password,tdbname); + } + else{ + printUsage(); + return 0; + } + + }else + { + removeEntryFromCsqlds(dsn); + } + return 0; +} + +DbRetVal addEntryToCsqlds(char *dsn, char *uname, char *pwd, char *tbdname) +{ + FILE *fp = fopen(Conf::config.getDsConfigFile(), "a+"); + if (NULL == fp) { + printError(ErrSysInit, "Invalid path/filename in DS_CONFIG_FILE.\n"); + return ErrOS; + } + bool isDsnExist=false; + char usertmp[IDENTIFIER_LENGTH]; + char pwdtmp[IDENTIFIER_LENGTH]; + char dsntmp[IDENTIFIER_LENGTH]; + char tdbname[IDENTIFIER_LENGTH]; + while(!feof(fp)) + { + fscanf(fp, "%s %s %s %s\n",dsntmp,usertmp,pwdtmp,tdbname ); + if (strcmp (dsn, dsntmp) == 0) { + isDsnExist=true; + printf("Applied DSN already Exists\n"); + return ErrAlready; + } + } + fprintf(fp,"%s %s %s %s\n",dsn, uname, pwd, tbdname); + fclose(fp); + return OK; +} + +DbRetVal removeEntryFromCsqlds(char *dsn) +{ + FILE *fp,*tmpfp; + char tmpFileName[MAX_FILE_PATH_LEN]; + sprintf(tmpFileName, "%s.tmp", Conf::config.getDsConfigFile()); + char usertmp[IDENTIFIER_LENGTH]; + char pwdtmp[IDENTIFIER_LENGTH]; + char dsntmp[IDENTIFIER_LENGTH]; + char tdbname[IDENTIFIER_LENGTH]; + tmpfp = fopen(tmpFileName,"w"); + if( tmpfp == NULL ) { + printError(ErrSysInit, "Invalid path/filename in TABLE_CONFIG_FILE.\n"); + return ErrSysInit; + } + fp = fopen(Conf::config.getDsConfigFile(),"r"); + if( fp == NULL ) { + printError(ErrSysInit, "csqltable.conf file does not exist"); + return ErrSysInit; + } + + while(!feof(fp)) + { + fscanf(fp, "%s %s %s %s\n",dsntmp,usertmp,pwdtmp,tdbname ); + if (strcmp (dsn, dsntmp) == 0) continue; + fprintf(tmpfp, "%s %s %s %s\n",dsntmp,usertmp,pwdtmp,tdbname); + } + fclose(fp); + fclose(tmpfp); + char sysCommand[MAX_FILE_PATH_LEN * 2]; + sprintf(sysCommand, "mv %s %s", tmpFileName, Conf::config.getDsConfigFile()); + int ret = system(sysCommand); + if (ret != 0) + { + printError(ErrSysInit, "Check csqltable.conf file permission. unable to remove %s from file", dsn); + return ErrSysInit; + } + return OK; +} diff --git a/src/tools/csqldump.cxx b/src/tools/csqldump.cxx index d501c75d..adab2142 100644 --- a/src/tools/csqldump.cxx +++ b/src/tools/csqldump.cxx @@ -20,6 +20,7 @@ #include #include #include + void printUsage() { printf("Usage: csqldump [-u username] [-p passwd] [-n noOfStmtsPerCommit] [-T tableName]\n"); @@ -34,23 +35,23 @@ bool isCached(char *tblName) { if (!Conf::config.useCache()) return false; FILE *fp = fopen(Conf::config.getTableConfigFile(),"r"); - if( fp == NULL ) { - return OK; - } + if( fp == NULL ) { return OK; } char ctablename[IDENTIFIER_LENGTH]; char fieldname[IDENTIFIER_LENGTH]; char condition[IDENTIFIER_LENGTH]; char field[IDENTIFIER_LENGTH]; + char dsnName[IDENTIFIER_LENGTH]; + int mode; bool isCached=false; - while(!feof(fp)) - { - fscanf(fp, "%d:%s %s %s %s\n", &mode, ctablename,fieldname,condition,field); + while(!feof(fp)) { + fscanf(fp, "%d %s %s %s %s %s %s %s\n", &mode, ctablename,fieldname,condition,field,dsnName); if (strcmp (ctablename, tblName) == 0) { isCached=true; break; } } fclose(fp); return isCached; } + int main(int argc, char **argv) { char username[IDENTIFIER_LENGTH]; @@ -58,19 +59,22 @@ int main(int argc, char **argv) char password[IDENTIFIER_LENGTH]; password [0] = '\0'; char tblName[IDENTIFIER_LENGTH]; + char conditionVal[IDENTIFIER_LENGTH]; int c = 0, opt = 0; int noOfStmts =100; bool exclusive = false; + bool Iscondition=false; + bool schema = false; char name[IDENTIFIER_LENGTH]; - while ((c = getopt(argc, argv, "u:p:n:T:X?")) != EOF) - { - switch (c) - { + while ((c = getopt(argc, argv, "u:p:n:T:c:XS?")) != EOF) { + switch (c) { case 'u' : { strcpy(username, argv[optind - 1]); opt=1; break; } case 'p' : { strcpy(password, argv[optind - 1]); opt=1; break; } case 'n' : { noOfStmts = atoi(argv[optind - 1]); opt = 5; break; } case 'T' : { strcpy(tblName, argv[optind - 1]); opt = 15; break; } - case 'X' : { exclusive = true; break; } + case 'c' : {strcpy(conditionVal,argv[optind -1]); Iscondition=true;break;} + case 'X' : { exclusive = true; break; } + case 'S' : { schema = true; break; } case '?' : { opt = 10; break; } //print help default: opt=1; //list all the tables @@ -82,136 +86,151 @@ int main(int argc, char **argv) } //printf("%s %s \n", username, password); - if (username[0] == '\0' ) - { - strcpy(username, "root"); - strcpy(password, "manager"); + if (username[0] == '\0' ) { + strcpy(username, I_USER); + strcpy(password, I_PASS); } SqlConnection *sqlconn = (SqlConnection*) SqlFactory::createConnection(CSqlDirect); SqlStatement *stmt = (SqlStatement*) SqlFactory::createStatement(CSqlDirect); - if (exclusive) { - stmt->setLoading(true); - } - + if (exclusive) { stmt->setLoading(true); } if (opt == 0 || opt == 1) opt = 5; if (opt == 5) { Connection conn; DbRetVal rv = conn.open(username, password); if (rv != OK) { - printf("Unable to get connection to csql\n"); - delete sqlconn; - delete stmt; - return 1; + printf("Unable to get connection to csql\n"); + delete sqlconn; + delete stmt; + return 1; } DatabaseManagerImpl *dbMgr = (DatabaseManagerImpl*) conn.getDatabaseManager(); if (dbMgr == NULL) { printf("Unable to retrive db manager\n"); conn.close(); - delete sqlconn; delete stmt; + delete sqlconn; return 2; } List tableList = dbMgr->getAllTableNames(); ListIterator iter = tableList.getIterator(); Identifier *elem = NULL; int count =0; - while (iter.hasElement()) - { + while (iter.hasElement()) { elem = (Identifier*) iter.nextElement(); - if (!exclusive && isCached(elem->name)) continue; + //if (!exclusive && isCached(elem->name)) continue; printf("CREATE TABLE %s (", elem->name); Table *table = dbMgr->openTable(elem->name); - FieldInfo *info = new FieldInfo(); + FieldInfo *info = new FieldInfo(); List fNameList = table->getFieldNameList(); ListIterator fNameIter = fNameList.getIterator(); count++; bool firstField=true; + Identifier *elem1 = NULL; char fieldName[IDENTIFIER_LENGTH]; while (fNameIter.hasElement()) { - elem = (Identifier*) fNameIter.nextElement(); - Table::getFieldNameAlone(elem->name, fieldName); - rv = table->getFieldInfo(elem->name, info); - if (rv !=OK) { - printf("unable to retrive info for table %s\n", elem->name); - conn.close(); - delete sqlconn; - delete stmt; - } - if (firstField) { - printf("%s %s ", fieldName, AllDataType::getSQLString(info->type)); - firstField = false; - } - else - printf(", %s %s ", fieldName, AllDataType::getSQLString(info->type)); - if (info->type == typeString) printf("(%d)",info->length ); - if (info->type == typeBinary) printf("(%d)",info->length); - if (info->isNull) printf(" NOT NULL "); - if (info->isDefault) printf(" DEFAULT '%s' ", info->defaultValueBuf); - if (info->isAutoIncrement) printf(" AUTO_INCREMENT "); + elem1 = (Identifier*) fNameIter.nextElement(); + Table::getFieldNameAlone(elem1->name, fieldName); + rv = table->getFieldInfo(elem1->name, info); + if (rv !=OK) { + printf("unable to retrive info for table %s\n", elem1->name); + conn.close(); + delete stmt; + delete sqlconn; + } + if (firstField) { + printf("%s %s ", fieldName, AllDataType::getSQLString(info->type)); + firstField = false; + } else + printf(", %s %s ", fieldName, AllDataType::getSQLString(info->type)); + if (info->type == typeString) printf("(%d)",info->length ); + if (info->type == typeBinary) printf("(%d)",info->length); + if (info->isNull) printf(" NOT NULL "); + if (info->isDefault) printf(" DEFAULT '%s' ", info->defaultValueBuf); + if (info->isAutoIncrement) printf(" AUTO_INCREMENT "); + } + fNameIter.reset(); + while (fNameIter.hasElement()) { + elem1 = (Identifier*) fNameIter.nextElement(); + delete elem1; + } + fNameList.reset(); + if (table->isFKTable()){ + table->printSQLForeignString(); } printf(");\n"); table->printSQLIndexString(); delete info; dbMgr->closeTable(table); - } - conn.close(); - rv = sqlconn->connect("root", "manager"); - if (OK !=rv) { - printf("unable to connect to csql\n"); - delete sqlconn; - delete stmt; - return 10; - } - stmt->setConnection(sqlconn); - if (exclusive) { - rv = sqlconn->getExclusiveLock(); - if (rv != OK) { - printf("Unable to get exclusive lock\n"); - sqlconn->disconnect(); + } + conn.close(); + if (schema) { delete sqlconn; delete stmt; return 0; } + + rv = sqlconn->connect(I_USER, I_PASS); + if (OK !=rv) { + printf("unable to connect to csql\n"); delete sqlconn; delete stmt; - return 1; - } - } - iter.reset(); - char sqlstring[1024]; - bool flag=false; - while (iter.hasElement()) { - elem = (Identifier*) iter.nextElement(); - if (!exclusive && isCached(elem->name)) continue; - if (!flag) { printf("SET AUTOCOMMIT OFF;\n"); flag=true; } - sprintf(sqlstring, "SELECT * FROM %s;", elem->name); - sqlconn->beginTrans(); - DbRetVal rv = stmt->prepare(sqlstring); - int rows = 0; - rv = stmt->execute(rows); - void *tuple = NULL; - rows = 0; - while(true) { - tuple = stmt->fetchAndPrint(true); - if (tuple == NULL) break; - rows++; - if (rows % noOfStmts ==0) { - sqlconn->commit(); - sqlconn->beginTrans(); - printf("COMMIT;\n"); - } - } - if (rows % noOfStmts !=0) { sqlconn->commit(); printf("COMMIT;\n"); } - stmt->close(); - stmt->free(); - } + return 10; + } + stmt->setConnection(sqlconn); + if (exclusive) { + rv = sqlconn->getExclusiveLock(); + if (rv != OK) { + printf("Unable to get exclusive lock\n"); + sqlconn->disconnect(); + delete sqlconn; + delete stmt; + return 1; + } + } + iter.reset(); + char sqlstring[1024]=""; + bool flag=false; + while (iter.hasElement()) { + elem = (Identifier*) iter.nextElement(); + //if (!exclusive && isCached(elem->name)) continue; + if (!flag) { printf("SET AUTOCOMMIT OFF;\n"); flag=true; } + sprintf(sqlstring, "SELECT * FROM %s;", elem->name); + sqlconn->beginTrans(); + DbRetVal rv = stmt->prepare(sqlstring); + int rows = 0; + rv = stmt->execute(rows); + void *tuple = NULL; + rows = 0; + while(true) { + tuple = stmt->fetchAndPrint(true); + if (tuple == NULL) break; + rows++; + if (rows % noOfStmts ==0) { + sqlconn->commit(); + sqlconn->beginTrans(); + printf("COMMIT;\n"); + } + } + if (rows % noOfStmts !=0) { + sqlconn->commit(); + printf("COMMIT;\n"); + } + stmt->close(); + stmt->free(); + } + iter.reset(); + while (iter.hasElement()) { + elem = (Identifier*) iter.nextElement(); + delete elem; + } + tableList.reset(); } if (opt == 15) { Connection conn; DbRetVal rv = conn.open(username, password); if (rv != OK) { - printf("Unable to get connection to csql\n"); - delete sqlconn; - delete stmt; - return 1; + printf("Unable to get connection to csql\n"); + delete sqlconn; + delete stmt; + return 1; } DatabaseManagerImpl *dbMgr = (DatabaseManagerImpl*) conn.getDatabaseManager(); if (dbMgr == NULL) { printf("Auth failed\n"); return 2;} @@ -250,18 +269,25 @@ int main(int argc, char **argv) delete info; conn.close(); char sqlstring[1024]; - bool flag=false; + // char sqlstring1[1024]; + bool flag=false; if (!flag) { printf("SET AUTOCOMMIT OFF;\n"); flag=true; } - rv = sqlconn->connect("root", "manager"); - if (OK !=rv) { - printf("unable to connect\n"); - return 10; - } - stmt->setConnection(sqlconn); - sprintf(sqlstring, "SELECT * FROM %s;", tblName); - sqlconn->beginTrans(); - rv = stmt->prepare(sqlstring); - int rows = 0; + rv = sqlconn->connect(I_USER, I_PASS); + if (OK !=rv) { + printf("unable to connect\n"); + return 10; + } + stmt->setConnection(sqlconn); + + if(Iscondition) + sprintf(sqlstring, "SELECT * FROM %s WHERE %s;", tblName,conditionVal); + else + sprintf(sqlstring, "SELECT * FROM %s;", tblName); + sqlconn->beginTrans(); + rv = stmt->prepare(sqlstring); + + //*********************************************** + int rows = 0; rv = stmt->execute(rows); void *tuple = NULL; rows = 0; diff --git a/src/tools/csqlserver.cxx b/src/tools/csqlserver.cxx index 3484f831..b6aec3a2 100644 --- a/src/tools/csqlserver.cxx +++ b/src/tools/csqlserver.cxx @@ -22,8 +22,10 @@ #include #include #include -char* version = "csql-linux-i686-2.0GA"; +#include //TODO::move this to os.h +char* version = "csql-linux-i686-3.0GA"; int srvStop =0; +pid_t asyncpid=0; pid_t sqlserverpid=0; pid_t cachepid=0; bool recoverFlag=false; @@ -34,6 +36,13 @@ static void sigTermHandler(int sig) printf("Received signal %d\nStopping the server\n", sig); srvStop = 1; } +static void sigChildHandler(int sig) +{ + os::signal(SIGCHLD, sigChildHandler); + int stat; + waitpid(-1, &stat, WNOHANG); + //TODO::move waitpid to os wrapper +} bool checkDead(pid_t pid) { @@ -117,7 +126,9 @@ DbRetVal logActiveProcs(Database *sysdb) if (pInfo->pid_ !=0 ) { logFine(Conf::logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_); printf("Client process with pid %d is still registered\n", pInfo->pid_); - count++; + if( pInfo->pid_ != asyncpid && pInfo->pid_ != cachepid && + pInfo->pid_ != sqlserverpid) + count++; } pInfo++; } @@ -126,28 +137,42 @@ DbRetVal logActiveProcs(Database *sysdb) } void startCacheServer() { - printf("Starting Cache Recv Server\n"); char execName[1024]; sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT")); - printf("filename is %s\n", execName); + if (srvStop) return; + //printf("filename is %s\n", execName); cachepid = os::createProcess(execName, "csqlcacheserver"); if (cachepid != -1) - printf("Cache Recv Server Started pid=%d\n", cachepid); + printf("Cache Receiver Started\t [PID=%d]\n",cachepid); return; } void startServiceClient() { - printf("Starting Csql Network Daemon\n"); char execName[1024]; sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT")); - printf("filename is %s\n", execName); + //printf("filename is %s\n", execName); + if (srvStop) return; sqlserverpid = os::createProcess(execName, "csqlsqlserver"); if (sqlserverpid != -1) - printf("Csql Network Daemon Started pid=%d\n", sqlserverpid); + printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid,Conf::config.getPort()); + return; } +void startAsyncServer() +{ + char execName[1024]; + sprintf(execName, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT")); + //printf("filename is %s\n", execName); + if (srvStop) return; + asyncpid = os::createProcess(execName, "csqlasyncserver"); + if (asyncpid != -1) + printf("Async Cache Server Started [PID=%d]\n", asyncpid); + return; +} + + void printUsage() { printf("Usage: csqlserver [-c] [-v]\n"); @@ -158,7 +183,8 @@ void printUsage() } int main(int argc, char **argv) { - int c = 0, opt = 0; + int c = 0,opt = 0; + char cmd[1024]; while ((c = getopt(argc, argv, "cv?")) != EOF) { switch (c) @@ -187,10 +213,11 @@ int main(int argc, char **argv) } os::signal(SIGINT, sigTermHandler); os::signal(SIGTERM, sigTermHandler); + os::signal(SIGCHLD, sigChildHandler); rv = Conf::logger.startLogger(Conf::config.getLogFile(), true); if (rv != OK) { - printf("Unable to start the logger\n"); + printf("Unable to start the Conf::logger\n"); return 2; } bool isInit = true; @@ -208,31 +235,75 @@ int main(int argc, char **argv) printf("Unable to attach to existing database\n"); return 3; } - }else { - printf("System Database initialized\n"); } - bool end = false; struct timeval timeout, tval; timeout.tv_sec = 5; timeout.tv_usec = 0; Database* sysdb = session->getSystemDatabase(); recoverFlag = false; + + GlobalUniqueID UID; + if (isInit) UID.create(); + if(isInit && Conf::config.useDurability()) { - char dbChkptFileName[1024]; char dbRedoFileName[1024]; - char cmd[1024]; - sprintf(dbChkptFileName, "%s/csql.db.chkpt", Conf::config.getDbFile()); - if (FILE *file = fopen(dbChkptFileName, "r")) - { + char dbChkptSchema[1024]; + char dbChkptMap[1024]; + char dbChkptData[1024]; + char dbBackupFile[1024]; + + //check for check point file if present recover + sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile()); + if (FILE *file = fopen(dbChkptSchema, "r")) { fclose(file); - sprintf(cmd, "csql -X -s %s",dbChkptFileName); + sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema, Conf::config.getDbFile()); int ret = system(cmd); - if (ret != 0) { - printf("Tables cannot be recovered. chkpt file corrupted\n"); + if (ret != 0) { + Conf::logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 20; + } + } + sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile()); + if (FILE *file = fopen(dbChkptMap, "r")) { + fclose(file); + sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap, Conf::config.getDbFile()); + int ret = system(cmd); + if (ret != 0) { + Conf::logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 30; } } + sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile()); + sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile()); + FILE *fl = NULL; + if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) { + fclose(fl); + sprintf(cmd, "cp %s/db.chkpt.data1 %s", Conf::config.getDbFile(), dbChkptData); + int ret = system(cmd); + if (ret != 0) { + printError(ErrOS, "Unable to take backup for chkpt data file"); + return 40; + } + } + if (FILE *file = fopen(dbChkptData, "r")) { + fclose(file); + int ret = system("recover"); + if (ret != 0) { + printf("Recovery failed\n"); + Conf::logger.stopLogger(); + session->destroySystemDatabase(); + delete session; + return 50; + } + } + + //check for redo log file if present apply redo logs sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile()); if (FILE *file = fopen(dbRedoFileName, "r")) { @@ -243,17 +314,20 @@ int main(int argc, char **argv) Conf::logger.stopLogger(); session->destroySystemDatabase(); delete session; - return 10; + return 60; } - //TODO::generate checkpoint file - sprintf(cmd, "csqldump -X > %s",dbChkptFileName); - ret = system(cmd); - if (ret != 0) { - printf("Unable to create checkpoint file\n"); + + // take check point at this moment + sprintf(dbChkptSchema, "%s/db.chkpt.schema", Conf::config.getDbFile()); + sprintf(dbChkptMap, "%s/db.chkpt.map", Conf::config.getDbFile()); + sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile()); + ret = system("checkpoint"); + if (ret != 0) { + printf("Unable to create checkpoint file. Database corrupted.\n"); Conf::logger.stopLogger(); session->destroySystemDatabase(); delete session; - return 11; + return 70; } ret = unlink(dbRedoFileName); if (ret != 0) { @@ -261,15 +335,16 @@ int main(int argc, char **argv) Conf::logger.stopLogger(); session->destroySystemDatabase(); delete session; - return 11; + return 80; } } } + bool isCacheReq = false, isSQLReq= false; recoverFlag = true; if (opt == 1 && isInit && ! Conf::config.useDurability()) { if (Conf::config.useCache()) { printf("Database server recovering cached tables...\n"); - int ret = system("cachetable -U root -P manager -R"); + int ret = system("cachetable -R"); if (ret != 0) { printf("Cached Tables recovery failed %d\n", ret); Conf::logger.stopLogger(); @@ -286,46 +361,53 @@ int main(int argc, char **argv) return 1; } } - GlobalUniqueID UID; - if (isInit) UID.create(); //TODO:: kill all the child servers and restart if !isInit - bool isCacheReq = false, isSQLReq= false; if(Conf::config.useCsqlSqlServer()) { isSQLReq = true; startServiceClient(); } + if ( (Conf::config.useCache() && + Conf::config.getCacheMode()==ASYNC_MODE)) { + startAsyncServer(); + } if (Conf::config.useCache() && Conf::config.useTwoWayCache()) { isCacheReq = true; startCacheServer(); } - printf("Database server started\n"); + printf("Database Server Started...\n"); + reloop: while(!srvStop) { tval.tv_sec = timeout.tv_sec; tval.tv_usec = timeout.tv_usec; os::select(0, 0, 0, 0, &tval); - + //send signal to all the registered process..check they are alive cleanupDeadProcs(sysdb); - + if (srvStop) break; //TODO::if it fails to start 5 times, exit if (isCacheReq && cachepid !=0 && checkDead(cachepid)) startCacheServer(); - if (isSQLReq && sqlserverpid !=0 && checkDead(sqlserverpid)) - startServiceClient(); } if (logActiveProcs(sysdb) != OK) {srvStop = 0; goto reloop; } - os::kill(cachepid, SIGTERM); - os::kill(sqlserverpid, SIGTERM); + if (cachepid) os::kill(cachepid, SIGTERM); + if(asyncpid) os::kill(asyncpid, SIGTERM); + if (sqlserverpid) os::kill(sqlserverpid, SIGTERM); //if (recoverFlag) dumpData(); + if (Conf::config.useDurability() && Conf::config.useMmap()) { + //ummap the memory + char *startAddr = (char *) sysdb->getMetaDataPtr(); + msync(startAddr + Conf::config.getMaxSysDbSize(),Conf::config.getMaxDbSize(), MS_SYNC); + munmap(startAddr + Conf::config.getMaxSysDbSize(), Conf::config.getMaxDbSize()); + } logFine(Conf::logger, "Server Exiting"); printf("Server Exiting\n"); logFine(Conf::logger, "Server Ended"); UID.destroy(); - Conf::logger.stopLogger(); session->destroySystemDatabase(); + Conf::logger.stopLogger(); delete session; return 0; } diff --git a/src/tools/csqlsqlserver.cxx b/src/tools/csqlsqlserver.cxx index 45bb9569..3d07b47f 100644 --- a/src/tools/csqlsqlserver.cxx +++ b/src/tools/csqlsqlserver.cxx @@ -65,6 +65,7 @@ int main(int argc, char **argv) } os::signal(SIGINT, sigTermHandler); os::signal(SIGTERM, sigTermHandler); + os::signal(SIGCHLD, SIG_IGN); bool end = false; SqlNetworkHandler::stmtID = 0; @@ -112,9 +113,10 @@ int main(int argc, char **argv) rv = nwServer->start(); if (rv != OK) { printf("Unable to start the server\n"); + delete nwServer; return 1; } - printf("Csql Network Daemon started\n"); + // printf("Network Server Started"); fd_set fdset; int ret = 0; struct timeval timeout, tval; @@ -135,5 +137,6 @@ int main(int argc, char **argv) } printf("Csql Network Daemon Exiting\n"); nwServer->stop(); + delete nwServer; return 0; } diff --git a/src/tools/isql.cxx b/src/tools/isql.cxx index 93834c4b..b4be4617 100644 --- a/src/tools/isql.cxx +++ b/src/tools/isql.cxx @@ -15,7 +15,7 @@ ***************************************************************************/ #include #include -#include +#include #include #include #include @@ -23,14 +23,18 @@ #include #include #include -#define SQL_STMT_LEN 1024 -enum STMT_TYPE +#include +#define SQL_STMT_LEN 8192 +void printVariables(); + +enum STATEMENT_TYPE { SELECT =0, + EXPLAIN, DDL , OTHER }; -STMT_TYPE stmtType = SELECT; +STATEMENT_TYPE stmtType = SELECT; FILE *fp; AbsSqlConnection *conn; AbsSqlStatement *stmt; @@ -39,6 +43,7 @@ List sqlStmtList; SqlApiImplType type = CSqlUnknown; bool gateway=false, silent=false; bool autocommitmode = true; +bool isTimer = false; bool network = false; bool firstPrepare = false; IsolationLevel isoLevel = READ_COMMITTED; @@ -56,7 +61,8 @@ void printUsage() return; } - +bool noUndolog=false; +bool exclusive=false; int getConnType(int opt); int main(int argc, char **argv) @@ -72,9 +78,8 @@ int main(int argc, char **argv) char filename[512]; filename [0] ='\0'; int c = 0, opt=0; - bool exclusive=false; int connOpt = 0; - while ((c = getopt(argc, argv, "u:p:s:o:H:P:gXS?")) != EOF) + while ((c = getopt(argc, argv, "u:p:s:o:H:P:gXUS?")) != EOF) { switch (c) { @@ -86,6 +91,7 @@ int main(int argc, char **argv) case 'S' : { silent = true; break; } //silent case 'X' : { silent = true; exclusive = true; break; } //silent case 'g' : { gateway = true; break; } //print help + case 'U' : { noUndolog = true; break; } //print help case 'H' : { strcpy (hostname, argv[optind - 1]); network = true; break; } case 'P' : { strcpy (port, argv[optind - 1]); @@ -102,8 +108,8 @@ int main(int argc, char **argv) } if (username[0] == '\0' ) { - strcpy(username, "root"); - strcpy(password, "manager"); + strcpy(username, I_USER); + strcpy(password, I_PASS); } if (network) { if (hostname[0] == '\0') { printUsage(); return 0; } @@ -123,6 +129,7 @@ int main(int argc, char **argv) DbRetVal rv = OK; if (connOpt) { + noUndolog =false; type = (SqlApiImplType) getConnType(connOpt); conn = SqlFactory::createConnection((SqlApiImplType)type); if(connOpt == 4 || connOpt == 5 || connOpt == 6) { @@ -142,8 +149,7 @@ int main(int argc, char **argv) else { if (gateway) type = CSqlGateway; else { - if (exclusive) type=CSqlDirect; - else type = CSql; + if (exclusive) type=CSqlDirect; else type = CSql; } conn = SqlFactory::createConnection(type); } @@ -160,10 +166,10 @@ int main(int argc, char **argv) } } aconStmt = SqlFactory::createStatement(type); - if (exclusive) { +/* if (exclusive) { SqlStatement *sqlStmt = (SqlStatement*)aconStmt; sqlStmt->setLoading(true); - } + }*/ aconStmt->setConnection(conn); //rv = conn->beginTrans(READ_COMMITTED, TSYNC); rv = conn->beginTrans(); @@ -245,6 +251,10 @@ bool handleTransaction(char *st) printf("Isolation Level is set to READ_REPEATABLE\n"); return true; } + else if (strcasecmp(st, "SET TIMER ON;") == 0) + { isTimer=true; return true; } + else if (strcasecmp(st, "SET TIMER OFF;") == 0) + { isTimer=false; return true; } return false; } bool handleEchoAndComment(char *st) @@ -261,6 +271,10 @@ bool handleEchoAndComment(char *st) { printHelp(); return true; + }else if(strcasecmp(st,"show variables;")==0){ + + printVariables(); + return true; }else if (strcasecmp(st, "show tables;") == 0) { DbRetVal rv = OK; List tableList = aconStmt->getAllTableNames(rv); @@ -277,8 +291,23 @@ bool handleEchoAndComment(char *st) } if (count ==0) printf(" No tables exist\n"); printf("==========================================\n"); - - return true; + return true; + }else if( strcasecmp(st, "show users;" )==0){ + DbRetVal rv = OK; + List userList = aconStmt->getAllUserNames(rv); + ListIterator iter = userList.getIterator(); + Identifier *elem = NULL; + int ret =0; + printf("=============UserNames===================\n"); + while (iter.hasElement()) + { + elem = (Identifier*) iter.nextElement(); + if(0==strcmp( elem->name ,I_USER)) + continue; + printf(" %s \n", elem->name); + } + printf("=========================================\n"); + return true; } return false; } @@ -299,9 +328,102 @@ void printHelp() printf("QUIT\n"); printf("======================================================\n"); } +void printVariables() +{ + //SERVER Section Info. + printf("=======================================================\n"); + printf("| SERVER SECTION INFORMATION |\n"); + printf("=======================================================\n"); + printf(" SITE_ID\t\t\t= %d\n",Conf::config.getSiteID()); + printf(" PAGE_SIZE\t\t\t= %d Byte\n",Conf::config.getPageSize()); + printf(" MAX_PROCS\t\t\t= %d\n",Conf::config.getMaxProcs()); + printf(" MAX_SYS_DB_SIZE\t\t= %d MB\n",(Conf::config.getMaxSysDbSize()/1048576)); + printf(" MAX_DB_SIZE\t\t\t= %d MB\n",(Conf::config.getMaxDbSize()/1048576)); + printf(" SYS_DB_KEY\t\t\t= %d\n",Conf::config.getSysDbKey()); + printf(" USER_DB_KEY\t\t\t= %d\n",Conf::config.getUserDbKey()); + printf(" LOG_LEVEL\t\t\t= %d\n",Conf::config.getLogLevel()); + printf(" MAP_ADDRESS\t\t\t= %ld\n",Conf::config.getMapAddress()); + + if(Conf::config.useDurability()) + printf(" DURABILITY\t\t\t= True\n"); + else + printf(" DURABILITY\t\t\t= False\n"); + + if(Conf::config.useMmap()) + printf(" MMAP\t\t\t\t= True\n"); + else + printf(" MMAP\t\t\t\t= False\n"); + + printf(" DURABLE_MODE\t\t\t= %d\n",Conf::config.getDurableMode()); + printf(" DATABASE_FILE\t\t\t= %s\n",Conf::config.getDbFile()); + printf(" LOG_FILE_PATH\t\t\t= %s\n",Conf::config.getLogFile()); + printf(" STDERR_FILE\t\t\t= %s\n",Conf::config.getStderrFile()); + + //Network Section Information + printf("=======================================================\n"); + printf("| NETWORK SECTION INFORMATION |\n"); + printf("=======================================================\n"); + + if(Conf::config.useCsqlSqlServer()) + printf(" CSQL_SQL_SERVER\t\t= True\n"); + else + printf(" CSQL_SQL_SERVER\t\t= False\n"); + + printf(" PORT\t\t\t\t= %d\n",Conf::config. getPort()); + printf(" NETWORK_RESPONSE_TIMEOUT\t= %d\n",Conf::config.getNetworkResponseTimeout()); + printf(" NETWORK_CONNECT_TIMEOUT\t= %d\n",Conf::config.getNetworkConnectTimeout()); + printf(" ID_SHM_KEY\t\t\t= %d\n",Conf::config.getShmIDKey()); + + //Client section variables + printf("=======================================================\n"); + printf("| CLIENT SECTION INFORMATION |\n"); + printf("=======================================================\n"); + printf(" MUTEX_TIMEOUT_SECS\t\t= %d\n",Conf::config.getMutexSecs()); + printf(" MUTEX_TIMEOUT_USECS\t\t= %d\n",Conf::config.getMutexUSecs()); + printf(" MUTEX_TIMEOUT_RETRIES\t\t= %d\n",Conf::config.getMutexRetries()); + printf(" LOCK_TIMEOUT_SECS\t\t= %d\n",Conf::config.getLockSecs()); + printf(" LOCK_TIMEOUT_USECS\t\t= %d\n",Conf::config.getLockUSecs()); + printf(" LOCK_TIMEOUT_RETRIES\t\t= %d\n",Conf::config.getLockRetries()); + + //CACHE Section variables + printf("=======================================================\n"); + printf("| CACHE SECTION INFORMATION |\n"); + printf("=======================================================\n"); + + if(Conf::config.useCache()) + printf(" CACHE_TABLE\t\t\t= True\n"); + else + printf(" CACHE_TABLE\t\t\t= False\n"); + + printf(" DSN\t\t\t\t= %s\n",Conf::config.getDSN()); + + if(Conf::config.useTwoWayCache()) + printf(" ENABLE_BIDIRECTIONAL_CACHE\t= True\n"); + else + printf(" ENABLE_BIDIRECTIONAL_CACHE\t= False\n"); + + printf(" CACHE_RECEIVER_WAIT_SECS\t= %d\n",Conf::config.getCacheWaitSecs()); + + if(Conf::config.getCacheMode()==0) + printf(" CACHE_MODE\t\t\t= SYNC\n"); + else if (Conf::config.getCacheMode()==1) + printf(" CACHE_MODE\t\t\t= ASYNC\n"); + else + printf(" CACHE_MODE\t\t\t= Unknown\n"); + + printf(" DS_CONFIG_FILE\t\t\t= %s\n",Conf::config.getDsConfigFile()); + printf(" TABLE_CONFIG_FILE\t\t= %s\n",Conf::config.getTableConfigFile()); + printf(" MSG_KEY\t\t\t= %d\n",Conf::config.getMsgKey() ); + printf(" ASYNC_MSGMAX\t\t\t= %d\n",Conf::config.getAsyncMsgMax()); + printf(" MAX_QUEUE_LOGS\t\t\t= %d\n",Conf::config.getLogLevel()); + printf(" CONFL_RESOL_FILE\t\t= %s\n",Conf::config.getConflResoFile()); + printf(" \nNOTE: To modify above variables, You will be following 'csql.conf' file.\n\n"); +} + void setStmtType(char *st) { if (strncasecmp (st, "SELECT", 6) == 0) {stmtType=SELECT; return; } + else if (strncasecmp (st, "EXPLAIN", 6) == 0) {stmtType=EXPLAIN; return; } else if (strncasecmp (st, "CREATE", 6) == 0) {stmtType=DDL; return; } else if (strncasecmp (st, "DROP", 4) == 0) { stmtType=DDL; return; } stmtType = OTHER; @@ -334,7 +456,8 @@ char getQueryFromStdIn(char *buf) strcpy(buf, ""); char *line = readline("CSQL>"); if (line) {strcpy(buf, line); add_history(line); } - else return EOF; + else { free(line); return EOF; } + free(line); return 0; } char getQueryFromFile(char *buf) @@ -376,7 +499,7 @@ bool getInput(bool fromFile) if ( *buf == ';' ) return true; // Null statement. setStmtType(buf); - + NanoTimer timer; DbRetVal rv; if (autocommitmode) { if (firstPrepare) aconStmt->free(); @@ -398,7 +521,12 @@ bool getInput(bool fromFile) } sqlStmtList.append(stmt); } + if (noUndolog || exclusive ){ + SqlStatement *sqlStmt = (SqlStatement*)stmt; + sqlStmt->setLoading(true); + } int rows =0; + timer.start(); rv = stmt->execute(rows); if (rv != OK) { @@ -416,10 +544,16 @@ bool getInput(bool fromFile) } return true; } - if (stmtType == OTHER) + timer.stop(); + if (stmtType == OTHER && stmt->isSelect()) stmtType = SELECT; + if (stmtType == OTHER ) { if (!silent) printf("Statement Executed: Rows Affected = %d\n", rows); } + else if (stmtType == EXPLAIN) + { + stmt->close(); + } else if (stmtType == DDL) { if (!silent) printf("Statement Executed\n"); @@ -437,6 +571,7 @@ bool getInput(bool fromFile) printf("\n---------------------------------------------------------\n"); delete info; void *tuple = NULL; + timer.start(); while(true) { printf("\t"); @@ -445,7 +580,9 @@ bool getInput(bool fromFile) if (tuple == NULL) { break; } } stmt->close(); + timer.stop(); } + if (isTimer) printf("Time taken: %lld microsecs\n", timer.sum()/1000); if (autocommitmode) { conn->commit(); @@ -466,6 +603,8 @@ int getConnType(int opt) case 3: { printf("Local Gateway\n"); return (int) CSqlGateway; } case 4: { printf("Network CSql\n"); return (int) CSqlNetwork; } case 5: { printf("Network Adapter\n"); return (int) CSqlNetworkAdapter;} - case 6: { printf("Netwrok Gateway\n"); return (int) CSqlNetworkGateway;} + case 6: { printf("Network Gateway\n"); return (int) CSqlNetworkGateway;} + case 7: { printf("Log\n"); return (int) CSqlLog; } + case 8: { printf("Log\n"); return (int) CSqlLog; } } } diff --git a/src/tools/recover.cxx b/src/tools/recover.cxx new file mode 100644 index 00000000..38d65e3a --- /dev/null +++ b/src/tools/recover.cxx @@ -0,0 +1,92 @@ +/*************************************************************************** + * Copyright (C) 2007 by www.databasecache.com * + * Contact: praba_tuty@databasecache.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + ***************************************************************************/ +#include +#include +//#include +//#include +#include +#include +//#include +//#include +//#include +//#include +//#define SQL_STMT_LEN 1024 + +//FILE *fp; + +AbsSqlConnection *conn = NULL; +AbsSqlStatement *stmt = NULL; + +int main(int argc, char **argv) +{ + char username[IDENTIFIER_LENGTH]; + username [0] = '\0'; + char password[IDENTIFIER_LENGTH]; + password [0] = '\0'; + char filename[512]; + filename [0] ='\0'; + int c = 0, opt=0; + int connOpt = 0; +/* while ((c = getopt(argc, argv, "u:p:")) != EOF) + { + switch (c) + { + case 'u' : strcpy(username , argv[optind - 1]); break; + case 'p' : strcpy(password , argv[optind - 1]); break; + default: printf("Wrong args\n"); exit(1); + } + + }*/ + char schFile[1024]; + char cmd[1024]; + + Connection dbconn; + DbRetVal rv = dbconn.open(I_USER, I_PASS); + if (rv != OK) return 1; + dbconn.close(); + + sprintf(schFile, "%s/db.chkpt.schema", Conf::config.getDbFile()); + if (FILE *file = fopen(schFile, "r")) { + fclose(file); + sprintf(cmd, "csql -X -s %s",schFile); + int ret = system(cmd); + if (ret != 0) { + printf("Tables cannot be recovered. schema file corrupted\n"); + return 1; + } + } + conn = SqlFactory::createConnection(CSqlDirect); + rv = conn->connect(I_USER, I_PASS); + if (rv != OK) { + delete conn; + return 2; + } + SqlConnection *sqlcon = (SqlConnection *) conn; + rv = sqlcon->getExclusiveLock(); + if (rv != OK) { + conn->disconnect(); + delete conn; + return 3; + } + + DatabaseManager *dbMgr = conn->getConnObject().getDatabaseManager(); + dbMgr->recover(); + + conn->disconnect(); + delete conn; + + return 0; +} diff --git a/src/tools/redo.cxx b/src/tools/redo.cxx index ee8ba220..8d8717b2 100644 --- a/src/tools/redo.cxx +++ b/src/tools/redo.cxx @@ -118,11 +118,13 @@ int main(int argc, char **argv) { struct stat st; char fileName[1024]; - int c = 0, opt=0; - while ((c = getopt(argc, argv, "a?")) != EOF) { + int c = 0, opt=0; + bool interactive=0; + while ((c = getopt(argc, argv, "ai?")) != EOF) { switch (c) { case '?' : { opt = 1; break; } //print help case 'a' : { opt = 2; break; } + case 'i' : { interactive = 1; break; } default: printf("Wrong args\n"); exit(1); } @@ -133,7 +135,7 @@ int main(int argc, char **argv) } Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE")); sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile()); - printf("Filename Redo log is :%s\n", fileName); + printf("Redo log filename is :%s\n", fileName); int fd = open(fileName, O_RDONLY); if (-1 == fd) { return OK; } if (fstat(fd, &st) == -1) { @@ -147,7 +149,7 @@ int main(int argc, char **argv) return 0; } conn = SqlFactory::createConnection(CSqlDirect); - DbRetVal rv = conn->connect("root","manager"); + DbRetVal rv = conn->connect(I_USER, I_PASS); SqlConnection *sCon = (SqlConnection*) conn; rv = sCon->getExclusiveLock(); if (rv != OK) { @@ -192,6 +194,7 @@ int main(int argc, char **argv) //printf("PREPARE:%d %d %s\n", stmtID, len, stmtString); AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect); stmt->setConnection(conn); + if (interactive) printf("PREPARE %d : %s\n", stmtID, stmtString); rv = stmt->prepare(stmtString); if (rv != OK) { printf("unable to prepare\n"); @@ -217,13 +220,14 @@ int main(int argc, char **argv) conn->commit(); freeAllStmtHandles(); conn->disconnect(); - munmap(startAddr, st.st_size); + munmap((char*)startAddr, st.st_size); close(fd); delete conn; return 0; } stmtID = *(int*)iter; //printf("stmtid %d\n", stmtID); + if (interactive) printf("EXEC %d :\n", stmtID); iter = iter + sizeof(int); eType = *(int*)iter; //printf("eType is %d\n", eType); @@ -266,7 +270,7 @@ int main(int argc, char **argv) loglen = *(int*) iter; iter += sizeof(int); stmtID = *(int*)iter; iter = iter + sizeof(int); - //printf("FREE: %d\n", stmtID); + if (interactive) printf("FREE %d:\n", stmtID); AbsSqlStatement *stmt = getStmtFromHashTable(stmtID); if (stmt) { stmt->free(); @@ -292,6 +296,7 @@ int main(int argc, char **argv) return ErrSysFatal; } stmt->setConnection(conn); + if (interactive) printf("EXECDIRECT %d : %s\n", stmtID, stmtString); rv = stmt->prepare(stmtString); if (rv != OK) { printf("unable to prepare\n"); @@ -300,6 +305,13 @@ int main(int argc, char **argv) } rv = stmt->execute(ret); if (rv != OK) { + if (strlen(stmtString) > 6 && + ( (strncasecmp(stmtString,"CREATE", 6) == 0) || + (strncasecmp(stmtString,"DROP", 4) == 0)) ) { + // conn->disconnect(); + // return OK; + continue; + } printf("unable to execute\n"); conn->disconnect(); return ErrSysFatal; @@ -307,7 +319,7 @@ int main(int argc, char **argv) stmt->free(); } } - munmap(startAddr, st.st_size); + munmap((char*)startAddr, st.st_size); close(fd); freeAllStmtHandles(); conn->disconnect(); -- 2.11.4.GIT