2010603 insert not propagated to postgres
[csql.git] / src / tools / csqlcacheserver.cxx
blobcf9217fe7aedf855ddea09d7622f5ff90e026a14
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlOdbcStatement.h>
19 #include <SqlFactory.h>
20 #include <CSql.h>
22 int insert(Table *table, int pkid);
23 int remove(Table *table, int pkid);
24 int getRecordsFromTargetDb(int mode);
26 int srvStop =0;
27 static void sigTermHandler(int sig)
29 printf("Received signal %d\nStopping the server\n", sig);
30 srvStop = 1;
33 void printUsage()
35 printf("Usage: csqlcacheserver \n");
36 printf("Description: Start the csql caching server.\n");
37 return;
39 AbsSqlConnection *targetconn;
40 Connection conn;
41 int main(int argc, char **argv)
43 int c = 0, opt = 0;
44 while ((c = getopt(argc, argv, "?")) != EOF)
46 switch (c)
48 case '?' : { opt = 10; break; } //print help
49 default: opt=10;
52 }//while options
54 if (opt == 10) {
55 printUsage();
56 return 0;
59 os::signal(SIGINT, sigTermHandler);
60 os::signal(SIGTERM, sigTermHandler);
62 DbRetVal rv = conn.open("root", "manager");
63 if (rv != OK) return 1;
64 targetconn = SqlFactory::createConnection(CSqlAdapter);
65 rv = targetconn->connect("root", "manager");
66 if (rv != OK) return 1;
67 if (!Conf::config.useCache())
69 printf("Cache is set to OFF in csql.conf file\n");
70 return 1;
72 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
73 stmt->setConnection(targetconn);
74 /*rv = stmt->prepare("create table csql_log_int(tablename char(64), pkid int, operation int, id int not null unique auto_increment)engine='innodb';");
75 targetconn->beginTrans();
76 int rows=0;
77 stmt->execute(rows);
78 targetconn->commit();
79 stmt->free();
80 delete stmt;*/
82 printf("Cache server started\n");
83 int ret = 0;
84 struct timeval timeout, tval;
85 timeout.tv_sec = Conf::config.getCacheWaitSecs();
86 timeout.tv_usec = 0;
88 while(!srvStop)
90 tval.tv_sec = timeout.tv_sec;
91 tval.tv_usec = timeout.tv_usec;
92 ret = os::select(0, NULL, 0, 0, &tval);
93 printf("Checking for cache updates\n");
94 ret = getRecordsFromTargetDb(1);
95 if (ret !=0) srvStop = 1;
96 //ret = getRecordsFromTargetDb(2);
97 if (ret !=0) srvStop = 1;
99 printf("Cache Server Exiting\n");
100 conn.close();
101 targetconn->disconnect();
102 return 0;
104 int getRecordsFromTargetDb(int mode)
106 int pkid;
107 char tablename[64];
108 int op, id;
109 int rows =0;
110 DbRetVal rv = OK;
111 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
112 stmt->setConnection(targetconn);
113 AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter);
114 delstmt->setConnection(targetconn);
115 if (mode == 1 ) {
116 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
117 if (rv != OK) {printf("Stmt prepare failed\n"); return 1; }
118 rv = stmt->prepare("SELECT * FROM csql_log_int;");
120 else {
121 rv = stmt->prepare("SELECT * FROM csql_log_char;");
122 if (rv != OK) {printf("Stmt prepare failed\n"); return 1; }
123 //rv = delstmt->prepare("DELETE from csql_log_char where id=?;");
125 if (rv != OK) {printf("Stmt prepare failed\n"); return 1; }
126 stmt->bindField(1, tablename);
127 stmt->bindField(2, &pkid);
128 stmt->bindField(3, &op);
129 stmt->bindField(4, &id);
130 DatabaseManager *dbMgr = conn.getDatabaseManager();
131 char delStmtStr[1024];
132 while(true) {
133 rv = targetconn->beginTrans();
134 rv = stmt->execute(rows);
135 if (rv != OK)
137 printError(ErrSysInit, "Unable to execute stmt in target db");
138 targetconn->rollback();
139 stmt->free();
140 delstmt->free();
141 delete stmt;
142 delete delstmt;
143 return 1;
145 if (stmt->fetch() != NULL) {
146 printf("Row value is %s %d %d\n", tablename, pkid, op);
147 Table *table = dbMgr->openTable(tablename);
148 int ret = 0;
149 if (table == NULL)
151 printError(ErrSysInit, "Table %s not exist in csql", tablename);
152 targetconn->rollback();
153 stmt->free();
154 delstmt->free();
155 delete stmt;
156 delete delstmt;
157 break;
159 if (op == 2)//DELETE
161 ret = remove(table,pkid);
163 else //INSERT
165 ret = insert(table, pkid);
167 dbMgr->closeTable(table);
168 rv = targetconn->commit();
169 rv = targetconn->beginTrans();
170 //Remove record from csql_log_XXX table
171 sprintf(delStmtStr, "DELETE from csql_log_int where id=%d ;", id);
172 rv = delstmt->prepare(delStmtStr);
173 if (rv != OK) {printf("FAILED\n"); return 1; }
174 // delstmt->setIntParam(1, id);
175 rv = delstmt->execute(rows);
176 if (rv != OK)
178 printf("log record not deleted from the target db %d\n", rv);
179 targetconn->rollback();
180 stmt->free();
181 delstmt->free();
182 delete stmt;
183 delete delstmt;
185 delstmt->free();
187 rv = targetconn->commit();
189 else {
190 stmt->close();
191 break;
193 stmt->close();
195 stmt->free();
196 delstmt->free();
197 delete stmt;
198 delete delstmt;
199 return 0;
201 int insert(Table *table, int pkid)
203 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
204 stmt->setConnection(targetconn);
205 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
206 char pkfieldname[128];
207 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
208 char sbuf[1024];
209 sprintf(sbuf, "SELECT * FROM %s where %s = %d;", table->getName(), pkfieldname, pkid);
210 //TODO::get the primary key field name from the table interface. need to implement it
211 DbRetVal rv = stmt->prepare(sbuf);
212 if (rv != OK) return 1;
214 List fNameList = table->getFieldNameList();
215 ListIterator fNameIter = fNameList.getIterator();
216 FieldInfo *info = new FieldInfo();
217 int fcount =1; void *valBuf; int fieldsize=0;
218 void *buf[128];//TODO:resticts to support only 128 fields in table
219 Identifier *elem = NULL;
220 while (fNameIter.hasElement()) {
221 elem = (Identifier*) fNameIter.nextElement();
222 table->getFieldInfo((const char*)elem->name, info);
223 valBuf = AllDataType::alloc(info->type, info->length);
224 buf[fcount] = valBuf;
225 table->bindFld(elem->name, valBuf);
226 stmt->bindField(fcount++, valBuf);
229 delete info;
230 int rows=0;
231 int retValue = stmt->execute(rows);
232 if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; }
233 conn.startTransaction();
234 if (stmt->fetch() != NULL) {
235 table->insertTuple();
236 //Note:insert may fail if the record is inserted from this cache
238 for (int i=1; i < fcount; i++) {
239 free(buf[i]);
241 stmt->free();
242 delete stmt;
243 conn.commit();
244 return 0;
246 int remove(Table *table, int pkid)
248 DbRetVal rv = OK;
249 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
250 stmt->setConnection(targetconn);
251 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
252 char pkfieldname[128];
253 ostmt->getPrimaryKeyFieldName(table->getName(), pkfieldname);
254 delete stmt;
255 Condition p1;
256 p1.setTerm(pkfieldname, OpEquals, &pkid);
257 table->setCondition(&p1);
258 rv = conn.startTransaction();
259 if (rv != OK) return 1;
260 rv = table->execute();
261 if (rv != OK)
263 table->setCondition(NULL);
264 conn.rollback();
265 return 1;
267 if (table->fetch() != NULL)
268 rv = table->deleteTuple();
269 //Note:Delete may fail if the record is deleted from this cache
270 table->setCondition(NULL);
271 rv = conn.commit();
272 if (rv != OK) return 1;
273 return 0;