switching current checkpoint so that if system crashes during checkpoint it pick
[csql.git] / src / storage / Database.cxx
blobe91f78ba1f4aac3a2835b373a9bc74c5a3e2c436
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
25 const char* Database::getName()
27 return metaData_->dbName_;
30 int Database::getDatabaseID()
32 return metaData_->dbID_;
35 long Database::getMaxSize()
37 return metaData_->maxSize_;
40 long Database::getCurrentSize()
42 return metaData_->curSize_;
45 Page* Database::getCurrentPage()
47 return metaData_->curPage_;
50 Page* Database::getFirstPage()
52 return metaData_->firstPage_;
55 int Database::getNoOfChunks()
57 return metaData_->noOfChunks_;
59 Chunk* Database::getHashIndexChunk()
61 return metaData_->hashIndexChunk_;
64 void Database::setDatabaseID(int id)
66 metaData_->dbID_ = id;
68 void Database::setName(const char *name)
70 strcpy(metaData_->dbName_ , name);
72 void Database::setCurrentSize(long size)
74 metaData_->curSize_ = size;
76 void Database::setCurrentPage(Page *page)
78 //metaData_->curPage_ = page;
79 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
81 void Database::setFirstPage(Page *page)
83 metaData_->firstPage_ = page;
85 void Database::setMaxSize(long size)
87 metaData_->maxSize_ = size;
89 void Database::setNoOfChunks(int chunks)
91 metaData_->noOfChunks_ = chunks;
93 void Database::setHashIndexChunk(Chunk *ch)
95 metaData_->hashIndexChunk_ = ch;
99 int Database::initAllocDatabaseMutex()
101 return metaData_->dbAllocMutex_.init("allocdb");
103 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
105 int ret= metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
106 if (ret) return ErrLockTimeOut; else return OK;
108 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
110 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
111 return OK;
114 int Database::initPrepareStmtMutex()
116 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
118 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
120 int ret= metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
121 if (ret) return ErrLockTimeOut; else return OK;
123 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
125 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
126 return OK;
129 int Database::initTransTableMutex()
131 return metaData_->dbTransTableMutex_.init("transtable");
133 DbRetVal Database::getTransTableMutex()
135 int ret = metaData_->dbTransTableMutex_.getLock(procSlot);
136 if (ret) return ErrLockTimeOut; else return OK;
138 DbRetVal Database::releaseTransTableMutex()
140 metaData_->dbTransTableMutex_.releaseLock(procSlot);
141 return OK;
146 int Database::initProcessTableMutex()
148 return metaData_->dbProcTableMutex_.init("proctable");
150 DbRetVal Database::getProcessTableMutex(bool procAccount)
152 int ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
153 if (ret) return ErrLockTimeOut; else return OK;
155 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
157 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
158 return OK;
163 int Database::initCheckpointMutex()
165 return metaData_->ckptMutex_.init("checkpoint");
167 DbRetVal Database::getSCheckpointMutex(bool procAccount)
169 int ret = metaData_->ckptMutex_.getShareLock(procSlot, procAccount);
170 if (ret) return ErrLockTimeOut; else return OK;
172 DbRetVal Database::getXCheckpointMutex(bool procAccount)
174 int ret = metaData_->ckptMutex_.getExclusiveLock(procSlot, procAccount);
175 if (ret) return ErrLockTimeOut; else return OK;
177 DbRetVal Database::releaseCheckpointMutex(bool procAccount)
179 metaData_->ckptMutex_.releaseShareLock(procSlot, procAccount);
180 return OK;
183 // Gets the free page
184 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
185 // of each page to determine if the page is free
186 // Algorithm is to scan through the pageInfo objects stored at
187 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
188 // database
189 // But in case of large tuples, pages are merged, so there wont be
190 // PageInfo object on pages which are merged.
191 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
193 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
194 Page* Database::getFreePage()
196 Page* page = getFirstPage();
197 //Page* page = getCurrentPage();
198 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
199 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
200 PageInfo* pageInfo = ((PageInfo*)page);
201 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
202 int pageSize = PAGE_SIZE;
203 bool isEndAddchk=false;
204 while( 1 == pageInfo->isUsed_)
206 //If any pages are merged to store data larger than PAGE_SIZE
207 //move to the next page after the merge and check whether it is used
208 if ( pageInfo->nextPageAfterMerge_ == NULL) {
209 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
210 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
212 else {
213 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
214 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
217 if((((char*) pageInfo) + pageSize) >= endAddr )
219 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
220 else
221 break;
223 if ((char*)pageInfo >= endAddr)
225 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
226 return NULL;
230 if (!isValidAddress(((char*) pageInfo) + pageSize))
232 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
233 return NULL;
235 setCurrentPage((Page*) pageInfo);
236 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
237 return (Page*) pageInfo ;
240 //Used by tuples more than PAGE_SIZE
241 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
242 Page* Database::getFreePage(size_t size)
244 Page* page = getFirstPage();
245 PageInfo* pageInfo = ((PageInfo*)page);
246 int multiple = size / PAGE_SIZE;
247 int offset = ((multiple + 1) * PAGE_SIZE);
248 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
249 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
250 int pageSize = PAGE_SIZE;
251 bool isEndAddchk = false;
252 while(true){
253 while( 1 == pageInfo->isUsed_)
255 //If any pages are merged to store data larger than PAGE_SIZE
256 //move to the next page after the merge and check whether it is used
257 if ( pageInfo->nextPageAfterMerge_ == NULL) {
258 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
259 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
261 else {
262 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
263 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
265 if((((char*) pageInfo) + offset) >= endAddr )
267 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
268 else
269 break;
272 int i = 0;
273 PageInfo *pInfo = pageInfo;
274 if ((((char*)pInfo) + offset) >= endAddr)
276 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
277 return NULL;
279 for (i = 0; i< multiple + 1; i++)
281 if (1 == pInfo->isUsed_) break;
282 pInfo = (PageInfo*)((char*)pInfo + pageSize);
284 if ( i == (multiple + 1)) break;
285 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
288 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
289 setCurrentPage((Page*) pageInfo);
290 return (Page*) pageInfo ;
293 void Database::printStatistics()
295 Page* page = getFirstPage();
296 PageInfo* pageInfo = ((PageInfo*)page);
297 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
298 int totalDirtyPages=0;
299 printf("<DatabaseStatistics>\n");
300 printf(" <Database Name> %s </Database Name>\n", getName());
301 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
302 printf(" <First Page> %x </First Page>\n", getFirstPage());
303 while(isValidAddress((char*) pageInfo))
305 if (pageInfo == NULL) break;
306 //if (pageInfo > getCurrentPage()) break;
307 if (1 == pageInfo->isUsed_) {
308 if ( pageInfo->nextPageAfterMerge_ == NULL) {
309 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
310 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
311 usedPageCount++; totalPages++;
312 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
313 continue;
315 else {
316 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
317 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
318 usedMergedPageCount++; totalPages++;
319 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
320 continue;
322 } else if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
323 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
324 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
325 totalPages++;
327 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
328 if (Conf::config.useDurability())
329 printf(" <Dirty Pages> %d </Dirty Pages>\n", totalDirtyPages);
330 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
331 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
332 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
333 printf("</DatabaseStatistics>\n");
335 return ;
339 //called only in case of system database to create and initialize the chunk
340 //information
341 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
344 Chunk *chunk;
345 if (-1 == id )
347 printError(ErrSysFatal, "Database ID corrupted");
348 return ErrSysFatal;
350 chunk = getSystemDatabaseChunk(id);
352 chunk->setChunkNameForSystemDB(id);
354 if (FixedSizeAllocator == type) chunk->setSize(size);
355 //getDatabaseMutex();
356 if (chunk->allocSize_ > PAGE_SIZE)
357 chunk->curPage_ = getFreePage(chunk->allocSize_);
358 else
359 chunk->curPage_ = getFreePage();
360 if ( chunk->curPage_ == NULL)
362 //releaseDatabaseMutex();
363 printError(ErrNoMemory, "No free pages in database: Database full");
364 return ErrNoMemory;
367 chunk->firstPage_ = chunk->curPage_;
368 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
369 firstPageInfo->setFirstPageAsUsed();
370 chunk->setChunkID(id);
371 chunk->setAllocType(type);
372 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
373 if (chunk->allocSize_ > PAGE_SIZE)
375 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
376 int offset = ((multiple + 1) * PAGE_SIZE);
377 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
380 if (0 == size)
382 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
383 varInfo->isUsed_ = 0;
384 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
387 incrementChunk();
388 //releaseDatabaseMutex();
389 return OK;
392 //This is never called currently. If situation arises will be coded later.
393 DbRetVal Database::deleteSystemDatabaseChunk(int id)
396 Chunk *chunk = getSystemDatabaseChunk(id);
397 chunk->setChunkID(-1);
398 chunk->setSize(0);
399 chunk->setAllocType(UnknownAllocator);
400 //TODO::
401 //chunk->pageList_
402 //walk though the pageList ptr and get all the page pointers
403 //then free all the pages used to store this by setting the
404 //start of page to notused
405 chunk->firstPage_ = NULL;
406 chunk->curPage_ = NULL;
407 decrementChunk();
408 return OK;
412 void Database::createAllCatalogTables()
414 //These are special chunks which hold catalog tables and other information
416 // chunk id 0 ->userChunkTable
417 // chunk id 1 ->lockBucketHash
418 // chunk id 2 ->lockTable
420 // chunk id 10->DATABASE
421 // chunk id 11->USER
422 // chunk id 12->TABLE
423 // chunk id 13->FIELD
424 // chunk id 14->ACCESS
426 createSystemTables();
427 createMetaDataTables();
429 void Database::createSystemTables()
431 createSystemDatabaseChunk(FixedSizeAllocator,
432 sizeof(Chunk), UserChunkTableId);
433 createSystemDatabaseChunk(FixedSizeAllocator,
434 sizeof(Bucket) * LOCK_BUCKET_SIZE,
435 LockTableHashBucketId);
436 createSystemDatabaseChunk(FixedSizeAllocator,
437 sizeof(Mutex)* LOCK_BUCKET_SIZE,
438 LockTableMutexId);
439 createSystemDatabaseChunk(FixedSizeAllocator,
440 sizeof(LockHashNode), LockTableId);
441 createSystemDatabaseChunk(FixedSizeAllocator,
442 sizeof(TransHasNode), TransHasTableId);
444 createSystemDatabaseChunk(VariableSizeAllocator,
445 0, UndoLogTableID);
447 void Database::createMetaDataTables()
449 createSystemDatabaseChunk(FixedSizeAllocator,
450 sizeof(CDATABASEFILE), DatabaseTableId);
451 createSystemDatabaseChunk(FixedSizeAllocator,
452 sizeof(CUSER), UserTableId);
453 createSystemDatabaseChunk(FixedSizeAllocator,
454 sizeof(CTABLE), TableTableId);
455 createSystemDatabaseChunk(FixedSizeAllocator,
456 sizeof(CFIELD), FieldTableId);
457 createSystemDatabaseChunk(FixedSizeAllocator,
458 sizeof(CACCESS), AccessTableId);
459 createSystemDatabaseChunk(FixedSizeAllocator,
460 sizeof(CINDEX), IndexTableId);
461 createSystemDatabaseChunk(FixedSizeAllocator,
462 sizeof(CINDEXFIELD), IndexFieldTableId);
463 createSystemDatabaseChunk(FixedSizeAllocator,
464 sizeof(CFK), ForeignKeyTableId);
465 createSystemDatabaseChunk(FixedSizeAllocator,
466 sizeof(CFKFIELD), ForeignKeyFieldTableId);
469 //used in case of system database
470 Chunk* Database::getSystemDatabaseChunk(int id)
472 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
473 id * sizeof (Chunk);
474 return (Chunk*)(((char*) metaData_) + offset);
478 //used in case of system database
479 Transaction* Database::getSystemDatabaseTrans(int slot)
481 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
482 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
483 slot * sizeof (Transaction);
484 return (Transaction*)(((char*) metaData_) + offset);
487 bool Database::isValidAddress(void* addr)
489 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
490 return false;
491 else
492 return true;
495 //should be called only on system database
496 void* Database::allocLockHashBuckets()
498 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
499 DbRetVal rv=OK;
500 void *ptr = chunk->allocate(this, &rv);
501 if (NULL == ptr)
503 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
505 return ptr;
508 Bucket* Database::getLockHashBuckets()
510 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
511 ChunkIterator iter = tChunk->getIterator();
512 return (Bucket*)iter.nextElement();
514 void Database::setUniqueChunkID(int id)
516 (metaData_->chunkUniqueID_).setID(id);
519 int Database::getUniqueIDForChunk()
521 return ((metaData_->chunkUniqueID_).getID());
524 DbRetVal Database::recoverMutex(Mutex *mut)
526 //TODO: operations need to be undone before recovering the mutex.
527 mut->recoverMutex();
528 return OK;
530 DbRetVal Database::writeDirtyPages(char *dataFile)
532 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
533 lseek(fd, 0, SEEK_SET);
534 void *buf = (void *) metaData_;
535 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
536 ssize_t retSize = os::write(fd, (char*)buf, sizeToWrite);
537 if (-1 == retSize)
539 printError(ErrWarning, "Warning:Unable to write metadata");
540 return ErrSysInternal;
542 PageInfo *pageInfo = (PageInfo*) getFirstPage();
543 long pageSize =PAGE_SIZE;
544 int pagesWritten=0, writeOffset=0;
545 long long totalBytesWritten=0;
546 while(isValidAddress((char*) pageInfo))
548 if ( NULL == pageInfo ) break;
549 if (pageInfo > getCurrentPage()) {
550 char *a="0";
551 lseek(fd, getMaxSize() -1, SEEK_SET);
552 if ( -1 == os::write(fd, a, 1)) {
553 printError(ErrSysInternal, "Unable to extend chkpt file");
554 close(fd);
555 return ErrSysInternal;
557 break;
559 if (BITSET(pageInfo->flags, IS_DIRTY)) {
560 if (NULL == pageInfo->nextPageAfterMerge_)
561 pageSize = PAGE_SIZE;
562 else
563 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
564 writeOffset = (long) pageInfo - (long) metaData_;
565 lseek(fd, writeOffset, SEEK_SET);
566 CLEARBIT(pageInfo->flags, IS_DIRTY);
567 retSize = os::write(fd, (char*)pageInfo, pageSize);
568 if ( -1 == retSize ) {
569 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
570 close(fd);
571 return ErrSysInternal;
573 totalBytesWritten= totalBytesWritten + retSize;
574 pagesWritten++;
576 if ( pageInfo->nextPageAfterMerge_ == NULL) {
577 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
578 } else {
579 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
582 printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
583 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
584 close(fd);
585 return OK;
588 DbRetVal Database::checkPoint()
590 char dataFile[MAX_FILE_LEN];
591 char cmd[MAX_FILE_LEN];
592 char dbRedoFileName[MAX_FILE_LEN];
593 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
594 if (!Conf::config.useMmap()) {
595 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
596 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
597 FILE *fp = NULL;
598 if (fp = fopen(dataFile, "r")) {
599 fclose(fp);
600 int ret = unlink(dataFile);
601 if (ret != OK) {
602 printError(ErrOS, "Unable to delete old chkpt file. Failure");
603 return ErrOS;
606 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
607 void *buf = (void *) metaData_;
608 lseek(fd, 0, SEEK_SET);
609 write(fd, buf, Conf::config.getMaxDbSize());
610 close(fd);
611 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
612 int ret = system(cmd);
613 if (ret != 0) {
614 printError(ErrOS, "Unable to take checkpoint back up file");
615 return ErrOS;
617 } else {
618 int fd = getChkptfd();
619 if (!os::fdatasync(fd)) {
620 logFine(Conf::logger, "fsync succedded");
622 int ret = unlink(dbRedoFileName);
623 if (ret != 0) {
624 close(fd);
625 printError(ErrSysInternal, "Unable to delete redo log file");
626 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
627 return ErrOS;
629 //switch the checkpoint so that during recovery, fsynced checkpoint is
630 //used during recovery if the below step(writeDirtyPages)
631 //is not completed succesfully.
632 if (Database::getCheckpointID() == 0)
633 Database::setCheckpointID(1);
634 else
635 Database::setCheckpointID(0);
637 int val=Database::getCheckpointID();
639 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
640 DbRetVal rv = writeDirtyPages(dataFile);
641 if (OK != rv)
643 printError(ErrSysInternal, "Unable to write dirty pages");
644 close(fd);
645 return rv;
648 //Note: do not change order, chkpt id should be switched only after
649 //all dirty pages are written to disk. otherwise(if server crashes
650 //when it writes these dirty pages) recovery should use
651 //mapped file as fsync is already done on that file.
652 if (Database::getCheckpointID() == 0)
653 Database::setCheckpointID(1);
654 else
655 Database::setCheckpointID(0);
657 close(fd);
658 return OK;
660 int ret = unlink(dbRedoFileName);
661 if (ret != 0) {
662 printError(ErrSysInternal, "Unable to delete redo log file. Delete and restart the server\n");
663 return ErrOS;
665 return OK;
667 int Database::getCheckpointID()
669 int id=0;
670 char curCkptFile[MAX_FILE_LEN];
671 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
672 FILE *fp = fopen(curCkptFile, "r");
673 if (NULL == fp) { setCheckpointID(0); return 0; }
674 fscanf(fp, "%d", &id);
675 fclose(fp);
676 return id;
678 void Database::setCheckpointID(int id)
680 char curCkptFile[MAX_FILE_LEN];
681 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
682 FILE *fp = fopen(curCkptFile, "w");
683 if (NULL == fp) {
685 printError(ErrSysInternal, "Unable to set checkpointID");
686 return;
688 fprintf(fp, "%d", id);
689 logFine(Conf::logger, "Current checkpoint set to %d", id);
690 fclose(fp);
691 return;
695 //used only by the user database not the system database
696 DbRetVal Database::recoverUserDB()
698 char dataFile[MAX_FILE_LEN];
699 char cmd[MAX_FILE_LEN];
700 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
701 int fd = open(dataFile, O_RDONLY);
702 if (-1 == fd) { return OK; }
703 void *buf = (void *) metaData_;
704 read(fd, buf, Conf::config.getMaxDbSize());
705 close(fd);
706 return OK;
709 //used only by the system database
710 DbRetVal Database::recoverSystemDB()
712 char mapFile[MAX_FILE_LEN];
713 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
714 int fd = open(mapFile, O_RDONLY);
715 if (-1 == fd) { return OK; }
716 CatalogTableTABLE cTable(this);
717 CatalogTableINDEX cIndex(this);
718 struct Object buf;
719 while (read(fd, &buf, sizeof(buf))) {
720 if (buf.type == Tbl) {
721 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
724 else if (buf.type == hIdx || buf.type == tIdx) {
725 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
728 return OK;