adding statment length
[csql.git] / src / tools / csqlasyncserver.cxx
blob9d4e909e1e91ed7f38c5b3d714e6f300f859dc57
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <os.h>
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
28 #include <CSql.h>
29 #include <Network.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
33 class AbsCSqlQIterator
35 public:
36 virtual void *next() = 0;
39 class AbsCSqlQueue
41 public:
42 virtual void push(void *log, int len) = 0;
43 virtual void pop() = 0;
44 virtual int size() = 0;
47 class ListIter : public AbsCSqlQIterator
49 public:
50 ListIterator iter;
51 ListIter(List &list) { iter = list.getIterator(); }
52 void *next() { return iter.nextElementInQueue(); }
55 typedef struct FailedStmtObject {
56 int stmtId;
57 DbRetVal eType;
58 } FailStmt;
61 long long qIndex = 0;
63 class ListAsQueue : public AbsCSqlQueue
65 public:
66 List list;
67 ListAsQueue() {}
68 void push(void *log, int len)
70 int logSize = sizeof(long long) + len + sizeof(int) + sizeof(long);
71 char *logElem = (char *) malloc(os::align(logSize));
72 *(long long *) logElem = ++qIndex;
73 *(int*)(logElem + sizeof(long long))= len;
74 char *ptr = logElem + sizeof(int) + sizeof(long long);
75 memcpy(ptr, log, len+sizeof(long)); //long for msg type
76 list.append(logElem);
77 printDebug(DM_ReplServer, "Pushed Element: %x", logElem);
79 int size() { return list.size(); }
80 void pop(){};
84 class ThreadInputData
86 public:
87 AbsCSqlQIterator *qIter;
88 long long *indexPtr;
89 ThreadInputData() { qIter = NULL; indexPtr = NULL; }
92 void *startThread(void *p);
95 void printUsage()
97 printf("Usage: csqlasyncserver \n");
98 printf("Description: Start the csql Async server.\n");
99 return;
102 DbRetVal processMessage(void *str, int len, void *conn, void *hashBucketPtr,
103 SqlApiImplType flag, List *prepareFailList);
104 void *freeMsgFromQueue(void *p);
105 DbRetVal handlePrepare(void *str, void *conn, void *stmtBuckets,
106 SqlApiImplType flag, List *prepareFailList);
107 DbRetVal handleCommit(void *str, int len, void *conn, void *stmtBuckets,
108 List *prepareFailList);
109 DbRetVal handleFree(void *str, void *stmtBuckets, List *prepareFailList);
110 AbsSqlStatement *getStmtFromHashTable(int stmtId, void *stmtBuckets);
111 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets,
112 char *dsn);
114 int getHashBucket(int stmtid)
116 return (stmtid % STMT_BUCKET_SIZE);
119 AbsCSqlQueue *csqlQ = NULL;
121 int srvStop =0;
122 int msgKey = 0;
124 ThreadInputData **thrInput;
127 int asyncSites = 0;
128 int syncSites = 0;
129 long long * indexCountPtr = NULL;
130 pthread_t freeThrId = 0;
132 static void sigTermHandler(int sig)
134 printf("Received signal %d\nStopping the server\n", sig);
135 os::msgctl(msgKey, IPC_RMID, NULL);
136 srvStop = 1;
139 int main(int argc, char **argv)
141 int c = 0, opt = 0;
142 while ((c = getopt(argc, argv, "?")) != EOF) {
143 switch (c) {
144 case '?' : { opt = 10; break; } //print help
145 default: opt=10;
147 }//while options
148 if (opt == 10) { printUsage();
149 return 0;
152 os::signal(SIGINT, sigTermHandler);
153 os::signal(SIGTERM, sigTermHandler);
155 Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
156 if (( !Conf::config.useCache() &&
157 Conf::config.getCacheMode() == SYNC_MODE) ) {
158 printf("Replication server not started\n");
159 return 1;
162 bool found =false;
163 //printf("config id = %d\n", Conf::config.getSiteID());
165 if ((!Conf::config.useCache() &&
166 Conf::config.getCacheMode()==SYNC_MODE)) {
167 printf("There are no async sites\n");
168 return 4;
171 msgKey = os::msgget(Conf::config.getMsgKey(), 0666);
172 if (msgKey == -1) {
173 printf("Message Queue creation failed\n");
174 return 4;
177 csqlQ = new ListAsQueue();
178 ListIterator itr = ((ListAsQueue *)csqlQ)->list.getIterator();
180 if (Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) {
181 asyncSites++;
183 pthread_t *thrId =new pthread_t [asyncSites];
184 thrInput = (ThreadInputData **) malloc(sizeof(ThreadInputData *) * asyncSites);
185 indexCountPtr = (long long *) malloc(sizeof(long long) * asyncSites);
186 memset(indexCountPtr, 0, sizeof(long long) * asyncSites);
187 int i=0;
188 if(Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) {
189 thrInput[i] = new ThreadInputData();
190 thrInput[i]->qIter = NULL;
191 thrInput[i]->indexPtr = &indexCountPtr[i];
192 pthread_create(&thrId[i], NULL, &startThread, thrInput[i]);
193 i++;
195 pthread_create(&freeThrId, NULL, freeMsgFromQueue, NULL);
196 struct timeval timeout, tval;
197 timeout.tv_sec = 5;
198 timeout.tv_usec = 0;
199 int msgSize = Conf::config.getAsyncMsgMax();
200 char str[8192];
201 // printf("Replication Server Started");
202 while (!srvStop) {
203 tval.tv_sec = timeout.tv_sec;
204 tval.tv_usec = timeout.tv_usec;
205 os::select(0, 0, 0, 0, &tval);
206 printDebug(DM_ReplServer, "waiting for message");
207 while(true) {
208 long size = os::msgrcv(msgKey, str, msgSize, 0, 0666|IPC_NOWAIT);// process logs
209 printDebug(DM_ReplServer, "Received msg size = %d", size);
210 if (size == -1 || srvStop) break;
211 csqlQ->push(str, size); // long for mtype of msg
214 delete[] thrId;
215 printf("Replication Server Exiting\n");
216 return 0;
219 void *startThread(void *thrInfo)
221 DbRetVal rv = OK;
222 ThreadInputData *thrInput = (ThreadInputData *)thrInfo;
223 List prepareFailList;
224 SqlApiImplType flag;
225 flag = CSqlAdapter;
226 printDebug(DM_ReplServer, "SqlAdapter Thread created");
227 AbsCSqlQIterator *iter = thrInput->qIter;
228 while (1) { if (csqlQ->size()) break; }
229 iter = new ListIter(((ListAsQueue *)csqlQ)->list);
230 AbsSqlConnection *conn = SqlFactory::createConnection(flag);
231 void *stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket));
232 memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket));
233 printDebug(DM_ReplServer, "stmtbuckets: %x", stmtBuckets);
234 struct timeval timeout, tval;
235 timeout.tv_sec = 0;
236 while (1) {
237 while (1) {
238 rv = conn->connect(I_USER, I_PASS);
239 if (rv == OK) break;
240 printError(rv, "Unable to connect to peer site");
241 timeout.tv_sec = 10;
242 timeout.tv_usec = 0;
243 os::select(0, 0, 0, 0, &timeout);
245 while (1) {
246 void *msg = NULL;
247 while (1) {
248 msg = iter->next(); //receive msg from csqlQ
249 if (msg != NULL) break;
250 tval.tv_sec = 5;
251 tval.tv_usec = 1000;
252 os::select(0, 0, 0, 0, &tval);
254 long long index = *(long long *) msg;
255 int length = *(int *)((char *)msg+sizeof(long long));
256 char *msgptr = (char *)msg + sizeof(long long) + sizeof(int);
257 printDebug(DM_ReplServer, "entering process message");
258 rv = processMessage(msgptr, length, conn, stmtBuckets, flag,
259 &prepareFailList);
260 if (rv == ErrNoConnection) break;
261 printDebug(DM_ReplServer, "processed message");
262 *(long long *) thrInput->indexPtr = index;
263 printDebug(DM_ReplServer, "index %d is stored in Main index log array\n", index);
266 return NULL;
269 DbRetVal processMessage(void *str, int len, void *conn, void *stmtBuckets,
270 SqlApiImplType flag, List *prepareFailList)
272 long type = *(long *) str;
273 printDebug(DM_ReplServer, "type = %d\n", type);
274 char *data = (char *) str + sizeof(long);
275 if (type == 1) return handlePrepare(data, conn, stmtBuckets, flag,
276 prepareFailList);
277 else if (type == 2) return handleCommit(data, len, conn, stmtBuckets,
278 prepareFailList);
279 else if (type == 3) return handleFree(data, stmtBuckets, prepareFailList);
282 void *freeMsgFromQueue(void *indCntPtr)
284 long long minIndex = 0;
285 struct timeval timeout, tval;
286 timeout.tv_sec = 0;
287 printDebug(DM_ReplServer, "waiting for free the q elements");
288 while(1) {
289 if (csqlQ->size()) break;
290 /// printError(ErrWarning, "List is empty");
291 tval.tv_sec = 1;
292 tval.tv_usec = 1000;
293 os::select(0, 0, 0, 0, &tval);
295 AbsCSqlQIterator *iter = new ListIter(((ListAsQueue *)csqlQ)->list);
296 while (1) {
297 minIndex = indexCountPtr[0];
298 for (int i=0; i < asyncSites; i++) {
299 if (minIndex > indexCountPtr[i]) minIndex = indexCountPtr[i];
301 void *msg = NULL;
302 while (1) {
303 msg = iter->next(); //receive msg from csqlQ
304 if (msg == NULL) break;
305 if( *(long long *) msg <= minIndex) {
306 long long num = *(long long *) msg;
307 free (msg); msg = NULL;
308 } else break;
310 tval.tv_sec = 0;
311 tval.tv_usec = 100000;
312 os::select(0, 0, 0, 0, &tval);
314 return NULL;
317 DbRetVal handlePrepare(void *data, void *conn, void *stmtBuckets,
318 SqlApiImplType flag, List *prepareFailList)
320 DbRetVal rv = OK;
321 AbsSqlConnection *con = (AbsSqlConnection *)conn;
322 AbsSqlStatement *stmt = SqlFactory::createStatement(flag);
323 stmt->setConnection(con);
324 char *ptr = (char *) data;
325 int length = *(int *) ptr; ptr += sizeof(int);
326 int txnId = *(int *) ptr; ptr += sizeof(int);
327 int stmtId = *(int *) ptr; ptr += sizeof(int);
328 char *tblName = ptr; ptr += IDENTIFIER_LENGTH;
329 char *stmtstr = (char *)data + 3 * sizeof(int) + IDENTIFIER_LENGTH;
330 int i = 1;
332 unsigned int mode = TableConf::config.getTableMode(tblName);
333 bool isCached = TableConf::config.isTableCached(mode);
335 if ((flag == CSqlAdapter) && !isCached) {
336 FailStmt *fst = new FailStmt();
337 fst->stmtId = stmtId;
338 fst->eType = ErrNotCached;
339 prepareFailList->append(fst);
340 return OK;
343 printDebug(DM_ReplServer, "stmt str: %s", stmtstr);
344 rv = stmt->prepare(stmtstr);
345 if (rv != OK) {
346 FailStmt *fst = new FailStmt();
347 fst->stmtId = stmtId;
348 fst->eType = rv;
349 prepareFailList->append(fst);
350 return rv;
352 int bucketNo = getHashBucket(stmtId);
353 printDebug(DM_ReplServer, "PrepHdl: stmtBuckets: %x", stmtBuckets);
354 printDebug(DM_ReplServer, "PrepHdl: bucketno: %d", bucketNo);
355 StmtBucket *buck = (StmtBucket *) stmtBuckets;
356 StmtBucket *stmtBucket = &buck[bucketNo];
357 printDebug(DM_ReplServer, "PrepHdl: bucket addr: %x", stmtBucket);
358 StmtNode *node = new StmtNode();
359 printDebug(DM_ReplServer, "PredHdl: stmtNode addr: %x", node);
360 node->stmtId = stmtId;
361 node->stmt = stmt;
362 strcpy(node->stmtstr, stmtstr);
363 printDebug(DM_ReplServer, "PrepHdl: stmt id: %d stmt %x", node->stmtId, node->stmt);
364 stmtBucket->bucketList.append(node);
366 printDebug(DM_ReplServer, "returning from prepare");
367 return rv;
370 DbRetVal handleCommit(void *data, int len, void *conn, void *stmtBuckets,
371 List *prepareFailList)
373 DbRetVal rv = OK;
374 AbsSqlConnection *con = (AbsSqlConnection *)conn;
375 // get dsn if adapter to write into conflict resolution file
376 char *dsn = NULL;
377 SqlOdbcConnection *adCon = (SqlOdbcConnection *) con;
378 dsn = adCon->dsn;
379 char *ptr = (char *) data;
380 int datalen = *(int *) ptr; ptr += sizeof(int);
381 int txnId = *(int *) ptr; ptr += sizeof(int);
382 FailStmt *elem = NULL;
383 rv = con->beginTrans();
384 if (rv != OK) {
385 printError(rv, "Begin trans failed");
386 return rv;
388 while ((ptr - (char *)data) < len) {
389 int stmtId = *(int *)ptr;
390 ptr += sizeof(int);
391 AbsSqlStatement *stmt = getStmtFromHashTable(stmtId, stmtBuckets);
392 printDebug(DM_ReplServer, "commit: stmtId: %d", stmtId);
393 printDebug(DM_ReplServer, "commit: stmtbuckets: %x", stmtBuckets);
394 printDebug(DM_ReplServer, "commit: stmt: %x", stmt);
395 ExecType type = (ExecType) (*(int *) ptr);
396 ptr += sizeof(int);
397 if (type == SETPARAM) {
398 int parampos = *(int *) ptr;
399 ptr += sizeof(int);
400 DataType dataType = (DataType) ( *(int *) ptr);
401 ptr += sizeof(int);
402 int length = *(int *) ptr;
403 ptr += sizeof(int);
404 void *value = ptr;
405 ptr += length;
406 if (stmt != NULL)
407 SqlNetworkHandler::setParamValues(stmt, parampos, dataType,
408 length, (char *)value);
409 } else {
410 // start executing and committing for all active connections
411 int rows;
412 if (stmt != NULL) rv = stmt->execute(rows);
413 if (rv != OK) {
414 printError(rv, "Execute failed with return value %d", rv);
415 if (rv == ErrNoConnection) return rv;
416 else {
417 // write to conflict resolution file
418 writeToConfResFile(data, len, stmtBuckets, dsn);
419 con->rollback();
420 return OK;
423 if (stmt == NULL) {
424 ListIterator it = prepareFailList->getIterator();
425 bool found = false;
426 while (it.hasElement()) {
427 elem = (FailStmt *) it.nextElement();
428 if (elem->stmtId == stmtId) { found = true; break; }
430 if (! found) continue; // for local table
431 if ((elem->eType == ErrNotCached) ||
432 elem->eType == ErrNotExists)
433 continue;
434 else {
435 // write to conflict resolution file
436 writeToConfResFile(data, len, stmtBuckets, dsn);
437 con->rollback();
438 return OK;
443 rv = con->commit();
444 if (rv != OK) { printDebug(DM_ReplServer, "commit failed"); }
445 else { printDebug(DM_ReplServer, "commit passed"); }
446 return OK;
449 DbRetVal handleFree(void *data, void *stmtBuckets, List *prepareFailList)
451 DbRetVal rv = OK;
452 char *ptr = (char *) data;
453 int len = *(int *) ptr; ptr += sizeof(int);
454 int txnId = *(int *) ptr; ptr += sizeof(int);
455 int stmtId = *(int *)ptr;
456 AbsSqlStatement *stmt = getStmtFromHashTable(stmtId, stmtBuckets);
457 FailStmt *elem = NULL;
458 if (stmt == NULL) {
459 ListIterator failListIter = prepareFailList->getIterator();
460 while (failListIter.hasElement()) {
461 elem = (FailStmt *) failListIter.nextElement();
462 if (elem->stmtId == stmtId) break;
464 failListIter.reset();
465 prepareFailList->remove(elem);
466 return OK;
468 rv = stmt->free();
469 if (rv != OK) {
470 printError(rv, "HandleFree failed with return vlaue %d", rv);
471 return rv;
473 int bucketNo = getHashBucket(stmtId);
474 StmtBucket *buck = (StmtBucket *) stmtBuckets;
475 StmtBucket *stmtBucket = &buck[bucketNo];
476 StmtNode *node = NULL;
477 ListIterator it = stmtBucket->bucketList.getIterator();
478 while(it.hasElement()) {
479 node = (StmtNode *) it.nextElement();
480 if(stmtId == node->stmtId) break;
482 it.reset();
483 printDebug(DM_ReplServer, "GSFHT: node: %x", node);
484 printDebug(DM_ReplServer, "GSFHT: stmtId: %d", node->stmtId);
485 printDebug(DM_ReplServer, "GSFHT stmt: %x", node->stmt);
486 stmtBucket->bucketList.remove(node);
487 if (node->stmt) delete stmt;
488 delete node; node = NULL;
489 return OK;
492 AbsSqlStatement *getStmtFromHashTable(int stmtId, void *stmtBuckets)
494 int bucketNo = getHashBucket(stmtId);
495 printDebug(DM_ReplServer, "GSFHT: bucketNo: %d", bucketNo);
496 StmtBucket *buck = (StmtBucket *) stmtBuckets;
497 StmtBucket *stmtBucket = &buck[bucketNo];
498 printDebug(DM_ReplServer, "GSFHT: bucket: %x", stmtBucket);
499 StmtNode *node = NULL;
500 ListIterator it = stmtBucket->bucketList.getIterator();
501 while(it.hasElement()) {
502 node = (StmtNode *) it.nextElement();
503 printDebug(DM_ReplServer, "GSFHT: node: %x", node);
504 if(stmtId == node->stmtId) {
505 printDebug(DM_ReplServer, "GSFHT: stmtId: %d", node->stmtId);
506 printDebug(DM_ReplServer, "GSFHT stmt: %x", node->stmt);
507 return node->stmt;
510 return NULL;
513 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets,
514 char *dsn)
516 DbRetVal rv = OK;
517 bool isPrmStmt=false;
518 char confResFile[1024];
519 sprintf(confResFile, "%s", Conf::config.getConflResoFile());
520 int fd = open(confResFile, O_WRONLY|O_CREAT| O_APPEND, 0644);
521 if (fd < 0) {
522 printError(ErrOS, "Could not create conflict Resolution file");
523 return ErrOS;
525 char buffer[1024];
526 char paramStmtString[1024];
528 char *ptr = (char *) data;
529 int datalen = *(int *) ptr; ptr += sizeof(int);
530 int txnId = *(int *) ptr; ptr += sizeof(int);
531 strcpy(buffer, "SET AUTOCOMMIT OFF;\n");
532 int ret = os::write(fd, buffer, strlen(buffer));
533 if (ret != strlen(buffer)) {
534 printError(ErrOS, "Write error into conf resolution file");
535 return ErrOS;
537 bool first = true;
538 int counter = 0; // if at all the statement is parameterized
539 int nop = 0;
540 int pos = 0;
541 while ((ptr - (char *)data) < len) {
542 int stmtId = *(int *)ptr;
543 ptr += sizeof(int);
544 int bucketNo = getHashBucket(stmtId);
545 StmtBucket *buck = (StmtBucket *) stmtBuckets;
546 StmtBucket *stmtBucket = &buck[bucketNo];
547 StmtNode *node = NULL;
548 ListIterator it = stmtBucket->bucketList.getIterator();
549 while(it.hasElement()) {
550 node = (StmtNode *) it.nextElement();
551 if(stmtId == node->stmtId) break;
553 printf("DEBUG:node = %x\n", node);
554 ExecType type = (ExecType) (*(int *) ptr);
555 ptr += sizeof(int);
556 if (type == SETPARAM) {
557 isPrmStmt = true;
558 if (first) {
559 first = false;
560 sprintf(paramStmtString, "%s", node->stmtstr);
561 char *it = node->stmtstr;
563 int parampos = *(int *)ptr;
564 ptr += sizeof(int);
565 DataType dataType = (DataType) ( *(int *) ptr);
566 ptr += sizeof(int);
567 int length = *(int *) ptr;
568 ptr += sizeof(int);
569 void *value = ptr;
570 ptr += length;
571 char * it = paramStmtString;
572 int prntdChars = 0;
574 while (*it != '\0') {
575 if (*it == '?') {
576 pos++;
577 if(pos != parampos) { it++; continue; }
578 else {
579 *it++ = ' ';
580 strcpy(buffer,it);
581 switch (dataType) {
582 case typeString: case typeBinary: case typeDate:
583 case typeTime: case typeTimeStamp:
584 *it++ = '\'';
585 AllDataType::convertToString(it, value, dataType, length);
586 prntdChars = AllDataType::printVal(value, dataType,length);
587 it += prntdChars;
588 *it++ = '\'';
589 break;
590 default:
591 AllDataType::convertToString(it, value, dataType, length);
592 prntdChars = AllDataType::printVal(value, dataType,length);
593 it += prntdChars;
596 sprintf(it, " %s", buffer);
597 //strcpy(buffer, paramStmtString);
598 break;
600 } else { it++; }
602 } else {
603 if (!isPrmStmt) {
604 sprintf(buffer, "%s", node->stmtstr);
605 buffer[strlen(buffer)] = '\n';
606 ret = os::write(fd, buffer, strlen(node->stmtstr)+1);
607 if(ret != strlen(node->stmtstr)+1) {
608 printError(ErrOS, "Write error into conf resolution file");
609 return ErrOS;
611 } else {
612 strcpy(buffer, paramStmtString);
613 isPrmStmt = false;
614 first = true;
615 pos = 0;
616 int strlength = strlen(buffer);
617 buffer[strlen(buffer)] = '\n';
618 ret = os::write(fd, buffer, strlength+1);
619 if(ret != strlength+1) {
620 printError(ErrOS, "Write error into conf resolution file");
621 return ErrOS;
626 strcpy(buffer, "COMMIT;\n\n");
627 ret = os::write(fd, buffer, strlen(buffer));
628 if(ret != strlen(buffer)) {
629 printError(ErrOS, "Write error into conf resolution file");
630 return ErrOS;
632 close(fd);