1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
17 #include <TableConfig.h>
18 #include <SessionImpl.h>
19 #include <SqlFactory.h>
20 #include <AbsSqlConnection.h>
21 #include <AbsSqlStatement.h>
22 #include <SqlNwConnection.h>
23 #include <SqlOdbcConnection.h>
24 #include <SqlNwStatement.h>
25 #include <SqlConnection.h>
26 #include <SqlStatement.h>
27 #include <SqlFactory.h>
30 #include <SqlLogStatement.h> //for BindSqlField
31 #include <SqlNetworkHandler.h>
33 class AbsCSqlQIterator
36 virtual void *next() = 0;
42 virtual void push(void *log
, int len
) = 0;
43 virtual void pop() = 0;
44 virtual int size() = 0;
47 class ListIter
: public AbsCSqlQIterator
51 ListIter(List
&list
) { iter
= list
.getIterator(); }
52 void *next() { return iter
.nextElementInQueue(); }
55 typedef struct FailedStmtObject
{
63 class ListAsQueue
: public AbsCSqlQueue
68 void push(void *log
, int len
)
70 int logSize
= sizeof(long long) + len
+ sizeof(int) + sizeof(long);
71 char *logElem
= (char *) malloc(os::align(logSize
));
72 *(long long *) logElem
= ++qIndex
;
73 *(int*)(logElem
+ sizeof(long long))= len
;
74 char *ptr
= logElem
+ sizeof(int) + sizeof(long long);
75 memcpy(ptr
, log
, len
+sizeof(long)); //long for msg type
77 printDebug(DM_ReplServer
, "Pushed Element: %x", logElem
);
79 int size() { return list
.size(); }
87 AbsCSqlQIterator
*qIter
;
89 ThreadInputData() { qIter
= NULL
; indexPtr
= NULL
; }
92 void *startThread(void *p
);
97 printf("Usage: csqlasyncserver \n");
98 printf("Description: Start the csql Async server.\n");
102 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *hashBucketPtr
,
103 SqlApiImplType flag
, List
*prepareFailList
);
104 void *freeMsgFromQueue(void *p
);
105 DbRetVal
handlePrepare(void *str
, void *conn
, void *stmtBuckets
,
106 SqlApiImplType flag
, List
*prepareFailList
);
107 DbRetVal
handleCommit(void *str
, int len
, void *conn
, void *stmtBuckets
,
108 List
*prepareFailList
);
109 DbRetVal
handleFree(void *str
, void *stmtBuckets
, List
*prepareFailList
);
110 AbsSqlStatement
*getStmtFromHashTable(int stmtId
, void *stmtBuckets
);
111 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
,
114 int getHashBucket(int stmtid
)
116 return (stmtid
% STMT_BUCKET_SIZE
);
119 AbsCSqlQueue
*csqlQ
= NULL
;
124 ThreadInputData
**thrInput
;
129 long long * indexCountPtr
= NULL
;
130 pthread_t freeThrId
= 0;
132 static void sigTermHandler(int sig
)
134 printf("Received signal %d\nStopping the server\n", sig
);
135 os::msgctl(msgKey
, IPC_RMID
, NULL
);
139 int main(int argc
, char **argv
)
142 while ((c
= getopt(argc
, argv
, "?")) != EOF
) {
144 case '?' : { opt
= 10; break; } //print help
148 if (opt
== 10) { printUsage();
152 os::signal(SIGINT
, sigTermHandler
);
153 os::signal(SIGTERM
, sigTermHandler
);
155 Conf::config
.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
156 if (( !Conf::config
.useCache() &&
157 Conf::config
.getCacheMode() == SYNC_MODE
) ) {
158 printf("Replication server not started\n");
163 //printf("config id = %d\n", Conf::config.getSiteID());
165 if ((!Conf::config
.useCache() &&
166 Conf::config
.getCacheMode()==SYNC_MODE
)) {
167 printf("There are no async sites\n");
171 msgKey
= os::msgget(Conf::config
.getMsgKey(), 0666);
173 printf("Message Queue creation failed\n");
177 csqlQ
= new ListAsQueue();
178 ListIterator itr
= ((ListAsQueue
*)csqlQ
)->list
.getIterator();
180 if (Conf::config
.useCache() && Conf::config
.getCacheMode()==ASYNC_MODE
) {
183 pthread_t
*thrId
=new pthread_t
[asyncSites
];
184 thrInput
= (ThreadInputData
**) malloc(sizeof(ThreadInputData
*) * asyncSites
);
185 indexCountPtr
= (long long *) malloc(sizeof(long long) * asyncSites
);
186 memset(indexCountPtr
, 0, sizeof(long long) * asyncSites
);
188 if(Conf::config
.useCache() && Conf::config
.getCacheMode()==ASYNC_MODE
) {
189 thrInput
[i
] = new ThreadInputData();
190 thrInput
[i
]->qIter
= NULL
;
191 thrInput
[i
]->indexPtr
= &indexCountPtr
[i
];
192 pthread_create(&thrId
[i
], NULL
, &startThread
, thrInput
[i
]);
195 pthread_create(&freeThrId
, NULL
, freeMsgFromQueue
, NULL
);
196 struct timeval timeout
, tval
;
199 int msgSize
= Conf::config
.getAsyncMsgMax();
201 // printf("Replication Server Started");
203 tval
.tv_sec
= timeout
.tv_sec
;
204 tval
.tv_usec
= timeout
.tv_usec
;
205 os::select(0, 0, 0, 0, &tval
);
206 printDebug(DM_ReplServer
, "waiting for message");
208 long size
= os::msgrcv(msgKey
, str
, msgSize
, 0, 0666|IPC_NOWAIT
);// process logs
209 printDebug(DM_ReplServer
, "Received msg size = %d", size
);
210 if (size
== -1 || srvStop
) break;
211 csqlQ
->push(str
, size
); // long for mtype of msg
215 printf("Replication Server Exiting\n");
219 void *startThread(void *thrInfo
)
222 ThreadInputData
*thrInput
= (ThreadInputData
*)thrInfo
;
223 List prepareFailList
;
226 printDebug(DM_ReplServer
, "SqlAdapter Thread created");
227 AbsCSqlQIterator
*iter
= thrInput
->qIter
;
228 while (1) { if (csqlQ
->size()) break; }
229 iter
= new ListIter(((ListAsQueue
*)csqlQ
)->list
);
230 AbsSqlConnection
*conn
= SqlFactory::createConnection(flag
);
231 void *stmtBuckets
= malloc (STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
232 memset(stmtBuckets
, 0, STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
233 printDebug(DM_ReplServer
, "stmtbuckets: %x", stmtBuckets
);
234 struct timeval timeout
, tval
;
238 rv
= conn
->connect(I_USER
, I_PASS
);
240 printError(rv
, "Unable to connect to peer site");
243 os::select(0, 0, 0, 0, &timeout
);
248 msg
= iter
->next(); //receive msg from csqlQ
249 if (msg
!= NULL
) break;
252 os::select(0, 0, 0, 0, &tval
);
254 long long index
= *(long long *) msg
;
255 int length
= *(int *)((char *)msg
+sizeof(long long));
256 char *msgptr
= (char *)msg
+ sizeof(long long) + sizeof(int);
257 printDebug(DM_ReplServer
, "entering process message");
258 rv
= processMessage(msgptr
, length
, conn
, stmtBuckets
, flag
,
260 if (rv
== ErrNoConnection
) break;
261 printDebug(DM_ReplServer
, "processed message");
262 *(long long *) thrInput
->indexPtr
= index
;
263 printDebug(DM_ReplServer
, "index %d is stored in Main index log array\n", index
);
269 DbRetVal
processMessage(void *str
, int len
, void *conn
, void *stmtBuckets
,
270 SqlApiImplType flag
, List
*prepareFailList
)
272 long type
= *(long *) str
;
273 printDebug(DM_ReplServer
, "type = %d\n", type
);
274 char *data
= (char *) str
+ sizeof(long);
275 if (type
== 1) return handlePrepare(data
, conn
, stmtBuckets
, flag
,
277 else if (type
== 2) return handleCommit(data
, len
, conn
, stmtBuckets
,
279 else if (type
== 3) return handleFree(data
, stmtBuckets
, prepareFailList
);
282 void *freeMsgFromQueue(void *indCntPtr
)
284 long long minIndex
= 0;
285 struct timeval timeout
, tval
;
287 printDebug(DM_ReplServer
, "waiting for free the q elements");
289 if (csqlQ
->size()) break;
290 /// printError(ErrWarning, "List is empty");
293 os::select(0, 0, 0, 0, &tval
);
295 AbsCSqlQIterator
*iter
= new ListIter(((ListAsQueue
*)csqlQ
)->list
);
297 minIndex
= indexCountPtr
[0];
298 for (int i
=0; i
< asyncSites
; i
++) {
299 if (minIndex
> indexCountPtr
[i
]) minIndex
= indexCountPtr
[i
];
303 msg
= iter
->next(); //receive msg from csqlQ
304 if (msg
== NULL
) break;
305 if( *(long long *) msg
<= minIndex
) {
306 long long num
= *(long long *) msg
;
307 free (msg
); msg
= NULL
;
311 tval
.tv_usec
= 100000;
312 os::select(0, 0, 0, 0, &tval
);
317 DbRetVal
handlePrepare(void *data
, void *conn
, void *stmtBuckets
,
318 SqlApiImplType flag
, List
*prepareFailList
)
321 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
322 AbsSqlStatement
*stmt
= SqlFactory::createStatement(flag
);
323 stmt
->setConnection(con
);
324 char *ptr
= (char *) data
;
325 int length
= *(int *) ptr
; ptr
+= sizeof(int);
326 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
327 int stmtId
= *(int *) ptr
; ptr
+= sizeof(int);
328 char *tblName
= ptr
; ptr
+= IDENTIFIER_LENGTH
;
329 char *stmtstr
= (char *)data
+ 3 * sizeof(int) + IDENTIFIER_LENGTH
;
332 unsigned int mode
= TableConf::config
.getTableMode(tblName
);
333 bool isCached
= TableConf::config
.isTableCached(mode
);
335 if ((flag
== CSqlAdapter
) && !isCached
) {
336 FailStmt
*fst
= new FailStmt();
337 fst
->stmtId
= stmtId
;
338 fst
->eType
= ErrNotCached
;
339 prepareFailList
->append(fst
);
343 printDebug(DM_ReplServer
, "stmt str: %s", stmtstr
);
344 rv
= stmt
->prepare(stmtstr
);
346 FailStmt
*fst
= new FailStmt();
347 fst
->stmtId
= stmtId
;
349 prepareFailList
->append(fst
);
352 int bucketNo
= getHashBucket(stmtId
);
353 printDebug(DM_ReplServer
, "PrepHdl: stmtBuckets: %x", stmtBuckets
);
354 printDebug(DM_ReplServer
, "PrepHdl: bucketno: %d", bucketNo
);
355 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
356 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
357 printDebug(DM_ReplServer
, "PrepHdl: bucket addr: %x", stmtBucket
);
358 StmtNode
*node
= new StmtNode();
359 printDebug(DM_ReplServer
, "PredHdl: stmtNode addr: %x", node
);
360 node
->stmtId
= stmtId
;
362 strcpy(node
->stmtstr
, stmtstr
);
363 printDebug(DM_ReplServer
, "PrepHdl: stmt id: %d stmt %x", node
->stmtId
, node
->stmt
);
364 stmtBucket
->bucketList
.append(node
);
366 printDebug(DM_ReplServer
, "returning from prepare");
370 DbRetVal
handleCommit(void *data
, int len
, void *conn
, void *stmtBuckets
,
371 List
*prepareFailList
)
374 AbsSqlConnection
*con
= (AbsSqlConnection
*)conn
;
375 // get dsn if adapter to write into conflict resolution file
377 SqlOdbcConnection
*adCon
= (SqlOdbcConnection
*) con
;
379 char *ptr
= (char *) data
;
380 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
381 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
382 FailStmt
*elem
= NULL
;
383 rv
= con
->beginTrans();
385 printError(rv
, "Begin trans failed");
388 while ((ptr
- (char *)data
) < len
) {
389 int stmtId
= *(int *)ptr
;
391 AbsSqlStatement
*stmt
= getStmtFromHashTable(stmtId
, stmtBuckets
);
392 printDebug(DM_ReplServer
, "commit: stmtId: %d", stmtId
);
393 printDebug(DM_ReplServer
, "commit: stmtbuckets: %x", stmtBuckets
);
394 printDebug(DM_ReplServer
, "commit: stmt: %x", stmt
);
395 ExecType type
= (ExecType
) (*(int *) ptr
);
397 if (type
== SETPARAM
) {
398 int parampos
= *(int *) ptr
;
400 DataType dataType
= (DataType
) ( *(int *) ptr
);
402 int length
= *(int *) ptr
;
407 SqlNetworkHandler::setParamValues(stmt
, parampos
, dataType
,
408 length
, (char *)value
);
410 // start executing and committing for all active connections
412 if (stmt
!= NULL
) rv
= stmt
->execute(rows
);
414 printError(rv
, "Execute failed with return value %d", rv
);
415 if (rv
== ErrNoConnection
) return rv
;
417 // write to conflict resolution file
418 writeToConfResFile(data
, len
, stmtBuckets
, dsn
);
424 ListIterator it
= prepareFailList
->getIterator();
426 while (it
.hasElement()) {
427 elem
= (FailStmt
*) it
.nextElement();
428 if (elem
->stmtId
== stmtId
) { found
= true; break; }
430 if (! found
) continue; // for local table
431 if ((elem
->eType
== ErrNotCached
) ||
432 elem
->eType
== ErrNotExists
)
435 // write to conflict resolution file
436 writeToConfResFile(data
, len
, stmtBuckets
, dsn
);
444 if (rv
!= OK
) { printDebug(DM_ReplServer
, "commit failed"); }
445 else { printDebug(DM_ReplServer
, "commit passed"); }
449 DbRetVal
handleFree(void *data
, void *stmtBuckets
, List
*prepareFailList
)
452 char *ptr
= (char *) data
;
453 int len
= *(int *) ptr
; ptr
+= sizeof(int);
454 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
455 int stmtId
= *(int *)ptr
;
456 AbsSqlStatement
*stmt
= getStmtFromHashTable(stmtId
, stmtBuckets
);
457 FailStmt
*elem
= NULL
;
459 ListIterator failListIter
= prepareFailList
->getIterator();
460 while (failListIter
.hasElement()) {
461 elem
= (FailStmt
*) failListIter
.nextElement();
462 if (elem
->stmtId
== stmtId
) break;
464 failListIter
.reset();
465 prepareFailList
->remove(elem
);
470 printError(rv
, "HandleFree failed with return vlaue %d", rv
);
473 int bucketNo
= getHashBucket(stmtId
);
474 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
475 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
476 StmtNode
*node
= NULL
;
477 ListIterator it
= stmtBucket
->bucketList
.getIterator();
478 while(it
.hasElement()) {
479 node
= (StmtNode
*) it
.nextElement();
480 if(stmtId
== node
->stmtId
) break;
483 printDebug(DM_ReplServer
, "GSFHT: node: %x", node
);
484 printDebug(DM_ReplServer
, "GSFHT: stmtId: %d", node
->stmtId
);
485 printDebug(DM_ReplServer
, "GSFHT stmt: %x", node
->stmt
);
486 stmtBucket
->bucketList
.remove(node
);
487 if (node
->stmt
) delete stmt
;
488 delete node
; node
= NULL
;
492 AbsSqlStatement
*getStmtFromHashTable(int stmtId
, void *stmtBuckets
)
494 int bucketNo
= getHashBucket(stmtId
);
495 printDebug(DM_ReplServer
, "GSFHT: bucketNo: %d", bucketNo
);
496 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
497 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
498 printDebug(DM_ReplServer
, "GSFHT: bucket: %x", stmtBucket
);
499 StmtNode
*node
= NULL
;
500 ListIterator it
= stmtBucket
->bucketList
.getIterator();
501 while(it
.hasElement()) {
502 node
= (StmtNode
*) it
.nextElement();
503 printDebug(DM_ReplServer
, "GSFHT: node: %x", node
);
504 if(stmtId
== node
->stmtId
) {
505 printDebug(DM_ReplServer
, "GSFHT: stmtId: %d", node
->stmtId
);
506 printDebug(DM_ReplServer
, "GSFHT stmt: %x", node
->stmt
);
513 DbRetVal
writeToConfResFile(void *data
, int len
, void *stmtBuckets
,
517 bool isPrmStmt
=false;
518 char confResFile
[1024];
519 sprintf(confResFile
, "%s", Conf::config
.getConflResoFile());
520 int fd
= open(confResFile
, O_WRONLY
|O_CREAT
| O_APPEND
, 0644);
522 printError(ErrOS
, "Could not create conflict Resolution file");
526 char paramStmtString
[1024];
528 char *ptr
= (char *) data
;
529 int datalen
= *(int *) ptr
; ptr
+= sizeof(int);
530 int txnId
= *(int *) ptr
; ptr
+= sizeof(int);
531 strcpy(buffer
, "SET AUTOCOMMIT OFF;\n");
532 int ret
= os::write(fd
, buffer
, strlen(buffer
));
533 if (ret
!= strlen(buffer
)) {
534 printError(ErrOS
, "Write error into conf resolution file");
538 int counter
= 0; // if at all the statement is parameterized
541 while ((ptr
- (char *)data
) < len
) {
542 int stmtId
= *(int *)ptr
;
544 int bucketNo
= getHashBucket(stmtId
);
545 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
546 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
547 StmtNode
*node
= NULL
;
548 ListIterator it
= stmtBucket
->bucketList
.getIterator();
549 while(it
.hasElement()) {
550 node
= (StmtNode
*) it
.nextElement();
551 if(stmtId
== node
->stmtId
) break;
553 printf("DEBUG:node = %x\n", node
);
554 ExecType type
= (ExecType
) (*(int *) ptr
);
556 if (type
== SETPARAM
) {
560 sprintf(paramStmtString
, "%s", node
->stmtstr
);
561 char *it
= node
->stmtstr
;
563 int parampos
= *(int *)ptr
;
565 DataType dataType
= (DataType
) ( *(int *) ptr
);
567 int length
= *(int *) ptr
;
571 char * it
= paramStmtString
;
574 while (*it
!= '\0') {
577 if(pos
!= parampos
) { it
++; continue; }
582 case typeString
: case typeBinary
: case typeDate
:
583 case typeTime
: case typeTimeStamp
:
585 AllDataType::convertToString(it
, value
, dataType
, length
);
586 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
591 AllDataType::convertToString(it
, value
, dataType
, length
);
592 prntdChars
= AllDataType::printVal(value
, dataType
,length
);
596 sprintf(it
, " %s", buffer
);
597 //strcpy(buffer, paramStmtString);
604 sprintf(buffer
, "%s", node
->stmtstr
);
605 buffer
[strlen(buffer
)] = '\n';
606 ret
= os::write(fd
, buffer
, strlen(node
->stmtstr
)+1);
607 if(ret
!= strlen(node
->stmtstr
)+1) {
608 printError(ErrOS
, "Write error into conf resolution file");
612 strcpy(buffer
, paramStmtString
);
616 int strlength
= strlen(buffer
);
617 buffer
[strlen(buffer
)] = '\n';
618 ret
= os::write(fd
, buffer
, strlength
+1);
619 if(ret
!= strlength
+1) {
620 printError(ErrOS
, "Write error into conf resolution file");
626 strcpy(buffer
, "COMMIT;\n\n");
627 ret
= os::write(fd
, buffer
, strlen(buffer
));
628 if(ret
!= strlen(buffer
)) {
629 printError(ErrOS
, "Write error into conf resolution file");