code reorg
[csql.git] / src / tools / csqlasyncserver.cxx
blob56b6d30e3aa97904e19e28c86d76b19973eacaa5
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <os.h>
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
28 #include <CSql.h>
29 #include <Network.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
32 #include <Recover.h>
34 typedef struct FailedStmtObject {
35 int stmtId;
36 DbRetVal eType;
37 } FailStmt;
39 typedef struct item ITEM;
41 struct item
43 ITEM *next;
44 void *data;
47 // please dont touch the following code for queIterator
48 typedef class queueIterator
50 ITEM *head;
51 ITEM *iter;
52 ITEM *processed;
53 public:
54 queueIterator(ITEM *hd) { head = iter = hd; processed = NULL; }
55 inline void *next(ITEM *hd)
57 if (head == NULL) { head = iter = hd; }
58 if (head != hd) head = hd;
59 ITEM *elem = iter;
60 if (iter == NULL && processed) {
61 if (processed->next != NULL) {
62 processed = processed->next;
63 iter = processed;
64 elem = iter;
65 } else { return NULL; }
67 processed = elem;
68 printDebug(DM_CacheServer, "Processed ITEM: %X", processed);
69 iter = iter->next;
70 return &elem->data;
72 } QITER;
74 class queue
76 ITEM *head;
77 int nItems;
78 // array of msg indexes processed, First index for first thread and so on.
79 // As and when the msg is read from the queue by each of the threads
80 // respcective slot is filled with that index
81 long long *processedMsgIndexArray;
82 long long minProcessedMsgIndex;
83 long long qIndex;
84 long long lastFreedIndex;
85 QITER **qIter;
86 Mutex qMutex;
87 public:
88 queue(int asySites)
90 nItems = 0; head = NULL; processedMsgIndexArray = NULL;
91 qIndex = 0; qIter = NULL; minProcessedMsgIndex = 0;
92 lastFreedIndex = 0;
93 qMutex.init("Q");
94 int size = sizeof (long long) * asySites;
95 processedMsgIndexArray = (long long *) malloc(size);
96 memset(processedMsgIndexArray, 0, size);
97 qIter = (QITER **) malloc(sizeof (QITER *) * asySites);
98 for (int i = 0; i < asySites; i++) qIter[i] = new QITER(head);
100 ~queue() {}
101 int push(void *log, int len)
103 // log includes size of long (msgType) + size of (Msg data);
104 // 2nd parameter len is the size of (Msg data) excluding size of long
106 // long long for Msg Index
107 // int for size of the msg data
108 // long for msgType
109 // len bytes for msg data
110 int logSize = sizeof(long long) + sizeof(int) + sizeof(long) + len;
111 ITEM *item = (ITEM *) malloc(sizeof(ITEM) - sizeof(void *)
112 + os::align(logSize));
113 item->next = NULL;
114 char *ptr = (char *) &item->data;
115 *(long long *) ptr = ++qIndex; ptr += sizeof (long long);
116 *(int*) ptr= len; ptr += sizeof (int);
117 int sizeOfMsg = len + sizeof(long);
118 memcpy(ptr, log, sizeOfMsg);
119 if (head == NULL) { nItems++; head = item; return 0; }
120 ITEM *p = head;
121 while (p->next != NULL) p = p->next;
122 p->next = item;
123 nItems++;
124 return 0;
126 int size() { return nItems; }
127 void *readNextMessage(int thrIndex)
129 if (head == NULL) return NULL;
130 else return qIter[thrIndex]->next(head);
132 inline void updateProcessedIndex(int thrInd, int processedIndex)
134 processedMsgIndexArray[thrInd] = processedIndex;
136 inline long long findMinIndexForFree(int asySites)
138 long long minIndex = processedMsgIndexArray[0];
139 for (int i=1; i < asySites; i++) {
140 if (minIndex > processedMsgIndexArray[i]) {
141 minIndex = processedMsgIndexArray[i];
144 return minIndex-1;
146 void freeMessagesFromQueue(int asySites)
148 long long minIndex = findMinIndexForFree(asySites);
149 if (minIndex <= lastFreedIndex) return;
150 ITEM *elem = head;
151 ITEM *freeFrom = head;
152 ITEM *freeUptoThis = NULL;
153 long long ind = 0;
154 while (elem != NULL) {
155 ind = * (long long *) &elem->data;
156 if (ind == minIndex) {
157 freeUptoThis = elem;
158 head = elem->next;
159 break;
161 elem = elem->next;
163 ITEM *toFree = elem = freeFrom;
164 while (elem != freeUptoThis) {
165 toFree = elem;
166 elem = elem->next;
167 if (toFree) { ::free(toFree); nItems--; }
168 printDebug(DM_CacheServer, "FREED %X", toFree);
170 if (elem) { ::free(elem); nItems--; }
171 printDebug(DM_CacheServer, "FREED %X", elem);
172 lastFreedIndex = minIndex;
176 typedef class queue QUE;
178 class ThreadInputData
180 public:
181 int thrIndex;
182 ThreadInputData() { thrIndex = 0; }
185 void *startThread(void *p);
187 void printUsage()
189 printf("Usage: csqlasyncserver \n");
190 printf("Description: Start the csql Async server.\n");
191 return;
194 DbRetVal processMessage(void *str, int len, void *conn, void *hashBucketPtr,
195 SqlApiImplType flag, List *prepareFailList);
196 void *freeMsgFromQueue(void *p);
197 DbRetVal handlePrepare(void *str, void *conn, void *stmtBuckets,
198 SqlApiImplType flag, List *prepareFailList);
199 DbRetVal handleCommit(void *str, int len, void *conn, void *stmtBuckets,
200 List *prepareFailList);
201 DbRetVal handleFree(void *str, void *stmtBuckets, List *prepareFailList);
202 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, char *dsn);
204 //Globals
205 QUE *que = NULL;
206 int srvStop =0;
207 int msgKey = 0;
208 ThreadInputData **thrInput;
209 pthread_t freeThrId = 0;
211 static void sigTermHandler(int sig)
213 printf("Received signal %d\nStopping the server\n", sig);
214 os::msgctl(msgKey, IPC_RMID, NULL);
215 srvStop = 1;
218 int main(int argc, char **argv)
220 int c = 0, opt = 0;
221 while ((c = getopt(argc, argv, "?")) != EOF) {
222 switch (c) {
223 case '?' : { opt = 10; break; } //print help
224 default: opt=10;
226 }//while options
228 if (opt == 10) { printUsage(); return 0; }
230 os::signal(SIGINT, sigTermHandler);
231 os::signal(SIGTERM, sigTermHandler);
233 Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
234 msgKey = os::msgget(Conf::config.getMsgKey(), 0666);
235 if (msgKey == -1) {
236 printf("Message Queue creation failed\n");
237 return 4;
240 //Only single cache async updation is supported hence hard coded.
241 int asyncSites = 1;
243 // Create and Initialize repl server queue
244 que = new queue(asyncSites);
246 pthread_t *thrId =new pthread_t [asyncSites];
247 int thrInfoSize = sizeof(ThreadInputData *) * asyncSites;
248 thrInput = (ThreadInputData **) malloc(thrInfoSize);
250 int i=0;
251 if(Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE) {
252 thrInput[i] = new ThreadInputData();
253 thrInput[i]->thrIndex = i;
254 pthread_create(&thrId[i], NULL, &startThread, thrInput[i]);
255 i++;
257 pthread_create(&freeThrId, NULL, freeMsgFromQueue, (void *) asyncSites);
258 struct timeval timeout;
259 int msgSize = Conf::config.getAsyncMsgMax();
260 char str[8192];
262 while (!srvStop) {
263 timeout.tv_sec = 5;
264 timeout.tv_usec = 0;
265 os::select(0, 0, 0, 0, &timeout);
266 printDebug(DM_CacheServer, "waiting for message");
267 while(true) {
268 // pick messages from message que with key msgKey
269 long size = os::msgrcv(msgKey, str, msgSize, 0, 0666|IPC_NOWAIT);
270 printDebug(DM_CacheServer, "Received msg size = %d", size);
271 if (size == -1 || srvStop) break;
272 // push the received msg to the repl server queue
273 que->push(str, size);
276 delete[] thrId;
277 printf("Replication Server Exiting\n");
278 return 0;
281 void *startThread(void *thrInfo)
283 DbRetVal rv = OK;
284 DbRetVal proMsgRetVal = OK;
285 void *msg=NULL;
286 ThreadInputData *thrInput = (ThreadInputData *)thrInfo;
287 List prepareFailList;
288 SqlApiImplType flag = CSqlAdapter;
289 int thrInd = thrInput->thrIndex;
290 printDebug(DM_CacheServer, "SqlAdapter Thread created");
291 AbsSqlConnection *conn = SqlFactory::createConnection(flag);
293 void *stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket));
294 memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket));
295 printDebug(DM_CacheServer, "stmtbuckets: %x", stmtBuckets);
297 struct timeval timeout, tval;
298 while (1) {
299 while (1) {
300 rv = conn->connect(I_USER, I_PASS);
301 if (rv == OK) break;
302 printError(rv, "Unable to connect to peer site");
303 timeout.tv_sec = 10;
304 timeout.tv_usec = 0;
305 os::select(0, 0, 0, 0, &timeout);
307 while (1) {
308 while (1) {
309 if (proMsgRetVal != ErrNoConnection) {
310 msg = NULL;
311 msg = que->readNextMessage(thrInd);
313 if (msg != NULL) break;
314 tval.tv_sec = 5;
315 tval.tv_usec = 1000;
316 os::select(0, 0, 0, 0, &tval);
318 long long index = *(long long *) msg;
319 printDebug(DM_CacheServer, "Received message with index: %lld",
320 index);
321 int length = *(int *)((char *)msg+sizeof(long long));
322 char *msgptr = (char *)msg + sizeof(long long) + sizeof(int);
323 printDebug(DM_CacheServer, "entering process message");
324 proMsgRetVal = processMessage(msgptr, length, conn, stmtBuckets,
325 flag, &prepareFailList);
326 if (proMsgRetVal == ErrNoConnection) break;
327 printDebug(DM_CacheServer, "Processed message with index: %lld",
328 index);
329 //store processed index in the processed index array
330 que->updateProcessedIndex(thrInd, index);
331 printDebug(DM_CacheServer, "Updated processed index %lld", index);
334 return NULL;
337 DbRetVal processMessage(void *str, int len, void *conn, void *stmtBuckets,
338 SqlApiImplType flag, List *prepareFailList)
340 long type = *(long *) str;
341 printDebug(DM_CacheServer, "type = %d\n", type);
342 char *data = (char *) str + sizeof(long);
343 if (type == 1) return handlePrepare(data, conn, stmtBuckets, flag,
344 prepareFailList);
345 else if (type == 2) return handleCommit(data, len, conn, stmtBuckets,
346 prepareFailList);
347 else if (type == 3) return handleFree(data, stmtBuckets, prepareFailList);
350 void *freeMsgFromQueue(void *nAsync)
352 int asySites = (int)(long)nAsync;
353 struct timeval tval;
354 printDebug(DM_CacheServer, "Waiting for free the q elements");
355 while (1) {
356 que->freeMessagesFromQueue(asySites);
357 tval.tv_sec = 5;
358 tval.tv_usec = 0;
359 os::select(0, 0, 0, 0, &tval);
361 return NULL;
364 DbRetVal handlePrepare(void *data, void *conn, void *stmtBuckets,
365 SqlApiImplType flag, List *prepareFailList)
367 DbRetVal rv = OK;
368 AbsSqlConnection *con = (AbsSqlConnection *)conn;
369 AbsSqlStatement *stmt = SqlFactory::createStatement(flag);
370 stmt->setConnection(con);
371 char *ptr = (char *) data;
372 int length = *(int *) ptr; ptr += sizeof(int);
373 int txnId = *(int *) ptr; ptr += sizeof(int);
374 int stmtId = *(int *) ptr; ptr += sizeof(int);
375 char *tblName = ptr; ptr += IDENTIFIER_LENGTH;
376 char *stmtstr = (char *)data + 3 * sizeof(int) + IDENTIFIER_LENGTH;
377 int i = 1;
379 unsigned int mode = TableConf::config.getTableMode(tblName);
380 bool isCached = TableConf::config.isTableCached(mode);
382 if ((flag == CSqlAdapter) && !isCached) {
383 FailStmt *fst = new FailStmt();
384 fst->stmtId = stmtId;
385 fst->eType = ErrNotCached;
386 prepareFailList->append(fst);
387 return OK;
390 printDebug(DM_CacheServer, "stmt str: %s", stmtstr);
391 rv = stmt->prepare(stmtstr);
392 if (rv != OK) {
393 FailStmt *fst = new FailStmt();
394 fst->stmtId = stmtId;
395 fst->eType = rv;
396 prepareFailList->append(fst);
397 return rv;
399 Recovery recovery;
400 recovery.setStmtBucket(stmtBuckets);
401 recovery.addToHashTable(stmtId, stmt, stmtstr);
402 printDebug(DM_CacheServer, "returning from prepare");
403 return rv;
406 DbRetVal handleCommit(void *data, int len, void *conn, void *stmtBuckets,
407 List *prepareFailList)
409 DbRetVal rv = OK;
410 AbsSqlConnection *con = (AbsSqlConnection *)conn;
411 // get dsn if adapter to write into conflict resolution file
412 char *dsstring = NULL;
413 SqlOdbcConnection *adCon = (SqlOdbcConnection *) con;
414 dsstring = adCon->dsString;
415 char *ptr = (char *) data;
416 int datalen = *(int *) ptr; ptr += sizeof(int);
417 int txnId = *(int *) ptr; ptr += sizeof(int);
418 FailStmt *elem = NULL;
419 rv = con->beginTrans();
420 if (rv != OK) {
421 printError(rv, "Begin trans failed");
422 return rv;
424 Recovery recovery;
425 recovery.setStmtBucket(stmtBuckets);
426 while ((ptr - (char *)data) < len) {
427 int stmtId = *(int *)ptr;
428 ptr += sizeof(int);
429 AbsSqlStatement *stmt = recovery.getStmtFromHashTable(stmtId);
430 printDebug(DM_CacheServer, "commit: stmtId: %d", stmtId);
431 printDebug(DM_CacheServer, "commit: stmtbuckets: %x", stmtBuckets);
432 printDebug(DM_CacheServer, "commit: stmt: %x", stmt);
433 ExecType type = (ExecType) (*(int *) ptr);
434 ptr += sizeof(int);
435 if (type == SETPARAM) {
436 int parampos = *(int *) ptr;
437 ptr += sizeof(int);
438 int isNull = *(int *) ptr;
439 if (isNull == 0) {
440 DataType dataType = (DataType) ( *(int *) ptr);
441 ptr += sizeof(int);
442 int length = *(int *) ptr;
443 ptr += sizeof(int);
444 void *value = ptr;
445 ptr += length;
446 if (stmt != NULL)
447 SqlStatement::setParamValues(stmt, parampos,
448 dataType, length, (char *)value);
449 } else { if (stmt != NULL) stmt->setNull(parampos); }
450 } else {
451 // start executing and committing for all active connections
452 int rows;
453 if (stmt != NULL) rv = stmt->execute(rows);
454 if (rv != OK) {
455 printError(rv, "Execute failed with return value %d", rv);
456 if (rv == ErrNoConnection) return rv;
457 else {
458 // write to conflict resolution file
459 writeToConfResFile(data, len, stmtBuckets, dsstring);
460 con->rollback();
461 return OK;
464 if (stmt == NULL) {
465 ListIterator it = prepareFailList->getIterator();
466 bool found = false;
467 while (it.hasElement()) {
468 elem = (FailStmt *) it.nextElement();
469 if (elem->stmtId == stmtId) { found = true; break; }
471 if (! found) continue; // for local table
472 if ((elem->eType == ErrNotCached) ||
473 elem->eType == ErrNotExists)
474 continue;
475 else {
476 // write to conflict resolution file
477 writeToConfResFile(data, len, stmtBuckets, dsstring);
478 con->rollback();
479 return OK;
484 rv = con->commit();
485 if (rv != OK) { printDebug(DM_CacheServer, "commit failed"); }
486 else { printDebug(DM_CacheServer, "commit passed"); }
487 return OK;
490 DbRetVal handleFree(void *data, void *stmtBuckets, List *prepareFailList)
492 DbRetVal rv = OK;
493 char *ptr = (char *) data;
494 int len = *(int *) ptr; ptr += sizeof(int);
495 int txnId = *(int *) ptr; ptr += sizeof(int);
496 int stmtId = *(int *)ptr;
497 Recovery recovery;
498 recovery.setStmtBucket(stmtBuckets);
499 AbsSqlStatement *stmt = recovery.getStmtFromHashTable(stmtId);
500 FailStmt *elem = NULL;
501 if (stmt == NULL) {
502 ListIterator failListIter = prepareFailList->getIterator();
503 while (failListIter.hasElement()) {
504 elem = (FailStmt *) failListIter.nextElement();
505 if (elem->stmtId == stmtId) break;
507 failListIter.reset();
508 prepareFailList->remove(elem);
509 return OK;
511 rv = stmt->free();
512 if (rv != OK) {
513 printError(rv, "HandleFree failed with return vlaue %d", rv);
514 return rv;
516 recovery.removeFromHashTable(stmtId);
517 printDebug(DM_CacheServer, "Freed the statement from hashTable");
518 return OK;
521 DbRetVal writeToConfResFile(void *data, int len, void *stmtBuckets, char *dsn)
523 DbRetVal rv = OK;
524 bool isPrmStmt=false;
525 char confResFile[1024];
526 sprintf(confResFile, "%s", Conf::config.getConflResoFile());
527 int fd = open(confResFile, O_WRONLY|O_CREAT| O_APPEND, 0644);
528 if (fd < 0) {
529 printError(ErrOS, "Could not create conflict Resolution file");
530 return ErrOS;
532 char buffer[1024];
533 char paramStmtString[1024];
535 char *ptr = (char *) data;
536 int datalen = *(int *) ptr; ptr += sizeof(int);
537 int txnId = *(int *) ptr; ptr += sizeof(int);
538 strcpy(buffer, "SET AUTOCOMMIT OFF;\n");
539 int ret = os::write(fd, buffer, strlen(buffer));
540 if (ret != strlen(buffer)) {
541 printError(ErrOS, "Write error into conf resolution file");
542 return ErrOS;
544 bool first = true;
545 int counter = 0; // if at all the statement is parameterized
546 int nop = 0;
547 int pos = 0;
548 while ((ptr - (char *)data) < len) {
549 int stmtId = *(int *)ptr;
550 ptr += sizeof(int);
551 int bucketNo = stmtId % STMT_BUCKET_SIZE;
552 StmtBucket *buck = (StmtBucket *) stmtBuckets;
553 StmtBucket *stmtBucket = &buck[bucketNo];
554 StmtNode *node = NULL;
555 ListIterator it = stmtBucket->bucketList.getIterator();
556 while(it.hasElement()) {
557 node = (StmtNode *) it.nextElement();
558 if(stmtId == node->stmtId) break;
560 printf("DEBUG:node = %x\n", node);
561 ExecType type = (ExecType) (*(int *) ptr);
562 ptr += sizeof(int);
563 if (type == SETPARAM) {
564 isPrmStmt = true;
565 if (first) {
566 first = false;
567 sprintf(paramStmtString, "%s", node->stmtstr);
568 char *it = node->stmtstr;
570 int parampos = *(int *)ptr;
571 ptr += sizeof(int);
572 DataType dataType = (DataType) ( *(int *) ptr);
573 ptr += sizeof(int);
574 int length = *(int *) ptr;
575 ptr += sizeof(int);
576 void *value = ptr;
577 ptr += length;
578 char * it = paramStmtString;
579 int prntdChars = 0;
581 while (*it != '\0') {
582 if (*it == '?') {
583 pos++;
584 if(pos != parampos) { it++; continue; }
585 else {
586 *it++ = ' ';
587 strcpy(buffer,it);
588 switch (dataType) {
589 case typeString: case typeBinary: case typeDate:
590 case typeTime: case typeTimeStamp:
591 *it++ = '\'';
592 AllDataType::convertToString(it, value, dataType, length);
593 prntdChars = AllDataType::printVal(value, dataType,length);
594 it += prntdChars;
595 *it++ = '\'';
596 break;
597 default:
598 AllDataType::convertToString(it, value, dataType, length);
599 prntdChars = AllDataType::printVal(value, dataType,length);
600 it += prntdChars;
603 sprintf(it, " %s", buffer);
604 //strcpy(buffer, paramStmtString);
605 break;
607 } else { it++; }
609 } else {
610 if (!isPrmStmt) {
611 sprintf(buffer, "%s", node->stmtstr);
612 buffer[strlen(buffer)] = '\n';
613 ret = os::write(fd, buffer, strlen(node->stmtstr)+1);
614 if(ret != strlen(node->stmtstr)+1) {
615 printError(ErrOS, "Write error into conf resolution file");
616 return ErrOS;
618 } else {
619 strcpy(buffer, paramStmtString);
620 isPrmStmt = false;
621 first = true;
622 pos = 0;
623 int strlength = strlen(buffer);
624 buffer[strlen(buffer)] = '\n';
625 ret = os::write(fd, buffer, strlength+1);
626 if(ret != strlength+1) {
627 printError(ErrOS, "Write error into conf resolution file");
628 return ErrOS;
633 strcpy(buffer, "COMMIT;\n\n");
634 ret = os::write(fd, buffer, strlen(buffer));
635 if(ret != strlen(buffer)) {
636 printError(ErrOS, "Write error into conf resolution file");
637 return ErrOS;
639 close(fd);