additional changes for the bidirectional cache
[csql.git] / src / tools / csqlcacheserver.cxx
blob2509ef0deec12694cbc4c91ce6154be26a69b910
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlFactory.h>
19 #include <CSql.h>
21 int insert(Table *table, int pkid);
22 int remove(Table *table, int pkid);
23 int getRecordsFromTargetDb(int mode);
25 int srvStop =0;
26 static void sigTermHandler(int sig)
28 printf("Received signal %d\nStopping the server\n", sig);
29 srvStop = 1;
32 void printUsage()
34 printf("Usage: csqlcacheserver \n");
35 printf("Description: Start the csql caching server.\n");
36 return;
38 AbsSqlConnection *targetconn;
39 Connection conn;
40 int main(int argc, char **argv)
42 int c = 0, opt = 0;
43 while ((c = getopt(argc, argv, "?")) != EOF)
45 switch (c)
47 case '?' : { opt = 10; break; } //print help
48 default: opt=10;
51 }//while options
53 if (opt == 10) {
54 printUsage();
55 return 0;
58 os::signal(SIGINT, sigTermHandler);
59 os::signal(SIGTERM, sigTermHandler);
61 targetconn = SqlFactory::createConnection(CSqlAdapter);
62 DbRetVal rv = targetconn->connect("root", "manager");
63 if (rv != OK) return 1;
64 rv = conn.open("root", "manager");
65 if (rv != OK) return 1;
66 if (!Conf::config.useCache())
68 printf("Cache is set to OFF in csql.conf file\n");
69 return 1;
72 printf("Cache server started\n");
73 int ret = 0;
74 struct timeval timeout, tval;
75 timeout.tv_sec = 5; //TODO::should be an csql.conf parameter
76 timeout.tv_usec = 0;
78 while(!srvStop)
80 tval.tv_sec = timeout.tv_sec;
81 tval.tv_usec = timeout.tv_usec;
82 ret = os::select(0, NULL, 0, 0, &tval);
83 printf("Getting the updates\n");
84 ret = getRecordsFromTargetDb(1);
85 printf("Return value is %d\n", ret);
86 if (ret !=0) srvStop = 1;
87 //ret = getRecordsFromTargetDb(2);
88 if (ret !=0) srvStop = 1;
90 printf("Cache Server Exiting\n");
91 conn.close();
92 targetconn->disconnect();
93 return 0;
95 int getRecordsFromTargetDb(int mode)
97 int pkid;
98 char tablename[64];
99 int op, id;
100 int rows =0;
101 DbRetVal rv = OK;
102 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
103 stmt->setConnection(targetconn);
104 AbsSqlStatement *delstmt = SqlFactory::createStatement(CSqlAdapter);
105 delstmt->setConnection(targetconn);
106 if (mode == 1 ) {
107 rv = stmt->prepare("SELECT * FROM csql_log_int;");
108 rv = delstmt->prepare("DELETE FROM csql_log_int where id =?;");
110 else {
111 rv = stmt->prepare("SELECT * FROM csql_log_char;");
112 rv = delstmt->prepare("DELETE FROM csql_log_char where id =?;");
114 if (rv != OK) return 1;
115 stmt->bindField(1, tablename);
116 stmt->bindField(2, &pkid);
117 stmt->bindField(3, &op);
118 stmt->bindField(4, &id);
119 DatabaseManager *dbMgr = conn.getDatabaseManager();
120 while(true) {
121 rv = targetconn->beginTrans();
122 rv = stmt->execute(rows);
123 if (rv != OK)
125 printError(ErrSysInit, "Unable to execute stmt in target db");
126 return 1;
128 if (stmt->fetch() != NULL) {
129 printf("Row value is %s %d %d\n", tablename, pkid, op);
130 Table *table = dbMgr->openTable(tablename);
131 int ret = 0;
132 if (table == NULL)
134 printError(ErrSysInit, "Table %s not exist in csql", tablename);
135 break;
137 if (op == 3) //DELETE
139 ret = remove(table, pkid);
140 }else if (op == 2)// UPDATE
142 ret = remove(table,pkid);
143 ret = insert(table, pkid);
145 else //INSERT
147 ret = insert(table, pkid);
149 dbMgr->closeTable(table);
150 rv = targetconn->commit();
151 rv = targetconn->beginTrans();
152 //Remove record from csql_log_XXX table
153 delstmt->setIntParam(1, id);
154 rv = delstmt->execute(rows);
155 if (rv != OK) printf("log record not deleted from the target db %d\n", rv);
157 rv = targetconn->commit();
158 //create table csql_log_int(tablename char(64), pkid int, op int, id int not null unique auto_increment);
159 //insert ino csql_log_int(tablename, pkid, op) values ('t1', 100, 1);
161 else {
162 stmt->close();
163 break;
165 stmt->close();
167 return 0;
169 int insert(Table *table, int pkid)
171 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
172 stmt->setConnection(targetconn);
173 char sbuf[1024];
174 sprintf(sbuf, "SELECT * FROM %s where f1 = %d;", table->getName(), pkid);
175 //TODO::get the primary key field name from the table interface. need to implement it
176 DbRetVal rv = stmt->prepare(sbuf);
177 if (rv != OK) return 1;
179 List fNameList = table->getFieldNameList();
180 ListIterator fNameIter = fNameList.getIterator();
181 FieldInfo *info = new FieldInfo();
182 int fcount =1; void *valBuf; int fieldsize=0;
183 Identifier *elem = NULL;
184 while (fNameIter.hasElement()) {
185 elem = (Identifier*) fNameIter.nextElement();
186 table->getFieldInfo((const char*)elem->name, info);
187 valBuf = AllDataType::alloc(info->type, info->length);
188 table->bindFld(elem->name, valBuf);
189 stmt->bindField(fcount++, valBuf);
191 delete info;
192 int rows=0;
193 int retValue = stmt->execute(rows);
194 if (retValue && rows != 1) {printError(ErrSysInit, "Unable to execute statement at target db\n"); return ErrSysInit; }
195 conn.startTransaction();
196 if (stmt->fetch() != NULL) {
197 table->insertTuple();
198 //Note:insert may fail if the record is inserted from this cache
200 conn.commit();
201 return 0;
203 int remove(Table *table, int pkid)
205 DbRetVal rv = OK;
206 Condition p1;
207 //TODO::get the primary key field name from the table interface. need to implement it
208 p1.setTerm("f1", OpEquals, &pkid);
209 table->setCondition(&p1);
210 rv = conn.startTransaction();
211 if (rv != OK) return 1;
212 rv = table->execute();
213 if (rv != OK) { conn.rollback(); return 1;}
214 if (table->fetch() != NULL)
215 rv = table->deleteTuple();
216 //Note:Delete may fail if the record is deleted from this cache
217 table->setCondition(NULL);
218 rv = conn.commit();
219 if (rv != OK) return 1;
220 return 0;