redo log changes. prepare with param will go to stmt logs and other stmts go to redo...
[csql.git] / src / storage / Database.cxx
blob1ef3d2b219e5ea4f8efd18b3a6f84d5ffa70a616
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 const char* Database::getName()
28 return metaData_->dbName_;
31 int Database::getDatabaseID()
33 return metaData_->dbID_;
36 long Database::getMaxSize()
38 return metaData_->maxSize_;
41 long Database::getCurrentSize()
43 return metaData_->curSize_;
46 Page* Database::getCurrentPage()
48 return metaData_->curPage_;
51 Page* Database::getFirstPage()
53 return metaData_->firstPage_;
56 int Database::getNoOfChunks()
58 return metaData_->noOfChunks_;
60 Chunk* Database::getHashIndexChunk()
62 return metaData_->hashIndexChunk_;
65 void Database::setDatabaseID(int id)
67 metaData_->dbID_ = id;
69 void Database::setName(const char *name)
71 strcpy(metaData_->dbName_ , name);
73 void Database::setCurrentSize(long size)
75 metaData_->curSize_ = size;
77 void Database::setCurrentPage(Page *page)
79 //metaData_->curPage_ = page;
80 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
82 void Database::setFirstPage(Page *page)
84 metaData_->firstPage_ = page;
86 void Database::setMaxSize(long size)
88 metaData_->maxSize_ = size;
90 void Database::setNoOfChunks(int chunks)
92 metaData_->noOfChunks_ = chunks;
94 void Database::setHashIndexChunk(Chunk *ch)
96 metaData_->hashIndexChunk_ = ch;
100 int Database::initAllocDatabaseMutex()
102 return metaData_->dbAllocMutex_.init("allocdb");
104 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
106 int ret= metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
107 if (ret) return ErrLockTimeOut; else return OK;
109 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
111 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
112 return OK;
115 int Database::initPrepareStmtMutex()
117 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
119 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
121 int ret= metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
122 if (ret) return ErrLockTimeOut; else return OK;
124 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
126 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
127 return OK;
130 int Database::initTransTableMutex()
132 return metaData_->dbTransTableMutex_.init("transtable");
134 DbRetVal Database::getTransTableMutex()
136 int ret = metaData_->dbTransTableMutex_.getLock(procSlot);
137 if (ret) return ErrLockTimeOut; else return OK;
139 DbRetVal Database::releaseTransTableMutex()
141 metaData_->dbTransTableMutex_.releaseLock(procSlot);
142 return OK;
147 int Database::initProcessTableMutex()
149 return metaData_->dbProcTableMutex_.init("proctable");
151 DbRetVal Database::getProcessTableMutex(bool procAccount)
153 int ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
154 if (ret) return ErrLockTimeOut; else return OK;
156 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
158 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
159 return OK;
164 int Database::initCheckpointMutex()
166 return metaData_->ckptMutex_.init("checkpoint");
168 DbRetVal Database::getSCheckpointMutex(bool procAccount)
170 int ret = metaData_->ckptMutex_.getShareLock(procSlot, procAccount);
171 if (ret) return ErrLockTimeOut; else return OK;
173 DbRetVal Database::getXCheckpointMutex(bool procAccount)
175 int ret = metaData_->ckptMutex_.getExclusiveLock(procSlot, procAccount);
176 if (ret) return ErrLockTimeOut; else return OK;
178 DbRetVal Database::releaseCheckpointMutex(bool procAccount)
180 metaData_->ckptMutex_.releaseShareLock(procSlot, procAccount);
181 return OK;
184 // Gets the free page
185 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
186 // of each page to determine if the page is free
187 // Algorithm is to scan through the pageInfo objects stored at
188 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
189 // database
190 // But in case of large tuples, pages are merged, so there wont be
191 // PageInfo object on pages which are merged.
192 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
194 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
195 Page* Database::getFreePage()
197 Page* page = getFirstPage();
198 //Page* page = getCurrentPage();
199 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
200 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
201 PageInfo* pageInfo = ((PageInfo*)page);
202 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
203 int pageSize = PAGE_SIZE;
204 bool isEndAddchk=false;
205 while( 1 == pageInfo->isUsed_)
207 //If any pages are merged to store data larger than PAGE_SIZE
208 //move to the next page after the merge and check whether it is used
209 if ( pageInfo->nextPageAfterMerge_ == NULL) {
210 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
211 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
213 else {
214 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
215 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
218 if((((char*) pageInfo) + pageSize) >= endAddr )
220 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
221 else
222 break;
224 if ((char*)pageInfo >= endAddr)
226 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
227 return NULL;
231 if (!isValidAddress(((char*) pageInfo) + pageSize))
233 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
234 return NULL;
236 setCurrentPage((Page*) pageInfo);
237 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
238 return (Page*) pageInfo ;
241 //Used by tuples more than PAGE_SIZE
242 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
243 Page* Database::getFreePage(size_t size)
245 Page* page = getFirstPage();
246 PageInfo* pageInfo = ((PageInfo*)page);
247 int multiple = size / PAGE_SIZE;
248 int offset = ((multiple + 1) * PAGE_SIZE);
249 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
250 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
251 int pageSize = PAGE_SIZE;
252 bool isEndAddchk = false;
253 while(true){
254 while( 1 == pageInfo->isUsed_)
256 //If any pages are merged to store data larger than PAGE_SIZE
257 //move to the next page after the merge and check whether it is used
258 if ( pageInfo->nextPageAfterMerge_ == NULL) {
259 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
260 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
262 else {
263 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
264 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
266 if((((char*) pageInfo) + offset) >= endAddr )
268 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
269 else
270 break;
273 int i = 0;
274 PageInfo *pInfo = pageInfo;
275 if ((((char*)pInfo) + offset) >= endAddr)
277 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
278 return NULL;
280 for (i = 0; i< multiple + 1; i++)
282 if (1 == pInfo->isUsed_) break;
283 pInfo = (PageInfo*)((char*)pInfo + pageSize);
285 if ( i == (multiple + 1)) break;
286 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
289 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
290 setCurrentPage((Page*) pageInfo);
291 return (Page*) pageInfo ;
294 void Database::printStatistics()
296 Page* page = getFirstPage();
297 PageInfo* pageInfo = ((PageInfo*)page);
298 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
299 int totalDirtyPages=0;
300 printf("<DatabaseStatistics>\n");
301 printf(" <Database Name> %s </Database Name>\n", getName());
302 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
303 printf(" <First Page> %x </First Page>\n", getFirstPage());
304 while(isValidAddress((char*) pageInfo))
306 if (pageInfo == NULL) break;
307 //if (pageInfo > getCurrentPage()) break;
308 if (1 == pageInfo->isUsed_) {
309 if ( pageInfo->nextPageAfterMerge_ == NULL) {
310 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
311 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
312 usedPageCount++; totalPages++;
313 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
314 continue;
316 else {
317 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
318 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
319 usedMergedPageCount++; totalPages++;
320 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
321 continue;
323 } else if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
324 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
325 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
326 totalPages++;
328 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
329 if (Conf::config.useDurability())
330 printf(" <Dirty Pages> %d </Dirty Pages>\n", totalDirtyPages);
331 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
332 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
333 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
334 printf("</DatabaseStatistics>\n");
336 return ;
340 //called only in case of system database to create and initialize the chunk
341 //information
342 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
345 Chunk *chunk;
346 if (-1 == id )
348 printError(ErrSysFatal, "Database ID corrupted");
349 return ErrSysFatal;
351 chunk = getSystemDatabaseChunk(id);
353 chunk->setChunkNameForSystemDB(id);
355 if (FixedSizeAllocator == type) chunk->setSize(size);
356 //getDatabaseMutex();
357 if (chunk->allocSize_ > PAGE_SIZE)
358 chunk->curPage_ = getFreePage(chunk->allocSize_);
359 else
360 chunk->curPage_ = getFreePage();
361 if ( chunk->curPage_ == NULL)
363 //releaseDatabaseMutex();
364 printError(ErrNoMemory, "No free pages in database: Database full");
365 return ErrNoMemory;
368 chunk->firstPage_ = chunk->curPage_;
369 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
370 firstPageInfo->setFirstPageAsUsed();
371 chunk->setChunkID(id);
372 chunk->setAllocType(type);
373 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
374 if (chunk->allocSize_ > PAGE_SIZE)
376 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
377 int offset = ((multiple + 1) * PAGE_SIZE);
378 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
381 if (0 == size)
383 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
384 varInfo->isUsed_ = 0;
385 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
388 incrementChunk();
389 //releaseDatabaseMutex();
390 return OK;
393 //This is never called currently. If situation arises will be coded later.
394 DbRetVal Database::deleteSystemDatabaseChunk(int id)
397 Chunk *chunk = getSystemDatabaseChunk(id);
398 chunk->setChunkID(-1);
399 chunk->setSize(0);
400 chunk->setAllocType(UnknownAllocator);
401 //TODO::
402 //chunk->pageList_
403 //walk though the pageList ptr and get all the page pointers
404 //then free all the pages used to store this by setting the
405 //start of page to notused
406 chunk->firstPage_ = NULL;
407 chunk->curPage_ = NULL;
408 decrementChunk();
409 return OK;
413 void Database::createAllCatalogTables()
415 //These are special chunks which hold catalog tables and other information
417 // chunk id 0 ->userChunkTable
418 // chunk id 1 ->lockBucketHash
419 // chunk id 2 ->lockTable
421 // chunk id 10->DATABASE
422 // chunk id 11->USER
423 // chunk id 12->TABLE
424 // chunk id 13->FIELD
425 // chunk id 14->ACCESS
427 createSystemTables();
428 createMetaDataTables();
430 void Database::createSystemTables()
432 createSystemDatabaseChunk(FixedSizeAllocator,
433 sizeof(Chunk), UserChunkTableId);
434 createSystemDatabaseChunk(FixedSizeAllocator,
435 sizeof(Bucket) * LOCK_BUCKET_SIZE,
436 LockTableHashBucketId);
437 createSystemDatabaseChunk(FixedSizeAllocator,
438 sizeof(Mutex)* LOCK_BUCKET_SIZE,
439 LockTableMutexId);
440 createSystemDatabaseChunk(FixedSizeAllocator,
441 sizeof(LockHashNode), LockTableId);
442 createSystemDatabaseChunk(FixedSizeAllocator,
443 sizeof(TransHasNode), TransHasTableId);
445 createSystemDatabaseChunk(VariableSizeAllocator,
446 0, UndoLogTableID);
448 void Database::createMetaDataTables()
450 createSystemDatabaseChunk(FixedSizeAllocator,
451 sizeof(CDATABASEFILE), DatabaseTableId);
452 createSystemDatabaseChunk(FixedSizeAllocator,
453 sizeof(CUSER), UserTableId);
454 createSystemDatabaseChunk(FixedSizeAllocator,
455 sizeof(CTABLE), TableTableId);
456 createSystemDatabaseChunk(FixedSizeAllocator,
457 sizeof(CFIELD), FieldTableId);
458 createSystemDatabaseChunk(FixedSizeAllocator,
459 sizeof(CACCESS), AccessTableId);
460 createSystemDatabaseChunk(FixedSizeAllocator,
461 sizeof(CINDEX), IndexTableId);
462 createSystemDatabaseChunk(FixedSizeAllocator,
463 sizeof(CINDEXFIELD), IndexFieldTableId);
464 createSystemDatabaseChunk(FixedSizeAllocator,
465 sizeof(CFK), ForeignKeyTableId);
466 createSystemDatabaseChunk(FixedSizeAllocator,
467 sizeof(CFKFIELD), ForeignKeyFieldTableId);
470 //used in case of system database
471 Chunk* Database::getSystemDatabaseChunk(int id)
473 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
474 id * sizeof (Chunk);
475 return (Chunk*)(((char*) metaData_) + offset);
479 //used in case of system database
480 Transaction* Database::getSystemDatabaseTrans(int slot)
482 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
483 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
484 slot * sizeof (Transaction);
485 return (Transaction*)(((char*) metaData_) + offset);
488 bool Database::isValidAddress(void* addr)
490 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
491 return false;
492 else
493 return true;
496 //should be called only on system database
497 void* Database::allocLockHashBuckets()
499 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
500 DbRetVal rv=OK;
501 void *ptr = chunk->allocate(this, &rv);
502 if (NULL == ptr)
504 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
506 return ptr;
509 Bucket* Database::getLockHashBuckets()
511 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
512 ChunkIterator iter = tChunk->getIterator();
513 return (Bucket*)iter.nextElement();
515 void Database::setUniqueChunkID(int id)
517 (metaData_->chunkUniqueID_).setID(id);
520 int Database::getUniqueIDForChunk()
522 return ((metaData_->chunkUniqueID_).getID());
525 DbRetVal Database::recoverMutex(Mutex *mut)
527 //TODO: operations need to be undone before recovering the mutex.
528 mut->recoverMutex();
529 return OK;
531 DbRetVal Database::writeDirtyPages(char *dataFile)
533 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
534 lseek(fd, 0, SEEK_SET);
535 void *buf = (void *) metaData_;
536 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
537 ssize_t retSize = os::write(fd, (char*)buf, sizeToWrite);
538 if (-1 == retSize)
540 printError(ErrWarning, "Warning:Unable to write metadata");
541 return ErrSysInternal;
543 PageInfo *pageInfo = (PageInfo*) getFirstPage();
544 long pageSize =PAGE_SIZE;
545 int pagesWritten=0, writeOffset=0;
546 long long totalBytesWritten=0;
547 while(isValidAddress((char*) pageInfo))
549 if ( NULL == pageInfo ) break;
550 if (pageInfo > getCurrentPage()) {
551 char *a="0";
552 lseek(fd, getMaxSize() -1, SEEK_SET);
553 if ( -1 == os::write(fd, a, 1)) {
554 printError(ErrSysInternal, "Unable to extend chkpt file");
555 close(fd);
556 return ErrSysInternal;
558 break;
560 if (BITSET(pageInfo->flags, IS_DIRTY)) {
561 if (NULL == pageInfo->nextPageAfterMerge_)
562 pageSize = PAGE_SIZE;
563 else
564 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
565 writeOffset = (long) pageInfo - (long) metaData_;
566 lseek(fd, writeOffset, SEEK_SET);
567 CLEARBIT(pageInfo->flags, IS_DIRTY);
568 retSize = os::write(fd, (char*)pageInfo, pageSize);
569 if ( -1 == retSize ) {
570 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
571 close(fd);
572 return ErrSysInternal;
574 totalBytesWritten= totalBytesWritten + retSize;
575 pagesWritten++;
577 if ( pageInfo->nextPageAfterMerge_ == NULL) {
578 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
579 } else {
580 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
583 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
584 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
585 close(fd);
586 return OK;
589 DbRetVal Database::checkPoint()
591 char dataFile[MAX_FILE_LEN];
592 char cmd[MAX_FILE_LEN];
593 char dbRedoFileName[MAX_FILE_LEN];
594 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
595 if (!Conf::config.useMmap()) {
596 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
597 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
598 FILE *fp = NULL;
599 if (fp = fopen(dataFile, "r")) {
600 fclose(fp);
601 int ret = unlink(dataFile);
602 if (ret != OK) {
603 printError(ErrOS, "Unable to delete old chkpt file. Failure");
604 return ErrOS;
607 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
608 void *buf = (void *) metaData_;
609 lseek(fd, 0, SEEK_SET);
610 write(fd, buf, Conf::config.getMaxDbSize());
611 close(fd);
612 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
613 int ret = system(cmd);
614 if (ret != 0) {
615 printError(ErrOS, "Unable to take checkpoint back up file");
616 return ErrOS;
618 } else {
619 int fd = getChkptfd();
620 if (!os::fdatasync(fd)) {
621 logFine(Conf::logger, "fsync succedded");
623 filterAndRemoveStmtLogs();
624 int ret = truncate(dbRedoFileName,0);
625 if (ret != 0) {
626 close(fd);
627 printError(ErrSysInternal, "Unable to truncate redo log file");
628 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
629 return ErrOS;
631 //switch the checkpoint so that during recovery, fsynced checkpoint is
632 //used during recovery if the below step(writeDirtyPages)
633 //is not completed succesfully.
634 if (Database::getCheckpointID() == 0)
635 Database::setCheckpointID(1);
636 else
637 Database::setCheckpointID(0);
639 int val=Database::getCheckpointID();
641 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
642 DbRetVal rv = writeDirtyPages(dataFile);
643 if (OK != rv)
645 printError(ErrSysInternal, "Unable to write dirty pages");
646 close(fd);
647 return rv;
650 //Note: do not change order, chkpt id should be switched only after
651 //all dirty pages are written to disk. otherwise(if server crashes
652 //when it writes these dirty pages) recovery should use
653 //mapped file as fsync is already done on that file.
654 if (Database::getCheckpointID() == 0)
655 Database::setCheckpointID(1);
656 else
657 Database::setCheckpointID(0);
659 close(fd);
660 return OK;
662 filterAndRemoveStmtLogs();
663 int ret = truncate(dbRedoFileName,0);
664 if (ret != 0) {
665 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
666 return ErrOS;
668 return OK;
670 DbRetVal Database::filterAndRemoveStmtLogs()
672 struct stat st;
673 char fName[MAX_FILE_LEN];
674 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
675 int fdRead = open(fName, O_RDONLY);
676 if (-1 == fdRead) { return OK; }
677 if (fstat(fdRead, &st) == -1) {
678 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
679 close(fdRead);
680 return ErrSysInternal;
682 if (st.st_size ==0) {
683 close(fdRead);
684 return OK;
686 void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdRead, 0);
687 if (MAP_FAILED == startAddr) {
688 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
689 return ErrSysInternal;
691 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
692 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
693 char *iter = (char*)startAddr;
694 char *logStart = NULL, *logEnd = NULL;
695 int logType;
696 int stmtID;
697 int len =0, ret =0;
698 int txnID, loglen;
699 DbRetVal rv = OK;
700 HashMap stmtMap;
701 stmtMap.setKeySize(sizeof(int));
702 //PASS-I load all prepare stmts and free them
703 while(true) {
704 if (iter - (char*)startAddr >= st.st_size) break;
705 logType = *(int*)iter;
706 logStart = iter;
707 if (logType == -1) { //prepare
708 iter = iter + sizeof(int);
709 len = *(int*) iter;
710 iter = iter + 2 * sizeof(int);
711 stmtID = *(int*)iter;
712 stmtMap.insert(iter);
713 iter = logStart+ len;
714 ret =0;
716 else if(logType == -3) { //free
717 iter = iter + sizeof(int);
718 txnID = *(int*) iter; iter += sizeof(int);
719 loglen = *(int*) iter; iter += sizeof(int);
720 stmtID = *(int*)iter;
721 stmtMap.remove(iter);
722 iter = iter + sizeof(int);
723 }else{
724 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
725 rv = ErrSysInternal;
726 break;
729 //PASS-II take the prepared statements which are not freed into another backup file
730 while(true) {
731 if (iter - (char*)startAddr >= st.st_size) break;
732 logType = *(int*)iter;
733 logStart = iter;
734 if (logType == -1) { //prepare
735 iter = iter + sizeof(int);
736 len = *(int*) iter;
737 iter = iter + 2 * sizeof(int);
738 stmtID = *(int*)iter;
739 iter = logStart+ len;
740 ret =0;
741 if (stmtMap.find(&stmtID))
742 ret = os::write(fd, logStart, len);
743 if (-1 == ret) {
744 printError(ErrSysInternal, "Unable to write statement logs");
747 else if(logType == -3) { //free
748 iter = logStart + 4 *sizeof(int);
749 }else{
750 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
751 rv = ErrSysInternal;
752 break;
756 os::closeFile(fd);
757 munmap((char*)startAddr, st.st_size);
758 close(fdRead);
759 stmtMap.removeAll();
760 char cmd[MAX_FILE_LEN *2];
761 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
762 Conf::config.getDbFile(), Conf::config.getDbFile());
763 ret = system(cmd);
764 return rv;
768 int Database::getCheckpointID()
770 int id=0;
771 char curCkptFile[MAX_FILE_LEN];
772 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
773 FILE *fp = fopen(curCkptFile, "r");
774 if (NULL == fp) { setCheckpointID(0); return 0; }
775 fscanf(fp, "%d", &id);
776 fclose(fp);
777 return id;
779 void Database::setCheckpointID(int id)
781 char curCkptFile[MAX_FILE_LEN];
782 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
783 FILE *fp = fopen(curCkptFile, "w");
784 if (NULL == fp) {
786 printError(ErrSysInternal, "Unable to set checkpointID");
787 return;
789 fprintf(fp, "%d", id);
790 logFine(Conf::logger, "Current checkpoint set to %d", id);
791 fclose(fp);
792 return;
796 //used only by the user database not the system database
797 DbRetVal Database::recoverUserDB()
799 char dataFile[MAX_FILE_LEN];
800 char cmd[MAX_FILE_LEN];
801 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
802 int fd = open(dataFile, O_RDONLY);
803 if (-1 == fd) { return OK; }
804 void *buf = (void *) metaData_;
805 read(fd, buf, Conf::config.getMaxDbSize());
806 close(fd);
807 return OK;
810 //used only by the system database
811 DbRetVal Database::recoverSystemDB()
813 char mapFile[MAX_FILE_LEN];
814 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
815 int fd = open(mapFile, O_RDONLY);
816 if (-1 == fd) { return OK; }
817 CatalogTableTABLE cTable(this);
818 CatalogTableINDEX cIndex(this);
819 struct Object buf;
820 while (read(fd, &buf, sizeof(buf))) {
821 if (buf.type == Tbl) {
822 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
825 else if (buf.type == hIdx || buf.type == tIdx) {
826 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
829 return OK;