1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
33 typedef struct FailedStmtObject
{
38 typedef struct item ITEM
;
46 // please dont touch the following code for queIterator
47 typedef class queueIterator
53 queueIterator(ITEM
*hd
) { head
= iter
= hd
; processed
= NULL
; }
54 inline void *next(ITEM
*hd
)
56 if (head
== NULL
) { head
= iter
= hd
; }
57 if (head
!= hd
) head
= hd
;
59 if (iter
== NULL
&& processed
) {
60 if (processed
->next
!= NULL
) {
61 processed
= processed
->next
;
64 } else { return NULL
; }
67 printDebug(DM_CacheServer
, "Processed ITEM: %X", processed
);
77 // array of msg indexes processed, First index for first thread and so on.
78 // As and when the msg is read from the queue by each of the threads
79 // respcective slot is filled with that index
80 long long *processedMsgIndexArray
;
81 long long minProcessedMsgIndex
;
83 long long lastFreedIndex
;
89 nItems
= 0; head
= NULL
; processedMsgIndexArray
= NULL
;
90 qIndex
= 0; qIter
= NULL
; minProcessedMsgIndex
= 0;
93 int size
= sizeof (long long) * asySites
;
94 processedMsgIndexArray
= (long long *) malloc(size
);
95 memset(processedMsgIndexArray
, 0, size
);
96 qIter
= (QITER
**) malloc(sizeof (QITER
*) * asySites
);
97 for (int i
= 0; i
< asySites
; i
++) qIter
[i
] = new QITER(head
);
100 int push(void *log
, int len
)
102 // log includes size of long (msgType) + size of (Msg data);
103 // 2nd parameter len is the size of (Msg data) excluding size of long
105 // long long for Msg Index
106 // int for size of the msg data
108 // len bytes for msg data
109 int logSize
= sizeof(long long) + sizeof(int) + sizeof(long) + len
;
110 ITEM
*item
= (ITEM
*) malloc(sizeof(ITEM
) - sizeof(void *)
111 + os::align(logSize
));
113 char *ptr
= (char *) &item
->data
;
114 *(long long *) ptr
= ++qIndex
; ptr
+= sizeof (long long);
115 *(int*) ptr
= len
; ptr
+= sizeof (int);
116 int sizeOfMsg
= len
+ sizeof(long);
117 memcpy(ptr
, log
, sizeOfMsg
);
118 if (head
== NULL
) { nItems
++; head
= item
; return 0; }
120 while (p
->next
!= NULL
) p
= p
->next
;
125 int size() { return nItems
; }
126 void *readNextMessage(int thrIndex
)
128 if (head
== NULL
) return NULL
;
129 else return qIter
[thrIndex
]->next(head
);
131 inline void updateProcessedIndex(int thrInd
, int processedIndex
)
133 processedMsgIndexArray
[thrInd
] = processedIndex
;
135 inline long long findMinIndexForFree(int asySites
)
137 long long minIndex
= processedMsgIndexArray
[0];
138 for (int i
=1; i
< asySites
; i
++) {
139 if (minIndex
> processedMsgIndexArray
[i
]) {
140 minIndex
= processedMsgIndexArray
[i
];
145 void freeMessagesFromQueue(int asySites
)
147 long long minIndex
= findMinIndexForFree(asySites
);
148 if (minIndex
<= lastFreedIndex
) return;
150 ITEM
*freeFrom
= head
;
151 ITEM
*freeUptoThis
= NULL
;
153 while (elem
!= NULL
) {
154 ind
= * (long long *) &elem
->data
;
155 if (ind
== minIndex
) {
162 ITEM
*toFree
= elem
= freeFrom
;
163 while (elem
!= freeUptoThis
) {
166 if (toFree
) { ::free(toFree
); nItems
--; }
167 printDebug(DM_CacheServer
, "FREED %X", toFree
);
169 if (elem
) { ::free(elem
); nItems
--; }
170 printDebug(DM_CacheServer
, "FREED %X", elem
);
171 lastFreedIndex
= minIndex
;
175 typedef class queue QUE
;
177 class ThreadInputData
181 ThreadInputData() { thrIndex
= 0; }
184 void *startThread(void *p
);
188 printf("Usage: csqlasyncserver \n");
189 printf("Description: Start the csql Async server.\n");
193 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *hashBucketPtr
,
194 SqlApiImplType flag
, List
*prepareFailList
);
195 void *freeMsgFromQueue(void *p
);
196 DbRetVal
handlePrepare(void *str
, void *conn
, void *stmtBuckets
,
197 SqlApiImplType flag
, List
*prepareFailList
);
198 DbRetVal
handleCommit(void *str
, int len
, void *conn
, void *stmtBuckets
,
199 List
*prepareFailList
);
200 DbRetVal
handleFree(void *str
, void *stmtBuckets
, List
*prepareFailList
);
201 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
, char *dsn
);
207 ThreadInputData
**thrInput
;
208 pthread_t freeThrId
= 0;
210 static void sigTermHandler(int sig
)
212 printf("Received signal %d\nStopping the server\n", sig
);
213 os::msgctl(msgKey
, IPC_RMID
, NULL
);
217 int main(int argc
, char **argv
)
220 while ((c
= getopt(argc
, argv
, "?")) != EOF
) {
222 case '?' : { opt
= 10; break; } //print help
227 if (opt
== 10) { printUsage(); return 0; }
229 os::signal(SIGINT
, sigTermHandler
);
230 os::signal(SIGTERM
, sigTermHandler
);
232 Conf::config
.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
233 msgKey
= os::msgget(Conf::config
.getMsgKey(), 0666);
235 printf("Message Queue creation failed\n");
239 //Only single cache async updation is supported hence hard coded.
242 // Create and Initialize repl server queue
243 que
= new queue(asyncSites
);
245 pthread_t
*thrId
=new pthread_t
[asyncSites
];
246 int thrInfoSize
= sizeof(ThreadInputData
*) * asyncSites
;
247 thrInput
= (ThreadInputData
**) malloc(thrInfoSize
);
250 if(Conf::config
.useCache() && Conf::config
.getCacheMode()==ASYNC_MODE
) {
251 thrInput
[i
] = new ThreadInputData();
252 thrInput
[i
]->thrIndex
= i
;
253 pthread_create(&thrId
[i
], NULL
, &startThread
, thrInput
[i
]);
256 pthread_create(&freeThrId
, NULL
, freeMsgFromQueue
, (void *) asyncSites
);
257 struct timeval timeout
;
258 int msgSize
= Conf::config
.getAsyncMsgMax();
264 os::select(0, 0, 0, 0, &timeout
);
265 printDebug(DM_CacheServer
, "waiting for message");
267 // pick messages from message que with key msgKey
268 long size
= os::msgrcv(msgKey
, str
, msgSize
, 0, 0666|IPC_NOWAIT
);
269 printDebug(DM_CacheServer
, "Received msg size = %d", size
);
270 if (size
== -1 || srvStop
) break;
271 // push the received msg to the repl server queue
272 que
->push(str
, size
);
276 printf("Replication Server Exiting\n");
280 void *startThread(void *thrInfo
)
283 DbRetVal proMsgRetVal
= OK
;
285 ThreadInputData
*thrInput
= (ThreadInputData
*)thrInfo
;
286 List prepareFailList
;
287 SqlApiImplType flag
= CSqlAdapter
;
288 int thrInd
= thrInput
->thrIndex
;
289 printDebug(DM_CacheServer
, "SqlAdapter Thread created");
290 AbsSqlConnection
*conn
= SqlFactory::createConnection(flag
);
292 void *stmtBuckets
= malloc (STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
293 memset(stmtBuckets
, 0, STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
294 printDebug(DM_CacheServer
, "stmtbuckets: %x", stmtBuckets
);
296 struct timeval timeout
, tval
;
299 rv
= conn
->connect(I_USER
, I_PASS
);
301 printError(rv
, "Unable to connect to peer site");
304 os::select(0, 0, 0, 0, &timeout
);
308 if (proMsgRetVal
!= ErrNoConnection
) {
310 msg
= que
->readNextMessage(thrInd
);
312 if (msg
!= NULL
) break;
315 os::select(0, 0, 0, 0, &tval
);
317 long long index
= *(long long *) msg
;
318 printDebug(DM_CacheServer
, "Received message with index: %lld",
320 int length
= *(int *)((char *)msg
+sizeof(long long));
321 char *msgptr
= (char *)msg
+ sizeof(long long) + sizeof(int);
322 printDebug(DM_CacheServer
, "entering process message");
323 proMsgRetVal
= processMessage(msgptr
, length
, conn
, stmtBuckets
,
324 flag
, &prepareFailList
);
325 if (proMsgRetVal
== ErrNoConnection
) break;
326 printDebug(DM_CacheServer
, "Processed message with index: %lld",
328 //store processed index in the processed index array
329 que
->updateProcessedIndex(thrInd
, index
);
330 printDebug(DM_CacheServer
, "Updated processed index %lld", index
);
336 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *stmtBuckets
,
337 SqlApiImplType flag
, List
*prepareFailList
)
339 long type
= *(long *) str
;
340 printDebug(DM_CacheServer
, "type = %d\n", type
);
341 char *data
= (char *) str
+ sizeof(long);
342 if (type
== 1) return handlePrepare(data
, conn
, stmtBuckets
, flag
,
344 else if (type
== 2) return handleCommit(data
, len
, conn
, stmtBuckets
,
346 else if (type
== 3) return handleFree(data
, stmtBuckets
, prepareFailList
);
349 void *freeMsgFromQueue(void *nAsync
)
351 int asySites
= (int)nAsync
;
353 printDebug(DM_CacheServer
, "Waiting for free the q elements");
355 que
->freeMessagesFromQueue(asySites
);
358 os::select(0, 0, 0, 0, &tval
);
363 DbRetVal
handlePrepare(void *data
, void *conn
, void *stmtBuckets
,
364 SqlApiImplType flag
, List
*prepareFailList
)
367 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
368 AbsSqlStatement
*stmt
= SqlFactory::createStatement(flag
);
369 stmt
->setConnection(con
);
370 char *ptr
= (char *) data
;
371 int length
= *(int *) ptr
; ptr
+= sizeof(int);
372 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
373 int stmtId
= *(int *) ptr
; ptr
+= sizeof(int);
374 char *tblName
= ptr
; ptr
+= IDENTIFIER_LENGTH
;
375 char *stmtstr
= (char *)data
+ 3 * sizeof(int) + IDENTIFIER_LENGTH
;
378 unsigned int mode
= TableConf::config
.getTableMode(tblName
);
379 bool isCached
= TableConf::config
.isTableCached(mode
);
381 if ((flag
== CSqlAdapter
) && !isCached
) {
382 FailStmt
*fst
= new FailStmt();
383 fst
->stmtId
= stmtId
;
384 fst
->eType
= ErrNotCached
;
385 prepareFailList
->append(fst
);
389 printDebug(DM_CacheServer
, "stmt str: %s", stmtstr
);
390 rv
= stmt
->prepare(stmtstr
);
392 FailStmt
*fst
= new FailStmt();
393 fst
->stmtId
= stmtId
;
395 prepareFailList
->append(fst
);
398 SqlStatement::addToHashTable(stmtId
, stmt
, stmtBuckets
, stmtstr
);
399 printDebug(DM_CacheServer
, "returning from prepare");
403 DbRetVal
handleCommit(void *data
, int len
, void *conn
, void *stmtBuckets
,
404 List
*prepareFailList
)
407 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
408 // get dsn if adapter to write into conflict resolution file
409 char *dsstring
= NULL
;
410 SqlOdbcConnection
*adCon
= (SqlOdbcConnection
*) con
;
411 dsstring
= adCon
->dsString
;
412 char *ptr
= (char *) data
;
413 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
414 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
415 FailStmt
*elem
= NULL
;
416 rv
= con
->beginTrans();
418 printError(rv
, "Begin trans failed");
421 while ((ptr
- (char *)data
) < len
) {
422 int stmtId
= *(int *)ptr
;
424 AbsSqlStatement
*stmt
= SqlStatement::getStmtFromHashTable(stmtId
,
426 printDebug(DM_CacheServer
, "commit: stmtId: %d", stmtId
);
427 printDebug(DM_CacheServer
, "commit: stmtbuckets: %x", stmtBuckets
);
428 printDebug(DM_CacheServer
, "commit: stmt: %x", stmt
);
429 ExecType type
= (ExecType
) (*(int *) ptr
);
431 if (type
== SETPARAM
) {
432 int parampos
= *(int *) ptr
;
434 int isNull
= *(int *) ptr
;
436 DataType dataType
= (DataType
) ( *(int *) ptr
);
438 int length
= *(int *) ptr
;
443 SqlStatement::setParamValues(stmt
, parampos
,
444 dataType
, length
, (char *)value
);
445 } else { if (stmt
!= NULL
) stmt
->setNull(parampos
); }
447 // start executing and committing for all active connections
449 if (stmt
!= NULL
) rv
= stmt
->execute(rows
);
451 printError(rv
, "Execute failed with return value %d", rv
);
452 if (rv
== ErrNoConnection
) return rv
;
454 // write to conflict resolution file
455 writeToConfResFile(data
, len
, stmtBuckets
, dsstring
);
461 ListIterator it
= prepareFailList
->getIterator();
463 while (it
.hasElement()) {
464 elem
= (FailStmt
*) it
.nextElement();
465 if (elem
->stmtId
== stmtId
) { found
= true; break; }
467 if (! found
) continue; // for local table
468 if ((elem
->eType
== ErrNotCached
) ||
469 elem
->eType
== ErrNotExists
)
472 // write to conflict resolution file
473 writeToConfResFile(data
, len
, stmtBuckets
, dsstring
);
481 if (rv
!= OK
) { printDebug(DM_CacheServer
, "commit failed"); }
482 else { printDebug(DM_CacheServer
, "commit passed"); }
486 DbRetVal
handleFree(void *data
, void *stmtBuckets
, List
*prepareFailList
)
489 char *ptr
= (char *) data
;
490 int len
= *(int *) ptr
; ptr
+= sizeof(int);
491 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
492 int stmtId
= *(int *)ptr
;
493 AbsSqlStatement
*stmt
= SqlStatement::getStmtFromHashTable(stmtId
,
495 FailStmt
*elem
= NULL
;
497 ListIterator failListIter
= prepareFailList
->getIterator();
498 while (failListIter
.hasElement()) {
499 elem
= (FailStmt
*) failListIter
.nextElement();
500 if (elem
->stmtId
== stmtId
) break;
502 failListIter
.reset();
503 prepareFailList
->remove(elem
);
508 printError(rv
, "HandleFree failed with return vlaue %d", rv
);
511 SqlStatement::removeFromHashTable(stmtId
, stmtBuckets
);
512 printDebug(DM_CacheServer
, "Freed the statement from hashTable");
516 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
, char *dsn
)
519 bool isPrmStmt
=false;
520 char confResFile
[1024];
521 sprintf(confResFile
, "%s", Conf::config
.getConflResoFile());
522 int fd
= open(confResFile
, O_WRONLY
|O_CREAT
| O_APPEND
, 0644);
524 printError(ErrOS
, "Could not create conflict Resolution file");
528 char paramStmtString
[1024];
530 char *ptr
= (char *) data
;
531 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
532 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
533 strcpy(buffer
, "SET AUTOCOMMIT OFF;\n");
534 int ret
= os::write(fd
, buffer
, strlen(buffer
));
535 if (ret
!= strlen(buffer
)) {
536 printError(ErrOS
, "Write error into conf resolution file");
540 int counter
= 0; // if at all the statement is parameterized
543 while ((ptr
- (char *)data
) < len
) {
544 int stmtId
= *(int *)ptr
;
546 int bucketNo
= stmtId
% STMT_BUCKET_SIZE
;
547 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
548 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
549 StmtNode
*node
= NULL
;
550 ListIterator it
= stmtBucket
->bucketList
.getIterator();
551 while(it
.hasElement()) {
552 node
= (StmtNode
*) it
.nextElement();
553 if(stmtId
== node
->stmtId
) break;
555 printf("DEBUG:node = %x\n", node
);
556 ExecType type
= (ExecType
) (*(int *) ptr
);
558 if (type
== SETPARAM
) {
562 sprintf(paramStmtString
, "%s", node
->stmtstr
);
563 char *it
= node
->stmtstr
;
565 int parampos
= *(int *)ptr
;
567 DataType dataType
= (DataType
) ( *(int *) ptr
);
569 int length
= *(int *) ptr
;
573 char * it
= paramStmtString
;
576 while (*it
!= '\0') {
579 if(pos
!= parampos
) { it
++; continue; }
584 case typeString
: case typeBinary
: case typeDate
:
585 case typeTime
: case typeTimeStamp
:
587 AllDataType::convertToString(it
, value
, dataType
, length
);
588 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
593 AllDataType::convertToString(it
, value
, dataType
, length
);
594 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
598 sprintf(it
, " %s", buffer
);
599 //strcpy(buffer, paramStmtString);
606 sprintf(buffer
, "%s", node
->stmtstr
);
607 buffer
[strlen(buffer
)] = '\n';
608 ret
= os::write(fd
, buffer
, strlen(node
->stmtstr
)+1);
609 if(ret
!= strlen(node
->stmtstr
)+1) {
610 printError(ErrOS
, "Write error into conf resolution file");
614 strcpy(buffer
, paramStmtString
);
618 int strlength
= strlen(buffer
);
619 buffer
[strlen(buffer
)] = '\n';
620 ret
= os::write(fd
, buffer
, strlength
+1);
621 if(ret
!= strlength
+1) {
622 printError(ErrOS
, "Write error into conf resolution file");
628 strcpy(buffer
, "COMMIT;\n\n");
629 ret
= os::write(fd
, buffer
, strlen(buffer
));
630 if(ret
!= strlen(buffer
)) {
631 printError(ErrOS
, "Write error into conf resolution file");