Renamed server directory to storage directory in src
[csql.git] / src / storage / Chunk.cxx
blob109836235802990e8de9a4ab8f67d38ed1d2ba04
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
23 // sets the size of the Chunk allocator for fixed size
24 // allocator
25 // we need one integer to store book keeping information
26 // whether the storage allocation unit is used of not
27 // when it is deleted this flag is only set to unused
28 void Chunk::setSize(size_t size)
31 size_t needSize = size + sizeof(int);
32 size_t multiple = (size_t) os::floor(needSize / sizeof(size_t));
33 size_t rem = needSize % sizeof(size_t);
34 if (0 == rem)
35 allocSize_ = needSize;
36 else
37 allocSize_ = (multiple + 1) * sizeof(size_t);
40 void* Chunk::allocateForLargeDataSize(Database *db)
42 PageInfo* pageInfo = ((PageInfo*)curPage_);
43 DbRetVal ret = db->getAllocDatabaseMutex();
44 if (ret != 0)
46 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
47 return NULL;
50 //check whether we have space in curPage
51 if (pageInfo->hasFreeSpace_ == 1)
53 char *data = ((char*)curPage_) + sizeof(PageInfo);
54 pageInfo->hasFreeSpace_ =0;
55 *((int*)data) = 1;
56 db->releaseAllocDatabaseMutex();
57 return data + sizeof(int);
61 //no space in curpage , get new page from database
62 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
63 if (NULL == pageInfo)
65 db->releaseAllocDatabaseMutex();
66 printError(ErrNoMemory,"No more free pages in the database");
67 return NULL;
69 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
70 chunkID_, pageInfo);
71 int multiple = os::floor(allocSize_ / PAGE_SIZE);
72 int offset = ((multiple + 1) * PAGE_SIZE);
74 pageInfo->setPageAsUsed(offset);
76 //create the link
77 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
78 //Make this as current page
79 curPage_ = (Page*) pageInfo;
80 char* data = ((char*)curPage_) + sizeof(PageInfo);
81 //TODO::check whether it is locked
82 *((int*)data) = 1;
83 db->releaseAllocDatabaseMutex();
84 return data + sizeof(int);
88 void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes)
90 PageInfo *pageIter = ((PageInfo*)firstPage_);
91 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
92 chunkID_);
93 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
94 firstPage_);
95 char *data = NULL;
96 int i = 0;
97 //scan from first page to locate a free node available
98 while(NULL != pageIter)
100 data = ((char*)pageIter) + sizeof(PageInfo);
101 if (pageIter->hasFreeSpace_ == 1)
103 for (i = 0; i< noOfDataNodes -1; i++)
105 if (1 == *((int*)data))
106 data = data + allocSize_;
107 else break;
109 if (i != noOfDataNodes -1) break;
111 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
112 chunkID_, pageIter);
113 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
115 if (NULL == pageIter) return NULL;
116 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
117 chunkID_, pageIter);
118 *((int*)data) = 1;
119 return data + sizeof(int);
123 void* Chunk::allocateFromNewPage(Database *db)
125 DbRetVal ret = db->getAllocDatabaseMutex();
126 if (ret != 0)
128 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
129 return NULL;
131 //get a new page from db
132 Page *page = db->getFreePage();
133 if (page == NULL)
135 db->releaseAllocDatabaseMutex();
136 return NULL;
138 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
139 chunkID_, page);
140 //Initialize pageInfo for this new page
141 PageInfo *pInfo = (PageInfo*)page;
142 pInfo->setPageAsUsed(0);
144 //create the link between old page and the newly created page
145 PageInfo* pageInfo = ((PageInfo*)curPage_);
146 pageInfo->nextPage_ = page;
148 //make this new page as the current page
149 curPage_ = page;
151 char* data = ((char*)page) + sizeof(PageInfo);
152 *((int*)data) = 1;
153 db->releaseAllocDatabaseMutex();
154 return data + sizeof(int);
157 //Allocates memory to store data
158 //TODO::check whether it is locked before allocating.
159 //delete tuple will set the usedflag to true, but locks will be held
160 //till commit and it shall be rolledback.So make sure that it does not
161 //allocate deleted tuple which is yet to be commited.
163 void* Chunk::allocate(Database *db, DbRetVal *status)
165 PageInfo* pageInfo = ((PageInfo*)curPage_);
167 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
168 char *data = ((char*)curPage_) + sizeof(PageInfo);
169 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
170 chunkID_, curPage_, noOfDataNodes);
172 //1.scan through data list and find if any is free to use in current page
173 //2.If there is none then
174 // a) get new free page from db. set the prev->next to point
175 // to this new page
176 //4. b) initialize the free page to zero and get first data ptr.
177 //5.If there is one, return that
179 //For allocation more than PAGE_SIZE
180 if (0 == noOfDataNodes)
182 data = (char*) allocateForLargeDataSize(db);
183 return data;
186 int ret = getChunkMutex(db->procSlot);
187 if (ret != 0)
189 if (status != NULL) *status = ErrLockTimeOut;
190 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
191 return NULL;
193 int i = noOfDataNodes;
194 if (pageInfo->hasFreeSpace_ == 1)
196 for (i = 1; i< noOfDataNodes; i++)
198 if (*((int*)data) == 1) data = data + allocSize_;
199 else break;
203 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
204 chunkID_, i);
205 //It comes here if the pageInfo->hasFreeSpace ==0
206 //or there are no free data space in this page
207 if (i == noOfDataNodes && *((int*)data) == 1)
210 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
211 //there are no free data space in this page
212 pageInfo->hasFreeSpace_ = 0;
213 if (chunkID_ == LockTableId || chunkID_ == TransHasTableId)
215 data = (char*) allocateFromFirstPage(db, noOfDataNodes);
216 if (NULL == data)
218 data = (char*) allocateFromNewPage(db);
219 if (data == NULL)
221 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
222 if (status != NULL) *status = ErrNoMemory;
226 else
228 data = (char*) allocateFromNewPage(db);
229 if (NULL == data)
231 data = (char*) allocateFromFirstPage(db, noOfDataNodes);
232 if (data == NULL)
234 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
235 if (status != NULL) *status = ErrNoMemory;
239 releaseChunkMutex(db->procSlot);
240 return data;
242 *((int*)data) = 1;
243 releaseChunkMutex(db->procSlot);
244 return data + sizeof(int);
248 void* Chunk::allocateForLargeDataSize(Database *db, size_t size)
250 //no need to take chunk mutexes for this, as we are taking alloc database mutex
251 int multiple = os::floor(size / PAGE_SIZE);
252 int offset = ((multiple + 1) * PAGE_SIZE);
253 PageInfo* pageInfo = ((PageInfo*)curPage_);
254 DbRetVal ret = db->getAllocDatabaseMutex();
255 if (ret != 0)
257 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
258 return NULL;
260 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
261 if (NULL == pageInfo)
263 db->releaseAllocDatabaseMutex();
264 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
265 return NULL;
267 printDebug(DM_VarAlloc,"Chunk::allocate Large Data Item id:%d Size:%d curPage:%x ",
268 chunkID_, size, curPage_);
269 //TODO:: logic pending
272 //REDESIGN MAY BE REQUIRED:Lets us live with this for now.
273 //what happens to the space lets say 10000 bytes is allocated
274 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
275 //in this case.So need to take of this.
276 //Will be coded at later stage as this is developed to support
277 //undo logging and currently we shall assume that the logs generated
278 //wont be greater than PAGE_SIZE.
279 db->releaseAllocDatabaseMutex();
280 return NULL;
286 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size)
288 //Should be called only for data items <PAGE_SIZE
289 DbRetVal ret = db->getAllocDatabaseMutex();
290 if (ret != 0)
292 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
293 return NULL;
296 void *vnode = varSizeFirstFitAllocate(size);
297 if (vnode != NULL)
299 db->releaseAllocDatabaseMutex();
300 return vnode;
303 Page *newPage = db->getFreePage();
304 db->releaseAllocDatabaseMutex();
305 if (NULL == newPage)
307 return NULL;
310 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
311 PageInfo *pInfo = (PageInfo*) newPage;
312 pInfo->setPageAsUsed(0);
313 createDataBucket(newPage, PAGE_SIZE, size);
315 ((PageInfo*)curPage_)->nextPage_ = newPage;
316 curPage_ = newPage;
317 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
318 return data;
321 //Allocates from the current page of the chunk.
322 //Scans through the VarSizeInfo objects in the page and gets the free slot
323 void* Chunk::allocateFromCurPageForVarSize(size_t size)
325 //Should be called only for data items <PAGE_SIZE
326 Page *page = ((PageInfo*)curPage_);
327 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
328 chunkID_, size, curPage_);
329 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
330 sizeof(PageInfo));
331 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
333 if (0 == varInfo->isUsed_)
335 if( size + sizeof(VarSizeInfo) < varInfo->size_)
337 splitDataBucket(varInfo, size);
338 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
339 chunkID_, size, varInfo);
340 return (char*)varInfo + sizeof(VarSizeInfo);
342 else if (size == varInfo->size_) {
343 varInfo->isUsed_ = 1;
344 return (char *) varInfo + sizeof(VarSizeInfo);
348 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
349 +varInfo->size_);
351 return NULL;
354 //Allocates memory to store data of variable size
355 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
357 if (0 == size) return NULL;
358 //check if the size is more than PAGE_SIZE
359 //if it is more than the PAGE_SIZE, then allocate new
360 //page using database and then link the curPage to the
361 //newly allocated page
362 //if it is less than PAGE_SIZE, then check the curpage for
363 //free memory of specified size
364 //if not available, then scan from the firstPage for the free
365 //space
367 //TODO::During the scan, merge nearby nodes if both are free
368 //if not available then allocate new page
370 size_t alignedSize = os::align(size);
371 void *data = NULL;
372 int ret = getChunkMutex(db->procSlot);
373 if (ret != 0)
375 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
376 *status = ErrLockTimeOut;
377 return NULL;
379 if (alignedSize > PAGE_SIZE)
381 data = allocateForLargeDataSize(db, alignedSize);
383 else
385 data = allocateFromCurPageForVarSize(alignedSize);
386 if (NULL == data) {
387 //No available spaces in the current page.
388 //allocate new page
389 data= allocFromNewPageForVarSize(db, alignedSize);
390 if (NULL == data) {
391 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
392 if (status != NULL) *status = ErrNoMemory;
396 releaseChunkMutex(db->procSlot);
397 return data;
400 //Assumes chunk mutex is already taken, before calling this
401 void* Chunk::varSizeFirstFitAllocate(size_t size)
403 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
404 size, firstPage_);
406 Page *page = ((PageInfo*)firstPage_);
407 size_t alignedSize = os::align(size);
408 while(NULL != page)
410 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
411 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
413 if (0 == varInfo->isUsed_)
415 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
417 splitDataBucket(varInfo, alignedSize);
418 return ((char*)varInfo) + sizeof(VarSizeInfo);
420 else if (alignedSize == varInfo->size_) {
421 varInfo->isUsed_ = 1;
422 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
423 return ((char *) varInfo) + sizeof(VarSizeInfo);
426 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
427 +varInfo->size_);
429 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
430 page = ((PageInfo*) page)->nextPage_;
432 return NULL;
435 void Chunk::freeForVarSizeAllocator(void *ptr, int pslot)
437 int ret = getChunkMutex(pslot);
438 if (ret != 0)
440 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
441 return;
443 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
444 varInfo->isUsed_ = 0;
445 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
446 releaseChunkMutex(pslot);
447 return;
451 void Chunk::freeForLargeAllocator(void *ptr, int pslot)
453 //There will be max only one data element in a page.
454 //PageInfo is stored just before the data.
455 int ret = getChunkMutex(pslot);
456 if (ret != 0)
458 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
459 return;
461 PageInfo *pageInfo = (PageInfo*)(((char*)
462 ptr) - (sizeof(PageInfo) + sizeof(int)));
463 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
464 bool found = false;
465 while(!found)
467 if (pInfo == pageInfo) {found = true; break; }
468 prev = pInfo;
469 pInfo = (PageInfo*)pInfo->nextPage_;
471 if (!found)
473 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
474 releaseChunkMutex(pslot);
475 return ;
477 prev->nextPage_ = pageInfo->nextPage_;
478 pageInfo->nextPageAfterMerge_ = NULL;
479 pageInfo->isUsed_ = 0;
480 os::memset(pageInfo, 0 , allocSize_);
481 pageInfo->hasFreeSpace_ = 1;
482 releaseChunkMutex(pslot);
483 return;
486 //Frees the memory pointed by ptr
487 void Chunk::free(Database *db, void *ptr)
489 if (0 == allocSize_)
491 freeForVarSizeAllocator(ptr, db->procSlot);
492 return;
494 int noOfDataNodes =os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
496 if (0 == noOfDataNodes)
498 freeForLargeAllocator(ptr, db->procSlot);
499 return;
501 int ret = getChunkMutex(db->procSlot);
502 if (ret != 0)
504 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
505 return;
507 //below is the code for freeing in fixed size allocator
509 //unset the used flag
510 *((int*)ptr -1 ) = 0;
511 PageInfo *pageInfo;
512 pageInfo = getPageInfo(db, ptr);
513 if (NULL == pageInfo)
515 printError(ErrSysFatal,"Probable Data corruption: pageInfo is NULL", pageInfo );
516 releaseChunkMutex(db->procSlot);
517 return;
519 //set the pageinfo where this ptr points
520 pageInfo->hasFreeSpace_ = 1;
521 releaseChunkMutex(db->procSlot);
522 return;
525 //returns the pageInfo of the page where this ptr points
526 //This works only if the data size is less than PAGE_SIZE
527 //If ptr points to data which is more than PAGE_SIZE,then
528 //calling this might lead to memory corruption
529 //Note:IMPORTANT::assumes db lock is taken before calling this
530 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
532 if (allocSize_ < PAGE_SIZE - sizeof(PageInfo)) {
533 int rem = (long) ptr % 8192;
534 return (PageInfo*)(((char*)ptr) - rem);
535 } else {
536 //large size allocator
537 char *inPtr = (char*)ptr;
538 PageInfo* pageInfo = ((PageInfo*)firstPage_);
540 while( pageInfo != NULL )
542 if (inPtr > (char*) pageInfo && pageInfo->nextPageAfterMerge_ >inPtr)
543 return pageInfo;
544 pageInfo = (PageInfo*)pageInfo->nextPage_ ;
547 return NULL;
550 //If called on chunk used to store tuples, it returns the total number of rows
551 //present in the table
552 long Chunk::getTotalDataNodes()
554 long totalNodes =0;
555 if (0 == allocSize_) //->variable size allocator
557 Page *page = ((PageInfo*)firstPage_);
558 while(NULL != page)
560 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
561 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
563 if (1 == varInfo->isUsed_) totalNodes++;
564 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
565 +varInfo->size_);
567 page = ((PageInfo*) page)->nextPage_;
569 return totalNodes;
572 //TODO::for large size allocator
573 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
574 return 0;
576 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
577 PageInfo* pageInfo = ((PageInfo*)firstPage_);
578 char *data = ((char*)firstPage_) + sizeof(PageInfo);
579 int i=0;
580 while( pageInfo != NULL )
582 data = ((char*)pageInfo) + sizeof(PageInfo);
583 for (i = 0; i< noOfDataNodes; i++)
585 if (*((int*)data) == 1) { totalNodes++;}
586 data = data + allocSize_;
588 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
590 return totalNodes;
593 //TODO::for other type of allocators
594 int Chunk::compact()
596 PageInfo* pageInfo = ((PageInfo*)firstPage_);
597 PageInfo* prevPage = pageInfo;
598 if (NULL == pageInfo)
600 return 0;
602 pageInfo = (PageInfo*)pageInfo->nextPage_;
603 if (0 == allocSize_)
605 while( pageInfo != NULL )
607 bool flag = false;
608 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
609 sizeof(PageInfo));
610 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
612 if (1 == varInfo->isUsed_) {flag=true; break;}
613 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
614 +varInfo->size_);
616 if (!flag) {
617 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
618 prevPage->nextPage_ = pageInfo->nextPage_;
619 pageInfo->isUsed_ = 0;
621 prevPage = pageInfo;
622 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
623 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
625 }else if (allocSize_ < PAGE_SIZE)
627 while( pageInfo != NULL )
629 bool flag = false;
630 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
631 char *data = ((char*)pageInfo) + sizeof(PageInfo);
632 for (int i = 0; i< noOfDataNodes -1; i++)
634 if (1 == *((int*)data)) { flag = true; break; }
635 data = data +allocSize_;
637 if (!flag) {
638 printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo);
639 prevPage->nextPage_ = pageInfo->nextPage_;
640 pageInfo->isUsed_ = 0;
642 prevPage = pageInfo;
643 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
644 printDebug(DM_Alloc,"compact iter %x\n", pageInfo);
647 return 0;
650 int Chunk::totalPages()
652 //logic is same for variable size and for large data node allocator.
653 PageInfo* pageInfo = ((PageInfo*)firstPage_);
654 int totPages=0;
655 while( pageInfo != NULL )
657 totPages++;
658 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
660 return totPages;
663 int Chunk::initMutex()
665 return chunkMutex_.init("Chunk");
667 int Chunk::getChunkMutex(int procSlot)
669 return chunkMutex_.getLock(procSlot);
671 int Chunk::releaseChunkMutex(int procSlot)
673 return chunkMutex_.releaseLock(procSlot);
675 int Chunk::destroyMutex()
677 return chunkMutex_.destroy();
679 void Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize)
681 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
682 varInfo->isUsed_ = 1;
683 varInfo->size_ = needSize;
684 varInfo = (VarSizeInfo*)((char*)varInfo +
685 sizeof(VarSizeInfo) + varInfo->size_);
686 varInfo->isUsed_ = 0;
687 varInfo->size_ = remSpace;
688 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
689 return;
693 void Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize)
695 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
696 varInfo->isUsed_ = 0;
697 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
698 splitDataBucket(varInfo, needSize);
699 return;