redo log changes. prepare with param will go to stmt logs and other stmts go to redo...
[csql.git] / src / tools / redo.cxx
blob675c17fe0f57227d5399927a72023a3731e41be6
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <os.h>
17 #include <Statement.h>
18 #include <SqlFactory.h>
19 #include <SqlStatement.h>
21 DbRetVal iterateStmtLogs(void *startAddr, int size);
22 AbsSqlConnection *conn;
23 void *stmtBuckets = NULL;
24 bool list = false;
25 bool interactive=false;
26 char fileName[MAX_FILE_LEN];
27 void addToHashTable(int stmtID, AbsSqlStatement* sHdl)
29 int bucketNo = stmtID % STMT_BUCKET_SIZE;
30 StmtBucket *buck = (StmtBucket *) stmtBuckets;
31 StmtBucket *stmtBucket = &buck[bucketNo];
32 StmtNode *node = new StmtNode();
33 node->stmtId = stmtID;
34 node->stmt = sHdl;
35 stmtBucket->bucketList.append(node);
36 return;
38 void removeFromHashTable(int stmtID)
40 int bucketNo = stmtID % STMT_BUCKET_SIZE;
41 StmtBucket *buck = (StmtBucket *) stmtBuckets;
42 StmtBucket *stmtBucket = &buck[bucketNo];
43 StmtNode *node = NULL, *delNode = NULL;
44 ListIterator it = stmtBucket->bucketList.getIterator();
45 while(it.hasElement()) {
46 node = (StmtNode *) it.nextElement();
47 if(stmtID == node->stmtId) { delNode = node; break; }
49 it.reset();
50 if (delNode != NULL) {
51 stmtBucket->bucketList.remove(delNode);
52 delete delNode;
54 return;
56 AbsSqlStatement *getStmtFromHashTable(int stmtId)
58 int bucketNo = stmtId % STMT_BUCKET_SIZE;
59 StmtBucket *buck = (StmtBucket *) stmtBuckets;
60 StmtBucket *stmtBucket = &buck[bucketNo];
61 if (stmtBucket == NULL) return NULL;
62 StmtNode *node = NULL;
63 ListIterator it = stmtBucket->bucketList.getIterator();
64 while(it.hasElement()) {
65 node = (StmtNode *) it.nextElement();
66 if(stmtId == node->stmtId) {
67 SqlStatement *sqlStmt = (SqlStatement*)node->stmt;
68 if (!sqlStmt->isPrepared()) sqlStmt->prepare();
69 return node->stmt;
72 return NULL;
75 bool isStmtInHashTable(int stmtId)
77 int bucketNo = stmtId % STMT_BUCKET_SIZE;
78 StmtBucket *buck = (StmtBucket *) stmtBuckets;
79 StmtBucket *stmtBucket = &buck[bucketNo];
80 if (stmtBucket == NULL) return false;
81 StmtNode *node = NULL;
82 ListIterator it = stmtBucket->bucketList.getIterator();
83 while(it.hasElement()) {
84 node = (StmtNode *) it.nextElement();
85 if(stmtId == node->stmtId) {
86 SqlStatement *sqlStmt = (SqlStatement*)node->stmt;
87 if (sqlStmt->isPrepared()) return true;
88 else break;
91 return false;
95 void freeAllStmtHandles()
97 if (NULL == stmtBuckets) return;
98 StmtBucket *buck = (StmtBucket *) stmtBuckets;
99 StmtNode *node = NULL;
100 for (int i=0; i <STMT_BUCKET_SIZE; i++)
102 StmtBucket *stmtBucket = &buck[i];
103 if (stmtBucket == NULL) continue;
104 ListIterator it = stmtBucket->bucketList.getIterator();
105 while(it.hasElement()) {
106 node = (StmtNode *) it.nextElement();
107 node->stmt->free();
108 delete node->stmt;
111 ::free(stmtBuckets);
113 void setParam(AbsSqlStatement *stmt, int pos, DataType type , int length, void *value)
115 switch(type)
117 case typeInt:
118 stmt->setIntParam(pos, *(int*)value);
119 break;
120 case typeLong:
121 stmt->setLongParam(pos, *(long*) value);
122 break;
123 case typeLongLong:
124 stmt->setLongLongParam(pos, *(long long*)value);
125 break;
126 case typeShort:
127 stmt->setShortParam(pos, *(short*)value);
128 break;
129 case typeByteInt:
130 stmt->setByteIntParam(pos, *(ByteInt*)value);
131 break;
132 case typeDouble:
133 stmt->setDoubleParam(pos, *(double*)value);
134 break;
135 case typeFloat:
136 stmt->setFloatParam(pos, *(float*)value);
137 break;
138 case typeDate:
139 stmt->setDateParam(pos, *(Date*)value);
140 break;
141 case typeTime:
142 stmt->setTimeParam(pos, *(Time*)value);
143 break;
144 case typeTimeStamp:
145 stmt->setTimeStampParam(pos, *(TimeStamp*)value);
146 break;
147 case typeString:
148 stmt->setStringParam(pos, (char*)value);
149 break;
150 case typeBinary:
151 stmt->setBinaryParam(pos, value, length);
152 break;
153 default:
154 printf("unknown type\n");
155 break;
157 return;
160 DbRetVal readAndPopulateStmts()
162 struct stat st;
163 char fName[MAX_FILE_LEN];
164 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
165 printf("Statement Redo log filename is :%s\n", fName);
166 int fd = open(fName, O_RDONLY);
167 if (-1 == fd) { return OK; }
168 if (fstat(fd, &st) == -1) {
169 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
170 close(fd);
171 return ErrSysInternal;
173 if (NULL != stmtBuckets)
175 printError(ErrSysInternal, "stmtBuckets already populated");
176 return ErrSysInternal;
178 stmtBuckets = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket));
179 memset(stmtBuckets, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket));
180 if (st.st_size ==0) {
181 printError(ErrNote, "No Statement logs found during recovery");
182 close(fd);
183 return OK;
185 void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
186 if (MAP_FAILED == startAddr) {
187 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
188 return ErrSysInternal;
190 DbRetVal rv = iterateStmtLogs(startAddr, st.st_size);
191 munmap((char*)startAddr, st.st_size);
192 close(fd);
193 return rv;
195 DbRetVal filterAndWriteStmtLogs()
197 struct stat st;
198 char fName[MAX_FILE_LEN];
199 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
200 int fdRead = open(fName, O_RDONLY);
201 if (-1 == fdRead) { return OK; }
202 if (fstat(fdRead, &st) == -1) {
203 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
204 close(fdRead);
205 return ErrSysInternal;
207 if (st.st_size ==0) {
208 close(fdRead);
209 return OK;
211 void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdRead, 0);
212 if (MAP_FAILED == startAddr) {
213 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
214 return ErrSysInternal;
216 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
217 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
218 char *iter = (char*)startAddr;
219 char *logStart = NULL, *logEnd = NULL;
220 int logType;
221 int stmtID;
222 int len =0, ret =0;
223 DbRetVal rv = OK;
224 while(true) {
225 if (iter - (char*)startAddr >= st.st_size) break;
226 logType = *(int*)iter;
227 logStart = iter;
228 if (logType == -1) { //prepare
229 iter = iter + sizeof(int);
230 len = *(int*) iter;
231 iter = iter + 2 * sizeof(int);
232 stmtID = *(int*)iter;
233 iter = logStart+ len;
234 ret =0;
235 if (isStmtInHashTable(stmtID))
236 ret = os::write(fd, logStart, len);
237 if (-1 == ret) {
238 printError(ErrSysInternal, "Unable to write statement logs");
241 else if(logType == -3) { //free
242 iter = logStart + 4 *sizeof(int);
243 }else{
244 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
245 rv = ErrSysInternal;
246 break;
249 os::closeFile(fd);
250 munmap((char*)startAddr, st.st_size);
251 close(fdRead);
252 char cmd[MAX_FILE_LEN *2];
253 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
254 Conf::config.getDbFile(), Conf::config.getDbFile());
255 ret = system(cmd);
256 return rv;
258 DbRetVal iterateStmtLogs(void *startAddr, int size)
260 char *iter = (char*)startAddr;
261 void *value = NULL;
262 int logType, eType;
263 int stmtID;
264 int txnID;
265 int len, ret, retVal =0;
266 int loglen;
267 char stmtString[SQL_STMT_LEN];
268 DbRetVal rv = OK;
269 while(true) {
270 if (iter - (char*)startAddr >= size) break;
271 logType = *(int*)iter;
272 if (logType == -1) { //prepare
273 iter = iter + sizeof(int);
274 txnID = *(int*) iter; iter += sizeof(int);
275 loglen = *(int*) iter; iter += sizeof(int);
276 stmtID = *(int*)iter;
277 iter = iter + sizeof(int);
278 len = *(int*)iter;
279 iter = iter + sizeof(int);
280 strncpy(stmtString, iter, len);
281 iter = iter + len;
282 if (list) {
283 printf("PREPARE: SID:%d %s\n", stmtID, stmtString);
284 continue;
286 if (interactive) printf("STMTLOG PREPARE SID:%d %s\n", stmtID, stmtString);
287 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect);
288 stmt->setConnection(conn);
289 SqlStatement *sqlStmt = (SqlStatement*)stmt;
290 sqlStmt->setStmtString(stmtString);
291 addToHashTable(stmtID, stmt);
293 else if(logType == -3) { //free
294 iter = iter + sizeof(int);
295 txnID = *(int*) iter; iter += sizeof(int);
296 loglen = *(int*) iter; iter += sizeof(int);
297 stmtID = *(int*)iter;
298 iter = iter + sizeof(int);
299 if (list) {
300 printf("FREE: SID:%d TID:%d \n", stmtID, txnID);
301 continue;
303 }else{
304 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
305 rv = ErrSysInternal;
306 break;
309 return rv;
312 int main(int argc, char **argv)
314 struct stat st;
315 strcpy(fileName, "");
316 int c = 0, opt=0;
317 while ((c = getopt(argc, argv, "f:ail?")) != EOF) {
318 switch (c) {
319 case '?' : { opt = 1; break; } //print help
320 case 'a' : { opt = 2; break; }
321 case 'i' : { interactive = true; break; }
322 case 'l' : { list = true; break; }
323 case 'f' : {strcpy(fileName , argv[optind - 1]); break;}
324 default: printf("Wrong args\n"); exit(1);
327 }//while options
328 if (2 !=opt) {
329 printf("This is an internal csql command with i and f <filename> options.");
330 exit(1);
332 char *verbose = os::getenv("CSQL_INTERACTIVE");
333 if (verbose !=NULL && strcmp(verbose, "true") == 0)
335 printf("VERBOSE ON %s\n", verbose);
336 interactive=true;
340 Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
341 if (strcmp(fileName, "") ==0) {
342 sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile());
344 int fd = open(fileName, O_RDONLY);
345 if (-1 == fd) { return OK; }
346 if (fstat(fd, &st) == -1) {
347 printError(ErrSysInternal, "Unable to retrieve undo log file size");
348 close(fd);
349 return 1;
351 if (st.st_size ==0) {
352 printError(ErrNote, "No Redo logs found during recovery");
353 readAndPopulateStmts();
354 close(fd);
355 return 0;
357 conn = SqlFactory::createConnection(CSqlDirect);
358 DbRetVal rv = conn->connect(I_USER, I_PASS);
359 SqlConnection *sCon = (SqlConnection*) conn;
360 if(!list) rv = sCon->getExclusiveLock();
361 //during connection close, this exclusive lock will be automatically released
362 if (rv != OK) {
363 close(fd);
364 conn->disconnect();
365 delete conn;
366 return 1;
368 void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
369 if (MAP_FAILED == startAddr) {
370 printf("Unable to read undo log file:mmap failed.\n");
371 conn->disconnect();
372 delete conn;
373 return 2;
375 rv = readAndPopulateStmts();
376 if (OK != rv)
378 printf("Unable to read stmt log file\n");
379 conn->disconnect();
380 delete conn;
381 return 2;
384 printf("Redo log filename is :%s\n", fileName);
385 char *iter = (char*)startAddr;
386 void *value = NULL;
387 int logType, eType;
388 int stmtID;
389 int txnID;
390 int len, ret, retVal =0;
391 int loglen;
392 char stmtString[SQL_STMT_LEN];
393 //printf("size of file %d\n", st.st_size);
394 while(true) {
395 //printf("OFFSET HERE %d\n", iter - (char*)startAddr);
396 if (iter - (char*)startAddr >= st.st_size) break;
397 logType = *(int*)iter;
398 if (logType == -1) { //prepare
399 iter = iter + sizeof(int);
400 txnID = *(int*) iter; iter += sizeof(int);
401 loglen = *(int*) iter; iter += sizeof(int);
402 stmtID = *(int*)iter;
403 iter = iter + sizeof(int);
404 len = *(int*)iter;
405 iter = iter + sizeof(int);
406 strncpy(stmtString, iter, len);
407 iter = iter + len;
408 if (list) {
409 printf("PREPARE: SID:%d %s\n", stmtID, stmtString);
410 continue;
412 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect);
413 stmt->setConnection(conn);
414 if (interactive) printf("PREPARE %d : %s\n", stmtID, stmtString);
415 rv = stmt->prepare(stmtString);
416 if (rv != OK) {
417 printError(ErrSysInternal, "unable to prepare stmt:%s", stmtString);
418 retVal=1;
419 break;
421 SqlStatement *sqlStmt = (SqlStatement*)stmt;
422 sqlStmt->setLoading(true);
423 addToHashTable(stmtID, stmt);
425 else if(logType == -2) { //commit
426 conn->beginTrans();
427 iter = iter + sizeof(int);
428 txnID = *(int*) iter; iter += sizeof(int);
429 loglen = *(int*) iter; iter += sizeof(int);
430 char *curPtr = iter;
431 while(true) {
432 //printf("Iter length %d\n", iter - curPtr);
433 if (iter - (char*)startAddr >= st.st_size) {
434 //file end reached
435 //printf("Redo log file end\n");
436 retVal=0;
437 break;
439 stmtID = *(int*)iter;
440 //printf("stmtid %d\n", stmtID);
441 if (interactive) printf("EXEC %d :\n", stmtID);
442 iter = iter + sizeof(int);
443 eType = *(int*)iter;
444 //printf("eType is %d\n", eType);
445 AbsSqlStatement *stmt = NULL;
446 if (!list) {
447 stmt = getStmtFromHashTable(stmtID);
448 if (NULL == stmt) {
449 printError(ErrSysInternal, "Unable to find in stmt hashtable");
450 retVal=2;
451 break;
454 if (0 == eType) { //execute type
455 iter = iter + sizeof(int);
456 if (list) {
457 printf("EXEC SID:%d TID:%d\n", stmtID, txnID);
458 if (*(int*)iter <0) break;
459 continue;
461 if (stmt) {
462 rv = stmt->execute(ret);
463 if (rv != OK) {
464 printError(ErrSysInternal, "unable to execute");
465 retVal=2;
466 break;
468 } else {
469 printError(ErrSysInternal, "statement not found for %d\n",stmtID);
471 if (*(int*)iter <0) break;
472 } else if ( 1 == eType) { //set type
473 iter=iter+sizeof(int);
474 int pos = *(int*) iter;
475 iter=iter+sizeof(int);
476 DataType type = (DataType)(*(int*)iter);
477 iter=iter+sizeof(int);
478 int len = *(int*) iter;
479 iter=iter+sizeof(int);
480 value = iter;
481 iter=iter+len;
482 if (list) {
483 printf("SET SID:%d POS:%d TYPE:%d LEN:%d Value:", stmtID, pos, type, len);
484 AllDataType::printVal(value, type, len);
485 printf("\n");
486 if (*(int*)iter <0) break;
487 continue;
489 setParam(stmt, pos, type, len, value);
490 if (*(int*)iter <0) break;
493 conn->commit();
495 else if(logType == -3) { //free
496 iter = iter + sizeof(int);
497 txnID = *(int*) iter; iter += sizeof(int);
498 loglen = *(int*) iter; iter += sizeof(int);
499 stmtID = *(int*)iter;
500 iter = iter + sizeof(int);
501 if (list) {
502 printf("FREE SID:%d \n", stmtID);
503 continue;
505 if (interactive) printf("FREE %d:\n", stmtID);
506 AbsSqlStatement *stmt = getStmtFromHashTable(stmtID);
507 if (stmt) {
508 stmt->free();
509 delete stmt;
510 removeFromHashTable(stmtID);
511 } else { printError(ErrSysInternal, "statement not found for %d\n",stmtID);}
513 else if(logType == -4) { //prepare and execute
514 iter = iter + sizeof(int);
515 txnID = *(int*) iter; iter += sizeof(int);
516 loglen = *(int*) iter; iter += sizeof(int);
517 stmtID = *(int*)iter;
518 iter = iter + sizeof(int);
519 len = *(int*)iter;
520 iter = iter + sizeof(int);
521 strncpy(stmtString, iter, len);
522 stmtString[len+1] ='\0';
523 iter = iter + len;
524 if (list) {
525 printf("EXECDIRECT SID:%d TID:%d STMT:%s\n", stmtID, txnID, stmtString);
526 continue;
528 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlDirect);
529 if ( NULL == stmt) {
530 printError(ErrSysInternal, "unable to prepare:%s", stmtString);
531 retVal=3;
532 break;
534 stmt->setConnection(conn);
535 if (interactive) printf("EXECDIRECT %d : %s\n", stmtID, stmtString);
536 rv = stmt->prepare(stmtString);
537 if (rv != OK) {
538 printError(ErrSysInternal, "unable to prepare:%s", stmtString);
539 stmt->free();
540 delete stmt;
541 retVal=4;
542 break;
544 rv = stmt->execute(ret);
545 if (rv != OK) {
546 if (strlen(stmtString) > 6 &&
547 ( (strncasecmp(stmtString,"CREATE", 6) == 0) ||
548 (strncasecmp(stmtString,"DROP", 4) == 0)) ) {
549 // conn->disconnect();
550 // return OK;
551 continue;
553 printError(ErrSysInternal, "unable to execute %s", stmtString);
554 stmt->free();
555 retVal=5;
556 break;
558 stmt->free();
559 delete stmt;
560 }else{
561 printError(ErrSysInternal, "Redo log file corrupted: logType:%d", logType);
562 retVal=6;
563 break;
566 munmap((char*)startAddr, st.st_size);
567 close(fd);
568 if (!list) {
569 filterAndWriteStmtLogs();
570 freeAllStmtHandles();
572 conn->disconnect();
573 delete conn;
574 return retVal;