adding test scripts
[csql.git] / src / storage / Database.cxx
blob46c3055f777b5bed987e1f630fc11e083e702d68
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 const char* Database::getName()
28 return metaData_->dbName_;
31 int Database::getDatabaseID()
33 return metaData_->dbID_;
36 long Database::getMaxSize()
38 return metaData_->maxSize_;
41 long Database::getCurrentSize()
43 return metaData_->curSize_;
46 Page* Database::getCurrentPage()
48 return metaData_->curPage_;
51 Page* Database::getFirstPage()
53 return metaData_->firstPage_;
56 int Database::getNoOfChunks()
58 return metaData_->noOfChunks_;
60 Chunk* Database::getHashIndexChunk()
62 return metaData_->hashIndexChunk_;
65 void Database::setDatabaseID(int id)
67 metaData_->dbID_ = id;
69 void Database::setName(const char *name)
71 strcpy(metaData_->dbName_ , name);
73 void Database::setCurrentSize(long size)
75 metaData_->curSize_ = size;
77 void Database::setCurrentPage(Page *page)
79 //metaData_->curPage_ = page;
80 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
82 void Database::setFirstPage(Page *page)
84 metaData_->firstPage_ = page;
86 void Database::setMaxSize(long size)
88 metaData_->maxSize_ = size;
90 void Database::setNoOfChunks(int chunks)
92 metaData_->noOfChunks_ = chunks;
94 void Database::setHashIndexChunk(Chunk *ch)
96 metaData_->hashIndexChunk_ = ch;
100 int Database::initAllocDatabaseMutex()
102 return metaData_->dbAllocMutex_.init("allocdb");
104 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
106 struct timeval timeout, timeval;
107 timeout.tv_sec = Conf::config.getMutexSecs();
108 timeout.tv_usec = Conf::config.getMutexUSecs();
109 int tries=0;
110 int totalTries = Conf::config.getMutexRetries() *2;
111 int ret =0;
112 while (tries < totalTries)
114 ret = metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
115 if (ret == 0) break;
116 timeval.tv_sec = timeout.tv_sec;
117 timeval.tv_usec = timeout.tv_usec;
118 os::select(0, 0, 0, 0, &timeval);
119 tries++;
121 if (tries >= totalTries) return ErrLockTimeOut;
122 return OK;
124 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
126 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
127 return OK;
130 int Database::initPrepareStmtMutex()
132 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
134 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
136 struct timeval timeout, timeval;
137 timeout.tv_sec = Conf::config.getMutexSecs();
138 timeout.tv_usec = Conf::config.getMutexUSecs();
139 int tries=0;
140 int totalTries = Conf::config.getMutexRetries() *2;
141 int ret =0;
142 while (tries < totalTries)
144 ret = metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
145 if (ret == 0) break;
146 timeval.tv_sec = timeout.tv_sec;
147 timeval.tv_usec = timeout.tv_usec;
148 os::select(0, 0, 0, 0, &timeval);
149 tries++;
151 if (tries >= totalTries) return ErrLockTimeOut;
152 return OK;
155 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
157 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
158 return OK;
161 int Database::initTransTableMutex()
163 return metaData_->dbTransTableMutex_.init("transtable");
165 DbRetVal Database::getTransTableMutex()
167 struct timeval timeout, timeval;
168 timeout.tv_sec = Conf::config.getMutexSecs();
169 timeout.tv_usec = Conf::config.getMutexUSecs();
170 int tries=0;
171 int totalTries = Conf::config.getMutexRetries() *2;
172 int ret =0;
173 while (tries < totalTries)
175 ret = metaData_->dbTransTableMutex_.getLock(procSlot);
176 if (ret == 0) break;
177 timeval.tv_sec = timeout.tv_sec;
178 timeval.tv_usec = timeout.tv_usec;
179 os::select(0, 0, 0, 0, &timeval);
180 tries++;
182 if (tries >= totalTries) return ErrLockTimeOut;
183 return OK;
186 DbRetVal Database::releaseTransTableMutex()
188 metaData_->dbTransTableMutex_.releaseLock(procSlot);
189 return OK;
194 int Database::initProcessTableMutex()
196 return metaData_->dbProcTableMutex_.init("proctable");
198 DbRetVal Database::getProcessTableMutex(bool procAccount)
200 struct timeval timeout, timeval;
201 timeout.tv_sec = Conf::config.getMutexSecs();
202 timeout.tv_usec = Conf::config.getMutexUSecs();
203 int tries=0;
204 int totalTries = Conf::config.getMutexRetries() *2;
205 int ret =0;
206 while (tries < totalTries)
208 ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
209 if (ret == 0) break;
210 timeval.tv_sec = timeout.tv_sec;
211 timeval.tv_usec = timeout.tv_usec;
212 os::select(0, 0, 0, 0, &timeval);
213 tries++;
215 if (tries >= totalTries) return ErrLockTimeOut;
216 return OK;
219 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
221 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
222 return OK;
227 int Database::initCheckpointMutex()
229 return metaData_->ckptMutex_.init("checkpoint");
231 DbRetVal Database::getSCheckpointMutex(bool procAccount)
233 struct timeval timeout, timeval;
234 timeout.tv_sec = Conf::config.getMutexSecs();
235 timeout.tv_usec = Conf::config.getMutexUSecs();
236 int tries=0;
237 int totalTries = Conf::config.getMutexRetries() *2;
238 int ret =0;
239 while (tries < totalTries)
241 ret = metaData_->ckptMutex_.getShareLock(procSlot, procAccount);
242 if (ret == 0) break;
243 timeval.tv_sec = timeout.tv_sec;
244 timeval.tv_usec = timeout.tv_usec;
245 os::select(0, 0, 0, 0, &timeval);
246 tries++;
248 if (tries >= totalTries) return ErrLockTimeOut;
249 return OK;
252 DbRetVal Database::getXCheckpointMutex(bool procAccount)
254 struct timeval timeout, timeval;
255 timeout.tv_sec = Conf::config.getMutexSecs();
256 timeout.tv_usec = Conf::config.getMutexUSecs();
257 int tries=0;
258 int totalTries = Conf::config.getMutexRetries() *2;
259 int ret =0;
260 while (tries < totalTries)
262 ret = metaData_->ckptMutex_.getExclusiveLock(procSlot, procAccount);
263 if (ret == 0) break;
264 timeval.tv_sec = timeout.tv_sec;
265 timeval.tv_usec = timeout.tv_usec;
266 os::select(0, 0, 0, 0, &timeval);
267 tries++;
269 if (tries >= totalTries) return ErrLockTimeOut;
270 return OK;
273 DbRetVal Database::releaseCheckpointMutex(bool procAccount)
275 metaData_->ckptMutex_.releaseShareLock(procSlot, procAccount);
276 return OK;
279 // Gets the free page
280 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
281 // of each page to determine if the page is free
282 // Algorithm is to scan through the pageInfo objects stored at
283 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
284 // database
285 // But in case of large tuples, pages are merged, so there wont be
286 // PageInfo object on pages which are merged.
287 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
289 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
290 Page* Database::getFreePage()
292 Page* page = getFirstPage();
293 //Page* page = getCurrentPage();
294 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
295 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
296 PageInfo* pageInfo = ((PageInfo*)page);
297 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
298 int pageSize = PAGE_SIZE;
299 bool isEndAddchk=false;
300 while( 1 == pageInfo->isUsed_)
302 //If any pages are merged to store data larger than PAGE_SIZE
303 //move to the next page after the merge and check whether it is used
304 if ( pageInfo->nextPageAfterMerge_ == NULL) {
305 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
306 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
308 else {
309 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
310 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
313 if((((char*) pageInfo) + pageSize) >= endAddr )
315 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
316 else
317 break;
319 if ((char*)pageInfo >= endAddr)
321 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
322 return NULL;
326 if (!isValidAddress(((char*) pageInfo) + pageSize))
328 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
329 return NULL;
331 setCurrentPage((Page*) pageInfo);
332 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
333 return (Page*) pageInfo ;
336 //Used by tuples more than PAGE_SIZE
337 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
338 Page* Database::getFreePage(size_t size)
340 Page* page = getFirstPage();
341 PageInfo* pageInfo = ((PageInfo*)page);
342 int multiple = size / PAGE_SIZE;
343 int offset = ((multiple + 1) * PAGE_SIZE);
344 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
345 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
346 int pageSize = PAGE_SIZE;
347 bool isEndAddchk = false;
348 while(true){
349 while( 1 == pageInfo->isUsed_)
351 //If any pages are merged to store data larger than PAGE_SIZE
352 //move to the next page after the merge and check whether it is used
353 if ( pageInfo->nextPageAfterMerge_ == NULL) {
354 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
355 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
357 else {
358 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
359 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
361 if((((char*) pageInfo) + offset) >= endAddr )
363 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
364 else
365 break;
368 int i = 0;
369 PageInfo *pInfo = pageInfo;
370 if ((((char*)pInfo) + offset) >= endAddr)
372 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
373 return NULL;
375 for (i = 0; i< multiple + 1; i++)
377 if (1 == pInfo->isUsed_) break;
378 pInfo = (PageInfo*)((char*)pInfo + pageSize);
380 if ( i == (multiple + 1)) break;
381 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
384 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
385 setCurrentPage((Page*) pageInfo);
386 return (Page*) pageInfo ;
389 void Database::printStatistics()
391 Page* page = getFirstPage();
392 PageInfo* pageInfo = ((PageInfo*)page);
393 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
394 int totalDirtyPages=0;
395 printf("<DatabaseStatistics>\n");
396 printf(" <Database Name> %s </Database Name>\n", getName());
397 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
398 printf(" <First Page> %x </First Page>\n", getFirstPage());
399 while(isValidAddress((char*) pageInfo))
401 if (pageInfo == NULL) break;
402 //if (pageInfo > getCurrentPage()) break;
403 if (1 == pageInfo->isUsed_) {
404 if ( pageInfo->nextPageAfterMerge_ == NULL) {
405 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
406 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
407 usedPageCount++; totalPages++;
408 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
409 continue;
411 else {
412 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
413 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
414 usedMergedPageCount++; totalPages++;
415 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
416 continue;
418 } else if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
419 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
420 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
421 totalPages++;
423 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
424 if (Conf::config.useDurability())
425 printf(" <Dirty Pages> %d </Dirty Pages>\n", totalDirtyPages);
426 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
427 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
428 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
429 printf("</DatabaseStatistics>\n");
431 return ;
435 //called only in case of system database to create and initialize the chunk
436 //information
437 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
440 Chunk *chunk;
441 if (-1 == id )
443 printError(ErrSysFatal, "Database ID corrupted");
444 return ErrSysFatal;
446 chunk = getSystemDatabaseChunk(id);
448 chunk->setChunkNameForSystemDB(id);
450 if (FixedSizeAllocator == type) chunk->setSize(size);
451 //getDatabaseMutex();
452 if (chunk->allocSize_ > PAGE_SIZE)
453 chunk->curPage_ = getFreePage(chunk->allocSize_);
454 else
455 chunk->curPage_ = getFreePage();
456 if ( chunk->curPage_ == NULL)
458 //releaseDatabaseMutex();
459 printError(ErrNoMemory, "No free pages in database: Database full");
460 return ErrNoMemory;
463 chunk->firstPage_ = chunk->curPage_;
464 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
465 firstPageInfo->setFirstPageAsUsed();
466 chunk->setChunkID(id);
467 chunk->setAllocType(type);
468 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
469 if (chunk->allocSize_ > PAGE_SIZE)
471 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
472 int offset = ((multiple + 1) * PAGE_SIZE);
473 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
476 if (0 == size)
478 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
479 varInfo->isUsed_ = 0;
480 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
483 incrementChunk();
484 //releaseDatabaseMutex();
485 return OK;
488 //This is never called currently. If situation arises will be coded later.
489 DbRetVal Database::deleteSystemDatabaseChunk(int id)
492 Chunk *chunk = getSystemDatabaseChunk(id);
493 chunk->setChunkID(-1);
494 chunk->setSize(0);
495 chunk->setAllocType(UnknownAllocator);
496 //TODO::
497 //chunk->pageList_
498 //walk though the pageList ptr and get all the page pointers
499 //then free all the pages used to store this by setting the
500 //start of page to notused
501 chunk->firstPage_ = NULL;
502 chunk->curPage_ = NULL;
503 decrementChunk();
504 return OK;
508 void Database::createAllCatalogTables()
510 //These are special chunks which hold catalog tables and other information
512 // chunk id 0 ->userChunkTable
513 // chunk id 1 ->lockBucketHash
514 // chunk id 2 ->lockTable
516 // chunk id 10->DATABASE
517 // chunk id 11->USER
518 // chunk id 12->TABLE
519 // chunk id 13->FIELD
520 // chunk id 14->ACCESS
522 createSystemTables();
523 createMetaDataTables();
525 void Database::createSystemTables()
527 createSystemDatabaseChunk(FixedSizeAllocator,
528 sizeof(Chunk), UserChunkTableId);
529 createSystemDatabaseChunk(FixedSizeAllocator,
530 sizeof(Bucket) * LOCK_BUCKET_SIZE,
531 LockTableHashBucketId);
532 createSystemDatabaseChunk(FixedSizeAllocator,
533 sizeof(Mutex)* LOCK_BUCKET_SIZE,
534 LockTableMutexId);
535 createSystemDatabaseChunk(FixedSizeAllocator,
536 sizeof(LockHashNode), LockTableId);
537 createSystemDatabaseChunk(FixedSizeAllocator,
538 sizeof(TransHasNode), TransHasTableId);
540 createSystemDatabaseChunk(VariableSizeAllocator,
541 0, UndoLogTableID);
543 void Database::createMetaDataTables()
545 createSystemDatabaseChunk(FixedSizeAllocator,
546 sizeof(CDATABASEFILE), DatabaseTableId);
547 createSystemDatabaseChunk(FixedSizeAllocator,
548 sizeof(CUSER), UserTableId);
549 createSystemDatabaseChunk(FixedSizeAllocator,
550 sizeof(CTABLE), TableTableId);
551 createSystemDatabaseChunk(FixedSizeAllocator,
552 sizeof(CFIELD), FieldTableId);
553 createSystemDatabaseChunk(FixedSizeAllocator,
554 sizeof(CACCESS), AccessTableId);
555 createSystemDatabaseChunk(FixedSizeAllocator,
556 sizeof(CINDEX), IndexTableId);
557 createSystemDatabaseChunk(FixedSizeAllocator,
558 sizeof(CINDEXFIELD), IndexFieldTableId);
559 createSystemDatabaseChunk(FixedSizeAllocator,
560 sizeof(CFK), ForeignKeyTableId);
561 createSystemDatabaseChunk(FixedSizeAllocator,
562 sizeof(CFKFIELD), ForeignKeyFieldTableId);
565 //used in case of system database
566 Chunk* Database::getSystemDatabaseChunk(int id)
568 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
569 id * sizeof (Chunk);
570 return (Chunk*)(((char*) metaData_) + offset);
574 //used in case of system database
575 Transaction* Database::getSystemDatabaseTrans(int slot)
577 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
578 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
579 slot * sizeof (Transaction);
580 return (Transaction*)(((char*) metaData_) + offset);
583 bool Database::isValidAddress(void* addr)
585 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
586 return false;
587 else
588 return true;
591 //should be called only on system database
592 void* Database::allocLockHashBuckets()
594 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
595 DbRetVal rv=OK;
596 void *ptr = chunk->allocate(this, &rv);
597 if (NULL == ptr)
599 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
601 return ptr;
604 Bucket* Database::getLockHashBuckets()
606 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
607 ChunkIterator iter = tChunk->getIterator();
608 return (Bucket*)iter.nextElement();
610 void Database::setUniqueChunkID(int id)
612 (metaData_->chunkUniqueID_).setID(id);
615 int Database::getUniqueIDForChunk()
617 return ((metaData_->chunkUniqueID_).getID());
620 DbRetVal Database::recoverMutex(Mutex *mut)
622 //TODO: operations need to be undone before recovering the mutex.
623 mut->recoverMutex();
624 return OK;
626 DbRetVal Database::writeDirtyPages(char *dataFile)
628 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
629 lseek(fd, 0, SEEK_SET);
630 void *buf = (void *) metaData_;
631 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
632 ssize_t retSize = os::write(fd, (char*)buf, sizeToWrite);
633 if (-1 == retSize)
635 printError(ErrWarning, "Warning:Unable to write metadata");
636 return ErrSysInternal;
638 PageInfo *pageInfo = (PageInfo*) getFirstPage();
639 long pageSize =PAGE_SIZE;
640 int pagesWritten=0, writeOffset=0;
641 long long totalBytesWritten=0;
642 while(isValidAddress((char*) pageInfo))
644 if ( NULL == pageInfo ) break;
645 if (pageInfo > getCurrentPage()) {
646 char *a="0";
647 lseek(fd, getMaxSize() -1, SEEK_SET);
648 if ( -1 == os::write(fd, a, 1)) {
649 printError(ErrSysInternal, "Unable to extend chkpt file");
650 close(fd);
651 return ErrSysInternal;
653 break;
655 if (BITSET(pageInfo->flags, IS_DIRTY)) {
656 if (NULL == pageInfo->nextPageAfterMerge_)
657 pageSize = PAGE_SIZE;
658 else
659 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
660 writeOffset = (long) pageInfo - (long) metaData_;
661 lseek(fd, writeOffset, SEEK_SET);
662 CLEARBIT(pageInfo->flags, IS_DIRTY);
663 retSize = os::write(fd, (char*)pageInfo, pageSize);
664 if ( -1 == retSize ) {
665 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
666 close(fd);
667 return ErrSysInternal;
669 totalBytesWritten= totalBytesWritten + retSize;
670 pagesWritten++;
672 if ( pageInfo->nextPageAfterMerge_ == NULL) {
673 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
674 } else {
675 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
678 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
679 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
680 close(fd);
681 return OK;
684 DbRetVal Database::checkPoint()
686 char dataFile[MAX_FILE_LEN];
687 char cmd[MAX_FILE_LEN];
688 char dbRedoFileName[MAX_FILE_LEN];
689 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
690 if (!Conf::config.useMmap()) {
691 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
692 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
693 FILE *fp = NULL;
694 if (fp = fopen(dataFile, "r")) {
695 fclose(fp);
696 int ret = unlink(dataFile);
697 if (ret != OK) {
698 printError(ErrOS, "Unable to delete old chkpt file. Failure");
699 return ErrOS;
702 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
703 void *buf = (void *) metaData_;
704 lseek(fd, 0, SEEK_SET);
705 write(fd, buf, Conf::config.getMaxDbSize());
706 close(fd);
707 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
708 int ret = system(cmd);
709 if (ret != 0) {
710 printError(ErrOS, "Unable to take checkpoint back up file");
711 return ErrOS;
713 } else {
714 int fd = getChkptfd();
715 if (!os::fdatasync(fd)) {
716 logFine(Conf::logger, "fsync succedded");
718 filterAndRemoveStmtLogs();
719 int ret = truncate(dbRedoFileName,0);
720 if (ret != 0) {
721 close(fd);
722 printError(ErrSysInternal, "Unable to truncate redo log file");
723 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
724 return ErrOS;
726 //switch the checkpoint so that during recovery, fsynced checkpoint is
727 //used during recovery if the below step(writeDirtyPages)
728 //is not completed succesfully.
729 if (Database::getCheckpointID() == 0)
730 Database::setCheckpointID(1);
731 else
732 Database::setCheckpointID(0);
734 int val=Database::getCheckpointID();
736 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
737 DbRetVal rv = writeDirtyPages(dataFile);
738 if (OK != rv)
740 printError(ErrSysInternal, "Unable to write dirty pages");
741 close(fd);
742 return rv;
745 //Note: do not change order, chkpt id should be switched only after
746 //all dirty pages are written to disk. otherwise(if server crashes
747 //when it writes these dirty pages) recovery should use
748 //mapped file as fsync is already done on that file.
749 if (Database::getCheckpointID() == 0)
750 Database::setCheckpointID(1);
751 else
752 Database::setCheckpointID(0);
754 close(fd);
755 return OK;
757 filterAndRemoveStmtLogs();
758 int ret = ::truncate(dbRedoFileName,0);
759 if (ret != 0) {
760 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
761 return ErrOS;
763 return OK;
765 DbRetVal Database::filterAndRemoveStmtLogs()
767 struct stat st;
768 char fName[MAX_FILE_LEN];
769 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
770 int fdRead = open(fName, O_RDONLY);
771 if (-1 == fdRead) { return OK; }
772 if (fstat(fdRead, &st) == -1) {
773 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
774 close(fdRead);
775 return ErrSysInternal;
777 if (st.st_size ==0) {
778 close(fdRead);
779 return OK;
781 void *startAddr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdRead, 0);
782 if (MAP_FAILED == startAddr) {
783 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
784 return ErrSysInternal;
786 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
787 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
788 char *iter = (char*)startAddr;
789 char *logStart = NULL, *logEnd = NULL;
790 int logType;
791 int stmtID;
792 int len =0, ret =0;
793 int txnID, loglen;
794 DbRetVal rv = OK;
795 HashMap stmtMap;
796 stmtMap.setKeySize(sizeof(int));
797 //PASS-I load all prepare stmts and free them
798 while(true) {
799 if (iter - (char*)startAddr >= st.st_size) break;
800 logType = *(int*)iter;
801 logStart = iter;
802 if (logType == -1) { //prepare
803 iter = iter + sizeof(int);
804 len = *(int*) iter;
805 iter = iter + 2 * sizeof(int);
806 stmtID = *(int*)iter;
807 stmtMap.insert(iter);
808 iter = logStart+ len;
809 ret =0;
811 else if(logType == -3) { //free
812 iter = iter + sizeof(int);
813 txnID = *(int*) iter; iter += sizeof(int);
814 loglen = *(int*) iter; iter += sizeof(int);
815 stmtID = *(int*)iter;
816 stmtMap.remove(iter);
817 iter = iter + sizeof(int);
818 }else{
819 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
820 rv = ErrSysInternal;
821 break;
824 //PASS-II take the prepared statements which are not freed into another backup file
825 while(true) {
826 if (iter - (char*)startAddr >= st.st_size) break;
827 logType = *(int*)iter;
828 logStart = iter;
829 if (logType == -1) { //prepare
830 iter = iter + sizeof(int);
831 len = *(int*) iter;
832 iter = iter + 2 * sizeof(int);
833 stmtID = *(int*)iter;
834 iter = logStart+ len;
835 ret =0;
836 if (stmtMap.find(&stmtID))
837 ret = os::write(fd, logStart, len);
838 if (-1 == ret) {
839 printError(ErrSysInternal, "Unable to write statement logs");
842 else if(logType == -3) { //free
843 iter = logStart + 4 *sizeof(int);
844 //neglet free stmt logs in this pass
845 }else{
846 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
847 rv = ErrSysInternal;
848 break;
852 os::closeFile(fd);
853 munmap((char*)startAddr, st.st_size);
854 close(fdRead);
855 stmtMap.removeAll();
856 char cmd[MAX_FILE_LEN *2];
857 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
858 Conf::config.getDbFile(), Conf::config.getDbFile());
859 ret = system(cmd);
860 return rv;
862 int Database::getCheckpointID()
864 int id=0;
865 char curCkptFile[MAX_FILE_LEN];
866 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
867 FILE *fp = fopen(curCkptFile, "r");
868 if (NULL == fp) { setCheckpointID(0); return 0; }
869 fscanf(fp, "%d", &id);
870 fclose(fp);
871 return id;
873 void Database::setCheckpointID(int id)
875 char curCkptFile[MAX_FILE_LEN];
876 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
877 FILE *fp = fopen(curCkptFile, "w");
878 if (NULL == fp) {
880 printError(ErrSysInternal, "Unable to set checkpointID");
881 return;
883 fprintf(fp, "%d", id);
884 logFine(Conf::logger, "Current checkpoint set to %d", id);
885 fclose(fp);
886 return;
890 //used only by the user database not the system database
891 DbRetVal Database::recoverUserDB()
893 char dataFile[MAX_FILE_LEN];
894 char cmd[MAX_FILE_LEN];
895 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
896 int fd = open(dataFile, O_RDONLY);
897 if (-1 == fd) { return OK; }
898 void *buf = (void *) metaData_;
899 int readbytes = read(fd, buf, Conf::config.getMaxDbSize());
900 if (readbytes == -1) { close(fd); return ErrOS; }
901 close(fd);
902 return OK;
905 //used only by the system database
906 DbRetVal Database::recoverSystemDB()
908 char mapFile[MAX_FILE_LEN];
909 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
910 int fd = open(mapFile, O_RDONLY);
911 if (-1 == fd) { return OK; }
912 CatalogTableTABLE cTable(this);
913 CatalogTableINDEX cIndex(this);
914 struct Object buf;
915 while (read(fd, &buf, sizeof(buf))) {
916 if (buf.type == Tbl) {
917 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
920 else if (buf.type == hIdx || buf.type == tIdx) {
921 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
924 return OK;