1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlLogConnection.h>
19 #include <SqlOdbcStatement.h>
20 #include <SqlFactory.h>
21 #include <SqlConnection.h>
22 #include <SqlStatement.h>
24 #include <CacheTableLoader.h>
26 // List which keeps all DS Information.
29 char dsn
[IDENTIFIER_LENGTH
];
30 char user
[IDENTIFIER_LENGTH
];
31 char pwd
[IDENTIFIER_LENGTH
];
32 char tdb
[IDENTIFIER_LENGTH
];
33 struct MultiThreadDSN
*next
;
36 int insert(char *table
, long long pkid
, AbsSqlConnection
*targetconn
, SqlStatement
*sqlstmt
, AbsSqlStatement
*csqlstmt
, AbsSqlConnection
*csqlcon
);
37 int remove(char *table
, long long pkid
, AbsSqlConnection
*targetconn
, AbsSqlStatement
*csqlstmt
,AbsSqlConnection
*csqlcon
);
38 int getRecordsFromTargetDb(AbsSqlConnection
*targetconn
, AbsSqlConnection
*csqlcon
,AbsSqlStatement
*csqlstmt
, SqlConnection
*con
, SqlStatement
*sqlstmt
);
39 void createCacheTableList();
40 DbRetVal
getCacheField(char *tblName
,char *fldName
);
41 DbRetVal
getCacheProjField(char *tblName
,char *fielflist
);
42 DbRetVal
getCacheCondition(char *tblName
,char *condition
);
43 void setParamValues(AbsSqlStatement
*stmt
, int parampos
, DataType type
, int length
, char *value
);
44 void *fillBindBuffer(TDBInfo tName
, DataType type
, void *valBuf
, int length
=0);
47 static void sigTermHandler(int sig
)
49 printf("Received signal %d\nStopping the server\n", sig
);
55 printf("Usage: csqlcacheserver \n");
56 printf("Description: Start the csql caching server.\n");
60 AbsSqlConnection
*csqlcon
= NULL
;
67 char ds
[IDENTIFIER_LENGTH
];
68 char targetDb
[IDENTIFIER_LENGTH
];
69 char userName
[IDENTIFIER_LENGTH
];
70 char pwdName
[IDENTIFIER_LENGTH
];
71 MultiDsnThread() { ds
[0]='\0'; targetDb
[0]='\0'; userName
[0]='\0'; pwdName
[0]='\0';}
74 void *startThread(void *p
);// Function is used for Thread
75 MultiDsnThread
**multiDsnInput
;
78 int main(int argc
, char **argv
)
81 while ((c
= getopt(argc
, argv
, "?")) != EOF
)
85 case '?' : { opt
= 10; break; } //print help
96 os::signal(SIGINT
, sigTermHandler
);
97 os::signal(SIGTERM
, sigTermHandler
);
99 csqlcon
= SqlFactory::createConnection(CSqlLog
);
100 SqlLogConnection
*logConn
= (SqlLogConnection
*) csqlcon
;
101 logConn
->setNoMsgLog(true);
102 rv
= csqlcon
->connect(I_USER
, I_PASS
);
104 printError(ErrSysInternal
, "Unable to connect to CSQL");
108 // Reading "csqlds.conf file"
110 fp
= fopen(Conf::config
.getDsConfigFile(),"r");
112 printError(ErrSysInit
,"csqlds.conf file does not exist");
113 csqlcon
->disconnect();
116 struct MultiThreadDSN
*head
=NULL
, *pnode
=NULL
;
118 char dsnname
[IDENTIFIER_LENGTH
];dsnname
[0]='\0';
119 char tdbname
[IDENTIFIER_LENGTH
];tdbname
[0] = '\0';
120 char username
[IDENTIFIER_LENGTH
];username
[0]='\0';
121 char password
[IDENTIFIER_LENGTH
];password
[0]='\0';
126 struct MultiThreadDSN
*multiDsn
= new struct MultiThreadDSN
;
127 fscanf(fp
,"%s %s %s %s\n",dsnname
,username
,password
,tdbname
);
129 strcpy(multiDsn
->dsn
,dsnname
);
130 strcpy(multiDsn
->user
,username
);
131 strcpy(multiDsn
->pwd
,password
);
132 strcpy(multiDsn
->tdb
,tdbname
);
135 if(pnode
==NULL
) {head
=multiDsn
; pnode
=multiDsn
;}
136 else { pnode
->next
=multiDsn
; pnode
=pnode
->next
; }
142 MultiDsnThread
*info
= new MultiDsnThread();
143 strcpy(info
->ds
,pnode
->dsn
);
144 strcpy(info
->targetDb
,pnode
->tdb
);
145 strcpy(info
->userName
,pnode
->user
);
146 strcpy(info
->pwdName
,pnode
->pwd
);
148 printf("Cache Server Exiting\n");
149 cacheTableList
.reset();
150 csqlcon
->disconnect();
156 // Declare number of thread
157 pthread_t
*thrId
=new pthread_t
[totalDsn
];
158 multiDsnInput
= (MultiDsnThread
**) malloc (sizeof(MultiDsnThread
*) * totalDsn
);
161 //Traversing the list
163 while(pnode
!= NULL
){
164 multiDsnInput
[i
] = new MultiDsnThread();
165 strcpy(multiDsnInput
[i
]->ds
,pnode
->dsn
);
166 strcpy(multiDsnInput
[i
]->targetDb
,pnode
->tdb
);
167 strcpy(multiDsnInput
[i
]->userName
,pnode
->user
);
168 strcpy(multiDsnInput
[i
]->pwdName
,pnode
->pwd
);
171 pthread_create(&thrId
[i
], NULL
, &startThread
, multiDsnInput
[i
]);
177 for(int j
=0; j
<totalDsn
; j
++){
178 pthread_join(thrId
[j
], NULL
);
181 printf("Cache Server Exiting\n");
182 cacheTableList
.reset();
183 csqlcon
->disconnect();
184 // targetconn->disconnect();
185 // printf("Out of main\n");
191 // Function for THreads
192 void *startThread(void *thrInfo
)
195 AbsSqlConnection
*targetconn
;
196 //AbsSqlConnection *csqlcon = NULL;
197 AbsSqlStatement
*csqlstmt
= NULL
;
198 SqlConnection
*con
= NULL
;
199 SqlStatement
*sqlstmt
= NULL
;
202 MultiDsnThread
*multiDsnInput
= (MultiDsnThread
*)thrInfo
;
203 /* csqlcon = SqlFactory::createConnection(CSqlLog);
204 SqlLogConnection *logConn = (SqlLogConnection *) csqlcon;
205 logConn->setNoMsgLog(true);
206 rv = csqlcon->connect(I_USER, I_PASS);
207 if (rv != OK) return NULL;
209 targetconn
= SqlFactory::createConnection(CSqlAdapter
);
210 SqlOdbcConnection
*dsnAda
= (SqlOdbcConnection
*)targetconn
;
211 dsnAda
->setDsn(multiDsnInput
->ds
);//line added
213 struct timeval timeout
, tval
;
214 timeout
.tv_sec
= Conf::config
.getCacheWaitSecs();
218 rv
= targetconn
->connect(I_USER
, I_PASS
);
220 printError(ErrSysInternal
, "Unable to connect to target database:%s", multiDsnInput
->ds
);
221 tval
.tv_sec
= timeout
.tv_sec
;
222 tval
.tv_usec
= timeout
.tv_usec
;
223 os::select(0, 0, 0, 0, &tval
);
225 if (srvStop
) return NULL
;
227 if (srvStop
) return NULL
;
229 if (!Conf::config
.useCache())
231 printf("Cache is set to OFF in csql.conf file\n");
234 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
235 stmt
->setConnection(targetconn
);
236 csqlstmt
= SqlFactory::createStatement(CSqlLog
);
237 csqlstmt
->setConnection(csqlcon
);
240 struct stat ofstatus
,nfstatus
;
241 ret
=stat(Conf::config
.getTableConfigFile(),&ofstatus
);
242 timeout
.tv_sec
= Conf::config
.getCacheWaitSecs();
244 createCacheTableList();
247 tval
.tv_sec
= timeout
.tv_sec
;
248 tval
.tv_usec
= timeout
.tv_usec
;
249 ret
= os::select(0, 0, 0, 0, &tval
);
250 printf("Checking for cache updates\n");
251 ret
=stat(Conf::config
.getTableConfigFile(),&nfstatus
);
252 if(ofstatus
.st_mtime
!= nfstatus
.st_mtime
)
254 cacheTableList
.reset();
255 createCacheTableList();
256 ofstatus
.st_mtime
= nfstatus
.st_mtime
;
258 if((ret
= getRecordsFromTargetDb( targetconn
, csqlcon
, csqlstmt
, con
, sqlstmt
)) == 1) {
260 targetconn
->disconnect();
265 //printf("Cache Server Exiting\n");
266 //cacheTableList.reset();
267 //csqlcon->disconnect();
268 targetconn
->disconnect();
272 int getRecordsFromTargetDb(AbsSqlConnection
*targetconn
, AbsSqlConnection
*csqlcon
,AbsSqlStatement
*csqlstmt
, SqlConnection
*con
, SqlStatement
*sqlstmt
)
276 long long op
=0, id
=0,cId
=0;
282 caId
=Conf::config
.getSiteID();
283 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
284 stmt
->setConnection(targetconn
);
285 AbsSqlStatement
*delstmt
= SqlFactory::createStatement(CSqlAdapter
);
286 delstmt
->setConnection(targetconn
);
287 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
288 sprintf(StmtStr
, "SELECT * FROM csql_log_int where cacheid = %d;", caId
);
289 rv
= stmt
->prepare(StmtStr
);
290 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
291 stmt
->bindField(1, tablename
);
292 stmt
->bindField(2, &pkid
);
293 stmt
->bindField(3, &op
);
294 stmt
->bindField(4, &cId
);
295 stmt
->bindField(5, &id
);
297 con
= (SqlConnection
*) csqlcon
->getInnerConnection();
298 sqlstmt
= (SqlStatement
*)csqlstmt
->getInnerStatement();
299 sqlstmt
->setSqlConnection(con
);
301 sprintf(StmtStr
, "DELETE from csql_log_int where id=?;");
302 rv
= delstmt
->prepare(StmtStr
);
308 printError(ErrSysInternal
, "Statement prepare failed. TDB may be down");
313 rv
= targetconn
->beginTrans();
314 rv
= stmt
->execute(rows
);
316 printError(ErrSysInit
, "Unable to execute stmt in target db");
317 targetconn
->rollback();
325 while ( stmt
->fetch() != NULL
)
327 Util::trimEnd(tablename
);
328 logFiner(Conf::logger
, "Row value is Table:%s PK:%lld OP:%lld CID:%lld\n", tablename
, pkid
, op
,cId
);
330 if (op
== 2) { //DELETE
331 retVal
= remove(tablename
,pkid
, targetconn
, csqlstmt
, csqlcon
);
334 retVal
= insert(tablename
, pkid
, targetconn
, sqlstmt
, csqlstmt
, csqlcon
);
336 //targetconn->commit();
337 //rv = targetconn->beginTrans();
339 delstmt
->setIntParam(1, id
);
340 rv
= delstmt
->execute(rows
);
342 printf("log record not deleted from the target db %d\n", rv
);
343 targetconn
->rollback();
346 rv
= targetconn
->commit();
347 rv
= targetconn
->beginTrans();
354 targetconn
->rollback();
362 int insert(char *tablename
, long long pkid
, AbsSqlConnection
*targetconn
, SqlStatement
*sqlstmt
, AbsSqlStatement
*csqlstmt
, AbsSqlConnection
*csqlcon
)
364 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
365 stmt
->setConnection(targetconn
);
366 TDBInfo tdbname
= ((SqlOdbcConnection
*)targetconn
)->getTrDbName();
367 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
369 char pkfieldname
[128];
370 DbRetVal rv
=getCacheField(tablename
, pkfieldname
);
372 ostmt
->getPrimaryKeyFieldName(tablename
, pkfieldname
);
374 //Util::str_tolower(pkfieldname);
375 char fieldlist
[IDENTIFIER_LENGTH
];
376 char condition
[IDENTIFIER_LENGTH
];
378 rv
=getCacheProjField(tablename
,fieldlist
);
380 rv
=getCacheCondition(tablename
,condition
);
382 sprintf(sbuf
, "SELECT * FROM %s where %s = %lld;", tablename
, pkfieldname
, pkid
);
384 sprintf(sbuf
, "SELECT * FROM %s where %s = %lld and %s ;", tablename
, pkfieldname
, pkid
,condition
);
387 rv
=getCacheCondition(tablename
,condition
);
389 sprintf(sbuf
, "SELECT %s FROM %s where %s = %lld;",fieldlist
,tablename
, pkfieldname
, pkid
);
391 sprintf(sbuf
, "SELECT %s FROM %s where %s = %lld and %s;",fieldlist
,tablename
, pkfieldname
, pkid
,condition
);
394 //TODO::get the primary key field name from the table interface. need to implement it
395 //printf("InsertString: %s\n", sbuf);
396 rv
= stmt
->prepare(sbuf
);
397 if (rv
!= OK
) return 2;
400 sprintf(ptr
,"INSERT INTO %s VALUES(", tablename
);
402 bool firstFld
= true;
403 List fNameList
= sqlstmt
->getFieldNameList(tablename
);
404 int noOfFields
= fNameList
.size();
405 while (noOfFields
--) {
408 sprintf(ptr
,"?", tablename
);
417 //printf("insert stmt: '%s'\n", insStmt);
419 rv
= csqlstmt
->prepare(insStmt
);
420 if (rv
!= OK
) { return 2; }
422 ListIterator fNameIter
= fNameList
.getIterator();
423 FieldInfo
*info
= new FieldInfo();
424 int fcount
=0; void *valBuf
; int fieldsize
=0;
425 void *buf
[128];//TODO:resticts to support only 128 fields in table
426 for (int i
=0; i
< 128; i
++) buf
[i
]= NULL
;
428 Identifier
*elem
= NULL
;
429 BindBuffer
*bBuf
= NULL
;
430 while (fNameIter
.hasElement()) {
431 elem
= (Identifier
*) fNameIter
.nextElement();
432 sqlstmt
->getFieldInfo(tablename
, (const char*)elem
->name
, info
);
433 valBuf
= AllDataType::alloc(info
->type
, info
->length
+1);
434 os::memset(valBuf
,0,info
->length
);
435 bBuf
= (BindBuffer
*) fillBindBuffer(tdbname
, info
->type
, valBuf
, info
->length
);
436 if (info
->type
== typeString
) {
438 valBufList
.append(bBuf
);
439 dType
[fcount
] = info
->type
;
440 buf
[fcount
] = valBuf
;
441 stmt
->bindField(fcount
+1, buf
[fcount
]);
446 int retValue
= stmt
->execute(rows
);
447 if (retValue
&& rows
!= 1) {
448 printError(ErrSysInit
, "Unable to execute statement at target db\n");
451 ListIterator bindIter
= valBufList
.getIterator();
452 if (stmt
->fetch() != NULL
) {
453 ostmt
->setNullInfo(csqlstmt
);
454 if(tdbname
== postgres
){
455 for (int i
=0; i
< fcount
; i
++) {
456 if(dType
[i
] == typeString
) Util::trimRight((char *)buf
[i
]);
459 //setXXXParams to be called here
461 while (bindIter
.hasElement()) {
462 bBuf
= (BindBuffer
*) bindIter
.nextElement();
463 setParamValues(csqlstmt
, pos
++, bBuf
->type
, bBuf
->length
,
466 csqlcon
->beginTrans();
468 rv
= csqlstmt
->execute(rows
);
470 printf ("execute failed \n");
471 printf(" STMT: %s\n",insStmt
);
475 //printf("successfully inserted value with pkid = %d\n", pkid);
476 //Note:insert may fail if the record is inserted from this cache
478 //for (int i=0; i < fcount; i++) free(buf[i]);
479 ListIterator iter
= valBufList
.getIterator();
480 while (iter
.hasElement()){
481 bBuf
= (BindBuffer
*) iter
.nextElement();
482 if (bBuf
->type
== typeString
)
483 //printf("Values %x %x \n", bBuf->csql, bBuf->targetdb);
491 int remove(char *tablename
, long long pkid
, AbsSqlConnection
*targetconn
, AbsSqlStatement
*csqlstmt
,AbsSqlConnection
*csqlcon
)
495 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
496 stmt
->setConnection(targetconn
);
497 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
498 char pkfieldname
[128];
499 rv
=getCacheField(tablename
, pkfieldname
);
501 ostmt
->getPrimaryKeyFieldName(tablename
, pkfieldname
);
503 Util::str_tolower(pkfieldname
);
507 sprintf(delStmt
, "DELETE FROM %s where %s = %d", tablename
, pkfieldname
, pkid
);
508 //printf("delStmt is %s\n", delStmt);
509 rv
= csqlstmt
->prepare(delStmt
);
510 if (rv
!= OK
) { return 2; }
511 rv
= csqlcon
->beginTrans();
512 if (rv
!= OK
) return 2;
514 rv
= csqlstmt
->execute(rows
);
515 if (rv
!= OK
|| rows
!=1)
518 printf("Delete failed for stmt %s\n", delStmt
);
521 rv
= csqlcon
->commit();
526 void createCacheTableList()
529 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
531 printError(ErrSysInit
, "csqltable.conf file does not exist");
534 char tablename
[IDENTIFIER_LENGTH
];
535 char fieldname
[IDENTIFIER_LENGTH
];
536 char condition
[IDENTIFIER_LENGTH
];
537 char field
[IDENTIFIER_LENGTH
];
538 char dsnName
[IDENTIFIER_LENGTH
];
543 fscanf(fp
,"%d %s %s %s %s %s \n",&mode
,tablename
,fieldname
,condition
,field
,dsnName
);
544 CacheTableInfo
*cacheTable
=new CacheTableInfo();
545 cacheTable
->setTableName(tablename
);
546 cacheTable
->setFieldName(fieldname
);
547 cacheTable
->setProjFieldList(field
);
548 cacheTable
->setCondition(condition
);
549 cacheTableList
.append(cacheTable
);
551 // printf("Table %s is not cached\n",tabname);
556 DbRetVal
getCacheCondition(char *tblName
,char *condition
)
558 ListIterator iter
=cacheTableList
.getIterator();
559 CacheTableInfo
*cacheTable
;
560 while(iter
.hasElement())
562 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
563 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
564 if(strcmp(cacheTable
->getCondition(),"NULL")!=0)
566 strcpy(condition
,cacheTable
->getCondition());
574 DbRetVal
getCacheProjField(char *tblName
,char *fieldlist
)
576 ListIterator iter
=cacheTableList
.getIterator();
577 CacheTableInfo
*cacheTable
;
578 while(iter
.hasElement())
580 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
581 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
582 if(strcmp(cacheTable
->getProjFieldList(),"NULL")!=0)
584 strcpy(fieldlist
,cacheTable
->getProjFieldList());
591 DbRetVal
getCacheField(char *tblName
,char *fldName
)
593 ListIterator iter
=cacheTableList
.getIterator();
594 CacheTableInfo
*cacheTable
;
595 while(iter
.hasElement())
597 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
598 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
599 if(strcmp(cacheTable
->getFieldName(),"NULL")!=0)
601 strcpy(fldName
,cacheTable
->getFieldName());
610 void setParamValues(AbsSqlStatement
*stmt
, int parampos
, DataType type
, int length
, char *value
)
615 stmt
->setIntParam(parampos
, *(int*)value
);
618 stmt
->setLongParam(parampos
, *(long*)value
);
621 stmt
->setLongLongParam(parampos
, *(long long*)value
);
624 stmt
->setShortParam(parampos
, *(short*)value
);
627 stmt
->setByteIntParam(parampos
, *(char*)value
);
630 stmt
->setDoubleParam(parampos
, *(double*)value
);
633 stmt
->setFloatParam(parampos
, *(float*)value
);
636 stmt
->setDateParam(parampos
, *(Date
*)value
);
639 stmt
->setTimeParam(parampos
, *(Time
*)value
);
642 stmt
->setTimeStampParam(parampos
, *(TimeStamp
*)value
);
646 char *d
=(char*)value
;
648 stmt
->setStringParam(parampos
, (char*)value
);
652 stmt
->setBinaryParam(parampos
, (char *) value
, length
);
658 void *fillBindBuffer(TDBInfo tdbName
, DataType type
, void *valBuf
, int length
)
660 BindBuffer
*bBuf
= NULL
;
664 bBuf
= new BindBuffer();
666 bBuf
->type
= typeDate
;
667 bBuf
->length
= sizeof(DATE_STRUCT
);
668 bBuf
->targetdb
= malloc(bBuf
->length
);
669 memset(bBuf
->targetdb
, 0, bBuf
->length
);
670 valBuf
= bBuf
->targetdb
;
673 bBuf
= new BindBuffer();
675 bBuf
->type
= typeTime
;
676 bBuf
->length
= sizeof(TIME_STRUCT
);
677 bBuf
->targetdb
= malloc(bBuf
->length
);
678 memset(bBuf
->targetdb
, 0, bBuf
->length
);
679 valBuf
= bBuf
->targetdb
;
682 bBuf
= new BindBuffer();
684 bBuf
->type
= typeTimeStamp
;
685 bBuf
->length
= sizeof(TIMESTAMP_STRUCT
);
686 bBuf
->targetdb
= malloc(bBuf
->length
);
687 memset(bBuf
->targetdb
, 0, bBuf
->length
);
688 valBuf
= bBuf
->targetdb
;
692 if( tdbName
== postgres
)
694 bBuf
= new BindBuffer();
695 bBuf
->type
= typeLongLong
;
698 bBuf
->targetdb
= AllDataType::alloc(typeString
,bBuf
->length
);
699 memset(bBuf
->targetdb
, 0, bBuf
->length
);
700 valBuf
= bBuf
->targetdb
;
704 bBuf
= new BindBuffer();
707 bBuf
->length
= length
;
712 if( tdbName
== postgres
)
714 bBuf
= new BindBuffer();
715 bBuf
->type
= typeString
;
717 bBuf
->length
= length
+1;
721 bBuf
= new BindBuffer();
724 bBuf
->length
= length
;