reverting back the changes. causes core dump
[csql.git] / src / tools / csqlasyncserver.cxx
blob834bf7c9954fd30b4fd47be7a0fb84d6df134fbc
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <os.h>
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
28 #include <CSql.h>
29 #include <Network.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
33 typedef struct FailedStmtObject {
34 int stmtId;
35 DbRetVal eType;
36 } FailStmt;
38 typedef struct item ITEM;
40 struct item
42 ITEM *next;
43 void *data;
46 // please dont touch the following code for queIterator
47 typedef class queueIterator
49 ITEM *head;
50 ITEM *iter;
51 ITEM *processed;
52 public:
53 queueIterator(ITEM *hd) { head = iter = hd; processed = NULL; }
54 inline void *next(ITEM *hd)
56 if (head == NULL) { head = iter = hd; }
57 if (head != hd) head = hd;
58 ITEM *elem = iter;
59 if (iter == NULL && processed) {
60 if (processed->next != NULL) {
61 processed = processed->next;
62 iter = processed;
63 elem = iter;
64 } else { return NULL; }
66 processed = elem;
67 printDebug(DM_CacheServer, "Processed ITEM: %X", processed);
68 iter = iter->next;
69 return &elem->data;
71 } QITER;
73 class queue
75 ITEM *head;
76 int nItems;
77 // array of msg indexes processed, First index for first thread and so on.
78 // As and when the msg is read from the queue by each of the threads
79 // respcective slot is filled with that index
80 long long *processedMsgIndexArray;
81 long long minProcessedMsgIndex;
82 long long qIndex;
83 long long lastFreedIndex;
84 QITER **qIter;
85 Mutex qMutex;
86 public:
87 queue(int asySites)
89 nItems = 0; head = NULL; processedMsgIndexArray = NULL;
90 qIndex = 0; qIter = NULL; minProcessedMsgIndex = 0;
91 lastFreedIndex = 0;
92 qMutex.init();
93 int size = sizeof (long long) * asySites;
94 processedMsgIndexArray = (long long *) malloc(size);
95 memset(processedMsgIndexArray, 0, size);
96 qIter = (QITER **) malloc(sizeof (QITER *) * asySites);
97 for (int i = 0; i < asySites; i++) qIter[i] = new QITER(head);
99 ~queue() {}
100 int push(void *log, int len)
102 // log includes size of long (msgType) + size of (Msg data);
103 // 2nd parameter len is the size of (Msg data) excluding size of long
105 // long long for Msg Index
106 // int for size of the msg data
107 // long for msgType
108 // len bytes for msg data
109 int logSize = sizeof(long long) + sizeof(int) + sizeof(long) + len;
110 ITEM *item = (ITEM *) malloc(sizeof(ITEM) - sizeof(void *)
111 + os::align(logSize));
112 item->next = NULL;
113 char *ptr = (char *) &item->data;
114 *(long long *) ptr = ++qIndex; ptr += sizeof (long long);
115 *(int*) ptr= len; ptr += sizeof (int);
116 int sizeOfMsg = len + sizeof(long);
117 memcpy(ptr, log, sizeOfMsg);
118 if (head == NULL) { nItems++; head = item; return 0; }
119 ITEM *p = head;
120 while (p->next != NULL) p = p->next;
121 p->next = item;
122 nItems++;
123 return 0;
125 int size() { return nItems; }
126 void *readNextMessage(int thrIndex)
128 if (head == NULL) return NULL;
129 else return qIter[thrIndex]->next(head);
131 inline void updateProcessedIndex(int thrInd, int processedIndex)
133 processedMsgIndexArray[thrInd] = processedIndex;
135 inline long long findMinIndexForFree(int asySites)
137 long long minIndex = processedMsgIndexArray[0];
138 for (int i=1; i < asySites; i++) {
139 if (minIndex > processedMsgIndexArray[i]) {
140 minIndex = processedMsgIndexArray[i];
143 return minIndex-1;
145 void freeMessagesFromQueue(int asySites)
147 long long minIndex = findMinIndexForFree(asySites);
148 if (minIndex <= lastFreedIndex) return;
149 ITEM *elem = head;
150 ITEM *freeFrom = head;
151 ITEM *freeUptoThis = NULL;
152 long long ind = 0;
153 while (elem != NULL) {
154 ind = * (long long *) &elem->data;
155 if (ind == minIndex) {
156 freeUptoThis = elem;
157 head = elem->next;
158 break;
160 elem = elem->next;
162 ITEM *toFree = elem = freeFrom;
163 while (elem != freeUptoThis) {
164 toFree = elem;
165 elem = elem->next;
166 if (toFree) { ::free(toFree); nItems--; }
167 printDebug(DM_CacheServer, "FREED %X", toFree);
169 if (elem) { ::free(elem); nItems--; }
170 printDebug(DM_CacheServer, "FREED %X", elem);
171 lastFreedIndex = minIndex;
175 typedef class queue QUE;
177 class ThreadInputData
179 public:
180 int thrIndex;
181 ThreadInputData() { thrIndex = 0; }
184 void *startThread(void *p);
186 void printUsage()
188 printf("Usage: csqlasyncserver \n");
189 printf("Description: Start the csql Async server.\n");
190 return;
193 DbRetVal processMessage(void *str, int len, void *conn, void *hashBucketPtr,
194 SqlApiImplType flag, List *prepareFailList);
195 void *freeMsgFromQueue(void *p);
196 DbRetVal handlePrepare(void *str, void *conn, void *stmtBuckets,
197 SqlApiImplType flag, List *prepareFailList);
198 DbRetVal handleCommit(void *str, int len, void *conn, void *stmtBuckets,
199 List *prepareFailList);
200 DbRetVal handleFree(void *str, void *stmtBuckets, List *prepareFailList);
201 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, char *dsn);
203 //Globals
204 QUE *que = NULL;
205 int srvStop =0;
206 int msgKey = 0;
207 ThreadInputData **thrInput;
208 pthread_t freeThrId = 0;
210 static void sigTermHandler(int sig)
212 printf("Received signal %d\nStopping the server\n", sig);
213 os::msgctl(msgKey, IPC_RMID, NULL);
214 srvStop = 1;
217 int main(int argc, char **argv)
219 int c = 0, opt = 0;
220 while ((c = getopt(argc, argv, "?")) != EOF) {
221 switch (c) {
222 case '?' : { opt = 10; break; } //print help
223 default: opt=10;
225 }//while options
227 if (opt == 10) { printUsage(); return 0; }
229 os::signal(SIGINT, sigTermHandler);
230 os::signal(SIGTERM, sigTermHandler);
232 Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
233 msgKey = os::msgget(Conf::config.getMsgKey(), 0666);
234 if (msgKey == -1) {
235 printf("Message Queue creation failed\n");
236 return 4;
239 //Only single cache async updation is supported hence hard coded.
240 int asyncSites = 1;
242 // Create and Initialize repl server queue
243 que = new queue(asyncSites);
245 pthread_t *thrId =new pthread_t [asyncSites];
246 int thrInfoSize = sizeof(ThreadInputData *) * asyncSites;
247 thrInput = (ThreadInputData **) malloc(thrInfoSize);
249 int i=0;
250 if(Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) {
251 thrInput[i] = new ThreadInputData();
252 thrInput[i]->thrIndex = i;
253 pthread_create(&thrId[i], NULL, &startThread, thrInput[i]);
254 i++;
256 pthread_create(&freeThrId, NULL, freeMsgFromQueue, (void *) asyncSites);
257 struct timeval timeout;
258 int msgSize = Conf::config.getAsyncMsgMax();
259 char str[8192];
261 while (!srvStop) {
262 timeout.tv_sec = 5;
263 timeout.tv_usec = 0;
264 os::select(0, 0, 0, 0, &timeout);
265 printDebug(DM_CacheServer, "waiting for message");
266 while(true) {
267 // pick messages from message que with key msgKey
268 long size = os::msgrcv(msgKey, str, msgSize, 0, 0666|IPC_NOWAIT);
269 printDebug(DM_CacheServer, "Received msg size = %d", size);
270 if (size == -1 || srvStop) break;
271 // push the received msg to the repl server queue
272 que->push(str, size);
275 delete[] thrId;
276 printf("Replication Server Exiting\n");
277 return 0;
280 void *startThread(void *thrInfo)
282 DbRetVal rv = OK;
283 DbRetVal proMsgRetVal = OK;
284 void *msg=NULL;
285 ThreadInputData *thrInput = (ThreadInputData *)thrInfo;
286 List prepareFailList;
287 SqlApiImplType flag = CSqlAdapter;
288 int thrInd = thrInput->thrIndex;
289 printDebug(DM_CacheServer, "SqlAdapter Thread created");
290 AbsSqlConnection *conn = SqlFactory::createConnection(flag);
292 void *stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket));
293 memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket));
294 printDebug(DM_CacheServer, "stmtbuckets: %x", stmtBuckets);
296 struct timeval timeout, tval;
297 while (1) {
298 while (1) {
299 rv = conn->connect(I_USER, I_PASS);
300 if (rv == OK) break;
301 printError(rv, "Unable to connect to peer site");
302 timeout.tv_sec = 10;
303 timeout.tv_usec = 0;
304 os::select(0, 0, 0, 0, &timeout);
306 while (1) {
307 while (1) {
308 if (proMsgRetVal != ErrNoConnection) {
309 msg = NULL;
310 msg = que->readNextMessage(thrInd);
312 if (msg != NULL) break;
313 tval.tv_sec = 5;
314 tval.tv_usec = 1000;
315 os::select(0, 0, 0, 0, &tval);
317 long long index = *(long long *) msg;
318 printDebug(DM_CacheServer, "Received message with index: %lld",
319 index);
320 int length = *(int *)((char *)msg+sizeof(long long));
321 char *msgptr = (char *)msg + sizeof(long long) + sizeof(int);
322 printDebug(DM_CacheServer, "entering process message");
323 proMsgRetVal = processMessage(msgptr, length, conn, stmtBuckets,
324 flag, &prepareFailList);
325 if (proMsgRetVal == ErrNoConnection) break;
326 printDebug(DM_CacheServer, "Processed message with index: %lld",
327 index);
328 //store processed index in the processed index array
329 que->updateProcessedIndex(thrInd, index);
330 printDebug(DM_CacheServer, "Updated processed index %lld", index);
333 return NULL;
336 DbRetVal processMessage(void *str, int len, void *conn, void *stmtBuckets,
337 SqlApiImplType flag, List *prepareFailList)
339 long type = *(long *) str;
340 printDebug(DM_CacheServer, "type = %d\n", type);
341 char *data = (char *) str + sizeof(long);
342 if (type == 1) return handlePrepare(data, conn, stmtBuckets, flag,
343 prepareFailList);
344 else if (type == 2) return handleCommit(data, len, conn, stmtBuckets,
345 prepareFailList);
346 else if (type == 3) return handleFree(data, stmtBuckets, prepareFailList);
349 void *freeMsgFromQueue(void *nAsync)
351 int asySites = (int)nAsync;
352 struct timeval tval;
353 printDebug(DM_CacheServer, "Waiting for free the q elements");
354 while (1) {
355 que->freeMessagesFromQueue(asySites);
356 tval.tv_sec = 5;
357 tval.tv_usec = 0;
358 os::select(0, 0, 0, 0, &tval);
360 return NULL;
363 DbRetVal handlePrepare(void *data, void *conn, void *stmtBuckets,
364 SqlApiImplType flag, List *prepareFailList)
366 DbRetVal rv = OK;
367 AbsSqlConnection *con = (AbsSqlConnection *)conn;
368 AbsSqlStatement *stmt = SqlFactory::createStatement(flag);
369 stmt->setConnection(con);
370 char *ptr = (char *) data;
371 int length = *(int *) ptr; ptr += sizeof(int);
372 int txnId = *(int *) ptr; ptr += sizeof(int);
373 int stmtId = *(int *) ptr; ptr += sizeof(int);
374 char *tblName = ptr; ptr += IDENTIFIER_LENGTH;
375 char *stmtstr = (char *)data + 3 * sizeof(int) + IDENTIFIER_LENGTH;
376 int i = 1;
378 unsigned int mode = TableConf::config.getTableMode(tblName);
379 bool isCached = TableConf::config.isTableCached(mode);
381 if ((flag == CSqlAdapter) && !isCached) {
382 FailStmt *fst = new FailStmt();
383 fst->stmtId = stmtId;
384 fst->eType = ErrNotCached;
385 prepareFailList->append(fst);
386 return OK;
389 printDebug(DM_CacheServer, "stmt str: %s", stmtstr);
390 rv = stmt->prepare(stmtstr);
391 if (rv != OK) {
392 FailStmt *fst = new FailStmt();
393 fst->stmtId = stmtId;
394 fst->eType = rv;
395 prepareFailList->append(fst);
396 return rv;
398 SqlStatement::addToHashTable(stmtId, stmt, stmtBuckets, stmtstr);
399 printDebug(DM_CacheServer, "returning from prepare");
400 return rv;
403 DbRetVal handleCommit(void *data, int len, void *conn, void *stmtBuckets,
404 List *prepareFailList)
406 DbRetVal rv = OK;
407 AbsSqlConnection *con = (AbsSqlConnection *)conn;
408 // get dsn if adapter to write into conflict resolution file
409 char *dsstring = NULL;
410 SqlOdbcConnection *adCon = (SqlOdbcConnection *) con;
411 dsstring = adCon->dsString;
412 char *ptr = (char *) data;
413 int datalen = *(int *) ptr; ptr += sizeof(int);
414 int txnId = *(int *) ptr; ptr += sizeof(int);
415 FailStmt *elem = NULL;
416 rv = con->beginTrans();
417 if (rv != OK) {
418 printError(rv, "Begin trans failed");
419 return rv;
421 while ((ptr - (char *)data) < len) {
422 int stmtId = *(int *)ptr;
423 ptr += sizeof(int);
424 AbsSqlStatement *stmt = SqlStatement::getStmtFromHashTable(stmtId,
425 stmtBuckets);
426 printDebug(DM_CacheServer, "commit: stmtId: %d", stmtId);
427 printDebug(DM_CacheServer, "commit: stmtbuckets: %x", stmtBuckets);
428 printDebug(DM_CacheServer, "commit: stmt: %x", stmt);
429 ExecType type = (ExecType) (*(int *) ptr);
430 ptr += sizeof(int);
431 if (type == SETPARAM) {
432 int parampos = *(int *) ptr;
433 ptr += sizeof(int);
434 int isNull = *(int *) ptr;
435 if (isNull == 0) {
436 DataType dataType = (DataType) ( *(int *) ptr);
437 ptr += sizeof(int);
438 int length = *(int *) ptr;
439 ptr += sizeof(int);
440 void *value = ptr;
441 ptr += length;
442 if (stmt != NULL)
443 SqlStatement::setParamValues(stmt, parampos,
444 dataType, length, (char *)value);
445 } else { if (stmt != NULL) stmt->setNull(parampos); }
446 } else {
447 // start executing and committing for all active connections
448 int rows;
449 if (stmt != NULL) rv = stmt->execute(rows);
450 if (rv != OK) {
451 printError(rv, "Execute failed with return value %d", rv);
452 if (rv == ErrNoConnection) return rv;
453 else {
454 // write to conflict resolution file
455 writeToConfResFile(data, len, stmtBuckets, dsstring);
456 con->rollback();
457 return OK;
460 if (stmt == NULL) {
461 ListIterator it = prepareFailList->getIterator();
462 bool found = false;
463 while (it.hasElement()) {
464 elem = (FailStmt *) it.nextElement();
465 if (elem->stmtId == stmtId) { found = true; break; }
467 if (! found) continue; // for local table
468 if ((elem->eType == ErrNotCached) ||
469 elem->eType == ErrNotExists)
470 continue;
471 else {
472 // write to conflict resolution file
473 writeToConfResFile(data, len, stmtBuckets, dsstring);
474 con->rollback();
475 return OK;
480 rv = con->commit();
481 if (rv != OK) { printDebug(DM_CacheServer, "commit failed"); }
482 else { printDebug(DM_CacheServer, "commit passed"); }
483 return OK;
486 DbRetVal handleFree(void *data, void *stmtBuckets, List *prepareFailList)
488 DbRetVal rv = OK;
489 char *ptr = (char *) data;
490 int len = *(int *) ptr; ptr += sizeof(int);
491 int txnId = *(int *) ptr; ptr += sizeof(int);
492 int stmtId = *(int *)ptr;
493 AbsSqlStatement *stmt = SqlStatement::getStmtFromHashTable(stmtId,
494 stmtBuckets);
495 FailStmt *elem = NULL;
496 if (stmt == NULL) {
497 ListIterator failListIter = prepareFailList->getIterator();
498 while (failListIter.hasElement()) {
499 elem = (FailStmt *) failListIter.nextElement();
500 if (elem->stmtId == stmtId) break;
502 failListIter.reset();
503 prepareFailList->remove(elem);
504 return OK;
506 rv = stmt->free();
507 if (rv != OK) {
508 printError(rv, "HandleFree failed with return vlaue %d", rv);
509 return rv;
511 SqlStatement::removeFromHashTable(stmtId, stmtBuckets);
512 printDebug(DM_CacheServer, "Freed the statement from hashTable");
513 return OK;
516 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, char *dsn)
518 DbRetVal rv = OK;
519 bool isPrmStmt=false;
520 char confResFile[1024];
521 sprintf(confResFile, "%s", Conf::config.getConflResoFile());
522 int fd = open(confResFile, O_WRONLY|O_CREAT| O_APPEND, 0644);
523 if (fd < 0) {
524 printError(ErrOS, "Could not create conflict Resolution file");
525 return ErrOS;
527 char buffer[1024];
528 char paramStmtString[1024];
530 char *ptr = (char *) data;
531 int datalen = *(int *) ptr; ptr += sizeof(int);
532 int txnId = *(int *) ptr; ptr += sizeof(int);
533 strcpy(buffer, "SET AUTOCOMMIT OFF;\n");
534 int ret = os::write(fd, buffer, strlen(buffer));
535 if (ret != strlen(buffer)) {
536 printError(ErrOS, "Write error into conf resolution file");
537 return ErrOS;
539 bool first = true;
540 int counter = 0; // if at all the statement is parameterized
541 int nop = 0;
542 int pos = 0;
543 while ((ptr - (char *)data) < len) {
544 int stmtId = *(int *)ptr;
545 ptr += sizeof(int);
546 int bucketNo = stmtId % STMT_BUCKET_SIZE;
547 StmtBucket *buck = (StmtBucket *) stmtBuckets;
548 StmtBucket *stmtBucket = &buck[bucketNo];
549 StmtNode *node = NULL;
550 ListIterator it = stmtBucket->bucketList.getIterator();
551 while(it.hasElement()) {
552 node = (StmtNode *) it.nextElement();
553 if(stmtId == node->stmtId) break;
555 printf("DEBUG:node = %x\n", node);
556 ExecType type = (ExecType) (*(int *) ptr);
557 ptr += sizeof(int);
558 if (type == SETPARAM) {
559 isPrmStmt = true;
560 if (first) {
561 first = false;
562 sprintf(paramStmtString, "%s", node->stmtstr);
563 char *it = node->stmtstr;
565 int parampos = *(int *)ptr;
566 ptr += sizeof(int);
567 DataType dataType = (DataType) ( *(int *) ptr);
568 ptr += sizeof(int);
569 int length = *(int *) ptr;
570 ptr += sizeof(int);
571 void *value = ptr;
572 ptr += length;
573 char * it = paramStmtString;
574 int prntdChars = 0;
576 while (*it != '\0') {
577 if (*it == '?') {
578 pos++;
579 if(pos != parampos) { it++; continue; }
580 else {
581 *it++ = ' ';
582 strcpy(buffer,it);
583 switch (dataType) {
584 case typeString: case typeBinary: case typeDate:
585 case typeTime: case typeTimeStamp:
586 *it++ = '\'';
587 AllDataType::convertToString(it, value, dataType, length);
588 prntdChars = AllDataType::printVal(value, dataType,length);
589 it += prntdChars;
590 *it++ = '\'';
591 break;
592 default:
593 AllDataType::convertToString(it, value, dataType, length);
594 prntdChars = AllDataType::printVal(value, dataType,length);
595 it += prntdChars;
598 sprintf(it, " %s", buffer);
599 //strcpy(buffer, paramStmtString);
600 break;
602 } else { it++; }
604 } else {
605 if (!isPrmStmt) {
606 sprintf(buffer, "%s", node->stmtstr);
607 buffer[strlen(buffer)] = '\n';
608 ret = os::write(fd, buffer, strlen(node->stmtstr)+1);
609 if(ret != strlen(node->stmtstr)+1) {
610 printError(ErrOS, "Write error into conf resolution file");
611 return ErrOS;
613 } else {
614 strcpy(buffer, paramStmtString);
615 isPrmStmt = false;
616 first = true;
617 pos = 0;
618 int strlength = strlen(buffer);
619 buffer[strlen(buffer)] = '\n';
620 ret = os::write(fd, buffer, strlength+1);
621 if(ret != strlength+1) {
622 printError(ErrOS, "Write error into conf resolution file");
623 return ErrOS;
628 strcpy(buffer, "COMMIT;\n\n");
629 ret = os::write(fd, buffer, strlen(buffer));
630 if(ret != strlen(buffer)) {
631 printError(ErrOS, "Write error into conf resolution file");
632 return ErrOS;
634 close(fd);