Gateway changes for OSYNC mode
[csql.git] / src / sqllog / SqlLogStatement.cxx
blob07ef17608ed158b503480b67f1624272d133fa2a
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlLogStatement.h>
22 UniqueID SqlLogStatement::stmtUID;
24 bool SqlLogStatement::isNonSelectDML(char *stmtstr)
26 if (strlen(stmtstr) <= 6) return false;
27 if (strncasecmp(stmtstr,"INSERT", 6) == 0) return true;
28 else if (strncasecmp(stmtstr, "UPDATE", 6) ==0) return true;
29 else if (strncasecmp(stmtstr, "DELETE", 6) ==0) return true;
30 return false;
33 DbRetVal SqlLogStatement::prepare(char *stmtstr)
35 DbRetVal rv = OK;
36 if (innerStmt) rv = innerStmt->prepare(stmtstr);
37 if (rv != OK) return rv;
39 isCached = false;
40 //check if it is INSERT UPDATE DELETE statement
41 //if not, then no need to generate logs
42 if (!isNonSelectDML(stmtstr)) { return rv;}
43 if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
44 SqlLogConnection* logConn = (SqlLogConnection*)con;
45 if (!logConn->isTableCached(innerStmt->getTableName())) return OK;
46 isCached = true;
47 mode = TABLE_OSYNC;//TEMP::support only OSYNC
49 sid = SqlLogStatement::stmtUID.getID();
50 //TODO::if connected to peer then only send this packet
51 PacketPrepare *pkt = new PacketPrepare();
52 pkt->stmtID= sid;
53 pkt->syncMode = ASYNC;
54 pkt->stmtString = stmtstr;
55 pkt->noParams = innerStmt->noOfParamFields();
56 FieldInfo *info = new FieldInfo();
57 if (pkt->noParams > 0) {
58 pkt->type = new int [pkt->noParams];
59 pkt->length = new int [pkt->noParams];
60 BindSqlField *bindField = NULL;
61 for (int i = 0; i < innerStmt->noOfParamFields(); i++)
63 innerStmt->getParamFldInfo(i+1, info);
64 bindField = new BindSqlField();
65 bindField->type = info->type;
66 bindField->length = info->length;
67 pkt->type[i] = info->type;
68 pkt->length[i] = info->length;
69 bindField->value = AllDataType::alloc(info->type, info->length);
70 paramList.append(bindField);
73 pkt->marshall();
74 /*logConn->connectIfNotConnected();
75 //printf("Sending PREPARE packet of size %d\n", pkt->getBufferSize());
76 rv = logConn->sendAndReceive(NW_PKT_PREPARE, pkt->getMarshalledBuffer(), pkt->getBufferSize());
77 printf("RV from PREPARE SQLLOG %d\n", rv);
78 if (rv != OK) {
79 logConn->addPreparePacket(pkt);
80 delete info;
81 return OK;
82 }*/
83 logConn->addPreparePacket(pkt);
84 delete info;
85 return rv;
88 bool SqlLogStatement::isSelect()
90 if (innerStmt) return innerStmt->isSelect();
91 return false;
94 DbRetVal SqlLogStatement::execute(int &rowsAffected)
97 SqlLogConnection* logConn = (SqlLogConnection*)con;
99 DbRetVal rv = OK;
100 if (innerStmt) rv = innerStmt->execute(rowsAffected);
101 if (rv != OK) return rv;
103 //no need to generate log if it does not actually modify the table
104 if (rowsAffected == 0 ) return OK;
105 if (!isCached) return OK;
106 if (logConn->getSyncMode() == OSYNC) return OK;
108 //printf("LOG:execute\n");
109 PacketExecute *pkt = new PacketExecute();
110 pkt->stmtID= sid;
111 pkt->noParams = innerStmt->noOfParamFields();
112 pkt->setParams(paramList);
113 pkt->marshall();
114 int *p = (int*)pkt->getMarshalledBuffer();
115 //printf("After EXEC packet marshall %d %d size %d\n", *p, *(p+1),
116 // pkt->getBufferSize());
117 // printf("EXEC pkt ptr is %x\n", pkt);
118 logConn->addPacket(pkt);
119 return rv;
122 DbRetVal SqlLogStatement::bindParam(int pos, void* value)
124 DbRetVal rv = OK;
125 if (innerStmt) rv = innerStmt->bindParam(pos,value);
126 if (rv != OK) return rv;
127 printError(ErrWarning, "Deprecated and does not replicate or cache");
128 return rv;
131 DbRetVal SqlLogStatement::bindField(int pos, void* value)
133 DbRetVal rv = OK;
134 if (innerStmt) rv = innerStmt->bindField(pos,value);
135 if (rv != OK) return rv;
136 return rv;
138 void* SqlLogStatement::fetch()
140 if (innerStmt) return innerStmt->fetch();
141 return NULL;
144 void* SqlLogStatement::fetchAndPrint(bool SQL)
146 if (innerStmt) return innerStmt->fetchAndPrint(SQL);
147 return NULL;
150 void* SqlLogStatement::next()
152 if (innerStmt) return innerStmt->next();
153 return NULL;
156 DbRetVal SqlLogStatement::close()
158 if (innerStmt) return innerStmt->close();
159 return OK;
162 void* SqlLogStatement::getFieldValuePtr( int pos )
164 if (innerStmt) return innerStmt->getFieldValuePtr(pos);
165 return NULL;
168 int SqlLogStatement::noOfProjFields()
170 if (innerStmt) return innerStmt->noOfProjFields();
171 return 0;
174 int SqlLogStatement::noOfParamFields()
176 if (innerStmt) return innerStmt->noOfParamFields();
177 return 0;
180 DbRetVal SqlLogStatement::getProjFldInfo (int projpos, FieldInfo *&fInfo)
182 if (innerStmt) return innerStmt->getProjFldInfo(projpos, fInfo);
183 return OK;
186 DbRetVal SqlLogStatement::getParamFldInfo (int parampos, FieldInfo *&fInfo)
188 if (innerStmt) return innerStmt->getParamFldInfo(parampos, fInfo);
189 return OK;
192 DbRetVal SqlLogStatement::free()
194 DbRetVal rv = OK;
195 if (innerStmt) rv = innerStmt->free();
196 //TODO::DEBUG::always innsrStmt->free() returns error
197 //if (rv != OK) return rv;
198 SqlLogConnection* logConn = (SqlLogConnection*)con;
199 if (sid != 0 ) logConn->removePreparePacket(sid);
200 if (!isCached) return rv;
202 //TODO
203 //If statement is freed before the txn commits, it will lead to issue
204 //incase of async mode. when the other site goes down and comes back,
205 //it will not have the cached SqlStatement objects, so in that case
206 //we need to send all the prepare packets again, so we should not free
207 //the statement straight away in client side as well as in server side
211 /*PacketFree *pkt = new PacketFree();
212 pkt->stmtID= sid;
213 pkt->marshall();
214 SqlLogConnection* logConn = (SqlLogConnection*)con;
215 logConn->sendAndReceiveAllPeers(NW_PKT_FREE, pkt->getMarshalledBuffer(), pkt->getBufferSize());
216 delete pkt;*/
217 isCached= false;
218 sid = 0;
219 paramList.reset();
220 return OK;
222 void SqlLogStatement::setShortParam(int paramPos, short value)
224 if (innerStmt) innerStmt->setShortParam(paramPos,value);
225 SqlLogConnection* logConn = (SqlLogConnection*)con;
226 if (logConn->getSyncMode() == OSYNC) return ;
227 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
228 if (bindField->type != typeShort) return;
229 *(short*)(bindField->value) = value;
230 return;
232 void SqlLogStatement::setIntParam(int paramPos, int value)
234 if (innerStmt) innerStmt->setIntParam(paramPos,value);
235 SqlLogConnection* logConn = (SqlLogConnection*)con;
236 if (logConn->getSyncMode() == OSYNC) return ;
237 if (!isCached) return;
238 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
239 if (bindField->type != typeInt) return;
240 *(int*)(bindField->value) = value;
241 return;
244 void SqlLogStatement::setLongParam(int paramPos, long value)
246 if (innerStmt) innerStmt->setLongParam(paramPos,value);
247 SqlLogConnection* logConn = (SqlLogConnection*)con;
248 if (logConn->getSyncMode() == OSYNC) return ;
249 if (!isCached) return;
250 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
251 if (bindField->type != typeLong) return;
252 *(long*)(bindField->value) = value;
253 return;
256 void SqlLogStatement::setLongLongParam(int paramPos, long long value)
258 if (innerStmt) innerStmt->setLongLongParam(paramPos,value);
259 SqlLogConnection* logConn = (SqlLogConnection*)con;
260 if (logConn->getSyncMode() == OSYNC) return ;
261 if (!isCached) return;
262 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
263 if (bindField->type != typeLongLong) return;
264 *(long long*)(bindField->value) = value;
265 return;
267 void SqlLogStatement::setByteIntParam(int paramPos, ByteInt value)
269 if (innerStmt) innerStmt->setByteIntParam(paramPos,value);
270 SqlLogConnection* logConn = (SqlLogConnection*)con;
271 if (logConn->getSyncMode() == OSYNC) return ;
272 if (!isCached) return;
273 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
274 if (bindField->type != typeByteInt) return;
275 *(char*)(bindField->value) = value;
278 void SqlLogStatement::setFloatParam(int paramPos, float value)
280 if (innerStmt) innerStmt->setFloatParam(paramPos,value);
281 SqlLogConnection* logConn = (SqlLogConnection*)con;
282 if (logConn->getSyncMode() == OSYNC) return ;
283 if (!isCached) return;
284 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
285 if (bindField->type != typeFloat) return;
286 *(float*)(bindField->value) = value;
289 void SqlLogStatement::setDoubleParam(int paramPos, double value)
291 if (innerStmt) innerStmt->setDoubleParam(paramPos,value);
292 SqlLogConnection* logConn = (SqlLogConnection*)con;
293 if (logConn->getSyncMode() == OSYNC) return ;
294 if (!isCached) return;
295 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
296 if (bindField->type != typeDouble) return;
297 *(double*)(bindField->value) = value;
300 void SqlLogStatement::setStringParam(int paramPos, char *value)
302 if (innerStmt) innerStmt->setStringParam(paramPos,value);
303 SqlLogConnection* logConn = (SqlLogConnection*)con;
304 if (logConn->getSyncMode() == OSYNC) return ;
305 if (!isCached) return;
306 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
307 if (bindField->type != typeString) return;
308 char *dest = (char*)bindField->value;
309 strncpy(dest, value, bindField->length);
310 dest[ bindField->length - 1] ='\0';
311 return;
313 void SqlLogStatement::setDateParam(int paramPos, Date value)
315 if (innerStmt) innerStmt->setDateParam(paramPos,value);
316 SqlLogConnection* logConn = (SqlLogConnection*)con;
317 if (logConn->getSyncMode() == OSYNC) return ;
318 if (!isCached) return;
319 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
320 if (bindField->type != typeDate) return;
321 *(Date*)(bindField->value) = value;
324 void SqlLogStatement::setTimeParam(int paramPos, Time value)
326 if (innerStmt) innerStmt->setTimeParam(paramPos,value);
327 SqlLogConnection* logConn = (SqlLogConnection*)con;
328 if (logConn->getSyncMode() == OSYNC) return ;
329 if (!isCached) return;
330 BindSqlField *bindField = (BindSqlField*)paramList.get(paramPos);
331 if (bindField->type != typeTime) return;
332 *(Time*)(bindField->value) = value;
335 void SqlLogStatement::setTimeStampParam(int paramPos, TimeStamp value)
337 if (innerStmt) innerStmt->setTimeStampParam(paramPos,value);
338 SqlLogConnection* logConn = (SqlLogConnection*)con;
339 if (logConn->getSyncMode() == OSYNC) return ;
340 if (!isCached) return;
341 BindSqlField *bindField = (BindSqlField*) paramList.get(paramPos);
342 if (bindField->type != typeTimeStamp) return;
343 *(TimeStamp*)(bindField->value) = value;