windows changes
[csql.git] / src / storage / Database.cxx
blob2488230ffeddb1ea430685d65d30d8c2bfbed1ea
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 const char* Database::getName()
28 return metaData_->dbName_;
31 int Database::getDatabaseID()
33 return metaData_->dbID_;
36 long Database::getMaxSize()
38 return metaData_->maxSize_;
41 long Database::getCurrentSize()
43 return metaData_->curSize_;
46 Page* Database::getCurrentPage()
48 return metaData_->curPage_;
51 Page* Database::getFirstPage()
53 return metaData_->firstPage_;
56 int Database::getNoOfChunks()
58 return metaData_->noOfChunks_;
60 Chunk* Database::getHashIndexChunk()
62 return metaData_->hashIndexChunk_;
65 void Database::setDatabaseID(int id)
67 metaData_->dbID_ = id;
69 void Database::setName(const char *name)
71 strcpy(metaData_->dbName_ , name);
73 void Database::setCurrentSize(long size)
75 metaData_->curSize_ = size;
77 void Database::setCurrentPage(Page *page)
79 //metaData_->curPage_ = page;
80 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
82 void Database::setFirstPage(Page *page)
84 metaData_->firstPage_ = page;
86 void Database::setMaxSize(long size)
88 metaData_->maxSize_ = size;
90 void Database::setNoOfChunks(int chunks)
92 metaData_->noOfChunks_ = chunks;
94 void Database::setHashIndexChunk(Chunk *ch)
96 metaData_->hashIndexChunk_ = ch;
100 int Database::initAllocDatabaseMutex()
102 return metaData_->dbAllocMutex_.init("allocdb");
104 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
106 struct timeval timeout, timeval;
107 timeout.tv_sec = Conf::config.getMutexSecs();
108 timeout.tv_usec = Conf::config.getMutexUSecs();
109 int tries=0;
110 int totalTries = Conf::config.getMutexRetries() *2;
111 int ret =0;
112 while (tries < totalTries)
114 ret = metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
115 if (ret == 0) break;
116 timeval.tv_sec = timeout.tv_sec;
117 timeval.tv_usec = timeout.tv_usec;
118 os::select(0, 0, 0, 0, &timeval);
119 tries++;
121 if (tries >= totalTries) return ErrLockTimeOut;
122 return OK;
124 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
126 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
127 return OK;
130 int Database::initPrepareStmtMutex()
132 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
134 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
136 struct timeval timeout, timeval;
137 timeout.tv_sec = Conf::config.getMutexSecs();
138 timeout.tv_usec = Conf::config.getMutexUSecs();
139 int tries=0;
140 int totalTries = Conf::config.getMutexRetries() *2;
141 int ret =0;
142 while (tries < totalTries)
144 ret = metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
145 if (ret == 0) break;
146 timeval.tv_sec = timeout.tv_sec;
147 timeval.tv_usec = timeout.tv_usec;
148 os::select(0, 0, 0, 0, &timeval);
149 tries++;
151 if (tries >= totalTries) return ErrLockTimeOut;
152 return OK;
155 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
157 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
158 return OK;
161 int Database::initTransTableMutex()
163 return metaData_->dbTransTableMutex_.init("transtable");
165 DbRetVal Database::getTransTableMutex()
167 struct timeval timeout, timeval;
168 timeout.tv_sec = Conf::config.getMutexSecs();
169 timeout.tv_usec = Conf::config.getMutexUSecs();
170 int tries=0;
171 int totalTries = Conf::config.getMutexRetries() *2;
172 int ret =0;
173 while (tries < totalTries)
175 ret = metaData_->dbTransTableMutex_.getLock(procSlot);
176 if (ret == 0) break;
177 timeval.tv_sec = timeout.tv_sec;
178 timeval.tv_usec = timeout.tv_usec;
179 os::select(0, 0, 0, 0, &timeval);
180 tries++;
182 if (tries >= totalTries) return ErrLockTimeOut;
183 return OK;
186 DbRetVal Database::releaseTransTableMutex()
188 metaData_->dbTransTableMutex_.releaseLock(procSlot);
189 return OK;
194 int Database::initProcessTableMutex()
196 return metaData_->dbProcTableMutex_.init("proctable");
198 DbRetVal Database::getProcessTableMutex(bool procAccount)
200 struct timeval timeout, timeval;
201 timeout.tv_sec = Conf::config.getMutexSecs();
202 timeout.tv_usec = Conf::config.getMutexUSecs();
203 int tries=0;
204 int totalTries = Conf::config.getMutexRetries() *2;
205 int ret =0;
206 while (tries < totalTries)
208 ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
209 if (ret == 0) break;
210 timeval.tv_sec = timeout.tv_sec;
211 timeval.tv_usec = timeout.tv_usec;
212 os::select(0, 0, 0, 0, &timeval);
213 tries++;
215 if (tries >= totalTries) return ErrLockTimeOut;
216 return OK;
219 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
221 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
222 return OK;
225 int Database::initCheckpointMutex()
227 return metaData_->ckptMutex_.init("checkpoint");
229 DbRetVal Database::getSCheckpointMutex(bool procAccount)
231 struct timeval timeout, timeval;
232 timeout.tv_sec = Conf::config.getMutexSecs();
233 timeout.tv_usec = Conf::config.getMutexUSecs();
234 int tries=0;
235 int totalTries = Conf::config.getMutexRetries() *2;
236 int ret =0;
237 while (tries < totalTries)
239 ret = metaData_->ckptMutex_.getShareLock(procSlot, procAccount);
240 if (ret == 0) break;
241 timeval.tv_sec = timeout.tv_sec;
242 timeval.tv_usec = timeout.tv_usec;
243 os::select(0, 0, 0, 0, &timeval);
244 tries++;
246 if (tries >= totalTries) return ErrLockTimeOut;
247 return OK;
250 DbRetVal Database::getXCheckpointMutex(bool procAccount)
252 struct timeval timeout, timeval;
253 timeout.tv_sec = Conf::config.getMutexSecs();
254 timeout.tv_usec = Conf::config.getMutexUSecs();
255 int tries=0;
256 int totalTries = Conf::config.getMutexRetries() *2;
257 int ret =0;
258 while (tries < totalTries)
260 ret = metaData_->ckptMutex_.getExclusiveLock(procSlot, procAccount);
261 if (ret == 0) break;
262 timeval.tv_sec = timeout.tv_sec;
263 timeval.tv_usec = timeout.tv_usec;
264 os::select(0, 0, 0, 0, &timeval);
265 tries++;
267 if (tries >= totalTries) return ErrLockTimeOut;
268 return OK;
271 DbRetVal Database::releaseCheckpointMutex(bool procAccount)
273 metaData_->ckptMutex_.releaseShareLock(procSlot, procAccount);
274 return OK;
277 // Gets the free page
278 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
279 // of each page to determine if the page is free
280 // Algorithm is to scan through the pageInfo objects stored at
281 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
282 // database
283 // But in case of large tuples, pages are merged, so there wont be
284 // PageInfo object on pages which are merged.
285 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
287 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
288 Page* Database::getFreePage()
290 Page* page = getFirstPage();
291 //Page* page = getCurrentPage();
292 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
293 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
294 PageInfo* pageInfo = ((PageInfo*)page);
295 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
296 int pageSize = PAGE_SIZE;
297 bool isEndAddchk=false;
298 while( 1 == pageInfo->isUsed_)
300 //If any pages are merged to store data larger than PAGE_SIZE
301 //move to the next page after the merge and check whether it is used
302 if ( pageInfo->nextPageAfterMerge_ == NULL) {
303 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
304 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
306 else {
307 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
308 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
311 if((((char*) pageInfo) + pageSize) >= endAddr )
313 if(!isEndAddchk){
314 isEndAddchk=true;
315 pageInfo=(PageInfo *)getFirstPage();
317 else
318 break;
320 if ((char*)pageInfo >= endAddr)
322 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
323 return NULL;
327 if (!isValidAddress(((char*) pageInfo) + pageSize))
329 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
330 return NULL;
332 setCurrentPage((Page*) pageInfo);
333 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
334 return (Page*) pageInfo ;
337 //Used by tuples more than PAGE_SIZE
338 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
339 Page* Database::getFreePage(size_t size)
341 Page* page = getFirstPage();
342 PageInfo* pageInfo = ((PageInfo*)page);
343 int multiple = size / PAGE_SIZE;
344 int offset = ((multiple + 1) * PAGE_SIZE);
345 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
346 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
347 int pageSize = PAGE_SIZE;
348 bool isEndAddchk = false;
349 while(true){
350 while( 1 == pageInfo->isUsed_)
352 //If any pages are merged to store data larger than PAGE_SIZE
353 //move to the next page after the merge and check whether it is used
354 if ( pageInfo->nextPageAfterMerge_ == NULL) {
355 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
356 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
358 else {
359 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
360 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
362 if((((char*) pageInfo) + offset) >= endAddr )
364 if(!isEndAddchk){
365 isEndAddchk=true;
366 pageInfo=(PageInfo *)getFirstPage();
368 else
369 break;
372 int i = 0;
373 PageInfo *pInfo = pageInfo;
374 if ((((char*)pInfo) + offset) >= endAddr)
376 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
377 return NULL;
379 for (i = 0; i< multiple + 1; i++)
381 if (1 == pInfo->isUsed_) break;
382 pInfo = (PageInfo*)((char*)pInfo + pageSize);
384 if ( i == (multiple + 1)) break;
385 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
388 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
389 setCurrentPage((Page*) pageInfo);
390 return (Page*) pageInfo ;
393 void Database::printStatistics()
395 Page* page = getFirstPage();
396 PageInfo* pageInfo = ((PageInfo*)page);
397 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
398 int totalDirtyPages=0;
399 printf("<DatabaseStatistics>\n");
400 printf(" <Database Name> %s </Database Name>\n", getName());
401 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
402 printf(" <First Page> %x </First Page>\n", getFirstPage());
403 while(isValidAddress((char*) pageInfo))
405 if (pageInfo == NULL) break;
406 //if (pageInfo > getCurrentPage()) break;
407 if (1 == pageInfo->isUsed_) {
408 if ( pageInfo->nextPageAfterMerge_ == NULL) {
409 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
410 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
411 usedPageCount++; totalPages++;
412 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
413 continue;
415 else {
416 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
417 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
418 usedMergedPageCount++; totalPages++;
419 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
420 continue;
422 } else if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
423 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
424 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
425 totalPages++;
427 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
428 if (Conf::config.useDurability())
429 printf(" <Dirty Pages> %d </Dirty Pages>\n", totalDirtyPages);
430 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
431 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
432 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
433 printf("</DatabaseStatistics>\n");
435 return ;
439 //called only in case of system database to create and initialize the chunk
440 //information
441 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
444 Chunk *chunk;
445 if (-1 == id )
447 printError(ErrSysFatal, "Database ID corrupted");
448 return ErrSysFatal;
450 chunk = getSystemDatabaseChunk(id);
452 chunk->setChunkNameForSystemDB(id);
454 if (FixedSizeAllocator == type) chunk->setSize(size);
455 //getDatabaseMutex();
456 if (chunk->allocSize_ > PAGE_SIZE)
457 chunk->curPage_ = getFreePage(chunk->allocSize_);
458 else
459 chunk->curPage_ = getFreePage();
460 if ( chunk->curPage_ == NULL)
462 //releaseDatabaseMutex();
463 printError(ErrNoMemory, "No free pages in database: Database full");
464 return ErrNoMemory;
467 chunk->firstPage_ = chunk->curPage_;
468 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
469 firstPageInfo->setFirstPageAsUsed();
470 chunk->setChunkID(id);
471 chunk->setAllocType(type);
472 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
473 if (chunk->allocSize_ > PAGE_SIZE)
475 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
476 int offset = ((multiple + 1) * PAGE_SIZE);
477 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
480 if (0 == size)
482 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
483 varInfo->isUsed_ = 0;
484 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
487 incrementChunk();
488 //releaseDatabaseMutex();
489 return OK;
492 //This is never called currently. If situation arises will be coded later.
493 DbRetVal Database::deleteSystemDatabaseChunk(int id)
496 Chunk *chunk = getSystemDatabaseChunk(id);
497 chunk->setChunkID(-1);
498 chunk->setSize(0);
499 chunk->setAllocType(UnknownAllocator);
500 //TODO::
501 //chunk->pageList_
502 //walk though the pageList ptr and get all the page pointers
503 //then free all the pages used to store this by setting the
504 //start of page to notused
505 chunk->firstPage_ = NULL;
506 chunk->curPage_ = NULL;
507 decrementChunk();
508 return OK;
512 void Database::createAllCatalogTables()
514 //These are special chunks which hold catalog tables and other information
516 // chunk id 0 ->userChunkTable
517 // chunk id 1 ->lockBucketHash
518 // chunk id 2 ->lockTable
520 // chunk id 10->DATABASE
521 // chunk id 11->USER
522 // chunk id 12->TABLE
523 // chunk id 13->FIELD
524 // chunk id 14->ACCESS
526 createSystemTables();
527 createMetaDataTables();
529 void Database::createSystemTables()
531 createSystemDatabaseChunk(FixedSizeAllocator,
532 sizeof(Chunk), UserChunkTableId);
533 createSystemDatabaseChunk(FixedSizeAllocator,
534 sizeof(Bucket) * LOCK_BUCKET_SIZE,
535 LockTableHashBucketId);
536 createSystemDatabaseChunk(FixedSizeAllocator,
537 sizeof(Mutex)* LOCK_BUCKET_SIZE,
538 LockTableMutexId);
539 createSystemDatabaseChunk(FixedSizeAllocator,
540 sizeof(LockHashNode), LockTableId);
541 createSystemDatabaseChunk(FixedSizeAllocator,
542 sizeof(TransHasNode), TransHasTableId);
544 createSystemDatabaseChunk(VariableSizeAllocator,
545 0, UndoLogTableID);
547 void Database::createMetaDataTables()
549 createSystemDatabaseChunk(FixedSizeAllocator,
550 sizeof(CDATABASEFILE), DatabaseTableId);
551 createSystemDatabaseChunk(FixedSizeAllocator,
552 sizeof(CUSER), UserTableId);
553 createSystemDatabaseChunk(FixedSizeAllocator,
554 sizeof(CTABLE), TableTableId);
555 createSystemDatabaseChunk(FixedSizeAllocator,
556 sizeof(CFIELD), FieldTableId);
557 createSystemDatabaseChunk(FixedSizeAllocator,
558 sizeof(CACCESS), AccessTableId);
559 createSystemDatabaseChunk(FixedSizeAllocator,
560 sizeof(CINDEX), IndexTableId);
561 createSystemDatabaseChunk(FixedSizeAllocator,
562 sizeof(CINDEXFIELD), IndexFieldTableId);
563 createSystemDatabaseChunk(FixedSizeAllocator,
564 sizeof(CFK), ForeignKeyTableId);
565 createSystemDatabaseChunk(FixedSizeAllocator,
566 sizeof(CFKFIELD), ForeignKeyFieldTableId);
569 //used in case of system database
570 Chunk* Database::getSystemDatabaseChunk(int id)
572 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
573 id * sizeof (Chunk);
574 return (Chunk*)(((char*) metaData_) + offset);
578 //used in case of system database
579 Transaction* Database::getSystemDatabaseTrans(int slot)
581 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
582 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
583 slot * sizeof (Transaction);
584 return (Transaction*)(((char*) metaData_) + offset);
587 bool Database::isValidAddress(void* addr)
589 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
590 return false;
591 else
592 return true;
595 //should be called only on system database
596 void* Database::allocLockHashBuckets()
598 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
599 DbRetVal rv=OK;
600 void *ptr = chunk->allocate(this, &rv);
601 if (NULL == ptr)
603 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
605 return ptr;
608 Bucket* Database::getLockHashBuckets()
610 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
611 ChunkIterator iter = tChunk->getIterator();
612 return (Bucket*)iter.nextElement();
614 void Database::setUniqueChunkID(int id)
616 (metaData_->chunkUniqueID_).setID(id);
619 int Database::getUniqueIDForChunk()
621 return ((metaData_->chunkUniqueID_).getID());
624 DbRetVal Database::recoverMutex(Mutex *mut)
626 //TODO: operations need to be undone before recovering the mutex.
627 mut->recoverMutex();
628 return OK;
630 DbRetVal Database::writeDirtyPages(char *dataFile)
632 int fd = os::openFile(dataFile, fileOpenCreat, 0);
633 lseek(fd, 0, SEEK_SET);
634 void *buf = (void *) metaData_;
635 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
636 size_t retSize = os::write(fd, (char*)buf, sizeToWrite);
637 if (-1 == retSize)
639 printError(ErrWarning, "Warning:Unable to write metadata");
640 return ErrSysInternal;
642 PageInfo *pageInfo = (PageInfo*) getFirstPage();
643 long pageSize =PAGE_SIZE;
644 int pagesWritten=0, writeOffset=0;
645 long long totalBytesWritten=0;
646 while(isValidAddress((char*) pageInfo))
648 if ( NULL == pageInfo ) break;
649 if (pageInfo > getCurrentPage()) {
650 char *a="0";
651 ::lseek(fd, getMaxSize() -1, SEEK_SET);
652 if ( -1 == os::write(fd, a, 1)) {
653 printError(ErrSysInternal, "Unable to extend chkpt file");
654 os::close(fd);
655 return ErrSysInternal;
657 break;
659 if (BITSET(pageInfo->flags, IS_DIRTY)) {
660 if (NULL == pageInfo->nextPageAfterMerge_)
661 pageSize = PAGE_SIZE;
662 else
663 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
664 writeOffset = (long) pageInfo - (long) metaData_;
665 ::lseek(fd, writeOffset, SEEK_SET);
666 CLEARBIT(pageInfo->flags, IS_DIRTY);
667 retSize = os::write(fd, (char*)pageInfo, pageSize);
668 if ( -1 == retSize ) {
669 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
670 os::close(fd);
671 return ErrSysInternal;
673 totalBytesWritten= totalBytesWritten + retSize;
674 pagesWritten++;
676 if ( pageInfo->nextPageAfterMerge_ == NULL) {
677 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
678 } else {
679 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
682 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
683 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
684 os::close(fd);
685 return OK;
688 DbRetVal Database::checkPoint()
690 char dataFile[MAX_FILE_LEN];
691 char cmd[MAX_FILE_LEN];
692 char dbRedoFileName[MAX_FILE_LEN];
693 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
694 if (!Conf::config.useMmap()) {
695 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
696 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
697 FILE *fp = NULL;
698 if (fp = fopen(dataFile, "r")) {
699 fclose(fp);
700 int ret = ::unlink(dataFile);
701 if (ret != OK) {
702 printError(ErrOS, "Unable to delete old chkpt file. Failure");
703 return ErrOS;
706 int fd = ::open(dataFile, O_WRONLY|O_CREAT, 0644);
707 void *buf = (void *) metaData_;
708 os::lseek(fd, 0, SEEK_SET);
709 os::write(fd, (char*) buf, Conf::config.getMaxDbSize());
710 os::close(fd);
711 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
712 int ret = system(cmd);
713 if (ret != 0) {
714 printError(ErrOS, "Unable to take checkpoint back up file");
715 return ErrOS;
717 } else {
718 file_desc fd = getChkptfd();
719 if (!os::fdatasync(fd)) {
720 logFine(Conf::logger, "fsync succedded");
722 filterAndRemoveStmtLogs();
723 int ret = os::truncate(dbRedoFileName);
724 if (ret != 0) {
725 os::close(fd);
726 printError(ErrSysInternal, "Unable to truncate redo log file");
727 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
728 return ErrOS;
730 //switch the checkpoint so that during recovery, fsynced checkpoint is
731 //used during recovery if the below step(writeDirtyPages)
732 //is not completed succesfully.
733 if (Database::getCheckpointID() == 0)
734 Database::setCheckpointID(1);
735 else
736 Database::setCheckpointID(0);
738 int val=Database::getCheckpointID();
740 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
741 DbRetVal rv = writeDirtyPages(dataFile);
742 if (OK != rv)
744 printError(ErrSysInternal, "Unable to write dirty pages");
745 os::closeFile(fd);
746 return rv;
749 //Note: do not change order, chkpt id should be switched only after
750 //all dirty pages are written to disk. otherwise(if server crashes
751 //when it writes these dirty pages) recovery should use
752 //mapped file as fsync is already done on that file.
753 if (Database::getCheckpointID() == 0)
754 Database::setCheckpointID(1);
755 else
756 Database::setCheckpointID(0);
758 os::closeFile(fd);
759 return OK;
761 filterAndRemoveStmtLogs();
762 int ret = os::truncate(dbRedoFileName);
763 if (ret != 0) {
764 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
765 return ErrOS;
767 return OK;
769 DbRetVal Database::filterAndRemoveStmtLogs()
771 struct stat st;
772 char fName[MAX_FILE_LEN];
773 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
774 file_desc fdRead = os::openFile(fName, fileOpenReadOnly,0);
775 if ((file_desc)-1 == fdRead) { return OK; }
776 if (::stat(fName, &st) == -1) {
777 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
778 os::closeFile(fdRead);
779 return ErrSysInternal;
781 if (st.st_size ==0) {
782 os::closeFile(fdRead);
783 return OK;
785 void *startAddr = os::mmap(NULL, st.st_size, mapProtRead, mapPrivate, fdRead, 0);
786 if ((void*) MAP_FAILED == startAddr) {
787 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
788 return ErrSysInternal;
790 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
791 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
792 char *iter = (char*)startAddr;
793 char *logStart = NULL, *logEnd = NULL;
794 int logType;
795 int stmtID;
796 int len =0, ret =0;
797 int txnID, loglen;
798 DbRetVal rv = OK;
799 HashMap stmtMap;
800 stmtMap.setKeySize(sizeof(int));
801 //PASS-I load all prepare stmts and free them
802 while(true) {
803 if (iter - (char*)startAddr >= st.st_size) break;
804 logType = *(int*)iter;
805 logStart = iter;
806 if (logType == -1) { //prepare
807 iter = iter + sizeof(int);
808 len = *(int*) iter;
809 iter = iter + 2 * sizeof(int);
810 stmtID = *(int*)iter;
811 stmtMap.insert(iter);
812 iter = logStart+ len;
813 ret =0;
815 else if(logType == -3) { //free
816 iter = iter + sizeof(int);
817 txnID = *(int*) iter; iter += sizeof(int);
818 loglen = *(int*) iter; iter += sizeof(int);
819 stmtID = *(int*)iter;
820 stmtMap.remove(iter);
821 iter = iter + sizeof(int);
822 }else{
823 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
824 rv = ErrSysInternal;
825 break;
828 //PASS-II take the prepared statements which are not freed into another backup file
829 while(true) {
830 if (iter - (char*)startAddr >= st.st_size) break;
831 logType = *(int*)iter;
832 logStart = iter;
833 if (logType == -1) { //prepare
834 iter = iter + sizeof(int);
835 len = *(int*) iter;
836 iter = iter + 2 * sizeof(int);
837 stmtID = *(int*)iter;
838 iter = logStart+ len;
839 ret =0;
840 if (stmtMap.find(&stmtID))
841 ret = os::write(fd, logStart, len);
842 if (-1 == ret) {
843 printError(ErrSysInternal, "Unable to write statement logs");
846 else if(logType == -3) { //free
847 iter = logStart + 4 *sizeof(int);
848 //neglet free stmt logs in this pass
849 }else{
850 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
851 rv = ErrSysInternal;
852 break;
856 os::close(fd);
857 os::munmap((char*)startAddr, st.st_size);
858 os::closeFile(fdRead);
859 stmtMap.removeAll();
860 char cmd[MAX_FILE_LEN *2];
861 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
862 Conf::config.getDbFile(), Conf::config.getDbFile());
863 ret = system(cmd);
864 return rv;
866 int Database::getCheckpointID()
868 int id=0;
869 char curCkptFile[MAX_FILE_LEN];
870 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
871 FILE *fp = fopen(curCkptFile, "r");
872 if (NULL == fp) { setCheckpointID(0); return 0; }
873 fscanf(fp, "%d", &id);
874 fclose(fp);
875 return id;
877 void Database::setCheckpointID(int id)
879 char curCkptFile[MAX_FILE_LEN];
880 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
881 FILE *fp = fopen(curCkptFile, "w");
882 if (NULL == fp) {
884 printError(ErrSysInternal, "Unable to set checkpointID");
885 return;
887 fprintf(fp, "%d", id);
888 logFine(Conf::logger, "Current checkpoint set to %d", id);
889 fclose(fp);
890 return;
894 //used only by the user database not the system database
895 DbRetVal Database::recoverUserDB()
897 char dataFile[MAX_FILE_LEN];
898 char cmd[MAX_FILE_LEN];
899 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
900 int fd = os::openFile(dataFile, fileOpenReadOnly, 0);
901 if (-1 == fd) { return OK; }
902 void *buf = (void *) metaData_;
903 int readbytes = read(fd, buf, Conf::config.getMaxDbSize());
904 if (readbytes == -1) { os::closeFile(fd); return ErrOS; }
905 os::closeFile(fd);
906 return OK;
909 //used only by the system database
910 DbRetVal Database::recoverSystemDB()
912 char mapFile[MAX_FILE_LEN];
913 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
914 int fd = open(mapFile, O_RDONLY);
915 if (-1 == fd) { return OK; }
916 CatalogTableTABLE cTable(this);
917 CatalogTableINDEX cIndex(this);
918 struct Object buf;
919 while (read(fd, &buf, sizeof(buf))) {
920 if (buf.type == Tbl) {
921 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
923 else if (buf.type == hIdx || buf.type == tIdx) {
924 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
927 return OK;