two way caching changes related to config, deamon
[csql.git] / src / tools / csqlcacheserver.cxx
blob978d54e00420ca4cdb37f2b3a86b527f1c90693e
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlOdbcStatement.h>
19 #include <SqlFactory.h>
20 #include <CSql.h>
22 int insert(Table *table, int pkid);
23 int remove(Table *table, int pkid);
24 int getRecordsFromTargetDb(int mode);
26 int srvStop =0;
27 static void sigTermHandler(int sig)
29 printf("Received signal %d\nStopping the server\n", sig);
30 srvStop = 1;
33 void printUsage()
35 printf("Usage: csqlcacheserver \n");
36 printf("Description: Start the csql caching server.\n");
37 return;
39 AbsSqlConnection *targetconn;
40 Connection conn;
41 int main(int argc, char **argv)
43 int c = 0, opt = 0;
44 while ((c = getopt(argc, argv, "?")) != EOF)
46 switch (c)
48 case '?' : { opt = 10; break; } //print help
49 default: opt=10;
52 }//while options
54 if (opt == 10) {
55 printUsage();
56 return 0;
59 os::signal(SIGINT, sigTermHandler);
60 os::signal(SIGTERM, sigTermHandler);
62 targetconn = SqlFactory::createConnection(CSqlAdapter);
63 DbRetVal rv = targetconn->connect("root", "manager");
64 if (rv != OK) return 1;
65 rv = conn.open("root", "manager");
66 if (rv != OK) return 1;
67 if (!Conf::config.useCache())
69 printf("Cache is set to OFF in csql.conf file\n");
70 return 1;
72 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
73 stmt->setConnection(targetconn);
74 rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
75 targetconn->beginTrans();
76 int rows=0;
77 stmt->execute(rows);
78 targetconn->commit();
79 stmt->free();
80 delete stmt;
82 printf("Cache server started\n");
83 int ret = 0;
84 struct timeval timeout, tval;
85 timeout.tv_sec = Conf::config.getCacheWaitSecs();
86 timeout.tv_usec = 0;
88 while(!srvStop)
90 tval.tv_sec = timeout.tv_sec;
91 tval.tv_usec = timeout.tv_usec;
92 ret = os::select(0, NULL, 0, 0, &tval);
93 printf("Getting the updates\n");
94 ret = getRecordsFromTargetDb(1);
95 if (ret !=0) srvStop = 1;
96 //ret = getRecordsFromTargetDb(2);
97 if (ret !=0) srvStop = 1;
99 printf("Cache Server Exiting\n");
100 conn.close();
101 targetconn->disconnect();
102 return 0;
104 int getRecordsFromTargetDb(int mode)
106 int pkid;
107 char tablename[64];
108 int op, id;
109 int rows =0;
110 DbRetVal rv = OK;
111 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
112 stmt->setConnection(targetconn);
113 AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter);
114 delstmt->setConnection(targetconn);
115 if (mode == 1 ) {
116 rv = stmt->prepare("SELECT * FROM csql_log_int;");
117 rv = delstmt->prepare("DELETE FROM csql_log_int where id =?;");
119 else {
120 rv = stmt->prepare("SELECT * FROM csql_log_char;");
121 rv = delstmt->prepare("DELETE FROM csql_log_char where id =?;");
123 if (rv != OK) return 1;
124 stmt->bindField(1, tablename);
125 stmt->bindField(2, &pkid);
126 stmt->bindField(3, &op);
127 stmt->bindField(4, &id);
128 DatabaseManager *dbMgr = conn.getDatabaseManager();
129 while(true) {
130 rv = targetconn->beginTrans();
131 rv = stmt->execute(rows);
132 if (rv != OK)
134 printError(ErrSysInit, "Unable to execute stmt in target db");
135 targetconn->rollback();
136 stmt->free();
137 delstmt->free();
138 delete stmt;
139 delete delstmt;
140 return 1;
142 if (stmt->fetch() != NULL) {
143 printf("Row value is %s %d %d\n", tablename, pkid, op);
144 Table *table = dbMgr->openTable(tablename);
145 int ret = 0;
146 if (table == NULL)
148 printError(ErrSysInit, "Table %s not exist in csql", tablename);
149 targetconn->rollback();
150 stmt->free();
151 delstmt->free();
152 delete stmt;
153 delete delstmt;
154 break;
156 if (op == 2)//DELETE
158 ret = remove(table,pkid);
160 else //INSERT
162 ret = insert(table, pkid);
164 dbMgr->closeTable(table);
165 rv = targetconn->commit();
166 rv = targetconn->beginTrans();
167 //Remove record from csql_log_XXX table
168 delstmt->setIntParam(1, id);
169 rv = delstmt->execute(rows);
170 if (rv != OK)
172 printf("log record not deleted from the target db %d\n", rv);
173 targetconn->rollback();
174 stmt->free();
175 delstmt->free();
176 delete stmt;
177 delete delstmt;
180 rv = targetconn->commit();
181 //create table csql_log_int(tablename char(64), pkid int, op int, id int not null unique auto_increment);
183 else {
184 stmt->close();
185 break;
187 stmt->close();
189 stmt->free();
190 delstmt->free();
191 delete stmt;
192 delete delstmt;
193 return 0;
195 int insert(Table *table, int pkid)
197 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
198 stmt->setConnection(targetconn);
199 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
200 char pkfieldname[128];
201 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
202 char sbuf[1024];
203 sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid);
204 //TODO::get the primary key field name from the table interface. need to implement it
205 DbRetVal rv = stmt->prepare(sbuf);
206 if (rv != OK) return 1;
208 List fNameList = table->getFieldNameList();
209 ListIterator fNameIter = fNameList.getIterator();
210 FieldInfo *info = new FieldInfo();
211 int fcount =1; void *valBuf; int fieldsize=0;
212 void *buf[128];//TODO:resticts to support only 128 fields in table
213 Identifier *elem = NULL;
214 while (fNameIter.hasElement()) {
215 elem = (Identifier*) fNameIter.nextElement();
216 table->getFieldInfo((const char*)elem->name, info);
217 valBuf = AllDataType::alloc(info->type, info->length);
218 buf[fcount] = valBuf;
219 table->bindFld(elem->name, valBuf);
220 stmt->bindField(fcount++, valBuf);
223 delete info;
224 int rows=0;
225 int retValue = stmt->execute(rows);
226 if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; }
227 conn.startTransaction();
228 if (stmt->fetch() != NULL) {
229 table->insertTuple();
230 //Note:insert may fail if the record is inserted from this cache
232 for (int i=1; i < fcount; i++) {
233 free(buf[i]);
235 stmt->free();
236 delete stmt;
237 conn.commit();
238 return 0;
240 int remove(Table *table, int pkid)
242 DbRetVal rv = OK;
243 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
244 stmt->setConnection(targetconn);
245 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
246 char pkfieldname[128];
247 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
248 delete stmt;
249 Condition p1;
250 p1.setTerm(pkfieldname, OpEquals, &pkid);
251 table->setCondition(&p1);
252 rv = conn.startTransaction();
253 if (rv != OK) return 1;
254 rv = table->execute();
255 if (rv != OK)
257 table->setCondition(NULL);
258 conn.rollback();
259 return 1;
261 if (table->fetch() != NULL)
262 rv = table->deleteTuple();
263 //Note:Delete may fail if the record is deleted from this cache
264 table->setCondition(NULL);
265 rv = conn.commit();
266 if (rv != OK) return 1;
267 return 0;