changing lock bucket size
[csql.git] / src / storage / Database.cxx
blob478a3c3b29e6ee4584a75f0ba3e8a885516a4c8f
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 const char* Database::getName()
28 return metaData_->dbName_;
31 int Database::getDatabaseID()
33 return metaData_->dbID_;
36 long Database::getMaxSize()
38 return metaData_->maxSize_;
41 long Database::getCurrentSize()
43 return metaData_->curSize_;
46 Page* Database::getCurrentPage()
48 return metaData_->curPage_;
51 Page* Database::getFirstPage()
53 return metaData_->firstPage_;
56 int Database::getNoOfChunks()
58 return metaData_->noOfChunks_;
60 Chunk* Database::getHashIndexChunk()
62 return metaData_->hashIndexChunk_;
65 void Database::setDatabaseID(int id)
67 metaData_->dbID_ = id;
69 void Database::setName(const char *name)
71 strcpy(metaData_->dbName_ , name);
73 void Database::setCurrentSize(long size)
75 metaData_->curSize_ = size;
77 void Database::setCurrentPage(Page *page)
79 //metaData_->curPage_ = page;
80 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
82 void Database::setFirstPage(Page *page)
84 metaData_->firstPage_ = page;
86 void Database::setMaxSize(long size)
88 metaData_->maxSize_ = size;
90 void Database::setNoOfChunks(int chunks)
92 metaData_->noOfChunks_ = chunks;
94 void Database::setHashIndexChunk(Chunk *ch)
96 metaData_->hashIndexChunk_ = ch;
99 void Database::printDebugMutexInfo()
101 metaData_->dbAllocMutex_.print();
102 metaData_->ckptMutex_.print();
103 metaData_->dbTransTableMutex_.print();
104 metaData_->dbProcTableMutex_.print();
105 metaData_->dbPrepareStmtMutex_.print();
106 metaData_->chunkUniqueID_.print();
108 int Database::initAllocDatabaseMutex()
110 return metaData_->dbAllocMutex_.init("allocdb");
112 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
114 struct timeval timeout, timeval;
115 timeout.tv_sec = Conf::config.getMutexSecs();
116 timeout.tv_usec = Conf::config.getMutexUSecs();
117 int tries=0;
118 int totalTries = Conf::config.getMutexRetries() *2;
119 int ret =0;
120 while (tries < totalTries)
122 ret = metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
123 if (ret == 0) break;
124 timeval.tv_sec = timeout.tv_sec;
125 timeval.tv_usec = timeout.tv_usec;
126 os::select(0, 0, 0, 0, &timeval);
127 tries++;
129 if (tries >= totalTries) return ErrLockTimeOut;
130 return OK;
132 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
134 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
135 return OK;
138 int Database::initPrepareStmtMutex()
140 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
142 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
144 struct timeval timeout, timeval;
145 timeout.tv_sec = Conf::config.getMutexSecs();
146 timeout.tv_usec = Conf::config.getMutexUSecs();
147 int tries=0;
148 int totalTries = Conf::config.getMutexRetries() *2;
149 int ret =0;
150 while (tries < totalTries)
152 ret = metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
153 if (ret == 0) break;
154 timeval.tv_sec = timeout.tv_sec;
155 timeval.tv_usec = timeout.tv_usec;
156 os::select(0, 0, 0, 0, &timeval);
157 tries++;
159 if (tries >= totalTries) return ErrLockTimeOut;
160 return OK;
163 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
165 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
166 return OK;
169 int Database::initTransTableMutex()
171 return metaData_->dbTransTableMutex_.init("transtable");
173 DbRetVal Database::getTransTableMutex()
175 struct timeval timeout, timeval;
176 timeout.tv_sec = Conf::config.getMutexSecs();
177 timeout.tv_usec = Conf::config.getMutexUSecs();
178 int tries=0;
179 int totalTries = Conf::config.getMutexRetries() *2;
180 int ret =0;
181 while (tries < totalTries)
183 ret = metaData_->dbTransTableMutex_.getLock(procSlot);
184 if (ret == 0) break;
185 timeval.tv_sec = timeout.tv_sec;
186 timeval.tv_usec = timeout.tv_usec;
187 os::select(0, 0, 0, 0, &timeval);
188 tries++;
190 if (tries >= totalTries) return ErrLockTimeOut;
191 return OK;
194 DbRetVal Database::releaseTransTableMutex()
196 metaData_->dbTransTableMutex_.releaseLock(procSlot);
197 return OK;
202 int Database::initProcessTableMutex()
204 return metaData_->dbProcTableMutex_.init("proctable");
206 DbRetVal Database::getProcessTableMutex(bool procAccount)
208 struct timeval timeout, timeval;
209 timeout.tv_sec = Conf::config.getMutexSecs();
210 timeout.tv_usec = Conf::config.getMutexUSecs();
211 int tries=0;
212 int totalTries = Conf::config.getMutexRetries() *2;
213 int ret =0;
214 while (tries < totalTries)
216 ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
217 if (ret == 0) break;
218 timeval.tv_sec = timeout.tv_sec;
219 timeval.tv_usec = timeout.tv_usec;
220 os::select(0, 0, 0, 0, &timeval);
221 tries++;
223 if (tries >= totalTries) return ErrLockTimeOut;
224 return OK;
227 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
229 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
230 return OK;
233 int Database::initCheckpointMutex()
235 return metaData_->ckptMutex_.init("checkpoint");
237 DbRetVal Database::getSCheckpointMutex(bool procAccount)
239 struct timeval timeout, timeval;
240 timeout.tv_sec = Conf::config.getMutexSecs();
241 timeout.tv_usec = Conf::config.getMutexUSecs();
242 int tries=0;
243 int totalTries = Conf::config.getMutexRetries() *2;
244 int ret =0;
245 while (tries < totalTries)
247 ret = metaData_->ckptMutex_.getShareLock(procSlot, procAccount);
248 if (ret == 0) break;
249 timeval.tv_sec = timeout.tv_sec;
250 timeval.tv_usec = timeout.tv_usec;
251 os::select(0, 0, 0, 0, &timeval);
252 tries++;
254 if (tries >= totalTries) return ErrLockTimeOut;
255 return OK;
258 DbRetVal Database::getXCheckpointMutex(bool procAccount)
260 struct timeval timeout, timeval;
261 timeout.tv_sec = Conf::config.getMutexSecs();
262 timeout.tv_usec = Conf::config.getMutexUSecs();
263 int tries=0;
264 int totalTries = Conf::config.getMutexRetries() *2;
265 int ret =0;
266 while (tries < totalTries)
268 ret = metaData_->ckptMutex_.getExclusiveLock(procSlot, procAccount);
269 if (ret == 0) break;
270 timeval.tv_sec = timeout.tv_sec;
271 timeval.tv_usec = timeout.tv_usec;
272 os::select(0, 0, 0, 0, &timeval);
273 tries++;
275 if (tries >= totalTries) return ErrLockTimeOut;
276 return OK;
279 DbRetVal Database::releaseCheckpointMutex(bool procAccount)
281 metaData_->ckptMutex_.releaseShareLock(procSlot, procAccount);
282 return OK;
285 // Gets the free page
286 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
287 // of each page to determine if the page is free
288 // Algorithm is to scan through the pageInfo objects stored at
289 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
290 // database
291 // But in case of large tuples, pages are merged, so there wont be
292 // PageInfo object on pages which are merged.
293 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
295 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
296 Page* Database::getFreePage()
298 Page* page = getFirstPage();
299 //Page* page = getCurrentPage();
300 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
301 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
302 PageInfo* pageInfo = ((PageInfo*)page);
303 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
304 int pageSize = PAGE_SIZE;
305 bool isEndAddchk=false;
306 while( 1 == pageInfo->isUsed_)
308 //If any pages are merged to store data larger than PAGE_SIZE
309 //move to the next page after the merge and check whether it is used
310 if ( pageInfo->nextPageAfterMerge_ == NULL) {
311 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
312 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
314 else {
315 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
316 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
319 if((((char*) pageInfo) + pageSize) >= endAddr )
321 if(!isEndAddchk){
322 isEndAddchk=true;
323 pageInfo=(PageInfo *)getFirstPage();
325 else
326 break;
328 if ((char*)pageInfo >= endAddr)
330 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
331 return NULL;
335 if (!isValidAddress(((char*) pageInfo) + pageSize))
337 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
338 return NULL;
340 setCurrentPage((Page*) pageInfo);
341 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
342 return (Page*) pageInfo ;
345 //Used by tuples more than PAGE_SIZE
346 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
347 Page* Database::getFreePage(size_t size)
349 Page* page = getFirstPage();
350 PageInfo* pageInfo = ((PageInfo*)page);
351 int multiple = size / PAGE_SIZE;
352 int offset = ((multiple + 1) * PAGE_SIZE);
353 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
354 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
355 int pageSize = PAGE_SIZE;
356 bool isEndAddchk = false;
357 while(true){
358 while( 1 == pageInfo->isUsed_)
360 //If any pages are merged to store data larger than PAGE_SIZE
361 //move to the next page after the merge and check whether it is used
362 if ( pageInfo->nextPageAfterMerge_ == NULL) {
363 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
364 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
366 else {
367 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
368 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
370 if((((char*) pageInfo) + offset) >= endAddr )
372 if(!isEndAddchk){
373 isEndAddchk=true;
374 pageInfo=(PageInfo *)getFirstPage();
376 else
377 break;
380 int i = 0;
381 PageInfo *pInfo = pageInfo;
382 if ((((char*)pInfo) + offset) >= endAddr)
384 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
385 return NULL;
387 for (i = 0; i< multiple + 1; i++)
389 if (1 == pInfo->isUsed_) break;
390 pInfo = (PageInfo*)((char*)pInfo + pageSize);
392 if ( i == (multiple + 1)) break;
393 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
396 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
397 setCurrentPage((Page*) pageInfo);
398 return (Page*) pageInfo ;
401 void Database::printStatistics()
403 Page* page = getFirstPage();
404 PageInfo* pageInfo = ((PageInfo*)page);
405 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
406 int totalDirtyPages=0;
407 printf("<DatabaseStatistics>\n");
408 printf(" <Database Name> %s </Database Name>\n", getName());
409 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
410 printf(" <First Page> %x </First Page>\n", getFirstPage());
411 while(isValidAddress((char*) pageInfo))
413 if (pageInfo == NULL) break;
414 //if (pageInfo > getCurrentPage()) break;
415 if (1 == pageInfo->isUsed_) {
416 if ( pageInfo->nextPageAfterMerge_ == NULL) {
417 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
418 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
419 usedPageCount++; totalPages++;
420 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
421 continue;
423 else {
424 if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
425 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
426 usedMergedPageCount++; totalPages++;
427 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
428 continue;
430 } else if (BITSET(pageInfo->flags, IS_DIRTY)) totalDirtyPages++;
431 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
432 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
433 totalPages++;
435 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
436 if (Conf::config.useDurability())
437 printf(" <Dirty Pages> %d </Dirty Pages>\n", totalDirtyPages);
438 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
439 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
440 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
441 printf("</DatabaseStatistics>\n");
443 return ;
447 //called only in case of system database to create and initialize the chunk
448 //information
449 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
452 Chunk *chunk;
453 if (-1 == id )
455 printError(ErrSysFatal, "Database ID corrupted");
456 return ErrSysFatal;
458 chunk = getSystemDatabaseChunk(id);
460 chunk->setChunkNameForSystemDB(id);
462 if (FixedSizeAllocator == type) chunk->setSize(size);
463 //getDatabaseMutex();
464 if (chunk->allocSize_ > PAGE_SIZE)
465 chunk->curPage_ = getFreePage(chunk->allocSize_);
466 else
467 chunk->curPage_ = getFreePage();
468 if ( chunk->curPage_ == NULL)
470 //releaseDatabaseMutex();
471 printError(ErrNoMemory, "No free pages in database: Database full");
472 return ErrNoMemory;
475 chunk->firstPage_ = chunk->curPage_;
476 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
477 firstPageInfo->setFirstPageAsUsed();
478 chunk->setChunkID(id);
479 chunk->setAllocType(type);
480 chunk->initMutex(id);
481 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
482 if (chunk->allocSize_ > PAGE_SIZE)
484 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
485 int offset = ((multiple + 1) * PAGE_SIZE);
486 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
489 if (0 == size)
491 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
492 varInfo->isUsed_ = 0;
493 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
496 incrementChunk();
497 //releaseDatabaseMutex();
498 return OK;
501 //This is never called currently. If situation arises will be coded later.
502 DbRetVal Database::deleteSystemDatabaseChunk(int id)
505 Chunk *chunk = getSystemDatabaseChunk(id);
506 chunk->setChunkID(-1);
507 chunk->setSize(0);
508 chunk->setAllocType(UnknownAllocator);
509 //TODO::
510 //chunk->pageList_
511 //walk though the pageList ptr and get all the page pointers
512 //then free all the pages used to store this by setting the
513 //start of page to notused
514 chunk->firstPage_ = NULL;
515 chunk->curPage_ = NULL;
516 decrementChunk();
517 return OK;
521 void Database::createAllCatalogTables()
523 //These are special chunks which hold catalog tables and other information
525 // chunk id 0 ->userChunkTable
526 // chunk id 1 ->lockBucketHash
527 // chunk id 2 ->lockTable
529 // chunk id 10->DATABASE
530 // chunk id 11->USER
531 // chunk id 12->TABLE
532 // chunk id 13->FIELD
533 // chunk id 14->ACCESS
535 createSystemTables();
536 createMetaDataTables();
538 void Database::createSystemTables()
540 createSystemDatabaseChunk(FixedSizeAllocator,
541 sizeof(Chunk), UserChunkTableId);
542 createSystemDatabaseChunk(FixedSizeAllocator,
543 sizeof(Bucket) * LOCK_BUCKET_SIZE,
544 LockTableHashBucketId);
545 createSystemDatabaseChunk(FixedSizeAllocator,
546 sizeof(Mutex)* LOCK_BUCKET_SIZE,
547 LockTableMutexId);
548 createSystemDatabaseChunk(FixedSizeAllocator,
549 sizeof(LockHashNode), LockTableId);
550 createSystemDatabaseChunk(FixedSizeAllocator,
551 sizeof(TransHasNode), TransHasTableId);
553 createSystemDatabaseChunk(VariableSizeAllocator,
554 0, UndoLogTableID);
556 void Database::createMetaDataTables()
558 createSystemDatabaseChunk(FixedSizeAllocator,
559 sizeof(CDATABASEFILE), DatabaseTableId);
560 createSystemDatabaseChunk(FixedSizeAllocator,
561 sizeof(CUSER), UserTableId);
562 createSystemDatabaseChunk(FixedSizeAllocator,
563 sizeof(CTABLE), TableTableId);
564 createSystemDatabaseChunk(FixedSizeAllocator,
565 sizeof(CFIELD), FieldTableId);
566 createSystemDatabaseChunk(FixedSizeAllocator,
567 sizeof(CACCESS), AccessTableId);
568 createSystemDatabaseChunk(FixedSizeAllocator,
569 sizeof(CINDEX), IndexTableId);
570 createSystemDatabaseChunk(FixedSizeAllocator,
571 sizeof(CINDEXFIELD), IndexFieldTableId);
572 createSystemDatabaseChunk(FixedSizeAllocator,
573 sizeof(CFK), ForeignKeyTableId);
574 createSystemDatabaseChunk(FixedSizeAllocator,
575 sizeof(CFKFIELD), ForeignKeyFieldTableId);
578 //used in case of system database
579 Chunk* Database::getSystemDatabaseChunk(int id)
581 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
582 id * sizeof (Chunk);
583 return (Chunk*)(((char*) metaData_) + offset);
587 //used in case of system database
588 Transaction* Database::getSystemDatabaseTrans(int slot)
590 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
591 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
592 slot * sizeof (Transaction);
593 return (Transaction*)(((char*) metaData_) + offset);
596 bool Database::isValidAddress(void* addr)
598 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
599 return false;
600 else
601 return true;
604 //should be called only on system database
605 void* Database::allocLockHashBuckets()
607 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
608 DbRetVal rv=OK;
609 void *ptr = chunk->allocate(this, &rv);
610 if (NULL == ptr)
612 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
614 return ptr;
617 Bucket* Database::getLockHashBuckets()
619 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
620 ChunkIterator iter = tChunk->getIterator();
621 return (Bucket*)iter.nextElement();
623 void Database::setUniqueChunkID(int id)
625 (metaData_->chunkUniqueID_).setID(id);
628 int Database::getUniqueIDForChunk()
630 return ((metaData_->chunkUniqueID_).getID());
633 DbRetVal Database::recoverMutex(Mutex *mut)
635 //TODO: operations need to be undone before recovering the mutex.
636 mut->recoverMutex();
637 return OK;
639 DbRetVal Database::writeDirtyPages(char *dataFile)
641 int fd = os::open(dataFile, fileOpenCreat, 0);
642 os::lseek(fd, 0, SEEK_SET);
643 void *buf = (void *) metaData_;
644 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
645 size_t retSize = os::write(fd, (char*)buf, sizeToWrite);
646 if (-1 == retSize)
648 printError(ErrWarning, "Warning:Unable to write metadata");
649 return ErrSysInternal;
651 PageInfo *pageInfo = (PageInfo*) getFirstPage();
652 long pageSize =PAGE_SIZE;
653 int pagesWritten=0, writeOffset=0;
654 long long totalBytesWritten=0;
655 while(isValidAddress((char*) pageInfo))
657 if ( NULL == pageInfo ) break;
658 if (pageInfo > getCurrentPage()) {
659 char *a="0";
660 os::lseek(fd, getMaxSize() -1, SEEK_SET);
661 if ( -1 == os::write(fd, a, 1)) {
662 printError(ErrSysInternal, "Unable to extend chkpt file");
663 os::close(fd);
664 return ErrSysInternal;
666 break;
668 if (BITSET(pageInfo->flags, IS_DIRTY)) {
669 if (NULL == pageInfo->nextPageAfterMerge_)
670 pageSize = PAGE_SIZE;
671 else
672 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
673 writeOffset = (long) pageInfo - (long) metaData_;
674 ::lseek(fd, writeOffset, SEEK_SET);
675 CLEARBIT(pageInfo->flags, IS_DIRTY);
676 retSize = os::write(fd, (char*)pageInfo, pageSize);
677 if ( -1 == retSize ) {
678 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
679 os::close(fd);
680 return ErrSysInternal;
682 totalBytesWritten= totalBytesWritten + retSize;
683 pagesWritten++;
685 if ( pageInfo->nextPageAfterMerge_ == NULL) {
686 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
687 } else {
688 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
691 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
692 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
693 os::close(fd);
694 return OK;
697 DbRetVal Database::checkPoint()
699 char dataFile[MAX_FILE_LEN];
700 char cmd[MAX_FILE_LEN];
701 char dbRedoFileName[MAX_FILE_LEN];
702 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
703 if (!Conf::config.useMmap()) {
704 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
705 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
706 FILE *fp = NULL;
707 if (fp = fopen(dataFile, "r")) {
708 fclose(fp);
709 int ret = ::unlink(dataFile);
710 if (ret != OK) {
711 printError(ErrOS, "Unable to delete old chkpt file. Failure");
712 return ErrOS;
715 int fd = ::open(dataFile, O_WRONLY|O_CREAT, 0644);
716 void *buf = (void *) metaData_;
717 os::lseek(fd, 0, SEEK_SET);
718 os::write(fd, (char*) buf, Conf::config.getMaxDbSize());
719 os::close(fd);
720 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
721 int ret = system(cmd);
722 if (ret != 0) {
723 printError(ErrOS, "Unable to take checkpoint back up file");
724 return ErrOS;
726 } else {
727 file_desc fd = getChkptfd();
728 if (!os::fdatasync(fd)) {
729 logFine(Conf::logger, "fsync succedded");
731 filterAndRemoveStmtLogs();
732 int ret = os::truncate(dbRedoFileName);
733 if (ret != 0) {
734 os::closeFile(fd);
735 printError(ErrSysInternal, "Unable to truncate redo log file");
736 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
737 return ErrOS;
739 //switch the checkpoint so that during recovery, fsynced checkpoint is
740 //used during recovery if the below step(writeDirtyPages)
741 //is not completed succesfully.
742 if (Database::getCheckpointID() == 0)
743 Database::setCheckpointID(1);
744 else
745 Database::setCheckpointID(0);
747 int val=Database::getCheckpointID();
749 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
750 DbRetVal rv = writeDirtyPages(dataFile);
751 if (OK != rv)
753 printError(ErrSysInternal, "Unable to write dirty pages");
754 os::closeFile(fd);
755 return rv;
758 //Note: do not change order, chkpt id should be switched only after
759 //all dirty pages are written to disk. otherwise(if server crashes
760 //when it writes these dirty pages) recovery should use
761 //mapped file as fsync is already done on that file.
762 if (Database::getCheckpointID() == 0)
763 Database::setCheckpointID(1);
764 else
765 Database::setCheckpointID(0);
767 os::closeFile(fd);
768 return OK;
770 filterAndRemoveStmtLogs();
771 int ret = os::truncate(dbRedoFileName);
772 if (ret != 0) {
773 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
774 return ErrOS;
776 return OK;
778 DbRetVal Database::filterAndRemoveStmtLogs()
780 struct stat st;
781 char fName[MAX_FILE_LEN];
782 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
783 file_desc fdRead = os::openFile(fName, fileOpenReadOnly,0);
784 if ((file_desc)-1 == fdRead) { return OK; }
785 if (::stat(fName, &st) == -1) {
786 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
787 os::closeFile(fdRead);
788 return ErrSysInternal;
790 if (st.st_size ==0) {
791 os::closeFile(fdRead);
792 return OK;
794 void *startAddr = os::mmap(NULL, st.st_size, mapProtRead, mapPrivate, fdRead, 0);
795 if ((void*) MAP_FAILED == startAddr) {
796 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
797 return ErrSysInternal;
799 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
800 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
801 char *iter = (char*)startAddr;
802 char *logStart = NULL, *logEnd = NULL;
803 int logType;
804 int stmtID;
805 int len =0, ret =0;
806 int txnID, loglen;
807 DbRetVal rv = OK;
808 HashMap stmtMap;
809 stmtMap.setKeySize(sizeof(int));
810 //PASS-I load all prepare stmts and free them
811 while(true) {
812 if (iter - (char*)startAddr >= st.st_size) break;
813 logType = *(int*)iter;
814 logStart = iter;
815 if (logType == -1) { //prepare
816 iter = iter + sizeof(int);
817 len = *(int*) iter;
818 iter = iter + 2 * sizeof(int);
819 stmtID = *(int*)iter;
820 stmtMap.insert(iter);
821 iter = logStart+ len;
822 ret =0;
824 else if(logType == -3) { //free
825 iter = iter + sizeof(int);
826 txnID = *(int*) iter; iter += sizeof(int);
827 loglen = *(int*) iter; iter += sizeof(int);
828 stmtID = *(int*)iter;
829 stmtMap.remove(iter);
830 iter = iter + sizeof(int);
831 }else{
832 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
833 rv = ErrSysInternal;
834 break;
837 //PASS-II take the prepared statements which are not freed into another backup file
838 while(true) {
839 if (iter - (char*)startAddr >= st.st_size) break;
840 logType = *(int*)iter;
841 logStart = iter;
842 if (logType == -1) { //prepare
843 iter = iter + sizeof(int);
844 len = *(int*) iter;
845 iter = iter + 2 * sizeof(int);
846 stmtID = *(int*)iter;
847 iter = logStart+ len;
848 ret =0;
849 if (stmtMap.find(&stmtID))
850 ret = os::write(fd, logStart, len);
851 if (-1 == ret) {
852 printError(ErrSysInternal, "Unable to write statement logs");
855 else if(logType == -3) { //free
856 iter = logStart + 4 *sizeof(int);
857 //neglet free stmt logs in this pass
858 }else{
859 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
860 rv = ErrSysInternal;
861 break;
865 os::close(fd);
866 os::munmap((char*)startAddr, st.st_size);
867 os::closeFile(fdRead);
868 stmtMap.removeAll();
869 char cmd[MAX_FILE_LEN *2];
870 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
871 Conf::config.getDbFile(), Conf::config.getDbFile());
872 ret = system(cmd);
873 return rv;
875 int Database::getCheckpointID()
877 int id=0;
878 char curCkptFile[MAX_FILE_LEN];
879 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
880 FILE *fp = fopen(curCkptFile, "r");
881 if (NULL == fp) { setCheckpointID(0); return 0; }
882 fscanf(fp, "%d", &id);
883 fclose(fp);
884 return id;
886 void Database::setCheckpointID(int id)
888 char curCkptFile[MAX_FILE_LEN];
889 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
890 FILE *fp = fopen(curCkptFile, "w");
891 if (NULL == fp) {
893 printError(ErrSysInternal, "Unable to set checkpointID");
894 return;
896 fprintf(fp, "%d", id);
897 logFine(Conf::logger, "Current checkpoint set to %d", id);
898 fclose(fp);
899 return;
903 //used only by the user database not the system database
904 DbRetVal Database::recoverUserDB()
906 char dataFile[MAX_FILE_LEN];
907 char cmd[MAX_FILE_LEN];
908 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
909 int fd = os::open(dataFile, fileOpenReadOnly, 0);
910 if (-1 == fd) { return OK; }
911 void *buf = (void *) metaData_;
912 int readbytes = read(fd, buf, Conf::config.getMaxDbSize());
913 if (readbytes == -1) { os::close(fd); return ErrOS; }
914 os::close(fd);
915 return OK;
918 //used only by the system database
919 DbRetVal Database::recoverSystemDB()
921 char mapFile[MAX_FILE_LEN];
922 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
923 int fd = open(mapFile, O_RDONLY);
924 if (-1 == fd) { return OK; }
925 CatalogTableTABLE cTable(this);
926 CatalogTableINDEX cIndex(this);
927 struct Object buf;
928 while (read(fd, &buf, sizeof(buf))) {
929 if (buf.type == Tbl) {
930 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
932 else if (buf.type == hIdx || buf.type == tIdx) {
933 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
936 return OK;