changing long to int as it affects 64 bit linux tests
[csql.git] / src / storage / Chunk.cxx
blobc4110482fc16597e11d4db715fcf5404f445d6ec
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
23 // sets the size of the Chunk allocator for fixed size
24 // allocator
25 // we need one integer to store book keeping information
26 // whether the storage allocation unit is used of not
27 // when it is deleted this flag is only set to unused
28 void Chunk::setSize(size_t size)
31 size_t needSize = size + sizeof(int);
32 size_t multiple = (size_t) os::floor(needSize / sizeof(size_t));
33 size_t rem = needSize % sizeof(size_t);
34 if (0 == rem)
35 allocSize_ = needSize;
36 else
37 allocSize_ = (multiple + 1) * sizeof(size_t);
40 void* Chunk::allocateForLargeDataSize(Database *db)
42 PageInfo* pageInfo = ((PageInfo*)curPage_);
43 DbRetVal ret = db->getAllocDatabaseMutex();
44 if (ret != 0)
46 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
47 return NULL;
50 //check whether we have space in curPage
51 if (pageInfo->hasFreeSpace_ == 1)
53 char *data = ((char*)curPage_) + sizeof(PageInfo);
54 //pageInfo->hasFreeSpace_ =0;
55 int retVal = Mutex::CAS(&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 0);
56 if (retVal !=0) printError(ErrSysFatal, "Unable to set hashFreeSpace flag");
57 *((InUse*)data) = 1;
58 //Mutex::CAS((InUse*)data , 0, 1);
59 db->releaseAllocDatabaseMutex();
60 return data + sizeof(InUse);
63 //no space in curpage , get new page from database
64 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
65 if (NULL == pageInfo)
67 db->releaseAllocDatabaseMutex();
68 printError(ErrNoMemory,"No more free pages in the database");
69 return NULL;
71 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
72 chunkID_, pageInfo);
73 int multiple = (int) os::floor(allocSize_ / PAGE_SIZE);
74 int offset = ((multiple + 1) * PAGE_SIZE);
76 pageInfo->setPageAsUsed(offset);
78 //create the link
79 //((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
80 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
81 (long)(((PageInfo*)curPage_)->nextPage_), (long)pageInfo);
82 if(retVal !=0) {
83 printError(ErrLockTimeOut, "Fatal: Unable to create page link.");
86 //Make this as current page
87 //curPage_ = (Page*) pageInfo;
88 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)pageInfo);
89 if(retVal !=0) {
90 printError(ErrLockTimeOut, "Fatal:Unable to set current page");
93 char* data = ((char*)curPage_) + sizeof(PageInfo);
94 retVal = Mutex::CAS(&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 0);
95 if(retVal !=0) {
96 printError(ErrLockTimeOut, "Fatal:Unable to set hasFreeSpace");
98 *((InUse*)data) = 1;
99 //Mutex::CAS((InUse*)data , 0, 1);
100 db->releaseAllocDatabaseMutex();
101 return data + sizeof(InUse);
105 void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *status)
107 PageInfo *pageIter = ((PageInfo*)firstPage_);
108 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
109 chunkID_);
110 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
111 firstPage_);
112 char *data = NULL;
113 int i = 0;
114 //scan from first page to locate a free node available
115 while(NULL != pageIter)
117 data = ((char*)pageIter) + sizeof(PageInfo);
118 if (pageIter->hasFreeSpace_ == 1)
120 for (i = 0; i< noOfDataNodes ; i++)
122 if (1 == *((InUse*)data))
123 data = data + allocSize_;
124 else break;
126 if (i != noOfDataNodes) break;
128 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
129 chunkID_, pageIter);
130 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
132 if (NULL == pageIter) {
133 *status = ErrNoMemory;
134 return NULL;
136 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
137 chunkID_, pageIter);
138 //*((InUse*)data) = 1;
139 int ret = Mutex::CAS((InUse*)data , 0, 1);
140 if(ret !=0) {
141 *status = ErrLockTimeOut;
142 //printError(ErrLockTimeOut, "Unable to allocate from first page. Retry...");
143 return NULL;
145 return data + sizeof(InUse);
148 void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status)
150 DbRetVal ret = db->getAllocDatabaseMutex();
151 if (ret != 0)
153 printDebug(DM_Warning,"Unable to acquire alloc database Mutex Chunkid:%d", chunkID_);
154 *status = ErrLockTimeOut;
155 return NULL;
157 //get a new page from db
158 Page *page = db->getFreePage();
159 if (page == NULL) {
160 printError(ErrNoMemory, "Unable to allocate page");
161 db->releaseAllocDatabaseMutex();
162 *status = ErrNoMemory;
163 return NULL;
165 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
166 chunkID_, page);
167 //Initialize pageInfo for this new page
168 PageInfo *pInfo = (PageInfo*)page;
169 pInfo->setPageAsUsed(0);
171 char* data = ((char*)page) + sizeof(PageInfo);
172 *((InUse*)data) = 1;
173 //Mutex::CAS((int*)data , 0, 1);
175 //create the link between old page and the newly created page
176 PageInfo* pageInfo = ((PageInfo*)curPage_);
178 long oldPage = (long)pageInfo->nextPage_;
179 //pageInfo->nextPage_ = page;
180 int retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)page);
181 if(retVal !=0) {
182 *((InUse*)data) = 0;
183 pInfo->setPageAsFree();
184 printDebug(DM_Warning, "Unable to get lock to set chunk list.");
185 *status = ErrLockTimeOut;
186 return NULL;
189 //make this new page as the current page
190 //curPage_ = page;
191 retVal = Mutex::CASL((long*)&curPage_ , (long)curPage_, (long)page);
192 if(retVal !=0) {
193 *((InUse*)data) = 0;
194 pInfo->setPageAsFree();
195 retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)oldPage);
196 if (retVal !=0) printError(ErrSysFatal, "Fatal: Unable to reset the nextPage");
197 printDebug(DM_Warning, "Unable to get lock to set curPage");
198 *status = ErrLockTimeOut;
199 return NULL;
202 db->releaseAllocDatabaseMutex();
203 return data + sizeof(InUse);
206 //Allocates memory to store data
207 //TODO::check whether it is locked before allocating.
208 //delete tuple will set the usedflag to true, but locks will be held
209 //till commit and it shall be rolledback.So make sure that it does not
210 //allocate deleted tuple which is yet to be commited.
212 void* Chunk::allocate(Database *db, DbRetVal *status)
214 PageInfo* pageInfo = ((PageInfo*)curPage_);
216 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
217 char *data = ((char*)curPage_) + sizeof(PageInfo);
218 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
219 chunkID_, curPage_, noOfDataNodes);
221 //1.scan through data list and find if any is free to use in current page
222 //2.If there is none then
223 // a) get new free page from db. set the prev->next to point
224 // to this new page
225 //4. b) initialize the free page to zero and get first data ptr.
226 //5.If there is one, return that
228 //For allocation more than PAGE_SIZE
229 if (0 == noOfDataNodes)
231 data = (char*) allocateForLargeDataSize(db);
232 return data;
235 /*int ret = getChunkMutex(db->procSlot);
236 if (ret != 0)
238 if (status != NULL) *status = ErrLockTimeOut;
239 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
240 return NULL;
242 int i = noOfDataNodes;
243 if (pageInfo->hasFreeSpace_ == 1)
245 for (i = 1; i< noOfDataNodes; i++)
247 if (*((InUse*)data) == 1) data = data + allocSize_;
248 else break;
252 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
253 chunkID_, i);
254 //It comes here if the pageInfo->hasFreeSpace ==0
255 //or there are no free data space in this page
256 if (i == noOfDataNodes && *((InUse*)data) == 1)
259 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
260 //there are no free data space in this page
261 //pageInfo->hasFreeSpace_ = 0;
262 int ret = Mutex::CAS(&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 0);
263 if(ret !=0) {
264 *status = ErrLockTimeOut;
265 printDebug(DM_Warning, "Unable to set hasFreespace");
266 return NULL;
268 //if (chunkID_ == LockTableId || chunkID_ == TransHasTableId)
270 *status = OK;
271 data = (char*) allocateFromFirstPage(db, noOfDataNodes, status);
272 if (NULL == data && *status != ErrLockTimeOut)
274 *status = OK;
275 data = (char*) allocateFromNewPage(db, status);
276 if (data == NULL && *status != ErrLockTimeOut)
278 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
279 *status = ErrNoMemory;
283 /*else
285 data = (char*) allocateFromNewPage(db, status);
286 if (NULL == data && *status != ErrLockTimeOut)
288 data = (char*) allocateFromFirstPage(db, noOfDataNodes);
289 if (data == NULL)
291 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
292 *status = ErrNoMemory;
296 //releaseChunkMutex(db->procSlot);
297 return data;
299 //*((InUse*)data) = 1;
300 int ret = Mutex::CAS((InUse*)data , 0, 1);
301 if(ret !=0) {
302 *status = ErrLockTimeOut;
303 printDebug(DM_Warning, "Unable to set isUsed : retry...");
304 return NULL;
306 //releaseChunkMutex(db->procSlot);
307 return data + sizeof(InUse);
311 void* Chunk::allocateForLargeDataSize(Database *db, size_t size)
313 //no need to take chunk mutexes for this, as we are taking alloc database mutex
314 int multiple = (int) os::floor(size / PAGE_SIZE);
315 int offset = ((multiple + 1) * PAGE_SIZE);
316 PageInfo* pageInfo = ((PageInfo*)curPage_);
317 if(0==allocSize_)
318 pageInfo = (PageInfo*)db->getFreePage(size);
319 else
320 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
321 if (NULL == pageInfo)
323 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
324 return NULL;
326 printDebug(DM_VarAlloc,"Chunk::alnextPageAfterMerge_locate Large Data Item id:%d Size:%d curPage:%x ",
327 chunkID_, size, curPage_);
328 //TODO:: logic pending
329 if(allocSize_!=0){
330 //large size allocate for FixedSize data
331 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
332 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
333 curPage_ = (Page*) pageInfo;
334 char* data = ((char*)curPage_) + sizeof(PageInfo);
335 //*((InUse*)data) = 1;
336 int ret = Mutex::CAS((InUse*)data , 0, 1);
337 if(ret !=0) {
338 printError(ErrLockTimeOut, "Lock Timeout: retry...");
339 return NULL;
341 pageInfo->isUsed_=1;
342 ret = Mutex::CAS(&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 0);
343 if (ret !=0) printError(ErrSysFatal, "Unable to set hashFreeSpace");
344 return data + sizeof(InUse);
345 }else{
346 //large size allocate for varSize data
347 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + sizeof(PageInfo));
348 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
349 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
350 curPage_ = (Page*) pageInfo;
351 varInfo->size_= size;
352 int ret = Mutex::CAS(&varInfo->isUsed_ , varInfo->isUsed_, 1);
353 if(ret !=0) {
354 printError(ErrLockTimeOut, "Unable to get lock for var alloc. Retry...");
355 return NULL;
357 pageInfo->isUsed_=1;
358 return (char *) varInfo + sizeof(VarSizeInfo);
360 //REDESIGN MAY BE REQUIRED:Lets us live with this for now.
361 //what happens to the space lets say 10000 bytes is allocated
362 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
363 //in this case.So need to take of this.
364 //Will be coded at later stage as this is developed to support
365 //undo logging and currently we shall assume that the logs generated
366 //wont be greater than PAGE_SIZE.
367 return NULL;
373 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, DbRetVal *rv)
375 //Should be called only for data items <PAGE_SIZE
376 void *vnode = varSizeFirstFitAllocate(size, pslot, rv);
377 if (vnode != NULL)
379 return vnode;
381 printDebug(DM_Warning, "No free space in any of the pages already being used");
382 *rv =OK;
383 DbRetVal ret = db->getAllocDatabaseMutex();
384 if (ret != 0)
386 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
387 return NULL;
389 Page *newPage = db->getFreePage();
390 if (NULL == newPage)
392 db->releaseAllocDatabaseMutex();
393 return NULL;
396 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
397 PageInfo *pInfo = (PageInfo*) newPage;
398 pInfo->setPageAsUsed(0);
399 if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot))
401 printError(ErrSysFatal, "Split failed in new page...Should never happen");
402 *rv = ErrSysFatal;
403 db->releaseAllocDatabaseMutex();
404 return NULL;
406 long oldPage = (long)((PageInfo*)curPage_)->nextPage_;
407 //((PageInfo*)curPage_)->nextPage_ = newPage;
408 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
409 (long)(((PageInfo*)curPage_)->nextPage_), (long)newPage);
410 if(retVal !=0) {
411 printError(ErrSysFatal, "Unable to get lock to set chunk next page");
412 pInfo->setPageAsFree();
413 db->releaseAllocDatabaseMutex();
414 *rv = ErrSysFatal;
415 return NULL;
417 //curPage_ = newPage;
418 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)newPage);
419 if(retVal !=0) {
420 printError(ErrSysFatal, "Unable to get lock to set curPage");
421 retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
422 (long)(((PageInfo*)curPage_)->nextPage_), (long)oldPage);
423 if (retVal !=0 ) printError(ErrSysFatal, "Unable to reset curPage");
424 pInfo->setPageAsFree();
425 db->releaseAllocDatabaseMutex();
426 *rv = ErrSysFatal;
427 return NULL;
429 db->releaseAllocDatabaseMutex();
430 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
431 return data;
434 //Allocates from the current page of the chunk.
435 //Scans through the VarSizeInfo objects in the page and gets the free slot
436 void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv)
438 //Should be called only for data items <PAGE_SIZE
439 Page *page = ((PageInfo*)curPage_);
440 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
441 chunkID_, size, curPage_);
442 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
443 sizeof(PageInfo));
444 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
445 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
447 if (0 == varInfo->isUsed_)
449 if( size + sizeof(VarSizeInfo) < varInfo->size_)
451 if (1 == splitDataBucket(varInfo, size, pslot, rv))
453 printDebug(DM_Warning, "Unable to split the data bucket");
454 releaseChunkMutex(pslot);
455 return NULL;
457 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
458 chunkID_, size, varInfo);
459 releaseChunkMutex(pslot);
460 return (char*)varInfo + sizeof(VarSizeInfo);
462 else if (size == varInfo->size_) {
463 //varInfo->isUsed_ = 1;
464 int ret = Mutex::CAS(&varInfo->isUsed_ , 0, 1);
465 if(ret !=0) {
466 printDebug(DM_Warning, "Unable to get lock for var alloc size:%d ", size);
467 *rv = ErrLockTimeOut;
468 releaseChunkMutex(pslot);
469 return NULL;
471 releaseChunkMutex(pslot);
472 return (char *) varInfo + sizeof(VarSizeInfo);
476 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
477 +varInfo->size_);
479 releaseChunkMutex(pslot);
480 return NULL;
483 //Allocates memory to store data of variable size
484 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
486 if (0 == size) return NULL;
487 //check if the size is more than PAGE_SIZE
488 //if it is more than the PAGE_SIZE, then allocate new
489 //page using database and then link the curPage to the
490 //newly allocated page
491 //if it is less than PAGE_SIZE, then check the curpage for
492 //free memory of specified size
493 //if not available, then scan from the firstPage for the free
494 //space
496 //TODO::During the scan, merge nearby nodes if both are free
497 //if not available then allocate new page
499 size_t alignedSize = os::alignLong(size);
500 void *data = NULL;
501 /*int ret = getChunkMutex(db->procSlot);
502 if (ret != 0)
504 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
505 *status = ErrLockTimeOut;
506 return NULL;
508 if (alignedSize > PAGE_SIZE )
510 data = allocateForLargeDataSize(db, alignedSize);
512 else
514 data = allocateFromCurPageForVarSize(alignedSize, db->procSlot, status);
515 if (NULL == data) {
516 *status = OK;
517 //No available spaces in the current page.
518 //allocate new page
519 data= allocFromNewPageForVarSize(db, alignedSize, db->procSlot, status);
520 if (NULL == data && *status !=ErrLockTimeOut) {
521 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
522 *status = ErrNoMemory;
526 //releaseChunkMutex(db->procSlot);
527 return data;
530 //Assumes chunk mutex is already taken, before calling this
531 void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv)
533 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
534 size, firstPage_);
536 Page *page = ((PageInfo*)firstPage_);
537 size_t alignedSize = os::alignLong(size);
538 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
539 while(NULL != page)
541 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
542 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
544 if (0 == varInfo->isUsed_)
546 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
548 if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv))
550 printDebug(DM_Warning, "Unable to split the data bucket");
551 releaseChunkMutex(pslot);
552 return NULL;
554 releaseChunkMutex(pslot);
555 return ((char*)varInfo) + sizeof(VarSizeInfo);
557 else if (alignedSize == varInfo->size_) {
558 //varInfo->isUsed_ = 1;
559 int ret = Mutex::CAS((int*)&varInfo->isUsed_, 0, 1);
560 if(ret !=0) {
561 printDebug(DM_Warning,"Unable to get lock to set isUsed flag.");
562 *rv = ErrLockTimeOut;
563 releaseChunkMutex(pslot);
564 return NULL;
566 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
567 releaseChunkMutex(pslot);
568 return ((char *) varInfo) + sizeof(VarSizeInfo);
571 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
572 +varInfo->size_);
574 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
575 page = ((PageInfo*) page)->nextPage_;
577 releaseChunkMutex(pslot);
578 return NULL;
581 void Chunk::freeForVarSizeAllocator(void *ptr, int pslot)
583 /*int ret = getChunkMutex(pslot);
584 if (ret != 0)
586 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
587 return;
589 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
590 //varInfo->isUsed_ = 0;
591 if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) {
592 PageInfo *pageInfo = (PageInfo*)((char*)varInfo - sizeof(PageInfo));
593 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
594 bool found = false;
595 while(!found)
597 if(NULL==pInfo) break;
598 if (pInfo == pageInfo) {found = true; break; }
599 prev = pInfo;
600 pInfo = (PageInfo*)pInfo->nextPage_;
602 if (!found)
604 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
605 return ;
607 if(curPage_== pageInfo) {curPage_ = prev ; }
608 pageInfo->isUsed_ = 0;
609 pageInfo->nextPageAfterMerge_ = NULL;
610 pageInfo->hasFreeSpace_ = 1;
611 prev->nextPage_ = pageInfo->nextPage_;
613 int ret = Mutex::CAS((int*)&varInfo->isUsed_, 1, 0);
614 if(ret !=0) {
615 printError(ErrAlready, "Fatal: Varsize double free for %x", ptr);
617 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
618 //releaseChunkMutex(pslot);
619 return;
623 void Chunk::freeForLargeAllocator(void *ptr, int pslot)
625 //There will be max only one data element in a page.
626 //PageInfo is stored just before the data.
627 int ret = getChunkMutex(pslot);
628 if (ret != 0)
630 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
631 return;
633 PageInfo *pageInfo = (PageInfo*)(((char*)
634 ptr) - (sizeof(PageInfo) + sizeof(int)));
635 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
636 bool found = false;
637 while(!found)
639 if (pInfo == pageInfo) {found = true; break; }
640 prev = pInfo;
641 pInfo = (PageInfo*)pInfo->nextPage_;
643 if (!found)
645 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
646 releaseChunkMutex(pslot);
647 return ;
649 os::memset(((char*)pageInfo+sizeof(PageInfo)), 0 , allocSize_);
650 if(((PageInfo*)firstPage_)->nextPage_ != NULL){
651 pageInfo->nextPageAfterMerge_ = NULL;
652 //pageInfo->isUsed_ = 0;
653 ret = Mutex::CAS((int*)&pageInfo->isUsed_, pageInfo->isUsed_, 0);
654 if (ret != 0) printError(ErrSysFatal, "Unable to set isUsed flag");
655 //pageInfo->hasFreeSpace_ = 1;
656 ret = Mutex::CAS((int*)&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 1);
657 if (ret != 0) printError(ErrSysFatal, "Unable to set hasFreeSpace flag");
658 if(pageInfo == firstPage_ && ((PageInfo*)firstPage_)->nextPage_ != NULL)
659 //firstPage_ = pageInfo->nextPage_ ;
660 ret = Mutex::CASL((long*)&firstPage_, (long) firstPage_,
661 (long)pageInfo->nextPage_);
662 if (ret != 0) printError(ErrSysFatal, "Unable to set firstPage");
663 else {
664 //prev->nextPage_ = pageInfo->nextPage_;
665 ret = Mutex::CASL((long*)&prev->nextPage_, (long) prev->nextPage_,
666 (long)pageInfo->nextPage_);
667 if (ret != 0) printError(ErrSysFatal, "Unable to set nextPage");
670 releaseChunkMutex(pslot);
671 return;
674 //Frees the memory pointed by ptr
675 void Chunk::free(Database *db, void *ptr)
677 if (0 == allocSize_)
679 freeForVarSizeAllocator(ptr, db->procSlot);
680 return;
682 int noOfDataNodes =(int) os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
684 if (0 == noOfDataNodes)
686 freeForLargeAllocator(ptr, db->procSlot);
687 return;
689 /*int ret = getChunkMutex(db->procSlot);
690 if (ret != 0)
692 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
693 return;
695 //below is the code for freeing in fixed size allocator
697 //unset the used flag
698 //*((int*)ptr -1 ) = 0;
699 if (*((InUse*)ptr -1 ) == 0) {
700 printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d", ptr, chunkID_);
701 //return;
703 int ret = Mutex::CAS(((InUse*)ptr -1), 1, 0);
704 if(ret !=0) {
705 printError(ErrSysFatal, "Unable to get lock to free for %x", ptr);
706 return;
708 PageInfo *pageInfo;
709 pageInfo = getPageInfo(db, ptr);
710 if (NULL == pageInfo)
712 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
713 //releaseChunkMutex(db->procSlot);
714 return;
716 //set the pageinfo where this ptr points
717 //pageInfo->hasFreeSpace_ = 1;
718 if (! (pageInfo->hasFreeSpace_ == 0 || pageInfo->hasFreeSpace_ == 1)) {
719 printError(ErrSysFatal, "Fatal: hasFreeSpace has invalid value:%d for page %x", pageInfo->hasFreeSpace_, pageInfo);
720 return;
722 ret = Mutex::CAS((int*)&pageInfo->hasFreeSpace_, pageInfo->hasFreeSpace_, 1);
723 if(ret !=0) {
724 printError(ErrSysFatal, "Unable to get lock to set hasFreeSpace");
726 //releaseChunkMutex(db->procSlot);
727 return;
730 //returns the pageInfo of the page where this ptr points
731 //This works only if the data size is less than PAGE_SIZE
732 //If ptr points to data which is more than PAGE_SIZE,then
733 //calling this might lead to memory corruption
734 //Note:IMPORTANT::assumes db lock is taken before calling this
735 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
737 if (allocSize_ < PAGE_SIZE - sizeof(PageInfo)) {
738 int rem = (long) ptr % PAGE_SIZE;
739 return (PageInfo*)(((char*)ptr) - rem);
740 } else {
741 //large size allocator
742 char *inPtr = (char*)ptr;
743 PageInfo* pageInfo = ((PageInfo*)firstPage_);
745 while( pageInfo != NULL )
747 if (inPtr > (char*) pageInfo && pageInfo->nextPageAfterMerge_ >inPtr)
748 return pageInfo;
749 pageInfo = (PageInfo*)pageInfo->nextPage_ ;
752 return NULL;
755 //If called on chunk used to store tuples, it returns the total number of rows
756 //present in the table
757 long Chunk::getTotalDataNodes()
759 long totalNodes =0;
760 if (0 == allocSize_) //->variable size allocator
762 Page *page = ((PageInfo*)firstPage_);
763 while(NULL != page)
765 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
766 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
768 if (1 == varInfo->isUsed_) totalNodes++;
769 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
770 +varInfo->size_);
772 page = ((PageInfo*) page)->nextPage_;
774 return totalNodes;
777 //TODO::for large size allocator
778 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
780 Page *page = ((PageInfo*)firstPage_);
781 while(NULL != page)
783 //current it page wise later this will done
784 if(1==*(int*)(((char*)page)+sizeof(PageInfo)))
785 totalNodes++;
786 page = ((PageInfo*) page)->nextPage_;
788 return totalNodes;
791 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
792 PageInfo* pageInfo = ((PageInfo*)firstPage_);
793 char *data = ((char*)firstPage_) + sizeof(PageInfo);
794 int i=0;
795 while( pageInfo != NULL )
797 data = ((char*)pageInfo) + sizeof(PageInfo);
798 for (i = 0; i< noOfDataNodes; i++)
800 if (*((InUse*)data) == 1) { totalNodes++;}
801 data = data + allocSize_;
803 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
805 return totalNodes;
808 //TODO::for other type of allocators
809 int Chunk::compact(int procSlot)
811 PageInfo* pageInfo = ((PageInfo*)firstPage_);
812 PageInfo* prevPage = pageInfo;
813 if (NULL == pageInfo)
815 return 0;
817 int ret = getChunkMutex(procSlot);
818 if (ret != 0)
820 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
821 return ret;
823 pageInfo = (PageInfo*)pageInfo->nextPage_;
824 if (0 == allocSize_)
826 while( pageInfo != NULL )
828 bool flag = false;
829 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
830 sizeof(PageInfo));
831 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
833 if (1 == varInfo->isUsed_) {flag=true; break;}
834 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
835 +varInfo->size_);
837 if (!flag) {
838 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
839 prevPage->nextPage_ = pageInfo->nextPage_;
840 pageInfo->isUsed_ = 0;
842 prevPage = pageInfo;
843 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
844 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
846 }else if (allocSize_ < PAGE_SIZE)
848 while( pageInfo != NULL )
850 bool flag = false;
851 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
852 char *data = ((char*)pageInfo) + sizeof(PageInfo);
853 for (int i = 0; i< noOfDataNodes ; i++)
855 if (1 == *((InUse*)data)) { flag = true; break; }
856 data = data +allocSize_;
858 if (!flag) {
859 printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo);
860 prevPage->nextPage_ = pageInfo->nextPage_;
861 pageInfo->isUsed_ = 0;
862 pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_) ;
863 }else{
864 prevPage = pageInfo;
865 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
867 printDebug(DM_Alloc,"compact iter %x\n", pageInfo);
870 releaseChunkMutex(procSlot);
871 return 0;
874 int Chunk::totalPages()
876 //logic is same for variable size and for large data node allocator.
877 PageInfo* pageInfo = ((PageInfo*)firstPage_);
878 int totPages=0;
879 while( pageInfo != NULL )
881 totPages++;
882 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
884 return totPages;
887 int Chunk::initMutex()
889 return chunkMutex_.init("Chunk");
891 int Chunk::getChunkMutex(int procSlot)
893 return chunkMutex_.getLock(procSlot);
895 int Chunk::releaseChunkMutex(int procSlot)
897 return chunkMutex_.releaseLock(procSlot);
899 int Chunk::destroyMutex()
901 return chunkMutex_.destroy();
903 int Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pSlot, DbRetVal *rv)
905 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
906 //varInfo->isUsed_ = 1;
907 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , 0, 1);
908 if(ret !=0) {
909 printDebug(DM_Warning, "Unable to set I isUsed flag");
910 *rv = ErrLockTimeOut;
911 return 1;
913 //varInfo->size_ = needSize;
914 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ , needSize);
915 if(ret !=0) {
916 printError(ErrSysFatal, "Unable to set I size flag");
917 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
918 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
919 *rv = ErrSysFatal;
920 return 1;
922 VarSizeInfo *varInfo2 = (VarSizeInfo*)((char*)varInfo +
923 sizeof(VarSizeInfo) + varInfo->size_);
924 //varInfo2->isUsed_ = 0;
925 ret = Mutex::CAS((int*)&varInfo2->isUsed_ , varInfo2->isUsed_, 0);
926 if(ret !=0) {
927 printError(ErrSysFatal, "Unable to set II isUsed flag");
928 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
929 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
930 *rv = ErrSysFatal;
931 return 1;
933 //varInfo2->size_ = remSpace;
934 ret = Mutex::CAS((int*)&varInfo2->size_, varInfo2->size_ , remSpace);
935 if(ret !=0) {
936 printError(ErrSysFatal, "Unable to set II size flag");
937 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
938 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
939 *rv = ErrSysFatal;
940 return 1;
942 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
943 return 0;
947 int Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize, int pslot)
949 //already db alloc mutex is taken
950 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
951 //varInfo->isUsed_ = 0;
952 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
953 if(ret !=0) {
954 printError(ErrSysFatal, "Fatal:Unable to get lock to set isUsed flag");
956 //varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
957 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ ,
958 PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo));
959 if(ret !=0) {
960 printError(ErrSysFatal, "Unable to get lock to set size");
962 DbRetVal rv =OK;
963 return splitDataBucket(varInfo, needSize, pslot, &rv);
965 void Chunk::setChunkNameForSystemDB(int id)
967 strcpy(chunkName,ChunkName[id]);
970 void Chunk::print()
972 printf(" <Chunk Id> %d </Chunk Id> \n",chunkID_);
973 printf(" <TotalPages> %d </TotalPages> \n",totalPages());
974 printf(" <ChunkName > %s </ChunkName> \n",getChunkName());
975 printf(" <TotalDataNodes> %d </TotalDataNodes> \n",getTotalDataNodes());
976 printf(" <SizeOfDataNodes> %d </SizeOfDataNodes> \n",getSize());
977 printf(" <Allocation Type> ");
978 if(allocType_==0)
980 printf("FixedSizeAllocator ");
981 }else if(allocType_==1)
983 printf("VariableSizeAllocator ");
984 }else
986 printf("UnknownAllocator ");
989 printf("</Allocation Type>\n");