1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlOdbcStatement.h>
19 #include <SqlFactory.h>
22 int insert(Table
*table
, int pkid
);
23 int remove(Table
*table
, int pkid
);
24 int getRecordsFromTargetDb(int mode
);
27 static void sigTermHandler(int sig
)
29 printf("Received signal %d\nStopping the server\n", sig
);
35 printf("Usage: csqlcacheserver \n");
36 printf("Description: Start the csql caching server.\n");
39 AbsSqlConnection
*targetconn
;
41 int main(int argc
, char **argv
)
44 while ((c
= getopt(argc
, argv
, "?")) != EOF
)
48 case '?' : { opt
= 10; break; } //print help
59 os::signal(SIGINT
, sigTermHandler
);
60 os::signal(SIGTERM
, sigTermHandler
);
62 DbRetVal rv
= conn
.open("root", "manager");
63 if (rv
!= OK
) return 1;
64 targetconn
= SqlFactory::createConnection(CSqlAdapter
);
65 rv
= targetconn
->connect("root", "manager");
66 if (rv
!= OK
) return 1;
67 if (!Conf::config
.useCache())
69 printf("Cache is set to OFF in csql.conf file\n");
72 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
73 stmt
->setConnection(targetconn
);
74 /*rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
75 targetconn->beginTrans();
82 printf("Cache server started\n");
84 struct timeval timeout
, tval
;
85 timeout
.tv_sec
= Conf::config
.getCacheWaitSecs();
90 tval
.tv_sec
= timeout
.tv_sec
;
91 tval
.tv_usec
= timeout
.tv_usec
;
92 ret
= os::select(0, NULL
, 0, 0, &tval
);
93 printf("Checking for cache updates\n");
94 ret
= getRecordsFromTargetDb(1);
95 if (ret
!=0) srvStop
= 1;
96 //ret = getRecordsFromTargetDb(2);
97 if (ret
!=0) srvStop
= 1;
99 printf("Cache Server Exiting\n");
101 targetconn
->disconnect();
104 int getRecordsFromTargetDb(int mode
)
111 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
112 stmt
->setConnection(targetconn
);
113 AbsSqlStatement
*delstmt
= SqlFactory::createStatement(CSqlAdapter
);
114 delstmt
->setConnection(targetconn
);
116 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
117 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
118 rv
= stmt
->prepare("SELECT * FROM csql_log_int;");
121 rv
= stmt
->prepare("SELECT * FROM csql_log_char;");
122 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
123 //rv = delstmt->prepare("DELETE from csql_log_char where id=?;");
125 if (rv
!= OK
) {printf("Stmt prepare failed\n"); return 1; }
126 stmt
->bindField(1, tablename
);
127 stmt
->bindField(2, &pkid
);
128 stmt
->bindField(3, &op
);
129 stmt
->bindField(4, &id
);
130 DatabaseManager
*dbMgr
= conn
.getDatabaseManager();
131 char delStmtStr
[1024];
133 rv
= targetconn
->beginTrans();
134 rv
= stmt
->execute(rows
);
137 printError(ErrSysInit
, "Unable to execute stmt in target db");
138 targetconn
->rollback();
145 if (stmt
->fetch() != NULL
) {
146 printf("Row value is %s %d %d\n", tablename
, pkid
, op
);
147 Table
*table
= dbMgr
->openTable(tablename
);
151 printError(ErrSysInit
, "Table %s not exist in csql", tablename
);
152 targetconn
->rollback();
161 ret
= remove(table
,pkid
);
165 ret
= insert(table
, pkid
);
167 dbMgr
->closeTable(table
);
168 rv
= targetconn
->commit();
169 rv
= targetconn
->beginTrans();
170 //Remove record from csql_log_XXX table
171 sprintf(delStmtStr
, "DELETE from csql_log_int where id=%d ;", id
);
172 rv
= delstmt
->prepare(delStmtStr
);
173 if (rv
!= OK
) {printf("FAILED\n"); return 1; }
174 // delstmt->setIntParam(1, id);
175 rv
= delstmt
->execute(rows
);
178 printf("log record not deleted from the target db %d\n", rv
);
179 targetconn
->rollback();
187 rv
= targetconn
->commit();
201 int insert(Table
*table
, int pkid
)
203 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
204 stmt
->setConnection(targetconn
);
205 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
206 char pkfieldname
[128];
207 ostmt
->getPrimaryKeyFieldName(table
->getName(), pkfieldname
);
209 sprintf(sbuf
, "SELECT * FROM %s where %s = %d;", table
->getName(), pkfieldname
, pkid
);
210 //TODO::get the primary key field name from the table interface. need to implement it
211 DbRetVal rv
= stmt
->prepare(sbuf
);
212 if (rv
!= OK
) return 1;
214 List fNameList
= table
->getFieldNameList();
215 ListIterator fNameIter
= fNameList
.getIterator();
216 FieldInfo
*info
= new FieldInfo();
217 int fcount
=1; void *valBuf
; int fieldsize
=0;
218 void *buf
[128];//TODO:resticts to support only 128 fields in table
219 Identifier
*elem
= NULL
;
220 while (fNameIter
.hasElement()) {
221 elem
= (Identifier
*) fNameIter
.nextElement();
222 table
->getFieldInfo((const char*)elem
->name
, info
);
223 valBuf
= AllDataType::alloc(info
->type
, info
->length
);
224 buf
[fcount
] = valBuf
;
225 table
->bindFld(elem
->name
, valBuf
);
226 stmt
->bindField(fcount
++, valBuf
);
231 int retValue
= stmt
->execute(rows
);
232 if (retValue
&& rows
!= 1) {printError(ErrSysInit
, "Unable to execute statement at target db\n"); return ErrSysInit
; }
233 conn
.startTransaction();
234 if (stmt
->fetch() != NULL
) {
235 table
->insertTuple();
236 //Note:insert may fail if the record is inserted from this cache
238 for (int i
=1; i
< fcount
; i
++) {
246 int remove(Table
*table
, int pkid
)
249 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlAdapter
);
250 stmt
->setConnection(targetconn
);
251 SqlOdbcStatement
*ostmt
= (SqlOdbcStatement
*) stmt
;
252 char pkfieldname
[128];
253 ostmt
->getPrimaryKeyFieldName(table
->getName(), pkfieldname
);
256 p1
.setTerm(pkfieldname
, OpEquals
, &pkid
);
257 table
->setCondition(&p1
);
258 rv
= conn
.startTransaction();
259 if (rv
!= OK
) return 1;
260 rv
= table
->execute();
263 table
->setCondition(NULL
);
267 if (table
->fetch() != NULL
)
268 rv
= table
->deleteTuple();
269 //Note:Delete may fail if the record is deleted from this cache
270 table
->setCondition(NULL
);
272 if (rv
!= OK
) return 1;