cleanup
[csql.git] / src / storage / Database.cxx
blob91b9f21c08406287efee90f9d629fe208b3a6bd1
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
25 const char* Database::getName()
27 return metaData_->dbName_;
30 int Database::getDatabaseID()
32 return metaData_->dbID_;
35 long Database::getMaxSize()
37 return metaData_->maxSize_;
40 long Database::getCurrentSize()
42 return metaData_->curSize_;
45 Page* Database::getCurrentPage()
47 return metaData_->curPage_;
50 Page* Database::getFirstPage()
52 return metaData_->firstPage_;
55 int Database::getNoOfChunks()
57 return metaData_->noOfChunks_;
59 Chunk* Database::getHashIndexChunk()
61 return metaData_->hashIndexChunk_;
64 void Database::setDatabaseID(int id)
66 metaData_->dbID_ = id;
68 void Database::setName(const char *name)
70 strcpy(metaData_->dbName_ , name);
72 void Database::setCurrentSize(long size)
74 metaData_->curSize_ = size;
76 void Database::setCurrentPage(Page *page)
78 //metaData_->curPage_ = page;
79 Mutex::CASL((long*)&metaData_->curPage_, (long)metaData_->curPage_, (long)page);
81 void Database::setFirstPage(Page *page)
83 metaData_->firstPage_ = page;
85 void Database::setMaxSize(long size)
87 metaData_->maxSize_ = size;
89 void Database::setNoOfChunks(int chunks)
91 metaData_->noOfChunks_ = chunks;
93 void Database::setHashIndexChunk(Chunk *ch)
95 metaData_->hashIndexChunk_ = ch;
99 int Database::initAllocDatabaseMutex()
101 return metaData_->dbAllocMutex_.init("allocdb");
103 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
105 int ret= metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
106 if (ret) return ErrLockTimeOut; else return OK;
108 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
110 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
111 return OK;
114 int Database::initPrepareStmtMutex()
116 return metaData_->dbPrepareStmtMutex_.init("prepstmt");
118 DbRetVal Database::getPrepareStmtMutex(bool procAccount)
120 int ret= metaData_->dbPrepareStmtMutex_.getLock(procSlot, procAccount);
121 if (ret) return ErrLockTimeOut; else return OK;
123 DbRetVal Database::releasePrepareStmtMutex(bool procAccount)
125 metaData_->dbPrepareStmtMutex_.releaseLock(procSlot, procAccount);
126 return OK;
129 int Database::initTransTableMutex()
131 return metaData_->dbTransTableMutex_.init("transtable");
133 DbRetVal Database::getTransTableMutex()
135 int ret = metaData_->dbTransTableMutex_.getLock(procSlot);
136 if (ret) return ErrLockTimeOut; else return OK;
138 DbRetVal Database::releaseTransTableMutex()
140 metaData_->dbTransTableMutex_.releaseLock(procSlot);
141 return OK;
146 int Database::initProcessTableMutex()
148 return metaData_->dbProcTableMutex_.init("proctable");
150 DbRetVal Database::getProcessTableMutex(bool procAccount)
152 int ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
153 if (ret) return ErrLockTimeOut; else return OK;
155 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
157 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
158 return OK;
163 int Database::initDatabaseMutex()
165 return metaData_->dbMutex_.init("db");
167 DbRetVal Database::getDatabaseMutex(bool procAccount)
169 int ret = metaData_->dbMutex_.getLock(procSlot, procAccount);
170 if (ret) return ErrLockTimeOut; else return OK;
172 DbRetVal Database::releaseDatabaseMutex(bool procAccount)
174 metaData_->dbMutex_.releaseLock(procSlot, procAccount);
175 return OK;
178 // Gets the free page
179 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
180 // of each page to determine if the page is free
181 // Algorithm is to scan through the pageInfo objects stored at
182 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
183 // database
184 // But in case of large tuples, pages are merged, so there wont be
185 // PageInfo object on pages which are merged.
186 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
188 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
189 Page* Database::getFreePage()
191 //Page* page = getFirstPage();
192 Page* page = getCurrentPage();
193 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
194 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
195 PageInfo* pageInfo = ((PageInfo*)page);
196 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
197 int pageSize = PAGE_SIZE;
198 bool isEndAddchk=false;
199 while( 1 == pageInfo->isUsed_)
201 //If any pages are merged to store data larger than PAGE_SIZE
202 //move to the next page after the merge and check whether it is used
203 if ( pageInfo->nextPageAfterMerge_ == NULL) {
204 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
205 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
207 else {
208 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
209 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
212 if((((char*) pageInfo) + pageSize) >= endAddr )
214 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
215 else
216 break;
218 if ((char*)pageInfo >= endAddr)
220 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
221 return NULL;
225 if (!isValidAddress(((char*) pageInfo) + pageSize))
227 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
228 return NULL;
230 setCurrentPage((Page*) pageInfo);
231 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
232 return (Page*) pageInfo ;
235 //Used by tuples more than PAGE_SIZE
236 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
237 Page* Database::getFreePage(size_t size)
239 Page* page = getFirstPage();
240 PageInfo* pageInfo = ((PageInfo*)page);
241 int multiple = size / PAGE_SIZE;
242 int offset = ((multiple + 1) * PAGE_SIZE);
243 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
244 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
245 int pageSize = PAGE_SIZE;
246 bool isEndAddchk = false;
247 while(true){
248 while( 1 == pageInfo->isUsed_)
250 //If any pages are merged to store data larger than PAGE_SIZE
251 //move to the next page after the merge and check whether it is used
252 if ( pageInfo->nextPageAfterMerge_ == NULL) {
253 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
254 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
256 else {
257 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
258 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
260 if((((char*) pageInfo) + offset) >= endAddr )
262 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
263 else
264 break;
267 int i = 0;
268 PageInfo *pInfo = pageInfo;
269 if ((((char*)pInfo) + offset) >= endAddr)
271 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
272 return NULL;
274 for (i = 0; i< multiple + 1; i++)
276 if (1 == pInfo->isUsed_) break;
277 pInfo = (PageInfo*)((char*)pInfo + pageSize);
279 if ( i == (multiple + 1)) break;
280 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
283 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
284 setCurrentPage((Page*) pageInfo);
285 return (Page*) pageInfo ;
288 void Database::printStatistics()
290 Page* page = getFirstPage();
291 PageInfo* pageInfo = ((PageInfo*)page);
292 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
293 printf("<DatabaseStatistics>\n");
294 printf(" <Database Name> %s </Database Name>\n", getName());
295 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
296 printf(" <First Page> %x </First Page>\n", getFirstPage());
297 while(isValidAddress((char*) pageInfo))
299 if (pageInfo == NULL) break;
300 if (1 == pageInfo->isUsed_) {
301 if ( pageInfo->nextPageAfterMerge_ == NULL) {
302 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
303 usedPageCount++; totalPages++;
304 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
305 continue;
307 else {
308 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
309 usedMergedPageCount++; totalPages++;
310 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
311 continue;
314 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
315 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
316 totalPages++;
318 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
319 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
320 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
321 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
322 printf("</DatabaseStatistics>\n");
324 return ;
328 //called only in case of system database to create and initialize the chunk
329 //information
330 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
333 Chunk *chunk;
334 if (-1 == id )
336 printError(ErrSysFatal, "Database ID corrupted");
337 return ErrSysFatal;
339 chunk = getSystemDatabaseChunk(id);
341 chunk->setChunkNameForSystemDB(id);
343 if (FixedSizeAllocator == type) chunk->setSize(size);
344 //getDatabaseMutex();
345 if (chunk->allocSize_ > PAGE_SIZE)
346 chunk->curPage_ = getFreePage(chunk->allocSize_);
347 else
348 chunk->curPage_ = getFreePage();
349 if ( chunk->curPage_ == NULL)
351 //releaseDatabaseMutex();
352 printError(ErrNoMemory, "No free pages in database: Database full");
353 return ErrNoMemory;
356 chunk->firstPage_ = chunk->curPage_;
357 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
358 firstPageInfo->setFirstPageAsUsed();
359 chunk->setChunkID(id);
360 chunk->setAllocType(type);
361 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
362 if (chunk->allocSize_ > PAGE_SIZE)
364 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
365 int offset = ((multiple + 1) * PAGE_SIZE);
366 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
369 if (0 == size)
371 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
372 varInfo->isUsed_ = 0;
373 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
376 incrementChunk();
377 //releaseDatabaseMutex();
378 return OK;
381 //This is never called currently. If situation arises will be coded later.
382 DbRetVal Database::deleteSystemDatabaseChunk(int id)
385 Chunk *chunk = getSystemDatabaseChunk(id);
386 chunk->setChunkID(-1);
387 chunk->setSize(0);
388 chunk->setAllocType(UnknownAllocator);
389 //TODO::
390 //chunk->pageList_
391 //walk though the pageList ptr and get all the page pointers
392 //then free all the pages used to store this by setting the
393 //start of page to notused
394 chunk->firstPage_ = NULL;
395 chunk->curPage_ = NULL;
396 decrementChunk();
397 return OK;
401 void Database::createAllCatalogTables()
403 //These are special chunks which hold catalog tables and other information
405 // chunk id 0 ->userChunkTable
406 // chunk id 1 ->lockBucketHash
407 // chunk id 2 ->lockTable
409 // chunk id 10->DATABASE
410 // chunk id 11->USER
411 // chunk id 12->TABLE
412 // chunk id 13->FIELD
413 // chunk id 14->ACCESS
415 createSystemTables();
416 createMetaDataTables();
418 void Database::createSystemTables()
420 createSystemDatabaseChunk(FixedSizeAllocator,
421 sizeof(Chunk), UserChunkTableId);
422 createSystemDatabaseChunk(FixedSizeAllocator,
423 sizeof(Bucket) * LOCK_BUCKET_SIZE,
424 LockTableHashBucketId);
425 createSystemDatabaseChunk(FixedSizeAllocator,
426 sizeof(Mutex)* LOCK_BUCKET_SIZE,
427 LockTableMutexId);
428 createSystemDatabaseChunk(FixedSizeAllocator,
429 sizeof(LockHashNode), LockTableId);
430 createSystemDatabaseChunk(FixedSizeAllocator,
431 sizeof(TransHasNode), TransHasTableId);
433 createSystemDatabaseChunk(VariableSizeAllocator,
434 0, UndoLogTableID);
436 void Database::createMetaDataTables()
438 createSystemDatabaseChunk(FixedSizeAllocator,
439 sizeof(CDATABASEFILE), DatabaseTableId);
440 createSystemDatabaseChunk(FixedSizeAllocator,
441 sizeof(CUSER), UserTableId);
442 createSystemDatabaseChunk(FixedSizeAllocator,
443 sizeof(CTABLE), TableTableId);
444 createSystemDatabaseChunk(FixedSizeAllocator,
445 sizeof(CFIELD), FieldTableId);
446 createSystemDatabaseChunk(FixedSizeAllocator,
447 sizeof(CACCESS), AccessTableId);
448 createSystemDatabaseChunk(FixedSizeAllocator,
449 sizeof(CINDEX), IndexTableId);
450 createSystemDatabaseChunk(FixedSizeAllocator,
451 sizeof(CINDEXFIELD), IndexFieldTableId);
452 createSystemDatabaseChunk(FixedSizeAllocator,
453 sizeof(CFK), ForeignKeyTableId);
454 createSystemDatabaseChunk(FixedSizeAllocator,
455 sizeof(CFKFIELD), ForeignKeyFieldTableId);
458 //used in case of system database
459 Chunk* Database::getSystemDatabaseChunk(int id)
461 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
462 id * sizeof (Chunk);
463 return (Chunk*)(((char*) metaData_) + offset);
467 //used in case of system database
468 Transaction* Database::getSystemDatabaseTrans(int slot)
470 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
471 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
472 slot * sizeof (Transaction);
473 return (Transaction*)(((char*) metaData_) + offset);
476 bool Database::isValidAddress(void* addr)
478 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
479 return false;
480 else
481 return true;
484 //should be called only on system database
485 void* Database::allocLockHashBuckets()
487 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
488 DbRetVal rv=OK;
489 void *ptr = chunk->allocate(this, &rv);
490 if (NULL == ptr)
492 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
494 return ptr;
497 Bucket* Database::getLockHashBuckets()
499 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
500 ChunkIterator iter = tChunk->getIterator();
501 return (Bucket*)iter.nextElement();
503 void Database::setUniqueChunkID(int id)
505 (metaData_->chunkUniqueID_).setID(id);
508 int Database::getUniqueIDForChunk()
510 return ((metaData_->chunkUniqueID_).getID());
513 DbRetVal Database::recoverMutex(Mutex *mut)
515 //TODO: operations need to be undone before recovering the mutex.
516 mut->recoverMutex();
517 return OK;
520 DbRetVal Database::checkPoint()
522 char dataFile[1024];
523 char cmd[1024];
524 if (!Conf::config.useMmap()) {
525 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
526 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
527 FILE *fp = NULL;
528 if (fp = fopen(dataFile, "r")) {
529 fclose(fp);
530 int ret = unlink(dataFile);
531 if (ret != OK) {
532 printError(ErrOS, "Unable to delete old chkpt file. Failure");
533 return ErrOS;
536 int fd = open(dataFile, O_WRONLY|O_CREAT, 0644);
537 void *buf = (void *) metaData_;
538 write(fd, buf, Conf::config.getMaxDbSize());
539 close(fd);
540 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
541 int ret = system(cmd);
542 if (ret != 0) {
543 printError(ErrOS, "Unable to take checkpoint back up file");
544 return ErrOS;
546 } else {
547 int fd = getChkptfd();
548 if (!fdatasync(fd)) { printf("pages written. checkpoint taken\n"); }
549 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
550 int ret = system(cmd);
551 if (ret != 0) {
552 printError(ErrOS, "Unable to take checkpoint back up file");
553 return ErrOS;
556 return OK;
559 //used only by the user database not the system database
560 DbRetVal Database::recoverUserDB()
562 char dataFile[1024];
563 char cmd[1024];
564 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
565 int fd = open(dataFile, O_RDONLY);
566 if (-1 == fd) { return OK; }
567 void *buf = (void *) metaData_;
568 read(fd, buf, Conf::config.getMaxDbSize());
569 close(fd);
570 return OK;
573 //used only by the system database
574 DbRetVal Database::recoverSystemDB()
576 char mapFile[1024];
577 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
578 int fd = open(mapFile, O_RDONLY);
579 if (-1 == fd) { return OK; }
580 CatalogTableTABLE cTable(this);
581 CatalogTableINDEX cIndex(this);
582 struct Object buf;
583 while (read(fd, &buf, sizeof(buf))) {
584 if (buf.type == Tbl) {
585 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
588 else if (buf.type == hIdx || buf.type == tIdx) {
589 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
592 return OK;