Making JDBCBench generic so as to run csql and mysql benchmark
[csql.git] / src / network / SqlNetworkHandler.cxx
blob6f05a900c59c39628db259755a1fc90ffa928cc4
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlNetworkHandler.h>
21 #include <AbsSqlConnection.h>
22 #include <SqlConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <AbsSqlStatement.h>
25 #include <SqlStatement.h>
26 #include <SqlOdbcStatement.h>
27 #include <SqlLogStatement.h>
28 #include <Parser.h>
30 List SqlNetworkHandler::stmtList;
31 AbsSqlConnection* SqlNetworkHandler::conn;
32 SqlApiImplType SqlNetworkHandler::type;
33 int SqlNetworkHandler::stmtID;
35 void *SqlNetworkHandler::process(PacketHeader &header, char *buffer)
37 DbRetVal rv = OK;
38 char *ptr = NULL;
39 ResponsePacket *rpkt = NULL;
40 switch(header.packetType)
42 // case NW_PKT_PREPARE:
43 //return processPrepare(header, buffer);
44 // break;
45 //case NW_PKT_COMMIT:
46 //return processCommit(header, buffer);
47 // break;
48 case SQL_NW_PKT_CONNECT:
49 return processSqlConnect(header, buffer);
50 break;
51 case SQL_NW_PKT_PREPARE:
52 return processSqlPrepare(header, buffer);
53 break;
54 case SQL_NW_PKT_EXECUTE:
55 return processSqlExecute(header, buffer);
56 break;
57 case SQL_NW_PKT_FETCH:
58 return processSqlFetch(header, buffer);
59 break;
60 case SQL_NW_PKT_COMMIT:
61 return processSqlCommit(header, buffer);
62 break;
63 case SQL_NW_PKT_ROLLBACK:
64 return processSqlRollback(header, buffer);
65 break;
66 case SQL_NW_PKT_DISCONNECT:
67 conn->rollback();
68 rv = conn->disconnect();
69 rpkt = new ResponsePacket();
70 ptr = (char *) &rpkt->retVal;
71 *ptr = 1;
72 strcpy(rpkt->errorString, "Success");
73 return rpkt;
74 case SQL_NW_PKT_FREE:
75 return processSqlFree(header, buffer);
76 break;
80 void * SqlNetworkHandler::processSqlConnect(PacketHeader &header, char *buffer)
82 ResponsePacket *rpkt = new ResponsePacket();
83 printDebug(DM_Network, "Processing CONNECT");
84 SqlPacketConnect *pkt = new SqlPacketConnect();
85 pkt->setBuffer(buffer);
86 pkt->setBufferSize(header.packetLength);
87 pkt->unmarshall();
88 char *ptr = (char *) &rpkt->retVal;
89 DbRetVal rv=conn->connect(pkt->userName, pkt->passWord);
90 if (rv != OK) {
91 *ptr = 0;
92 strcpy(rpkt->errorString, "Error:Connect failure");
93 return rpkt;
94 printf("connection failure\n");
96 if (rv == OK) {
97 *ptr = 1;
98 rv = conn->beginTrans();
99 return rpkt;
103 void* SqlNetworkHandler::processSqlPrepare(PacketHeader &header, char *buffer)
105 ResponsePacket *rpkt = new ResponsePacket();
106 rpkt->isSelect = false;
107 char *retval = (char *) &rpkt->retVal;
108 SqlPacketPrepare *pkt = new SqlPacketPrepare();
109 pkt->setBuffer(buffer);
110 pkt->setBufferSize(header.packetLength);
111 pkt->unmarshall();
112 printDebug(DM_Network, "PREPARE %s\n", pkt->stmtString);
113 AbsSqlStatement *sqlstmt = SqlFactory::createStatement(type);
114 sqlstmt->setConnection(conn);
115 NetworkStmt *nwStmt = new NetworkStmt();
116 nwStmt->stmtID = ++stmtID;
117 printDebug(DM_Network, "Statement string %s\n", pkt->stmtString);
118 nwStmt->stmt = sqlstmt;
119 DbRetVal rv = sqlstmt->prepare(pkt->stmtString);
120 if (rv != OK)
122 printError(ErrSysInit, "statement prepare failed\n");
123 *retval = 0;
124 strcpy(rpkt->errorString, "Error:Statement prepare failed");
125 return rpkt;
127 int param = sqlstmt->noOfParamFields();
128 int proj = sqlstmt->noOfProjFields();
129 BindSqlField *bindField = NULL;
130 BindSqlProjectField *projField = NULL;
131 //populate paramList
132 FieldInfo * fInfo = new FieldInfo();
133 for (int i = 0; i < param; i++) {
134 bindField = new BindSqlField();
135 sqlstmt->getParamFldInfo(i + 1, fInfo);
136 bindField->type = fInfo->type;
137 bindField->length = fInfo->length;
138 bindField->value = AllDataType::alloc(bindField->type, bindField->length);
139 nwStmt->paramList.append(bindField);
141 delete fInfo;
142 FieldInfo *fldInfo = new FieldInfo();
143 for (int i = 0; i < proj; i++) {
144 projField = new BindSqlProjectField();
145 sqlstmt->getProjFldInfo(i + 1, fldInfo);
146 projField->type = fldInfo->type;
147 projField->length = fldInfo->length;
148 projField->value = AllDataType::alloc(projField->type, projField->length);
149 nwStmt->projList.append(projField);
151 delete fldInfo;
152 stmtList.append(nwStmt);
153 *retval = 1;
154 if(sqlstmt->isSelect()) rpkt->isSelect = true;
155 if (param) *(retval+2) = 1;
156 if (proj) *(retval+3) = 1;
157 rpkt->stmtID = nwStmt->stmtID;
158 strcpy(rpkt->errorString, "Success");
159 return rpkt;
162 void * SqlNetworkHandler::processSqlExecute(PacketHeader &header, char *buffer)
164 ResponsePacket *rpkt = new ResponsePacket();
165 char *retval = (char *) &rpkt->retVal;
166 SqlPacketExecute *pkt = new SqlPacketExecute();
167 pkt->setBuffer(buffer);
168 pkt->setBufferSize(header.packetLength);
169 pkt->setStatementList(stmtList);
170 pkt->unmarshall();
171 printDebug(DM_Network, "PREPARE %d\n", pkt->stmtID);
172 rpkt->stmtID = pkt->stmtID;
173 ListIterator stmtIter = stmtList.getIterator();
174 NetworkStmt *stmt;
175 while (stmtIter.hasElement())
177 stmt = (NetworkStmt*) stmtIter.nextElement();
178 //TODO::Also check the srcNetworkID
179 if (stmt->stmtID == pkt->stmtID ) break;
181 AbsSqlStatement *sqlstmt = stmt->stmt;
182 int rows = 0;
183 for (int i=0; i < pkt->noParams; i++) {
184 BindSqlField *bindField = (BindSqlField *) stmt->paramList.get(i+1);
185 setParamValues(sqlstmt, i+1, bindField->type, bindField->length, (char *)bindField->value);
187 //SqlStatement *st = (SqlStatement *)sqlstmt;
188 if(sqlstmt->isSelect()) {
189 int noProj = stmt->projList.size();
190 for (int i=0; i < noProj; i++) {
191 BindSqlProjectField *prjFld = (BindSqlProjectField *) stmt->projList.get(i+1);
192 sqlstmt->bindField(i+1, prjFld->value);
195 DbRetVal rv = sqlstmt->execute(rows);
196 if (rv != OK) {
197 *retval = 0;
198 strcpy(rpkt->errorString, "Execute failed");
199 return rpkt;
201 *retval = 1;
202 rpkt->rows = rows;
203 strcpy(rpkt->errorString, "Success");
204 return rpkt;
207 void * SqlNetworkHandler::processSqlFetch(PacketHeader &header, char *buffer)
209 ResponsePacket *rpkt = new ResponsePacket();
210 char *retval = (char *) &rpkt->retVal;
211 SqlPacketFetch *pkt = new SqlPacketFetch();
212 pkt->setBuffer(buffer);
213 pkt->unmarshall();
214 rpkt->stmtID = pkt->stmtID;
215 ListIterator stmtIter = stmtList.getIterator();
216 NetworkStmt *stmt;
217 while (stmtIter.hasElement())
219 stmt = (NetworkStmt*) stmtIter.nextElement();
220 //TODO::Also check teh srcNetworkID
221 if (stmt->stmtID == pkt->stmtID ) break;
223 AbsSqlStatement *sqlstmt = stmt->stmt;
224 void *data=NULL;
225 DbRetVal rv = OK;
226 if ((data = sqlstmt->fetch(rv)) != NULL && rv == OK) {
227 *retval = 1;
228 strcpy(rpkt->errorString, "Success");
229 return rpkt;
231 if (data == NULL && rv == OK) {
232 sqlstmt->close();
233 *retval = 1;
234 *(retval + 1) = 1;
235 strcpy(rpkt->errorString, "Success fetch completed");
236 return rpkt;
238 else {
239 *retval = 0;
240 strcpy(rpkt->errorString, "fetch completed");
241 return rpkt;
245 void * SqlNetworkHandler::processSqlFree(PacketHeader &header, char *buffer)
247 ResponsePacket *rpkt = new ResponsePacket();
248 char *retval = (char *) &rpkt->retVal;
249 SqlPacketFree *pkt = new SqlPacketFree();
250 pkt->setBuffer(buffer);
251 pkt->unmarshall();
252 rpkt->stmtID = pkt->stmtID;
253 ListIterator stmtIter = stmtList.getIterator();
254 NetworkStmt *stmt;
255 while (stmtIter.hasElement())
257 stmt = (NetworkStmt*) stmtIter.nextElement();
258 //TODO::Also check teh srcNetworkID
259 if (stmt->stmtID == pkt->stmtID ) break;
261 AbsSqlStatement *sqlstmt = stmt->stmt;
262 sqlstmt->free();
263 ListIterator itprm = stmt->paramList.getIterator();
264 BindSqlField *fld = NULL;
265 while((fld = (BindSqlField *) itprm.nextElement()) != NULL) delete fld;
266 stmt->paramList.reset();
267 ListIterator itprj = stmt->projList.getIterator();
268 BindSqlProjectField *pfld = NULL;
269 while((pfld = (BindSqlProjectField *) itprj.nextElement()) != NULL) delete pfld;
270 stmt->projList.reset();
271 delete stmt->stmt;
272 stmtList.remove(stmt);
273 delete stmt;
274 *retval = 1;
275 strcpy(rpkt->errorString, "Success");
276 return rpkt;
280 void * SqlNetworkHandler::processSqlCommit(PacketHeader &header, char *buffer)
282 ResponsePacket *rpkt = new ResponsePacket();
283 char *retval = (char *) &rpkt->retVal;
284 DbRetVal rv = conn->commit();
285 if (rv != OK) {
286 *retval = 0;
287 strcpy(rpkt->errorString, "Commit failure\n");
288 return rpkt;
290 rv = conn->beginTrans();
291 *retval = 1;
292 strcpy(rpkt->errorString, "Success");
293 return rpkt;
296 void *SqlNetworkHandler::processSqlRollback(PacketHeader &header, char *buffer)
298 ResponsePacket *rpkt = new ResponsePacket();
299 char *retval = (char *) &rpkt->retVal;
301 DbRetVal rv = conn->rollback();
302 if (rv != OK) {
303 *retval = 0;
304 strcpy(rpkt->errorString, "Rollback failure\n");
305 return rpkt;
307 rv = conn->beginTrans();
308 *retval = 1;
309 strcpy(rpkt->errorString, "Success");
310 return rpkt;
313 void *SqlNetworkHandler::processCommit(PacketHeader &header, char *buffer)
315 printDebug(DM_Network, "Processing COMMIT");
316 PacketCommit *pkt = new PacketCommit();
317 pkt->setBuffer(buffer);
318 pkt->setBufferSize(header.packetLength);
319 pkt->unmarshall();
320 List pktList;
321 pkt->getExecPacketList(stmtList, pktList);
322 DbRetVal rv = applyExecPackets(stmtList, pktList);
323 int response = 1;
324 if (rv != OK)
326 printError(ErrSysFatal, "Unable to apply the exec packets\n");
327 response =0;
329 return response;
332 void * SqlNetworkHandler::processFree(PacketHeader &header, char *buffer)
334 PacketFree *pkt = new PacketFree();
335 pkt->setBuffer(buffer);
336 pkt->setBufferSize(header.packetLength);
337 pkt->unmarshall();
338 //printf("FREE %d \n", pkt->stmtID);
339 int response =1;
340 //This wont work for two statement executed in same transaction using same SqlStatement object using free.
341 //so do not delete now and put a flag 'readyfordelete' in NetworkStmt object and delete it during execute
343 ListIterator iter = stmtList.getIterator();
344 NetworkStmt *stmt, *removeStmt = NULL;
345 while (iter.hasElement())
347 stmt = (NetworkStmt*)iter.nextElement();
348 if (stmt->srcNetworkID == header.srcNetworkID
349 && stmt->stmtID == pkt->stmtID)
351 removeStmt = stmt;
352 break;
355 if (removeStmt) stmtList.remove(removeStmt);
356 else printf("Statement id %d not found in list \n", pkt->stmtID);
358 return response;
360 void * SqlNetworkHandler::processPrepare(PacketHeader &header, char *buffer)
362 PacketPrepare *pkt = new PacketPrepare();
363 pkt->setBuffer(buffer);
364 pkt->setBufferSize(header.packetLength);
365 pkt->unmarshall();
366 printDebug(DM_Network, "PREPARE %d %s\n", pkt->stmtID, pkt->stmtString);
367 //for (int i =0 ; i < pkt->noParams; i++)
368 //printf("PREPARE type %d length %d \n", pkt->type[i], pkt->length[i]);
369 int response =1;
370 //TODO::add it to the SqlStatement list
371 AbsSqlStatement *sqlstmt = SqlFactory::createStatement(type);
372 sqlstmt->setConnection(conn);
373 NetworkStmt *nwStmt = new NetworkStmt();
374 printDebug(DM_Network, "Statement string %s\n", pkt->stmtString);
375 nwStmt->srcNetworkID = header.srcNetworkID;
376 nwStmt->stmtID = pkt->stmtID;
377 nwStmt->stmt = sqlstmt;
378 DbRetVal rv = sqlstmt->prepare(pkt->stmtString);
379 if (rv != OK)
381 printError(ErrSysInit, "statement prepare failed\n");
382 response = 0;
383 return response;
385 BindSqlField *bindField = NULL;
386 //populate paramList
387 for (int i = 0; i < pkt->noParams; i++)
389 bindField = new BindSqlField();
390 bindField->type = (DataType) pkt->type[i];
391 bindField->length = pkt->length[i];
392 bindField->value = AllDataType::alloc(bindField->type,
393 bindField->length);
394 nwStmt->paramList.append(bindField);
396 stmtList.append(nwStmt);
397 return response;
401 DbRetVal SqlNetworkHandler::applyExecPackets(List sList, List pList)
403 ListIterator stmtIter = sList.getIterator();
404 NetworkStmt *nwstmt;
405 DbRetVal rv = conn->beginTrans();
406 if (rv != OK) return rv;
407 ListIterator pktIter = pList.getIterator();
408 PacketExecute *pkt;
409 int i = 0;
410 BindSqlField *bindField;
411 while (pktIter.hasElement())
413 pkt = (PacketExecute*) pktIter.nextElement();
414 stmtIter.reset();
415 bool found = false;
416 while (stmtIter.hasElement())
418 nwstmt = (NetworkStmt*) stmtIter.nextElement();
419 if (nwstmt->stmtID == pkt->stmtID) {found = true ; break;}
421 if (!found) {
422 printf("stmt not found in list. Negleting unreplicated table...\n");
423 continue;
425 ListIterator paramIter = nwstmt->paramList.getIterator();
426 i = 0;
427 while (paramIter.hasElement()) {
428 bindField = (BindSqlField*) paramIter.nextElement();
429 setParamValues(nwstmt->stmt, i+1, bindField->type, bindField->length, pkt->paramValues[i]);
430 i++;
432 int rows= 0;
433 DbRetVal rv = nwstmt->stmt->execute(rows);
434 if (rv != OK )
436 printError(ErrSysFatal, "sql execute failed with rv %d\n", rv);
437 //TODO::log all things like SQL statements to a file
438 SqlNetworkHandler::conn->rollback();
439 printError(ErrPeerExecFailed, "Transaction Rolledback\n");
440 return ErrPeerExecFailed;
443 SqlNetworkHandler::conn->commit();
444 return OK;
447 void SqlNetworkHandler::setParamValues(AbsSqlStatement *stmt, int parampos, DataType type,
448 int length, char *value)
450 switch(type)
452 case typeInt:
453 stmt->setIntParam(parampos, *(int*)value);
454 break;
455 case typeLong:
456 stmt->setLongParam(parampos, *(long*)value);
457 break;
458 case typeLongLong:
459 stmt->setLongLongParam(parampos, *(long long*)value);
460 break;
461 case typeShort:
462 stmt->setShortParam(parampos, *(short*)value);
463 break;
464 case typeByteInt:
465 stmt->setByteIntParam(parampos, *(char*)value);
466 break;
467 case typeDouble:
468 stmt->setDoubleParam(parampos, *(double*)value);
469 break;
470 case typeFloat:
471 stmt->setFloatParam(parampos, *(float*)value);
472 break;
473 case typeDate:
474 stmt->setDateParam(parampos, *(Date*)value);
475 break;
476 case typeTime:
477 stmt->setTimeParam(parampos, *(Time*)value);
478 break;
479 case typeTimeStamp:
480 stmt->setTimeStampParam(parampos, *(TimeStamp*)value);
481 break;
482 case typeString:
484 char *d =(char*)value;
485 d[length-1] = '\0';
486 stmt->setStringParam(parampos, (char*)value);
487 break;
489 case typeBinary:
490 stmt->setBinaryParam(parampos, (char *) value);
491 break;
493 return;