1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlLogConnection.h>
19 #include <SqlOdbcStatement.h>
20 #include <SqlFactory.h>
21 #include <SqlConnection.h>
22 #include <SqlStatement.h>
24 #include <CacheTableLoader.h>
26 // List which keeps all DS Information.
29 char dsn
[IDENTIFIER_LENGTH
];
30 char user
[IDENTIFIER_LENGTH
];
31 char pwd
[IDENTIFIER_LENGTH
];
32 char tdb
[IDENTIFIER_LENGTH
];
33 struct MultiThreadDSN
*next
;
36 int insert(char *table
, long long pkid
, AbsSqlConnection
*targetconn
, SqlStatement
*sqlstmt
, AbsSqlStatement
*csqlstmt
, AbsSqlConnection
*csqlcon
);
37 int remove(char *table
, long long pkid
, AbsSqlConnection
*targetconn
, AbsSqlStatement
*csqlstmt
,AbsSqlConnection
*csqlcon
);
38 int getRecordsFromTargetDb(AbsSqlConnection
*targetconn
, AbsSqlConnection
*csqlcon
,AbsSqlStatement
*csqlstmt
, SqlConnection
*con
, SqlStatement
*sqlstmt
);
39 void createCacheTableList();
40 DbRetVal
getCacheField(char *tblName
,char *fldName
);
41 DbRetVal
getCacheProjField(char *tblName
,char *fielflist
);
42 DbRetVal
getCacheCondition(char *tblName
,char *condition
);
43 void setParamValues(AbsSqlStatement
*stmt
, int parampos
, DataType type
, int length
, char *value
);
44 void *fillBindBuffer(TDBInfo tName
, DataType type
, void *valBuf
, int length
=0);
47 static void sigTermHandler(int sig
)
49 printf("Received signal %d\nStopping the server\n", sig
);
55 printf("Usage: csqlcacheserver \n");
56 printf("Description: Start the csql caching server.\n");
60 AbsSqlConnection
*csqlcon
= NULL
;
67 char ds
[IDENTIFIER_LENGTH
];
68 char targetDb
[IDENTIFIER_LENGTH
];
69 char userName
[IDENTIFIER_LENGTH
];
70 char pwdName
[IDENTIFIER_LENGTH
];
71 MultiDsnThread() { ds
[0]='\0'; targetDb
[0]='\0'; userName
[0]='\0'; pwdName
[0]='\0';}
74 void *startThread(void *p
);// Function is used for Thread
75 MultiDsnThread
**multiDsnInput
;
78 int main(int argc
, char **argv
)
81 while ((c
= getopt(argc
, argv
, "?")) != EOF
)
85 case '?' : { opt
= 10; break; } //print help
96 os::signal(SIGINT
, sigTermHandler
);
97 os::signal(SIGTERM
, sigTermHandler
);
99 csqlcon
= SqlFactory::createConnection(CSqlLog
);
100 SqlLogConnection
*logConn
= (SqlLogConnection
*) csqlcon
;
101 logConn
->setNoMsgLog(true);
102 rv
= csqlcon
->connect(I_USER
, I_PASS
);
103 if (rv
!= OK
) return NULL
;
105 // Reading "csqlds.conf file"
107 fp
= fopen(Conf::config
.getDsConfigFile(),"r");
109 printError(ErrSysInit
,"csqlds.conf file does not exist");
112 struct MultiThreadDSN
*head
=NULL
, *pnode
=NULL
;
114 char dsnname
[IDENTIFIER_LENGTH
];dsnname
[0]='\0';
115 char tdbname
[IDENTIFIER_LENGTH
];tdbname
[0] = '\0';
116 char username
[IDENTIFIER_LENGTH
];username
[0]='\0';
117 char password
[IDENTIFIER_LENGTH
];password
[0]='\0';
122 struct MultiThreadDSN
*multiDsn
= new struct MultiThreadDSN
;
123 fscanf(fp
,"%s %s %s %s\n",dsnname
,username
,password
,tdbname
);
125 strcpy(multiDsn
->dsn
,dsnname
);
126 strcpy(multiDsn
->user
,username
);
127 strcpy(multiDsn
->pwd
,password
);
128 strcpy(multiDsn
->tdb
,tdbname
);
131 if(pnode
==NULL
) {head
=multiDsn
; pnode
=multiDsn
;}
132 else { pnode
->next
=multiDsn
; pnode
=pnode
->next
; }
136 // Declare number of thread
137 pthread_t
*thrId
=new pthread_t
[totalDsn
];
138 multiDsnInput
= (MultiDsnThread
**) malloc (sizeof(MultiDsnThread
*) * totalDsn
);
141 //Traversing the list
143 while(pnode
!= NULL
){
144 multiDsnInput
[i
] = new MultiDsnThread();
145 strcpy(multiDsnInput
[i
]->ds
,pnode
->dsn
);
146 strcpy(multiDsnInput
[i
]->targetDb
,pnode
->tdb
);
147 strcpy(multiDsnInput
[i
]->userName
,pnode
->user
);
148 strcpy(multiDsnInput
[i
]->pwdName
,pnode
->pwd
);
151 pthread_create(&thrId
[i
], NULL
, &startThread
, multiDsnInput
[i
]);
157 for(int j
=0; j
<totalDsn
; j
++){
158 pthread_join(thrId
[j
], NULL
);
161 printf("Cache Server Exiting\n");
162 cacheTableList
.reset();
163 csqlcon
->disconnect();
164 // targetconn->disconnect();
165 // printf("Out of main\n");
171 // Function for THreads
172 void *startThread(void *thrInfo
)
175 AbsSqlConnection
*targetconn
;
176 //AbsSqlConnection *csqlcon = NULL;
177 AbsSqlStatement
*csqlstmt
= NULL
;
178 SqlConnection
*con
= NULL
;
179 SqlStatement
*sqlstmt
= NULL
;
182 MultiDsnThread
*multiDsnInput
= (MultiDsnThread
*)thrInfo
;
183 /* csqlcon = SqlFactory::createConnection(CSqlLog);
184 SqlLogConnection *logConn = (SqlLogConnection *) csqlcon;
185 logConn->setNoMsgLog(true);
186 rv = csqlcon->connect(I_USER, I_PASS);
187 if (rv != OK) return NULL;
189 targetconn
= SqlFactory::createConnection(CSqlAdapter
);
190 SqlOdbcConnection
*dsnAda
= (SqlOdbcConnection
*)targetconn
;
191 dsnAda
->setDsn(multiDsnInput
->ds
);//line added
192 rv
= targetconn
->connect(I_USER
, I_PASS
);
193 if (rv
!= OK
) return NULL
;
194 if (!Conf::config
.useCache())
196 printf("Cache is set to OFF in csql.conf file\n");
199 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
200 stmt
->setConnection(targetconn
);
201 csqlstmt
= SqlFactory::createStatement(CSqlLog
);
202 csqlstmt
->setConnection(csqlcon
);
205 struct stat ofstatus
,nfstatus
;
206 ret
=stat(Conf::config
.getTableConfigFile(),&ofstatus
);
207 struct timeval timeout
, tval
;
208 timeout
.tv_sec
= Conf::config
.getCacheWaitSecs();
210 createCacheTableList();
213 tval
.tv_sec
= timeout
.tv_sec
;
214 tval
.tv_usec
= timeout
.tv_usec
;
215 ret
= os::select(0, 0, 0, 0, &tval
);
216 printf("Checking for cache updates\n");
217 ret
=stat(Conf::config
.getTableConfigFile(),&nfstatus
);
218 if(ofstatus
.st_mtime
!= nfstatus
.st_mtime
)
220 cacheTableList
.reset();
221 createCacheTableList();
222 ofstatus
.st_mtime
= nfstatus
.st_mtime
;
224 if((ret
= getRecordsFromTargetDb( targetconn
, csqlcon
, csqlstmt
, con
, sqlstmt
)) == 1) {
229 //printf("Cache Server Exiting\n");
230 //cacheTableList.reset();
231 //csqlcon->disconnect();
232 targetconn
->disconnect();
236 int getRecordsFromTargetDb(AbsSqlConnection
*targetconn
, AbsSqlConnection
*csqlcon
,AbsSqlStatement
*csqlstmt
, SqlConnection
*con
, SqlStatement
*sqlstmt
)
240 long long op
=0, id
=0,cId
=0;
246 caId
=Conf::config
.getSiteID();
247 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
248 stmt
->setConnection(targetconn
);
249 AbsSqlStatement
*delstmt
= SqlFactory::createStatement(CSqlAdapter
);
250 delstmt
->setConnection(targetconn
);
251 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
252 sprintf(StmtStr
, "SELECT * FROM csql_log_int where cacheid = %d;", caId
);
253 rv
= stmt
->prepare(StmtStr
);
254 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
255 stmt
->bindField(1, tablename
);
256 stmt
->bindField(2, &pkid
);
257 stmt
->bindField(3, &op
);
258 stmt
->bindField(4, &cId
);
259 stmt
->bindField(5, &id
);
261 con
= (SqlConnection
*) csqlcon
->getInnerConnection();
262 sqlstmt
= (SqlStatement
*)csqlstmt
->getInnerStatement();
263 sqlstmt
->setSqlConnection(con
);
265 sprintf(StmtStr
, "DELETE from csql_log_int where id=?;");
266 rv
= delstmt
->prepare(StmtStr
);
277 rv
= targetconn
->beginTrans();
278 rv
= stmt
->execute(rows
);
280 printError(ErrSysInit
, "Unable to execute stmt in target db");
281 targetconn
->rollback();
289 while ( stmt
->fetch() != NULL
)
291 Util::trimEnd(tablename
);
292 printf("Row value is %s %lld %lld %lld\n", tablename
, pkid
, op
,cId
);
294 if (op
== 2) { //DELETE
295 retVal
= remove(tablename
,pkid
, targetconn
, csqlstmt
, csqlcon
);
298 retVal
= insert(tablename
, pkid
, targetconn
, sqlstmt
, csqlstmt
, csqlcon
);
300 //targetconn->commit();
301 //rv = targetconn->beginTrans();
303 delstmt
->setIntParam(1, id
);
304 rv
= delstmt
->execute(rows
);
306 printf("log record not deleted from the target db %d\n", rv
);
307 targetconn
->rollback();
310 rv
= targetconn
->commit();
311 rv
= targetconn
->beginTrans();
318 targetconn
->rollback();
326 int insert(char *tablename
, long long pkid
, AbsSqlConnection
*targetconn
, SqlStatement
*sqlstmt
, AbsSqlStatement
*csqlstmt
, AbsSqlConnection
*csqlcon
)
328 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
329 stmt
->setConnection(targetconn
);
330 TDBInfo tdbname
= ((SqlOdbcConnection
*)targetconn
)->getTrDbName();
331 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
333 char pkfieldname
[128];
334 DbRetVal rv
=getCacheField(tablename
, pkfieldname
);
336 ostmt
->getPrimaryKeyFieldName(tablename
, pkfieldname
);
338 //Util::str_tolower(pkfieldname);
339 char fieldlist
[IDENTIFIER_LENGTH
];
340 char condition
[IDENTIFIER_LENGTH
];
342 rv
=getCacheProjField(tablename
,fieldlist
);
344 rv
=getCacheCondition(tablename
,condition
);
346 sprintf(sbuf
, "SELECT * FROM %s where %s = %lld;", tablename
, pkfieldname
, pkid
);
348 sprintf(sbuf
, "SELECT * FROM %s where %s = %lld and %s ;", tablename
, pkfieldname
, pkid
,condition
);
351 rv
=getCacheCondition(tablename
,condition
);
353 sprintf(sbuf
, "SELECT %s FROM %s where %s = %lld;",fieldlist
,tablename
, pkfieldname
, pkid
);
355 sprintf(sbuf
, "SELECT %s FROM %s where %s = %lld and %s;",fieldlist
,tablename
, pkfieldname
, pkid
,condition
);
358 //TODO::get the primary key field name from the table interface. need to implement it
359 //printf("InsertString: %s\n", sbuf);
360 rv
= stmt
->prepare(sbuf
);
361 if (rv
!= OK
) return 2;
364 sprintf(ptr
,"INSERT INTO %s VALUES(", tablename
);
366 bool firstFld
= true;
367 List fNameList
= sqlstmt
->getFieldNameList(tablename
);
368 int noOfFields
= fNameList
.size();
369 while (noOfFields
--) {
372 sprintf(ptr
,"?", tablename
);
381 //printf("insert stmt: '%s'\n", insStmt);
383 rv
= csqlstmt
->prepare(insStmt
);
384 if (rv
!= OK
) { return 2; }
386 ListIterator fNameIter
= fNameList
.getIterator();
387 FieldInfo
*info
= new FieldInfo();
388 int fcount
=0; void *valBuf
; int fieldsize
=0;
389 void *buf
[128];//TODO:resticts to support only 128 fields in table
390 for (int i
=0; i
< 128; i
++) buf
[i
]= NULL
;
392 Identifier
*elem
= NULL
;
393 BindBuffer
*bBuf
= NULL
;
394 while (fNameIter
.hasElement()) {
395 elem
= (Identifier
*) fNameIter
.nextElement();
396 sqlstmt
->getFieldInfo(tablename
, (const char*)elem
->name
, info
);
397 valBuf
= AllDataType::alloc(info
->type
, info
->length
+1);
398 os::memset(valBuf
,0,info
->length
);
399 bBuf
= (BindBuffer
*) fillBindBuffer(tdbname
, info
->type
, valBuf
, info
->length
);
400 if (info
->type
== typeString
) {
402 valBufList
.append(bBuf
);
403 dType
[fcount
] = info
->type
;
404 buf
[fcount
] = valBuf
;
405 stmt
->bindField(fcount
+1, buf
[fcount
]);
410 int retValue
= stmt
->execute(rows
);
411 if (retValue
&& rows
!= 1) {
412 printError(ErrSysInit
, "Unable to execute statement at target db\n");
415 ListIterator bindIter
= valBufList
.getIterator();
416 if (stmt
->fetch() != NULL
) {
417 ostmt
->setNullInfo(csqlstmt
);
418 if(tdbname
== postgres
){
419 for (int i
=0; i
< fcount
; i
++) {
420 if(dType
[i
] == typeString
) Util::trimRight((char *)buf
[i
]);
423 //setXXXParams to be called here
425 while (bindIter
.hasElement()) {
426 bBuf
= (BindBuffer
*) bindIter
.nextElement();
427 setParamValues(csqlstmt
, pos
++, bBuf
->type
, bBuf
->length
,
430 csqlcon
->beginTrans();
432 rv
= csqlstmt
->execute(rows
);
434 printf ("execute failed \n");
435 printf(" STMT: %s\n",insStmt
);
439 //printf("successfully inserted value with pkid = %d\n", pkid);
440 //Note:insert may fail if the record is inserted from this cache
442 //for (int i=0; i < fcount; i++) free(buf[i]);
443 ListIterator iter
= valBufList
.getIterator();
444 while (iter
.hasElement()){
445 bBuf
= (BindBuffer
*) iter
.nextElement();
446 if (bBuf
->type
== typeString
)
447 //printf("Values %x %x \n", bBuf->csql, bBuf->targetdb);
455 int remove(char *tablename
, long long pkid
, AbsSqlConnection
*targetconn
, AbsSqlStatement
*csqlstmt
,AbsSqlConnection
*csqlcon
)
459 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
460 stmt
->setConnection(targetconn
);
461 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
462 char pkfieldname
[128];
463 rv
=getCacheField(tablename
, pkfieldname
);
465 ostmt
->getPrimaryKeyFieldName(tablename
, pkfieldname
);
467 Util::str_tolower(pkfieldname
);
471 sprintf(delStmt
, "DELETE FROM %s where %s = %d", tablename
, pkfieldname
, pkid
);
472 //printf("delStmt is %s\n", delStmt);
473 rv
= csqlstmt
->prepare(delStmt
);
474 if (rv
!= OK
) { return 2; }
475 rv
= csqlcon
->beginTrans();
476 if (rv
!= OK
) return 2;
478 rv
= csqlstmt
->execute(rows
);
479 if (rv
!= OK
|| rows
!=1)
482 printf("Delete failed for stmt %s\n", delStmt
);
485 rv
= csqlcon
->commit();
490 void createCacheTableList()
493 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
495 printError(ErrSysInit
, "cachetable.conf file does not exist");
498 char tablename
[IDENTIFIER_LENGTH
];
499 char fieldname
[IDENTIFIER_LENGTH
];
500 char condition
[IDENTIFIER_LENGTH
];
501 char field
[IDENTIFIER_LENGTH
];
502 char dsnName
[IDENTIFIER_LENGTH
];
507 fscanf(fp
,"%d %s %s %s %s %s \n",&mode
,tablename
,fieldname
,condition
,field
,dsnName
);
508 CacheTableInfo
*cacheTable
=new CacheTableInfo();
509 cacheTable
->setTableName(tablename
);
510 cacheTable
->setFieldName(fieldname
);
511 cacheTable
->setProjFieldList(field
);
512 cacheTable
->setCondition(condition
);
513 cacheTableList
.append(cacheTable
);
515 // printf("Table %s is not cached\n",tabname);
520 DbRetVal
getCacheCondition(char *tblName
,char *condition
)
522 ListIterator iter
=cacheTableList
.getIterator();
523 CacheTableInfo
*cacheTable
;
524 while(iter
.hasElement())
526 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
527 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
528 if(strcmp(cacheTable
->getCondition(),"NULL")!=0)
530 strcpy(condition
,cacheTable
->getCondition());
538 DbRetVal
getCacheProjField(char *tblName
,char *fieldlist
)
540 ListIterator iter
=cacheTableList
.getIterator();
541 CacheTableInfo
*cacheTable
;
542 while(iter
.hasElement())
544 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
545 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
546 if(strcmp(cacheTable
->getProjFieldList(),"NULL")!=0)
548 strcpy(fieldlist
,cacheTable
->getProjFieldList());
555 DbRetVal
getCacheField(char *tblName
,char *fldName
)
557 ListIterator iter
=cacheTableList
.getIterator();
558 CacheTableInfo
*cacheTable
;
559 while(iter
.hasElement())
561 cacheTable
=(CacheTableInfo
*)iter
.nextElement();
562 if(strcmp(cacheTable
->getTableName(),tblName
)==0){
563 if(strcmp(cacheTable
->getFieldName(),"NULL")!=0)
565 strcpy(fldName
,cacheTable
->getFieldName());
574 void setParamValues(AbsSqlStatement
*stmt
, int parampos
, DataType type
, int length
, char *value
)
579 stmt
->setIntParam(parampos
, *(int*)value
);
582 stmt
->setLongParam(parampos
, *(long*)value
);
585 stmt
->setLongLongParam(parampos
, *(long long*)value
);
588 stmt
->setShortParam(parampos
, *(short*)value
);
591 stmt
->setByteIntParam(parampos
, *(char*)value
);
594 stmt
->setDoubleParam(parampos
, *(double*)value
);
597 stmt
->setFloatParam(parampos
, *(float*)value
);
600 stmt
->setDateParam(parampos
, *(Date
*)value
);
603 stmt
->setTimeParam(parampos
, *(Time
*)value
);
606 stmt
->setTimeStampParam(parampos
, *(TimeStamp
*)value
);
610 char *d
=(char*)value
;
612 stmt
->setStringParam(parampos
, (char*)value
);
616 stmt
->setBinaryParam(parampos
, (char *) value
, length
);
622 void *fillBindBuffer(TDBInfo tdbName
, DataType type
, void *valBuf
, int length
)
624 BindBuffer
*bBuf
= NULL
;
628 bBuf
= new BindBuffer();
630 bBuf
->type
= typeDate
;
631 bBuf
->length
= sizeof(DATE_STRUCT
);
632 bBuf
->targetdb
= malloc(bBuf
->length
);
633 memset(bBuf
->targetdb
, 0, bBuf
->length
);
634 valBuf
= bBuf
->targetdb
;
637 bBuf
= new BindBuffer();
639 bBuf
->type
= typeTime
;
640 bBuf
->length
= sizeof(TIME_STRUCT
);
641 bBuf
->targetdb
= malloc(bBuf
->length
);
642 memset(bBuf
->targetdb
, 0, bBuf
->length
);
643 valBuf
= bBuf
->targetdb
;
646 bBuf
= new BindBuffer();
648 bBuf
->type
= typeTimeStamp
;
649 bBuf
->length
= sizeof(TIMESTAMP_STRUCT
);
650 bBuf
->targetdb
= malloc(bBuf
->length
);
651 memset(bBuf
->targetdb
, 0, bBuf
->length
);
652 valBuf
= bBuf
->targetdb
;
656 if( tdbName
== postgres
)
658 bBuf
= new BindBuffer();
659 bBuf
->type
= typeLongLong
;
662 bBuf
->targetdb
= AllDataType::alloc(typeString
,bBuf
->length
);
663 memset(bBuf
->targetdb
, 0, bBuf
->length
);
664 valBuf
= bBuf
->targetdb
;
668 bBuf
= new BindBuffer();
671 bBuf
->length
= length
;
676 if( tdbName
== postgres
)
678 bBuf
= new BindBuffer();
679 bBuf
->type
= typeString
;
681 bBuf
->length
= length
+1;
685 bBuf
= new BindBuffer();
688 bBuf
->length
= length
;