performance fixes and fix for core dump in test tools/csql/test029.ksh
[csql.git] / src / storage / Database.cxx
blob6944e1b92d08ae37dd63d3863bca9b3b2a75f210
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
25 const char* Database::getName()
27 return metaData_->dbName_;
30 int Database::getDatabaseID()
32 return metaData_->dbID_;
35 long Database::getMaxSize()
37 return metaData_->maxSize_;
40 long Database::getCurrentSize()
42 return metaData_->curSize_;
45 Page* Database::getCurrentPage()
47 return metaData_->curPage_;
50 Page* Database::getFirstPage()
52 return metaData_->firstPage_;
55 int Database::getNoOfChunks()
57 return metaData_->noOfChunks_;
59 Chunk* Database::getHashIndexChunk()
61 return metaData_->hashIndexChunk_;
64 void Database::setDatabaseID(int id)
66 metaData_->dbID_ = id;
68 void Database::setName(const char *name)
70 strcpy(metaData_->dbName_ , name);
72 void Database::setCurrentSize(long size)
74 metaData_->curSize_ = size;
76 void Database::setCurrentPage(Page *page)
78 metaData_->curPage_ = page;
80 void Database::setFirstPage(Page *page)
82 metaData_->firstPage_ = page;
84 void Database::setMaxSize(long size)
86 metaData_->maxSize_ = size;
88 void Database::setNoOfChunks(int chunks)
90 metaData_->noOfChunks_ = chunks;
92 void Database::setHashIndexChunk(Chunk *ch)
94 metaData_->hashIndexChunk_ = ch;
98 int Database::initAllocDatabaseMutex()
100 return metaData_->dbAllocMutex_.init("allocdb");
102 DbRetVal Database::getAllocDatabaseMutex(bool procAccount)
104 int ret= metaData_->dbAllocMutex_.getLock(procSlot, procAccount);
105 if (ret) return ErrLockTimeOut; else return OK;
107 DbRetVal Database::releaseAllocDatabaseMutex(bool procAccount)
109 metaData_->dbAllocMutex_.releaseLock(procSlot, procAccount);
110 return OK;
115 int Database::initTransTableMutex()
117 return metaData_->dbTransTableMutex_.init("transtable");
119 DbRetVal Database::getTransTableMutex()
121 int ret = metaData_->dbTransTableMutex_.getLock(procSlot);
122 if (ret) return ErrLockTimeOut; else return OK;
124 DbRetVal Database::releaseTransTableMutex()
126 metaData_->dbTransTableMutex_.releaseLock(procSlot);
127 return OK;
132 int Database::initProcessTableMutex()
134 return metaData_->dbProcTableMutex_.init("proctable");
136 DbRetVal Database::getProcessTableMutex(bool procAccount)
138 int ret = metaData_->dbProcTableMutex_.getLock(procSlot, procAccount);
139 if (ret) return ErrLockTimeOut; else return OK;
141 DbRetVal Database::releaseProcessTableMutex(bool procAccount)
143 metaData_->dbProcTableMutex_.releaseLock(procSlot, procAccount);
144 return OK;
149 int Database::initDatabaseMutex()
151 return metaData_->dbMutex_.init("db");
153 DbRetVal Database::getDatabaseMutex(bool procAccount)
155 int ret = metaData_->dbMutex_.getLock(procSlot, procAccount);
156 if (ret) return ErrLockTimeOut; else return OK;
158 DbRetVal Database::releaseDatabaseMutex(bool procAccount)
160 metaData_->dbMutex_.releaseLock(procSlot, procAccount);
161 return OK;
164 // Gets the free page
165 // Each page is segmented by PAGE_SIZE, so it checks the pageInfo
166 // of each page to determine if the page is free
167 // Algorithm is to scan through the pageInfo objects stored at
168 // address (db start address + i * PAGE_SIZE) where i = 1..n till end
169 // database
170 // But in case of large tuples, pages are merged, so there wont be
171 // PageInfo object on pages which are merged.
172 // These pages are skipped by checking the nextPageAfterMerge_ of PageInfo
174 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
175 Page* Database::getFreePage()
177 //Page* page = getFirstPage();
178 Page* page = getCurrentPage();
179 //printDebug(DM_Alloc, "Database::getFreePage firstPage:%x",page);
180 printDebug(DM_Alloc, "Database::getFreePage currentpage:%x",page);
181 PageInfo* pageInfo = ((PageInfo*)page);
182 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
183 int pageSize = PAGE_SIZE;
184 bool isEndAddchk=false;
185 while( 1 == pageInfo->isUsed_)
187 //If any pages are merged to store data larger than PAGE_SIZE
188 //move to the next page after the merge and check whether it is used
189 if ( pageInfo->nextPageAfterMerge_ == NULL) {
190 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
191 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
193 else {
194 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
195 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
198 if((((char*) pageInfo) + pageSize) >= endAddr )
200 if(!isEndAddchk){ isEndAddchk=true; pageInfo=(PageInfo *)getFirstPage(); }
201 else
202 break;
204 if ((char*)pageInfo >= endAddr)
206 //printError(ErrSysInternal,"Invalid address %x",pageInfo);
207 return NULL;
211 if (!isValidAddress(((char*) pageInfo) + pageSize))
213 printError(ErrSysInternal, "Invalid address %x",((char*) pageInfo) + pageSize);
214 return NULL;
216 setCurrentPage((Page*) pageInfo);
217 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
218 return (Page*) pageInfo ;
221 //Used by tuples more than PAGE_SIZE
222 //NOTE::IMPORTANT::assumes alloc database lock is taken before calling this
223 Page* Database::getFreePage(size_t size)
225 Page* page = getFirstPage();
226 PageInfo* pageInfo = ((PageInfo*)page);
227 int multiple = size / PAGE_SIZE;
228 int offset = ((multiple + 1) * PAGE_SIZE);
229 printDebug(DM_Alloc, "Database::getFreePage firstPage:%x size:%ld",page, size);
230 char* endAddr = ((char*)getMetaDataPtr()) + getMaxSize();
231 int pageSize = PAGE_SIZE;
232 while(true){
233 while( 1 == pageInfo->isUsed_)
235 //If any pages are merged to store data larger than PAGE_SIZE
236 //move to the next page after the merge and check whether it is used
237 if ( pageInfo->nextPageAfterMerge_ == NULL) {
238 pageInfo = (PageInfo*)((char*)pageInfo + pageSize);
239 printDebug(DM_Alloc,"Normal Page:Moving to page:%x",pageInfo);
241 else {
242 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
243 printDebug(DM_Alloc,"Merged Page:Moving to page:%x",pageInfo);
246 int i = 0;
247 PageInfo *pInfo = pageInfo;
248 if ((((char*)pInfo) + offset) >= endAddr)
250 printError(ErrSysInternal,"Invalid address %x",((char*)pInfo) + offset);
251 return NULL;
253 for (i = 0; i< multiple + 1; i++)
255 if (1 == pInfo->isUsed_) break;
256 pInfo = (PageInfo*)((char*)pInfo + pageSize);
258 if ( i == (multiple + 1)) break;
259 pageInfo = (PageInfo*)((char*)pInfo + pageSize);
262 printDebug(DM_Alloc,"Database::getFreePage returning page:%x",pageInfo);
263 setCurrentPage((Page*) pageInfo);
264 return (Page*) pageInfo ;
267 void Database::printStatistics()
269 Page* page = getFirstPage();
270 PageInfo* pageInfo = ((PageInfo*)page);
271 int usedPageCount =0, usedMergedPageCount =0, totalPages=0;
272 printf("<DatabaseStatistics>\n");
273 printf(" <Database Name> %s </Database Name>\n", getName());
274 printf(" <Max Size> %ld </Max Size>\n", getMaxSize());
275 printf(" <First Page> %x </First Page>\n", getFirstPage());
276 while(isValidAddress((char*) pageInfo))
278 if (pageInfo == NULL) break;
279 if (1 == pageInfo->isUsed_) {
280 if ( pageInfo->nextPageAfterMerge_ == NULL) {
281 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
282 usedPageCount++; totalPages++;
283 printDebug(DM_Alloc, "Normal Page:Moving to page:%x\n",pageInfo);
284 continue;
286 else {
287 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
288 usedMergedPageCount++; totalPages++;
289 printDebug(DM_Alloc,"Merged Page:Moving to page:%x\n",pageInfo);
290 continue;
293 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
294 printDebug(DM_Alloc,"Normal Page not used:Moving to page:%x\n",pageInfo);
295 totalPages++;
297 printf(" <Total Pages> %d </Total Pages>\n", totalPages);
298 printf(" <Used Normal Pages> %d </Used Normal Pages>\n", usedPageCount);
299 printf(" <Used Merged Pages> %d </Used Merged Pages>\n", usedMergedPageCount);
300 printf(" <Chunks Used> %d </Chunks Used>\n", getNoOfChunks());
301 printf("</DatabaseStatistics>\n");
303 return ;
307 //called only in case of system database to create and initialize the chunk
308 //information
309 DbRetVal Database::createSystemDatabaseChunk(AllocType type, size_t size, int id)
312 Chunk *chunk;
313 if (-1 == id )
315 printError(ErrSysFatal, "Database ID corrupted");
316 return ErrSysFatal;
318 chunk = getSystemDatabaseChunk(id);
320 chunk->setChunkNameForSystemDB(id);
322 if (FixedSizeAllocator == type) chunk->setSize(size);
323 //getDatabaseMutex();
324 if (chunk->allocSize_ > PAGE_SIZE)
325 chunk->curPage_ = getFreePage(chunk->allocSize_);
326 else
327 chunk->curPage_ = getFreePage();
328 if ( chunk->curPage_ == NULL)
330 //releaseDatabaseMutex();
331 printError(ErrNoMemory, "No free pages in database: Database full");
332 return ErrNoMemory;
335 chunk->firstPage_ = chunk->curPage_;
336 PageInfo* firstPageInfo = ((PageInfo*)chunk->firstPage_);
337 firstPageInfo->setFirstPageAsUsed();
338 chunk->setChunkID(id);
339 chunk->setAllocType(type);
340 printDebug(DM_Database, "Creating System Database Chunk:%d Size:%d",id, chunk->allocSize_);
341 if (chunk->allocSize_ > PAGE_SIZE)
343 int multiple = os::floor(chunk->allocSize_ / PAGE_SIZE);
344 int offset = ((multiple + 1) * PAGE_SIZE);
345 firstPageInfo->nextPageAfterMerge_ = ((char*)firstPageInfo)+ offset;
348 if (0 == size)
350 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
351 varInfo->isUsed_ = 0;
352 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
355 incrementChunk();
356 //releaseDatabaseMutex();
357 return OK;
360 //This is never called currently. If situation arises will be coded later.
361 DbRetVal Database::deleteSystemDatabaseChunk(int id)
364 Chunk *chunk = getSystemDatabaseChunk(id);
365 chunk->setChunkID(-1);
366 chunk->setSize(0);
367 chunk->setAllocType(UnknownAllocator);
368 //TODO::
369 //chunk->pageList_
370 //walk though the pageList ptr and get all the page pointers
371 //then free all the pages used to store this by setting the
372 //start of page to notused
373 chunk->firstPage_ = NULL;
374 chunk->curPage_ = NULL;
375 decrementChunk();
376 return OK;
380 void Database::createAllCatalogTables()
382 //These are special chunks which hold catalog tables and other information
384 // chunk id 0 ->userChunkTable
385 // chunk id 1 ->lockBucketHash
386 // chunk id 2 ->lockTable
388 // chunk id 10->DATABASE
389 // chunk id 11->USER
390 // chunk id 12->TABLE
391 // chunk id 13->FIELD
392 // chunk id 14->ACCESS
394 createSystemTables();
395 createMetaDataTables();
397 void Database::createSystemTables()
399 createSystemDatabaseChunk(FixedSizeAllocator,
400 sizeof(Chunk), UserChunkTableId);
401 createSystemDatabaseChunk(FixedSizeAllocator,
402 sizeof(Bucket) * LOCK_BUCKET_SIZE,
403 LockTableHashBucketId);
404 createSystemDatabaseChunk(FixedSizeAllocator,
405 sizeof(Mutex)* LOCK_BUCKET_SIZE,
406 LockTableMutexId);
407 createSystemDatabaseChunk(FixedSizeAllocator,
408 sizeof(LockHashNode), LockTableId);
409 createSystemDatabaseChunk(FixedSizeAllocator,
410 sizeof(TransHasNode), TransHasTableId);
412 createSystemDatabaseChunk(VariableSizeAllocator,
413 0, UndoLogTableID);
415 void Database::createMetaDataTables()
417 createSystemDatabaseChunk(FixedSizeAllocator,
418 sizeof(CDATABASEFILE), DatabaseTableId);
419 createSystemDatabaseChunk(FixedSizeAllocator,
420 sizeof(CUSER), UserTableId);
421 createSystemDatabaseChunk(FixedSizeAllocator,
422 sizeof(CTABLE), TableTableId);
423 createSystemDatabaseChunk(FixedSizeAllocator,
424 sizeof(CFIELD), FieldTableId);
425 createSystemDatabaseChunk(FixedSizeAllocator,
426 sizeof(CACCESS), AccessTableId);
427 createSystemDatabaseChunk(FixedSizeAllocator,
428 sizeof(CINDEX), IndexTableId);
429 createSystemDatabaseChunk(FixedSizeAllocator,
430 sizeof(CINDEXFIELD), IndexFieldTableId);
433 //used in case of system database
434 Chunk* Database::getSystemDatabaseChunk(int id)
436 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
437 id * sizeof (Chunk);
438 return (Chunk*)(((char*) metaData_) + offset);
442 //used in case of system database
443 Transaction* Database::getSystemDatabaseTrans(int slot)
445 size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
446 os::alignLong(MAX_CHUNKS * sizeof (Chunk)) +
447 slot * sizeof (Transaction);
448 return (Transaction*)(((char*) metaData_) + offset);
451 //used in case of system database
452 ThreadInfo* Database::getThreadInfo(int slot)
454 /* size_t offset = os::alignLong(sizeof (DatabaseMetaData));
455 offset = offset + os::alignLong( MAX_CHUNKS * sizeof (Chunk));
456 offset = offset + os::alignLong( Conf::config.getMaxProcs() * sizeof(Transaction));
457 offset = offset + slot * sizeof (ThreadInfo);
458 return (ThreadInfo*)(((char*) metaData_) + offset);
461 static size_t offset = os::alignLong(sizeof (DatabaseMetaData)) +
462 os::alignLong( MAX_CHUNKS * sizeof (Chunk)) +
463 os::alignLong( Conf::config.getMaxProcs()*sizeof(Transaction));
465 size_t off = offset + slot * sizeof (ThreadInfo);
466 return (ThreadInfo*)(((char*) metaData_) + off);
470 bool Database::isValidAddress(void* addr)
472 if ((char*) addr >= ((char*)getMetaDataPtr()) + getMaxSize())
473 return false;
474 else
475 return true;
478 //should be called only on system database
479 void* Database::allocLockHashBuckets()
481 Chunk *chunk = getSystemDatabaseChunk(LockTableHashBucketId);
482 void *ptr = chunk->allocate(this);
483 if (NULL == ptr)
485 printError(ErrNoMemory, "Chunk Allocation failed for lock hash bucket catalog table");
487 return ptr;
490 Bucket* Database::getLockHashBuckets()
492 Chunk *tChunk = getSystemDatabaseChunk(LockTableHashBucketId);
493 ChunkIterator iter = tChunk->getIterator();
494 return (Bucket*)iter.nextElement();
496 void Database::setUniqueChunkID(int id)
498 (metaData_->chunkUniqueID_).setID(id);
501 int Database::getUniqueIDForChunk()
503 return ((metaData_->chunkUniqueID_).getID());
506 DbRetVal Database::recoverMutex(Mutex *mut)
508 //TODO: operations need to be undone before recovering the mutex.
509 mut->recoverMutex();