1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlOdbcStatement.h>
19 #include <SqlFactory.h>
21 #include <CacheTableLoader.h>
23 int insert(Table
*table
, int pkid
);
24 int remove(Table
*table
, int pkid
);
25 int getRecordsFromTargetDb(int mode
);
26 void createCacheTableList();
27 DbRetVal
getCacheField(char *tblName
,char *fldName
);
28 DbRetVal
getCacheProjField(char *tblName
,char *fielflist
);
29 DbRetVal
getCacheCondition(char *tblName
,char *condition
);
32 static void sigTermHandler(int sig
)
34 printf("Received signal %d\nStopping the server\n", sig
);
40 printf("Usage: csqlcacheserver \n");
41 printf("Description: Start the csql caching server.\n");
44 AbsSqlConnection
*targetconn
;
46 int main(int argc
, char **argv
)
49 while ((c
= getopt(argc
, argv
, "?")) != EOF
)
53 case '?' : { opt
= 10; break; } //print help
64 os::signal(SIGINT
, sigTermHandler
);
65 os::signal(SIGTERM
, sigTermHandler
);
66 DbRetVal rv
= conn
.open("root", "manager");
67 if (rv
!= OK
) return 1;
68 targetconn
= SqlFactory::createConnection(CSqlAdapter
);
69 rv
= targetconn
->connect("root", "manager");
70 if (rv
!= OK
) return 1;
71 if (!Conf::config
.useCache())
73 printf("Cache is set to OFF in csql.conf file\n");
76 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
77 stmt
->setConnection(targetconn
);
78 /*rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
79 targetconn->beginTrans();
86 printf("Cache server started\n");
88 struct stat ofstatus
,nfstatus
;
89 ret
=stat(Conf::config
.getTableConfigFile(),&ofstatus
);
90 struct timeval timeout
, tval
;
91 timeout
.tv_sec
= Conf::config
.getCacheWaitSecs();
93 createCacheTableList();
96 tval
.tv_sec
= timeout
.tv_sec
;
97 tval
.tv_usec
= timeout
.tv_usec
;
98 ret
= os::select(0, NULL
, 0, 0, &tval
);
99 printf("Checking for cache updates\n");
100 ret
=stat(Conf::config
.getTableConfigFile(),&nfstatus
);
101 if(ofstatus
.st_mtime
!= nfstatus
.st_mtime
)
103 cacheTableList
.reset();
104 createCacheTableList();
105 ofstatus
.st_mtime
= nfstatus
.st_mtime
;
107 ret
= getRecordsFromTargetDb(1);
108 if (ret
!=0) srvStop
= 1;
109 //ret = getRecordsFromTargetDb(2);
110 if (ret
!=0) srvStop
= 1;
112 printf("Cache Server Exiting\n");
113 cacheTableList
.reset();
115 targetconn
->disconnect();
118 int getRecordsFromTargetDb(int mode
)
126 caId
=Conf::config
.getCacheID();
127 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
128 stmt
->setConnection(targetconn
);
129 AbsSqlStatement
*delstmt
= SqlFactory::createStatement(CSqlAdapter
);
130 delstmt
->setConnection(targetconn
);
132 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
133 sprintf(StmtStr
, "SELECT * FROM csql_log_int where cacheid = %d;", caId
);
134 rv
= stmt
->prepare(StmtStr
);
135 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
138 rv
= stmt
->prepare("SELECT * FROM csql_log_char;");
139 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
140 //rv = delstmt->prepare("DELETE from csql_log_char where id=?;");
142 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
143 stmt
->bindField(1, tablename
);
144 stmt
->bindField(2, &pkid
);
145 stmt
->bindField(3, &op
);
146 stmt
->bindField(4, &caId
);
147 stmt
->bindField(5, &id
);
149 DatabaseManager
*dbMgr
= conn
.getDatabaseManager();
151 rv
= targetconn
->beginTrans();
152 rv
= stmt
->execute(rows
);
155 printError(ErrSysInit
, "Unable to execute stmt in target db");
156 targetconn
->rollback();
163 if (stmt
->fetch() != NULL
) {
164 printf("Row value is %s %d %d %d\n", tablename
, pkid
, op
,caId
);
166 Table
*table
= dbMgr
->openTable(tablename
);
170 printError(ErrSysInit
, "Table %s not exist in csql", tablename
);
171 targetconn
->rollback();
180 ret
= remove(table
,pkid
);
184 ret
= insert(table
, pkid
);
186 dbMgr
->closeTable(table
);
187 rv
= targetconn
->commit();
188 rv
= targetconn
->beginTrans();
189 //Remove record from csql_log_XXX table
190 sprintf(StmtStr
, "DELETE from csql_log_int where id=%d ;", id
);
191 rv
= delstmt
->prepare(StmtStr
);
192 if (rv
!= OK
) {printf("FAILED\n"); return 1; }
193 // delstmt->setIntParam(1, id);
194 rv
= delstmt
->execute(rows
);
197 printf("log record not deleted from the target db %d\n", rv
);
198 targetconn
->rollback();
206 rv
= targetconn
->commit();
221 int insert(Table
*table
, int pkid
)
223 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
224 stmt
->setConnection(targetconn
);
225 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
226 char pkfieldname
[128];
227 DbRetVal rv
=getCacheField(table
->getName(), pkfieldname
);
229 ostmt
->getPrimaryKeyFieldName(table
->getName(), pkfieldname
);
231 char fieldlist
[IDENTIFIER_LENGTH
];
232 char condition
[IDENTIFIER_LENGTH
];
234 rv
=getCacheProjField(table
->getName(),fieldlist
);
236 rv
=getCacheCondition(table
->getName(),condition
);
238 sprintf(sbuf
, "SELECT * FROM %s where %s = %d;", table
->getName(), pkfieldname
, pkid
);
240 sprintf(sbuf
, "SELECT * FROM %s where %s = %d and %s ;", table
->getName(), pkfieldname
, pkid
,condition
);
243 rv
=getCacheCondition(table
->getName(),condition
);
245 sprintf(sbuf
, "SELECT %s FROM %s where %s = %d;",fieldlist
,table
->getName(), pkfieldname
, pkid
);
247 sprintf(sbuf
, "SELECT %s FROM %s where %s = %d and %s;",fieldlist
,table
->getName(), pkfieldname
, pkid
,condition
);
250 //TODO::get the primary key field name from the table interface. need to implement it
251 rv
= stmt
->prepare(sbuf
);
252 if (rv
!= OK
) return 1;
254 List fNameList
= table
->getFieldNameList();
255 ListIterator fNameIter
= fNameList
.getIterator();
256 FieldInfo
*info
= new FieldInfo();
257 int fcount
=1; void *valBuf
; int fieldsize
=0;
258 void *buf
[128];//TODO:resticts to support only 128 fields in table
259 Identifier
*elem
= NULL
;
260 while (fNameIter
.hasElement()) {
261 elem
= (Identifier
*) fNameIter
.nextElement();
262 table
->getFieldInfo((const char*)elem
->name
, info
);
263 valBuf
= AllDataType::alloc(info
->type
, info
->length
);
264 buf
[fcount
] = valBuf
;
265 table
->bindFld(elem
->name
, valBuf
);
266 stmt
->bindField(fcount
++, valBuf
);
271 int retValue
= stmt
->execute(rows
);
272 if (retValue
&& rows
!= 1) {printError(ErrSysInit
, "Unable to execute statement at target db\n"); return ErrSysInit
; }
273 conn
.startTransaction();
274 if (stmt
->fetch() != NULL
) {
275 ostmt
->setNullInfo(table
);
276 table
->insertTuple();
277 //Note:insert may fail if the record is inserted from this cache
279 for (int i
=1; i
< fcount
; i
++) {
287 int remove(Table
*table
, int pkid
)
290 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
291 stmt
->setConnection(targetconn
);
292 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
293 char pkfieldname
[128];
294 rv
=getCacheField(table
->getName(), pkfieldname
);
296 ostmt
->getPrimaryKeyFieldName(table
->getName(), pkfieldname
);
300 p1
.setTerm(pkfieldname
, OpEquals
, &pkid
);
301 table
->setCondition(&p1
);
302 rv
= conn
.startTransaction();
303 if (rv
!= OK
) return 1;
304 rv
= table
->execute();
307 table
->setCondition(NULL
);
311 if (table
->fetch() != NULL
)
312 rv
= table
->deleteTuple();
313 //Note:Delete may fail if the record is deleted from this cache
314 table
->setCondition(NULL
);
316 if (rv
!= OK
) return 1;
319 void createCacheTableList()
322 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
324 printError(ErrSysInit
, "cachetable.conf file does not exist");
327 char tablename
[IDENTIFIER_LENGTH
];
328 char fieldname
[IDENTIFIER_LENGTH
];
329 char condition
[IDENTIFIER_LENGTH
];
330 char field
[IDENTIFIER_LENGTH
];
334 fscanf(fp
,"%d:%s %s %s %s\n",&mode
,tablename
,fieldname
,condition
,field
);
335 CacheTableInfo
*cacheTable
=new CacheTableInfo();
336 cacheTable
->setTableName(tablename
);
337 cacheTable
->setFieldName(fieldname
);
338 cacheTable
->setProjFieldList(field
);
339 cacheTable
->setCondition(condition
);
340 cacheTableList
.append(cacheTable
);
342 // printf("Table %s is not cached\n",tabname);
347 DbRetVal
getCacheCondition(char *tblName
,char *condition
)
349 ListIterator iter
=cacheTableList
.getIterator();
350 CacheTableInfo
*cacheTable
;
351 while(iter
.hasElement())
353 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
354 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
355 if(strcmp(cacheTable
->getCondition(),"NULL")!=0)
357 strcpy(condition
,cacheTable
->getCondition());
365 DbRetVal
getCacheProjField(char *tblName
,char *fieldlist
)
367 ListIterator iter
=cacheTableList
.getIterator();
368 CacheTableInfo
*cacheTable
;
369 while(iter
.hasElement())
371 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
372 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
373 if(strcmp(cacheTable
->getProjFieldList(),"NULL")!=0)
375 strcpy(fieldlist
,cacheTable
->getProjFieldList());
382 DbRetVal
getCacheField(char *tblName
,char *fldName
)
384 ListIterator iter
=cacheTableList
.getIterator();
385 CacheTableInfo
*cacheTable
;
386 while(iter
.hasElement())
388 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
389 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
390 if(strcmp(cacheTable
->getFieldName(),"NULL")!=0)
392 strcpy(fldName
,cacheTable
->getFieldName());