updated to work for gateway.
[csql.git] / src / network / SqlNetworkHandler.cxx
blobed3458a7dc72f8fc128d3abe6b45000f00f5a580
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlNetworkHandler.h>
21 #include <AbsSqlConnection.h>
22 #include <SqlConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <AbsSqlStatement.h>
25 #include <SqlStatement.h>
26 #include <SqlOdbcStatement.h>
27 #include <SqlLogStatement.h>
28 #include <Parser.h>
30 List SqlNetworkHandler::stmtList;
31 AbsSqlConnection* SqlNetworkHandler::conn;
32 SqlApiImplType SqlNetworkHandler::type;
33 int SqlNetworkHandler::stmtID;
35 void *SqlNetworkHandler::process(PacketHeader &header, char *buffer)
37 DbRetVal rv = OK;
38 char *ptr = NULL;
39 ResponsePacket *rpkt = NULL;
40 switch(header.packetType)
42 // case NW_PKT_PREPARE:
43 //return processPrepare(header, buffer);
44 // break;
45 //case NW_PKT_COMMIT:
46 //return processCommit(header, buffer);
47 // break;
48 case SQL_NW_PKT_CONNECT:
49 return processSqlConnect(header, buffer);
50 break;
51 case SQL_NW_PKT_PREPARE:
52 return processSqlPrepare(header, buffer);
53 break;
54 case SQL_NW_PKT_EXECUTE:
55 return processSqlExecute(header, buffer);
56 break;
57 case SQL_NW_PKT_FETCH:
58 return processSqlFetch(header, buffer);
59 break;
60 case SQL_NW_PKT_COMMIT:
61 return processSqlCommit(header, buffer);
62 break;
63 case SQL_NW_PKT_ROLLBACK:
64 return processSqlRollback(header, buffer);
65 break;
66 case SQL_NW_PKT_DISCONNECT:
67 conn->rollback();
68 rv = conn->disconnect();
69 rpkt = new ResponsePacket();
70 ptr = (char *) &rpkt->retVal;
71 *ptr = 1;
72 strcpy(rpkt->errorString, "Success");
73 return rpkt;
74 case SQL_NW_PKT_FREE:
75 return processSqlFree(header, buffer);
76 break;
80 void * SqlNetworkHandler::processSqlConnect(PacketHeader &header, char *buffer)
82 ResponsePacket *rpkt = new ResponsePacket();
83 printDebug(DM_Network, "Processing CONNECT");
84 SqlPacketConnect *pkt = new SqlPacketConnect();
85 pkt->setBuffer(buffer);
86 pkt->setBufferSize(header.packetLength);
87 pkt->unmarshall();
88 char *ptr = (char *) &rpkt->retVal;
89 DbRetVal rv=conn->connect(pkt->userName, pkt->passWord);
90 if (rv != OK) {
91 *ptr = 0;
92 strcpy(rpkt->errorString, "Error:Connect failure");
93 return rpkt;
94 printf("connection failure\n");
96 if (rv == OK) {
97 *ptr = 1;
98 rv = conn->beginTrans();
99 return rpkt;
103 void* SqlNetworkHandler::processSqlPrepare(PacketHeader &header, char *buffer)
105 ResponsePacket *rpkt = new ResponsePacket();
106 char *retval = (char *) &rpkt->retVal;
107 SqlPacketPrepare *pkt = new SqlPacketPrepare();
108 pkt->setBuffer(buffer);
109 pkt->setBufferSize(header.packetLength);
110 pkt->unmarshall();
111 printDebug(DM_Network, "PREPARE %s\n", pkt->stmtString);
112 AbsSqlStatement *sqlstmt = SqlFactory::createStatement(type);
113 sqlstmt->setConnection(conn);
114 NetworkStmt *nwStmt = new NetworkStmt();
115 nwStmt->stmtID = ++stmtID;
116 printDebug(DM_Network, "Statement string %s\n", pkt->stmtString);
117 nwStmt->stmt = sqlstmt;
118 DbRetVal rv = sqlstmt->prepare(pkt->stmtString);
119 if (rv != OK)
121 printError(ErrSysInit, "statement prepare failed\n");
122 *retval = 0;
123 strcpy(rpkt->errorString, "Error:Statement prepare failed");
124 return rpkt;
126 int param = sqlstmt->noOfParamFields();
127 int proj = sqlstmt->noOfProjFields();
128 BindSqlField *bindField = NULL;
129 BindSqlProjectField *projField = NULL;
130 //populate paramList
131 FieldInfo * fInfo = new FieldInfo();
132 for (int i = 0; i < param; i++) {
133 bindField = new BindSqlField();
134 sqlstmt->getParamFldInfo(i + 1, fInfo);
135 bindField->type = fInfo->type;
136 bindField->length = fInfo->length;
137 bindField->value = AllDataType::alloc(bindField->type, bindField->length);
138 nwStmt->paramList.append(bindField);
140 delete fInfo;
141 FieldInfo *fldInfo = new FieldInfo();
142 for (int i = 0; i < proj; i++) {
143 projField = new BindSqlProjectField();
144 sqlstmt->getProjFldInfo(i + 1, fldInfo);
145 projField->type = fldInfo->type;
146 projField->length = fldInfo->length;
147 projField->value = AllDataType::alloc(projField->type, projField->length);
148 nwStmt->projList.append(projField);
150 delete fldInfo;
151 stmtList.append(nwStmt);
152 *retval = 1;
153 if (param) *(retval+2) = 1;
154 if (proj) *(retval+3) = 1;
155 rpkt->stmtID = nwStmt->stmtID;
156 strcpy(rpkt->errorString, "Success");
157 return rpkt;
160 void * SqlNetworkHandler::processSqlExecute(PacketHeader &header, char *buffer)
162 ResponsePacket *rpkt = new ResponsePacket();
163 char *retval = (char *) &rpkt->retVal;
164 SqlPacketExecute *pkt = new SqlPacketExecute();
165 pkt->setBuffer(buffer);
166 pkt->setBufferSize(header.packetLength);
167 pkt->setStatementList(stmtList);
168 pkt->unmarshall();
169 printDebug(DM_Network, "PREPARE %d\n", pkt->stmtID);
170 rpkt->stmtID = pkt->stmtID;
171 ListIterator stmtIter = stmtList.getIterator();
172 NetworkStmt *stmt;
173 while (stmtIter.hasElement())
175 stmt = (NetworkStmt*) stmtIter.nextElement();
176 //TODO::Also check the srcNetworkID
177 if (stmt->stmtID == pkt->stmtID ) break;
179 AbsSqlStatement *sqlstmt = stmt->stmt;
180 int rows = 0;
181 for (int i=0; i < pkt->noParams; i++) {
182 BindSqlField *bindField = (BindSqlField *) stmt->paramList.get(i+1);
183 setParamValues(sqlstmt, i+1, bindField->type, bindField->length, (char *)bindField->value);
185 //SqlStatement *st = (SqlStatement *)sqlstmt;
186 if(sqlstmt->isSelect()) {
187 int noProj = stmt->projList.size();
188 for (int i=0; i < noProj; i++) {
189 BindSqlProjectField *prjFld = (BindSqlProjectField *) stmt->projList.get(i+1);
190 sqlstmt->bindField(i+1, prjFld->value);
193 DbRetVal rv = sqlstmt->execute(rows);
194 if (rv != OK) {
195 *retval = 0;
196 strcpy(rpkt->errorString, "Execute failed");
197 return rpkt;
199 *retval = 1;
200 strcpy(rpkt->errorString, "Success");
201 return rpkt;
204 void * SqlNetworkHandler::processSqlFetch(PacketHeader &header, char *buffer)
206 ResponsePacket *rpkt = new ResponsePacket();
207 char *retval = (char *) &rpkt->retVal;
208 SqlPacketFetch *pkt = new SqlPacketFetch();
209 pkt->setBuffer(buffer);
210 pkt->unmarshall();
211 rpkt->stmtID = pkt->stmtID;
212 ListIterator stmtIter = stmtList.getIterator();
213 NetworkStmt *stmt;
214 while (stmtIter.hasElement())
216 stmt = (NetworkStmt*) stmtIter.nextElement();
217 //TODO::Also check teh srcNetworkID
218 if (stmt->stmtID == pkt->stmtID ) break;
220 AbsSqlStatement *sqlstmt = stmt->stmt;
221 void *data=NULL;
222 DbRetVal rv = OK;
223 if ((data = sqlstmt->fetch(rv)) != NULL && rv == OK) {
224 *retval = 1;
225 strcpy(rpkt->errorString, "Success");
226 return rpkt;
228 if (data == NULL && rv == OK) {
229 sqlstmt->close();
230 *retval = 1;
231 *(retval + 1) = 1;
232 strcpy(rpkt->errorString, "Success fetch completed");
233 return rpkt;
235 else {
236 *retval = 0;
237 strcpy(rpkt->errorString, "fetch completed");
238 return rpkt;
242 void * SqlNetworkHandler::processSqlFree(PacketHeader &header, char *buffer)
244 ResponsePacket *rpkt = new ResponsePacket();
245 char *retval = (char *) &rpkt->retVal;
246 SqlPacketFree *pkt = new SqlPacketFree();
247 pkt->setBuffer(buffer);
248 pkt->unmarshall();
249 rpkt->stmtID = pkt->stmtID;
250 ListIterator stmtIter = stmtList.getIterator();
251 NetworkStmt *stmt;
252 while (stmtIter.hasElement())
254 stmt = (NetworkStmt*) stmtIter.nextElement();
255 //TODO::Also check teh srcNetworkID
256 if (stmt->stmtID == pkt->stmtID ) break;
258 AbsSqlStatement *sqlstmt = stmt->stmt;
259 sqlstmt->free();
260 ListIterator itprm = stmt->paramList.getIterator();
261 BindSqlField *fld = NULL;
262 while((fld = (BindSqlField *) itprm.nextElement()) != NULL) delete fld;
263 stmt->paramList.reset();
264 ListIterator itprj = stmt->projList.getIterator();
265 BindSqlProjectField *pfld = NULL;
266 while((pfld = (BindSqlProjectField *) itprj.nextElement()) != NULL) delete pfld;
267 stmt->projList.reset();
268 delete stmt->stmt;
269 stmtList.remove(stmt);
270 delete stmt;
271 *retval = 1;
272 strcpy(rpkt->errorString, "Success");
273 return rpkt;
277 void * SqlNetworkHandler::processSqlCommit(PacketHeader &header, char *buffer)
279 ResponsePacket *rpkt = new ResponsePacket();
280 char *retval = (char *) &rpkt->retVal;
281 DbRetVal rv = conn->commit();
282 if (rv != OK) {
283 *retval = 0;
284 strcpy(rpkt->errorString, "Commit failure\n");
285 return rpkt;
287 rv = conn->beginTrans();
288 *retval = 1;
289 strcpy(rpkt->errorString, "Success");
290 return rpkt;
293 void *SqlNetworkHandler::processSqlRollback(PacketHeader &header, char *buffer)
295 ResponsePacket *rpkt = new ResponsePacket();
296 char *retval = (char *) &rpkt->retVal;
298 DbRetVal rv = conn->rollback();
299 if (rv != OK) {
300 *retval = 0;
301 strcpy(rpkt->errorString, "Rollback failure\n");
302 return rpkt;
304 rv = conn->beginTrans();
305 *retval = 1;
306 strcpy(rpkt->errorString, "Success");
307 return rpkt;
310 void *SqlNetworkHandler::processCommit(PacketHeader &header, char *buffer)
312 printDebug(DM_Network, "Processing COMMIT");
313 PacketCommit *pkt = new PacketCommit();
314 pkt->setBuffer(buffer);
315 pkt->setBufferSize(header.packetLength);
316 pkt->unmarshall();
317 List pktList;
318 pkt->getExecPacketList(stmtList, pktList);
319 DbRetVal rv = applyExecPackets(stmtList, pktList);
320 int response = 1;
321 if (rv != OK)
323 printError(ErrSysFatal, "Unable to apply the exec packets\n");
324 response =0;
326 return response;
329 void * SqlNetworkHandler::processFree(PacketHeader &header, char *buffer)
331 PacketFree *pkt = new PacketFree();
332 pkt->setBuffer(buffer);
333 pkt->setBufferSize(header.packetLength);
334 pkt->unmarshall();
335 //printf("FREE %d \n", pkt->stmtID);
336 int response =1;
337 //This wont work for two statement executed in same transaction using same SqlStatement object using free.
338 //so do not delete now and put a flag 'readyfordelete' in NetworkStmt object and delete it during execute
340 ListIterator iter = stmtList.getIterator();
341 NetworkStmt *stmt, *removeStmt = NULL;
342 while (iter.hasElement())
344 stmt = (NetworkStmt*)iter.nextElement();
345 if (stmt->srcNetworkID == header.srcNetworkID
346 && stmt->stmtID == pkt->stmtID)
348 removeStmt = stmt;
349 break;
352 if (removeStmt) stmtList.remove(removeStmt);
353 else printf("Statement id %d not found in list \n", pkt->stmtID);
355 return response;
357 void * SqlNetworkHandler::processPrepare(PacketHeader &header, char *buffer)
359 PacketPrepare *pkt = new PacketPrepare();
360 pkt->setBuffer(buffer);
361 pkt->setBufferSize(header.packetLength);
362 pkt->unmarshall();
363 printDebug(DM_Network, "PREPARE %d %s\n", pkt->stmtID, pkt->stmtString);
364 //for (int i =0 ; i < pkt->noParams; i++)
365 //printf("PREPARE type %d length %d \n", pkt->type[i], pkt->length[i]);
366 int response =1;
367 //TODO::add it to the SqlStatement list
368 AbsSqlStatement *sqlstmt = SqlFactory::createStatement(type);
369 sqlstmt->setConnection(conn);
370 NetworkStmt *nwStmt = new NetworkStmt();
371 printDebug(DM_Network, "Statement string %s\n", pkt->stmtString);
372 nwStmt->srcNetworkID = header.srcNetworkID;
373 nwStmt->stmtID = pkt->stmtID;
374 nwStmt->stmt = sqlstmt;
375 DbRetVal rv = sqlstmt->prepare(pkt->stmtString);
376 if (rv != OK)
378 printError(ErrSysInit, "statement prepare failed\n");
379 response = 0;
380 return response;
382 BindSqlField *bindField = NULL;
383 //populate paramList
384 for (int i = 0; i < pkt->noParams; i++)
386 bindField = new BindSqlField();
387 bindField->type = (DataType) pkt->type[i];
388 bindField->length = pkt->length[i];
389 bindField->value = AllDataType::alloc(bindField->type,
390 bindField->length);
391 nwStmt->paramList.append(bindField);
393 stmtList.append(nwStmt);
394 return response;
398 DbRetVal SqlNetworkHandler::applyExecPackets(List sList, List pList)
400 ListIterator stmtIter = sList.getIterator();
401 NetworkStmt *nwstmt;
402 DbRetVal rv = conn->beginTrans();
403 if (rv != OK) return rv;
404 ListIterator pktIter = pList.getIterator();
405 PacketExecute *pkt;
406 int i = 0;
407 BindSqlField *bindField;
408 while (pktIter.hasElement())
410 pkt = (PacketExecute*) pktIter.nextElement();
411 stmtIter.reset();
412 bool found = false;
413 while (stmtIter.hasElement())
415 nwstmt = (NetworkStmt*) stmtIter.nextElement();
416 if (nwstmt->stmtID == pkt->stmtID) {found = true ; break;}
418 if (!found) {
419 printf("stmt not found in list. Negleting unreplicated table...\n");
420 continue;
422 ListIterator paramIter = nwstmt->paramList.getIterator();
423 i = 0;
424 while (paramIter.hasElement()) {
425 bindField = (BindSqlField*) paramIter.nextElement();
426 setParamValues(nwstmt->stmt, i+1, bindField->type, bindField->length, pkt->paramValues[i]);
427 i++;
429 int rows= 0;
430 DbRetVal rv = nwstmt->stmt->execute(rows);
431 if (rv != OK )
433 printError(ErrSysFatal, "sql execute failed with rv %d\n", rv);
434 //TODO::log all things like SQL statements to a file
435 SqlNetworkHandler::conn->rollback();
436 printError(ErrPeerExecFailed, "Transaction Rolledback\n");
437 return ErrPeerExecFailed;
440 SqlNetworkHandler::conn->commit();
441 return OK;
444 void SqlNetworkHandler::setParamValues(AbsSqlStatement *stmt, int parampos, DataType type,
445 int length, char *value)
447 switch(type)
449 case typeInt:
450 stmt->setIntParam(parampos, *(int*)value);
451 break;
452 case typeLong:
453 stmt->setLongParam(parampos, *(long*)value);
454 break;
455 case typeLongLong:
456 stmt->setLongLongParam(parampos, *(long long*)value);
457 break;
458 case typeShort:
459 stmt->setShortParam(parampos, *(short*)value);
460 break;
461 case typeByteInt:
462 stmt->setByteIntParam(parampos, *(char*)value);
463 break;
464 case typeDouble:
465 stmt->setDoubleParam(parampos, *(double*)value);
466 break;
467 case typeFloat:
468 stmt->setFloatParam(parampos, *(float*)value);
469 break;
470 case typeDate:
471 stmt->setDateParam(parampos, *(Date*)value);
472 break;
473 case typeTime:
474 stmt->setTimeParam(parampos, *(Time*)value);
475 break;
476 case typeTimeStamp:
477 stmt->setTimeStampParam(parampos, *(TimeStamp*)value);
478 break;
479 case typeString:
481 char *d =(char*)value;
482 d[length-1] = '\0';
483 stmt->setStringParam(parampos, (char*)value);
484 break;
486 case typeBinary:
487 stmt->setBinaryParam(parampos, (char *) value);
488 break;
490 return;