1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 ***************************************************************************/
17 #include <Statement.h>
18 #include <SqlFactory.h>
19 #include <SqlStatement.h>
21 DbRetVal
iterateStmtLogs(void *startAddr
, int size
);
22 AbsSqlConnection
*conn
;
23 void *stmtBuckets
= NULL
;
25 bool interactive
=false;
26 char fileName
[MAX_FILE_LEN
];
27 void addToHashTable(int stmtID
, AbsSqlStatement
* sHdl
)
29 int bucketNo
= stmtID
% STMT_BUCKET_SIZE
;
30 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
31 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
32 StmtNode
*node
= new StmtNode();
33 node
->stmtId
= stmtID
;
35 stmtBucket
->bucketList
.append(node
);
38 void removeFromHashTable(int stmtID
)
40 int bucketNo
= stmtID
% STMT_BUCKET_SIZE
;
41 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
42 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
43 StmtNode
*node
= NULL
, *delNode
= NULL
;
44 ListIterator it
= stmtBucket
->bucketList
.getIterator();
45 while(it
.hasElement()) {
46 node
= (StmtNode
*) it
.nextElement();
47 if(stmtID
== node
->stmtId
) { delNode
= node
; break; }
50 if (delNode
!= NULL
) {
51 stmtBucket
->bucketList
.remove(delNode
);
56 AbsSqlStatement
*getStmtFromHashTable(int stmtId
)
58 int bucketNo
= stmtId
% STMT_BUCKET_SIZE
;
59 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
60 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
61 if (stmtBucket
== NULL
) return NULL
;
62 StmtNode
*node
= NULL
;
63 ListIterator it
= stmtBucket
->bucketList
.getIterator();
64 while(it
.hasElement()) {
65 node
= (StmtNode
*) it
.nextElement();
66 if(stmtId
== node
->stmtId
) {
67 SqlStatement
*sqlStmt
= (SqlStatement
*)node
->stmt
;
68 if (!sqlStmt
->isPrepared()) sqlStmt
->prepare();
75 bool isStmtInHashTable(int stmtId
)
77 int bucketNo
= stmtId
% STMT_BUCKET_SIZE
;
78 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
79 StmtBucket
*stmtBucket
= &buck
[bucketNo
];
80 if (stmtBucket
== NULL
) return false;
81 StmtNode
*node
= NULL
;
82 ListIterator it
= stmtBucket
->bucketList
.getIterator();
83 while(it
.hasElement()) {
84 node
= (StmtNode
*) it
.nextElement();
85 if(stmtId
== node
->stmtId
) {
86 SqlStatement
*sqlStmt
= (SqlStatement
*)node
->stmt
;
87 if (sqlStmt
->isPrepared()) return true;
95 void freeAllStmtHandles()
97 if (NULL
== stmtBuckets
) return;
98 StmtBucket
*buck
= (StmtBucket
*) stmtBuckets
;
99 StmtNode
*node
= NULL
;
100 for (int i
=0; i
<STMT_BUCKET_SIZE
; i
++)
102 StmtBucket
*stmtBucket
= &buck
[i
];
103 if (stmtBucket
== NULL
) continue;
104 ListIterator it
= stmtBucket
->bucketList
.getIterator();
105 while(it
.hasElement()) {
106 node
= (StmtNode
*) it
.nextElement();
113 void setParam(AbsSqlStatement
*stmt
, int pos
, DataType type
, int length
, void *value
)
118 stmt
->setIntParam(pos
, *(int*)value
);
121 stmt
->setLongParam(pos
, *(long*) value
);
124 stmt
->setLongLongParam(pos
, *(long long*)value
);
127 stmt
->setShortParam(pos
, *(short*)value
);
130 stmt
->setByteIntParam(pos
, *(ByteInt
*)value
);
133 stmt
->setDoubleParam(pos
, *(double*)value
);
136 stmt
->setFloatParam(pos
, *(float*)value
);
139 stmt
->setDateParam(pos
, *(Date
*)value
);
142 stmt
->setTimeParam(pos
, *(Time
*)value
);
145 stmt
->setTimeStampParam(pos
, *(TimeStamp
*)value
);
148 stmt
->setStringParam(pos
, (char*)value
);
151 stmt
->setBinaryParam(pos
, value
, length
);
154 printf("unknown type\n");
160 DbRetVal
readAndPopulateStmts()
163 char fName
[MAX_FILE_LEN
];
164 sprintf(fName
, "%s/csql.db.stmt", Conf::config
.getDbFile());
165 printf("Statement Redo log filename is :%s\n", fName
);
166 int fd
= open(fName
, O_RDONLY
);
167 if (-1 == fd
) { return OK
; }
168 if (fstat(fd
, &st
) == -1) {
169 printError(ErrSysInternal
, "Unable to retrieve stmt log file size");
171 return ErrSysInternal
;
173 if (NULL
!= stmtBuckets
)
175 printError(ErrSysInternal
, "stmtBuckets already populated");
176 return ErrSysInternal
;
178 stmtBuckets
= malloc (STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
179 memset(stmtBuckets
, 0, STMT_BUCKET_SIZE
* sizeof(StmtBucket
));
180 if (st
.st_size
==0) {
181 printError(ErrNote
, "No Statement logs found during recovery");
185 void *startAddr
= mmap(NULL
, st
.st_size
, PROT_READ
, MAP_PRIVATE
, fd
, 0);
186 if (MAP_FAILED
== startAddr
) {
187 printError(ErrSysInternal
, "Unable to mmap stmt log file\n");
188 return ErrSysInternal
;
190 DbRetVal rv
= iterateStmtLogs(startAddr
, st
.st_size
);
191 munmap((char*)startAddr
, st
.st_size
);
195 DbRetVal
filterAndWriteStmtLogs()
198 char fName
[MAX_FILE_LEN
];
199 sprintf(fName
, "%s/csql.db.stmt", Conf::config
.getDbFile());
200 int fdRead
= open(fName
, O_RDONLY
);
201 if (-1 == fdRead
) { return OK
; }
202 if (fstat(fdRead
, &st
) == -1) {
203 printError(ErrSysInternal
, "Unable to retrieve stmt log file size");
205 return ErrSysInternal
;
207 if (st
.st_size
==0) {
211 void *startAddr
= mmap(NULL
, st
.st_size
, PROT_READ
, MAP_PRIVATE
, fdRead
, 0);
212 if (MAP_FAILED
== startAddr
) {
213 printError(ErrSysInternal
, "Unable to mmap stmt log file\n");
214 return ErrSysInternal
;
216 sprintf(fName
, "%s/csql.db.stmt1", Conf::config
.getDbFile());
217 int fd
= os::openFileForAppend(fName
, O_CREAT
|O_TRUNC
);
218 char *iter
= (char*)startAddr
;
219 char *logStart
= NULL
, *logEnd
= NULL
;
225 if (iter
- (char*)startAddr
>= st
.st_size
) break;
226 logType
= *(int*)iter
;
228 if (logType
== -1) { //prepare
229 iter
= iter
+ sizeof(int);
231 iter
= iter
+ 2 * sizeof(int);
232 stmtID
= *(int*)iter
;
233 iter
= logStart
+ len
;
235 if (isStmtInHashTable(stmtID
))
236 ret
= os::write(fd
, logStart
, len
);
238 printError(ErrSysInternal
, "Unable to write statement logs");
241 else if(logType
== -3) { //free
242 iter
= logStart
+ 4 *sizeof(int);
244 printError(ErrSysInternal
, "Stmt Redo log file corrupted: logType:%d", logType
);
250 munmap((char*)startAddr
, st
.st_size
);
252 char cmd
[MAX_FILE_LEN
*2];
253 sprintf(cmd
, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
254 Conf::config
.getDbFile(), Conf::config
.getDbFile());
258 DbRetVal
iterateStmtLogs(void *startAddr
, int size
)
260 char *iter
= (char*)startAddr
;
265 int len
, ret
, retVal
=0;
267 char stmtString
[SQL_STMT_LEN
];
270 if (iter
- (char*)startAddr
>= size
) break;
271 logType
= *(int*)iter
;
272 if (logType
== -1) { //prepare
273 iter
= iter
+ sizeof(int);
274 txnID
= *(int*) iter
; iter
+= sizeof(int);
275 loglen
= *(int*) iter
; iter
+= sizeof(int);
276 stmtID
= *(int*)iter
;
277 iter
= iter
+ sizeof(int);
279 iter
= iter
+ sizeof(int);
280 strncpy(stmtString
, iter
, len
);
283 printf("PREPARE: SID:%d %s\n", stmtID
, stmtString
);
286 if (interactive
) printf("STMTLOG PREPARE SID:%d %s\n", stmtID
, stmtString
);
287 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlDirect
);
288 stmt
->setConnection(conn
);
289 SqlStatement
*sqlStmt
= (SqlStatement
*)stmt
;
290 sqlStmt
->setStmtString(stmtString
);
291 addToHashTable(stmtID
, stmt
);
293 else if(logType
== -3) { //free
294 iter
= iter
+ sizeof(int);
295 txnID
= *(int*) iter
; iter
+= sizeof(int);
296 loglen
= *(int*) iter
; iter
+= sizeof(int);
297 stmtID
= *(int*)iter
;
298 iter
= iter
+ sizeof(int);
300 printf("FREE: SID:%d TID:%d \n", stmtID
, txnID
);
304 printError(ErrSysInternal
, "Stmt Redo log file corrupted: logType:%d", logType
);
312 int main(int argc
, char **argv
)
315 strcpy(fileName
, "");
317 while ((c
= getopt(argc
, argv
, "f:ail?")) != EOF
) {
319 case '?' : { opt
= 1; break; } //print help
320 case 'a' : { opt
= 2; break; }
321 case 'i' : { interactive
= true; break; }
322 case 'l' : { list
= true; break; }
323 case 'f' : {strcpy(fileName
, argv
[optind
- 1]); break;}
324 default: printf("Wrong args\n"); exit(1);
329 printf("This is an internal csql command with i and f <filename> options.");
332 char *verbose
= os::getenv("CSQL_INTERACTIVE");
333 if (verbose
!=NULL
&& strcmp(verbose
, "true") == 0)
335 printf("VERBOSE ON %s\n", verbose
);
340 Conf::config
.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
341 if (strcmp(fileName
, "") ==0) {
342 sprintf(fileName
, "%s/csql.db.cur", Conf::config
.getDbFile());
344 int fd
= open(fileName
, O_RDONLY
);
345 if (-1 == fd
) { return OK
; }
346 if (fstat(fd
, &st
) == -1) {
347 printError(ErrSysInternal
, "Unable to retrieve undo log file size");
351 if (st
.st_size
==0) {
352 printError(ErrNote
, "No Redo logs found during recovery");
353 readAndPopulateStmts();
357 conn
= SqlFactory::createConnection(CSqlDirect
);
358 DbRetVal rv
= conn
->connect(I_USER
, I_PASS
);
359 SqlConnection
*sCon
= (SqlConnection
*) conn
;
360 if(!list
) rv
= sCon
->getExclusiveLock();
361 //during connection close, this exclusive lock will be automatically released
368 void *startAddr
= mmap(NULL
, st
.st_size
, PROT_READ
, MAP_PRIVATE
, fd
, 0);
369 if (MAP_FAILED
== startAddr
) {
370 printf("Unable to read undo log file:mmap failed.\n");
375 rv
= readAndPopulateStmts();
378 printf("Unable to read stmt log file\n");
384 printf("Redo log filename is :%s\n", fileName
);
385 char *iter
= (char*)startAddr
;
390 int len
, ret
, retVal
=0;
392 char stmtString
[SQL_STMT_LEN
];
393 //printf("size of file %d\n", st.st_size);
395 //printf("OFFSET HERE %d\n", iter - (char*)startAddr);
396 if (iter
- (char*)startAddr
>= st
.st_size
) break;
397 logType
= *(int*)iter
;
398 if (logType
== -1) { //prepare
399 iter
= iter
+ sizeof(int);
400 txnID
= *(int*) iter
; iter
+= sizeof(int);
401 loglen
= *(int*) iter
; iter
+= sizeof(int);
402 stmtID
= *(int*)iter
;
403 iter
= iter
+ sizeof(int);
405 iter
= iter
+ sizeof(int);
406 strncpy(stmtString
, iter
, len
);
409 printf("PREPARE: SID:%d %s\n", stmtID
, stmtString
);
412 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlDirect
);
413 stmt
->setConnection(conn
);
414 if (interactive
) printf("PREPARE %d : %s\n", stmtID
, stmtString
);
415 rv
= stmt
->prepare(stmtString
);
417 printError(ErrSysInternal
, "unable to prepare stmt:%s", stmtString
);
421 SqlStatement
*sqlStmt
= (SqlStatement
*)stmt
;
422 sqlStmt
->setLoading(true);
423 addToHashTable(stmtID
, stmt
);
425 else if(logType
== -2) { //commit
427 iter
= iter
+ sizeof(int);
428 txnID
= *(int*) iter
; iter
+= sizeof(int);
429 loglen
= *(int*) iter
; iter
+= sizeof(int);
432 //printf("Iter length %d\n", iter - curPtr);
433 if (iter
- (char*)startAddr
>= st
.st_size
) {
435 //printf("Redo log file end\n");
439 stmtID
= *(int*)iter
;
440 //printf("stmtid %d\n", stmtID);
441 if (interactive
) printf("EXEC %d :\n", stmtID
);
442 iter
= iter
+ sizeof(int);
444 //printf("eType is %d\n", eType);
445 AbsSqlStatement
*stmt
= NULL
;
447 stmt
= getStmtFromHashTable(stmtID
);
449 printError(ErrSysInternal
, "Unable to find in stmt hashtable");
454 if (0 == eType
) { //execute type
455 iter
= iter
+ sizeof(int);
457 printf("EXEC SID:%d TID:%d\n", stmtID
, txnID
);
458 if (*(int*)iter
<0) break;
462 rv
= stmt
->execute(ret
);
464 printError(ErrSysInternal
, "unable to execute");
469 printError(ErrSysInternal
, "statement not found for %d\n",stmtID
);
471 if (*(int*)iter
<0) break;
472 } else if ( 1 == eType
) { //set type
473 iter
=iter
+sizeof(int);
474 int pos
= *(int*) iter
;
475 iter
=iter
+sizeof(int);
476 DataType type
= (DataType
)(*(int*)iter
);
477 iter
=iter
+sizeof(int);
478 int len
= *(int*) iter
;
479 iter
=iter
+sizeof(int);
483 printf("SET SID:%d POS:%d TYPE:%d LEN:%d Value:", stmtID
, pos
, type
, len
);
484 AllDataType::printVal(value
, type
, len
);
486 if (*(int*)iter
<0) break;
489 setParam(stmt
, pos
, type
, len
, value
);
490 if (*(int*)iter
<0) break;
495 else if(logType
== -3) { //free
496 iter
= iter
+ sizeof(int);
497 txnID
= *(int*) iter
; iter
+= sizeof(int);
498 loglen
= *(int*) iter
; iter
+= sizeof(int);
499 stmtID
= *(int*)iter
;
500 iter
= iter
+ sizeof(int);
502 printf("FREE SID:%d \n", stmtID
);
505 if (interactive
) printf("FREE %d:\n", stmtID
);
506 AbsSqlStatement
*stmt
= getStmtFromHashTable(stmtID
);
510 removeFromHashTable(stmtID
);
511 } else { printError(ErrSysInternal
, "statement not found for %d\n",stmtID
);}
513 else if(logType
== -4) { //prepare and execute
514 iter
= iter
+ sizeof(int);
515 txnID
= *(int*) iter
; iter
+= sizeof(int);
516 loglen
= *(int*) iter
; iter
+= sizeof(int);
517 stmtID
= *(int*)iter
;
518 iter
= iter
+ sizeof(int);
520 iter
= iter
+ sizeof(int);
521 strncpy(stmtString
, iter
, len
);
522 stmtString
[len
+1] ='\0';
525 printf("EXECDIRECT SID:%d TID:%d STMT:%s\n", stmtID
, txnID
, stmtString
);
528 AbsSqlStatement
*stmt
= SqlFactory::createStatement(CSqlDirect
);
530 printError(ErrSysInternal
, "unable to prepare:%s", stmtString
);
534 stmt
->setConnection(conn
);
535 if (interactive
) printf("EXECDIRECT %d : %s\n", stmtID
, stmtString
);
536 rv
= stmt
->prepare(stmtString
);
538 printError(ErrSysInternal
, "unable to prepare:%s", stmtString
);
544 rv
= stmt
->execute(ret
);
546 if (strlen(stmtString
) > 6 &&
547 ( (strncasecmp(stmtString
,"CREATE", 6) == 0) ||
548 (strncasecmp(stmtString
,"DROP", 4) == 0)) ) {
549 // conn->disconnect();
553 printError(ErrSysInternal
, "unable to execute %s", stmtString
);
561 printError(ErrSysInternal
, "Redo log file corrupted: logType:%d", logType
);
566 munmap((char*)startAddr
, st
.st_size
);
569 filterAndWriteStmtLogs();
570 freeAllStmtHandles();