*** empty log message ***
[csql.git] / src / storage / Chunk.cxx
blob10bc512b4177a29ec4e83c13d9b351cd11b5ea1b
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
23 // sets the size of the Chunk allocator for fixed size
24 // allocator
25 // we need one integer to store book keeping information
26 // whether the storage allocation unit is used of not
27 // when it is deleted this flag is only set to unused
28 void Chunk::setSize(size_t size)
31 size_t needSize = size + sizeof(InUse);
32 size_t multiple = (size_t) os::floor(needSize / sizeof(size_t));
33 size_t rem = needSize % sizeof(size_t);
34 if (0 == rem)
35 allocSize_ = needSize;
36 else
37 allocSize_ = (multiple + 1) * sizeof(size_t);
40 void* Chunk::allocateForLargeDataSize(Database *db)
42 PageInfo* pageInfo = ((PageInfo*)curPage_);
43 DbRetVal ret = db->getAllocDatabaseMutex();
44 if (ret != 0)
46 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
47 return NULL;
50 //check whether we have space in curPage
51 if (BITSET(pageInfo->flags, HAS_SPACE))
53 char *data = ((char*)curPage_) + sizeof(PageInfo);
54 InUse oldVal = pageInfo->flags;
55 InUse newVal = oldVal;
56 CLEARBIT(newVal, HAS_SPACE);
57 int retVal = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
58 if (retVal !=0) printError(ErrSysFatal, "Unable to set flags");
59 *((InUse*)data) = 1;
60 db->releaseAllocDatabaseMutex();
61 return data + sizeof(InUse);
64 //no space in curpage , get new page from database
65 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
66 if (NULL == pageInfo)
68 db->releaseAllocDatabaseMutex();
69 printError(ErrNoMemory,"No more free pages in the database");
70 return NULL;
72 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
73 chunkID_, pageInfo);
74 int multiple = (int) os::floor(allocSize_ / PAGE_SIZE);
75 int offset = ((multiple + 1) * PAGE_SIZE);
77 pageInfo->setPageAsUsed(offset);
78 setPageDirty(pageInfo);
81 //create the link
82 //((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
83 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
84 (long)(((PageInfo*)curPage_)->nextPage_), (long)pageInfo);
85 if(retVal !=0) {
86 printError(ErrLockTimeOut, "Fatal: Unable to create page link.");
89 //Make this as current page
90 //curPage_ = (Page*) pageInfo;
91 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)pageInfo);
92 if(retVal !=0) {
93 printError(ErrLockTimeOut, "Fatal:Unable to set current page");
96 char* data = ((char*)curPage_) + sizeof(PageInfo);
97 InUse oldVal = pageInfo->flags;
98 InUse newVal = oldVal;
99 CLEARBIT(newVal, HAS_SPACE);
100 retVal = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
101 if(retVal !=0) {
102 printError(ErrLockTimeOut, "Fatal:Unable to set flags");
104 *((InUse*)data) = 1;
105 db->releaseAllocDatabaseMutex();
106 return data + sizeof(InUse);
110 void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *status)
112 PageInfo *pageIter = ((PageInfo*)firstPage_);
113 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
114 chunkID_);
115 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
116 firstPage_);
117 char *data = NULL;
118 int i = 0;
119 //scan from first page to locate a free node available
120 while(NULL != pageIter)
122 data = ((char*)pageIter) + sizeof(PageInfo);
123 if (BITSET(pageIter->flags, HAS_SPACE))
125 for (i = 0; i< noOfDataNodes ; i++)
127 if (1 == *((InUse*)data))
128 data = data + allocSize_;
129 else break;
131 if (i != noOfDataNodes) break;
133 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
134 chunkID_, pageIter);
135 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
137 if (NULL == pageIter) {
138 *status = ErrNoMemory;
139 return NULL;
141 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
142 chunkID_, pageIter);
143 int ret = Mutex::CASGen(data, 0, 1);
144 if(ret !=0) {
145 *status = ErrLockTimeOut;
146 //printError(ErrLockTimeOut, "Unable to allocate from first page. Retry...");
147 return NULL;
149 setPageDirty(pageIter);
150 return data + sizeof(InUse);
153 void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status)
155 DbRetVal ret = db->getAllocDatabaseMutex();
156 if (ret != 0)
158 printDebug(DM_Warning,"Unable to acquire alloc database Mutex Chunkid:%d", chunkID_);
159 *status = ErrLockTimeOut;
160 return NULL;
162 //get a new page from db
163 Page *page = db->getFreePage();
164 if (page == NULL) {
165 printError(ErrNoMemory, "Unable to allocate page");
166 db->releaseAllocDatabaseMutex();
167 *status = ErrNoMemory;
168 return NULL;
170 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
171 chunkID_, page);
172 //Initialize pageInfo for this new page
173 PageInfo *pInfo = (PageInfo*)page;
174 pInfo->setPageAsUsed(0);
175 setPageDirty(pInfo);
177 char* data = ((char*)page) + sizeof(PageInfo);
178 *((InUse*)data) = 1;
180 //create the link between old page and the newly created page
181 PageInfo* pageInfo = ((PageInfo*)curPage_);
183 long oldPage = (long)pageInfo->nextPage_;
184 //pageInfo->nextPage_ = page;
185 int retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)page);
186 if(retVal !=0) {
187 *((InUse*)data) = 0;
188 pInfo->setPageAsFree();
189 printDebug(DM_Warning, "Unable to get lock to set chunk list.");
190 *status = ErrLockTimeOut;
191 db->releaseAllocDatabaseMutex();
192 return NULL;
195 //make this new page as the current page
196 //curPage_ = page;
197 retVal = Mutex::CASL((long*)&curPage_ , (long)curPage_, (long)page);
198 if(retVal !=0) {
199 *((InUse*)data) = 0;
200 pInfo->setPageAsFree();
201 retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)oldPage);
202 if (retVal !=0) printError(ErrSysFatal, "Fatal: Unable to reset the nextPage");
203 printDebug(DM_Warning, "Unable to get lock to set curPage");
204 *status = ErrLockTimeOut;
205 db->releaseAllocDatabaseMutex();
206 return NULL;
209 db->releaseAllocDatabaseMutex();
210 return data + sizeof(InUse);
213 //Allocates memory to store data
214 //TODO::check whether it is locked before allocating.
215 //delete tuple will set the usedflag to true, but locks will be held
216 //till commit and it shall be rolledback.So make sure that it does not
217 //allocate deleted tuple which is yet to be commited.
219 void* Chunk::allocate(Database *db, DbRetVal *status)
221 PageInfo* pageInfo = ((PageInfo*)curPage_);
223 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
224 char *data = ((char*)curPage_) + sizeof(PageInfo);
225 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
226 chunkID_, curPage_, noOfDataNodes);
228 //1.scan through data list and find if any is free to use in current page
229 //2.If there is none then
230 // a) get new free page from db. set the prev->next to point
231 // to this new page
232 //4. b) initialize the free page to zero and get first data ptr.
233 //5.If there is one, return that
235 //For allocation more than PAGE_SIZE
236 if (0 == noOfDataNodes)
238 data = (char*) allocateForLargeDataSize(db);
239 return data;
242 //Chunk Mutex is not required as atomic instructions are instead used.
243 //This improves the concurrency
244 /*int ret = getChunkMutex(db->procSlot);
245 if (ret != 0)
247 if (status != NULL) *status = ErrLockTimeOut;
248 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
249 return NULL;
251 int i = noOfDataNodes;
252 if (BITSET(pageInfo->flags, HAS_SPACE))
254 for (i = 1; i< noOfDataNodes; i++)
256 if (*((InUse*)data) == 1) data = data + allocSize_;
257 else break;
261 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
262 chunkID_, i);
263 //It comes here if the pageInfo hasFreeSpace
264 //or there are no free data space in this page
265 if (i == noOfDataNodes && *((InUse*)data) == 1)
268 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
269 //there are no free data space in this page
270 InUse oldVal = pageInfo->flags;
271 InUse newVal = oldVal;
272 CLEARBIT(newVal, HAS_SPACE);
273 int ret = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
274 if(ret !=0) {
275 *status = ErrLockTimeOut;
276 printDebug(DM_Warning, "Unable to set hasFreespace");
277 //releaseChunkMutex(db->procSlot);
278 return NULL;
280 *status = OK;
281 data = (char*) allocateFromFirstPage(db, noOfDataNodes, status);
282 //releaseChunkMutex(db->procSlot);
283 if (NULL == data && *status != ErrLockTimeOut)
285 *status = OK;
286 data = (char*) allocateFromNewPage(db, status);
287 if (data == NULL && *status != ErrLockTimeOut)
289 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
290 *status = ErrNoMemory;
293 if (*status == OK) setPageDirty(db, data);
294 return data;
296 int retVal = Mutex::CASGen(data, 0, 1);
297 if(retVal !=0) {
298 *status = ErrLockTimeOut;
299 //releaseChunkMutex(db->procSlot);
300 printDebug(DM_Warning, "Unable to set isUsed : retry...");
301 return NULL;
303 setPageDirty(db, data);
304 //releaseChunkMutex(db->procSlot);
305 return data + sizeof(InUse);
307 void* Chunk::tryAllocate(Database *db, DbRetVal *status)
309 int tries=0;
310 int totalTries = Conf::config.getMutexRetries();
311 void *node = NULL;
312 while (tries < totalTries)
314 *status = OK;
315 node= allocate(db, status);
316 if (NULL != node) break;
317 if (*status != ErrLockTimeOut)
319 printError(*status, "Unable to allocate node");
320 return NULL;
322 tries++;
324 return node;
327 void Chunk::setPageDirty(Database *db, void *ptr)
329 if (chunkID_ < LastCatalogID) return;
330 PageInfo *pageInfo = getPageInfo(db, ptr);
331 if (NULL == pageInfo)
333 printError(ErrSysFatal,"Fatal: pageInfo is NULL %x", ptr );
334 return;
336 SETBIT(pageInfo->flags, IS_DIRTY);
337 return;
339 void Chunk::setPageDirty(PageInfo *pInfo)
341 if (chunkID_ < LastCatalogID) return;
342 SETBIT(pInfo->flags, IS_DIRTY);
345 void* Chunk::allocateForLargeDataSize(Database *db, size_t size)
347 //no need to take chunk mutexes for this, as we are taking alloc database mutex
348 int multiple = (int) os::floor(size / PAGE_SIZE);
349 int offset = ((multiple + 1) * PAGE_SIZE);
350 PageInfo* pageInfo = ((PageInfo*)curPage_);
351 if(0==allocSize_)
352 pageInfo = (PageInfo*)db->getFreePage(size);
353 else
354 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
355 if (NULL == pageInfo)
357 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
358 return NULL;
360 printDebug(DM_VarAlloc,"Chunk::alnextPageAfterMerge_locate Large Data Item id:%d Size:%d curPage:%x ",
361 chunkID_, size, curPage_);
362 //TODO:: logic pending
363 if(allocSize_!=0){
364 //large size allocate for FixedSize data
365 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
366 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
367 curPage_ = (Page*) pageInfo;
368 char* data = ((char*)curPage_) + sizeof(PageInfo);
369 int ret = Mutex::CASGen(data, 0, 1);
370 if(ret !=0) {
371 printError(ErrLockTimeOut, "Lock Timeout: retry...");
372 return NULL;
374 pageInfo->isUsed_=1;
375 InUse oldVal = pageInfo->flags;
376 InUse newVal = oldVal;
377 CLEARBIT(newVal, HAS_SPACE);
378 setPageDirty(pageInfo);
379 ret = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
380 if (ret !=0) printError(ErrSysFatal, "Unable to set flags");
381 return data + sizeof(InUse);
382 }else{
383 //large size allocate for varSize data
384 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + sizeof(PageInfo));
385 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
386 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
387 curPage_ = (Page*) pageInfo;
388 varInfo->size_= size;
389 int ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 1);
390 if(ret !=0) {
391 printError(ErrLockTimeOut, "Unable to get lock for var alloc. Retry...");
392 return NULL;
394 pageInfo->isUsed_=1;
395 setPageDirty(pageInfo);
396 return (char *) varInfo + sizeof(VarSizeInfo);
398 //REDESIGN MAY BE REQUIRED:Lets us live with this for now.
399 //what happens to the space lets say 10000 bytes is allocated
400 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
401 //in this case.So need to take of this.
402 //Will be coded at later stage as this is developed to support
403 //undo logging and currently we shall assume that the logs generated
404 //wont be greater than PAGE_SIZE.
405 return NULL;
410 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, DbRetVal *rv)
412 //Should be called only for data items <PAGE_SIZE
413 void *vnode = varSizeFirstFitAllocate(size, pslot, rv);
414 if (vnode != NULL)
416 return vnode;
418 printDebug(DM_Warning, "No free space in any of the pages already being used");
419 *rv =OK;
420 DbRetVal ret = db->getAllocDatabaseMutex();
421 if (ret != 0)
423 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
424 return NULL;
426 Page *newPage = db->getFreePage();
427 if (NULL == newPage)
429 db->releaseAllocDatabaseMutex();
430 return NULL;
433 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
434 PageInfo *pInfo = (PageInfo*) newPage;
435 pInfo->setPageAsUsed(0);
436 setPageDirty(pInfo);
437 SETBIT(pInfo->flags, HAS_SPACE);
438 if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot))
440 printError(ErrSysFatal, "Split failed in new page...Should never happen");
441 *rv = ErrSysFatal;
442 db->releaseAllocDatabaseMutex();
443 return NULL;
445 long oldPage = (long)((PageInfo*)curPage_)->nextPage_;
446 //((PageInfo*)curPage_)->nextPage_ = newPage;
447 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
448 (long)(((PageInfo*)curPage_)->nextPage_), (long)newPage);
449 if(retVal !=0) {
450 printError(ErrSysFatal, "Unable to get lock to set chunk next page");
451 pInfo->setPageAsFree();
452 db->releaseAllocDatabaseMutex();
453 *rv = ErrSysFatal;
454 return NULL;
456 //curPage_ = newPage;
457 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)newPage);
458 if(retVal !=0) {
459 printError(ErrSysFatal, "Unable to get lock to set curPage");
460 retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
461 (long)(((PageInfo*)curPage_)->nextPage_), (long)oldPage);
462 if (retVal !=0 ) printError(ErrSysFatal, "Unable to reset curPage");
463 pInfo->setPageAsFree();
464 db->releaseAllocDatabaseMutex();
465 *rv = ErrSysFatal;
466 return NULL;
468 db->releaseAllocDatabaseMutex();
469 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
470 return data;
473 //Allocates from the current page of the chunk.
474 //Scans through the VarSizeInfo objects in the page and gets the free slot
475 void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv)
477 //Should be called only for data items <PAGE_SIZE
478 Page *page = ((PageInfo*)curPage_);
479 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
480 chunkID_, size, curPage_);
481 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
482 sizeof(PageInfo));
483 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
484 int pageSize = PAGE_SIZE;
485 bool hasSpace= false;
486 while ((char*) varInfo < ((char*)page + pageSize))
488 if (0 == varInfo->isUsed_)
490 hasSpace= true;
491 if( size + sizeof(VarSizeInfo) < varInfo->size_)
493 if (1 == splitDataBucket(varInfo, size, pslot, rv))
495 printDebug(DM_Warning, "Unable to split the data bucket");
496 releaseChunkMutex(pslot);
497 return NULL;
499 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
500 chunkID_, size, varInfo);
501 releaseChunkMutex(pslot);
502 return (char*)varInfo + sizeof(VarSizeInfo);
504 else if (size == varInfo->size_) {
505 //varInfo->isUsed_ = 1;
506 int ret = Mutex::CASGen(&varInfo->isUsed_ , 0, 1);
507 if(ret !=0) {
508 printDebug(DM_Warning, "Unable to get lock for var alloc size:%d ", size);
509 *rv = ErrLockTimeOut;
510 releaseChunkMutex(pslot);
511 return NULL;
513 releaseChunkMutex(pslot);
514 return (char *) varInfo + sizeof(VarSizeInfo);
518 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
519 +varInfo->size_);
521 if (!hasSpace) CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
522 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
523 CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
524 releaseChunkMutex(pslot);
525 return NULL;
528 //Allocates memory to store data of variable size
529 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
531 if (0 == size) return NULL;
532 //check if the size is more than PAGE_SIZE
533 //if it is more than the PAGE_SIZE, then allocate new
534 //page using database and then link the curPage to the
535 //newly allocated page
536 //if it is less than PAGE_SIZE, then check the curpage for
537 //free memory of specified size
538 //if not available, then scan from the firstPage for the free
539 //space
541 //TODO::During the scan, merge nearby nodes if both are free
542 //if not available then allocate new page
544 size_t alignedSize = os::alignLong(size);
545 void *data = NULL;
546 /*int ret = getChunkMutex(db->procSlot);
547 if (ret != 0)
549 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
550 *status = ErrLockTimeOut;
551 return NULL;
553 if (alignedSize > PAGE_SIZE )
555 data = allocateForLargeDataSize(db, alignedSize);
557 else
559 data = allocateFromCurPageForVarSize(alignedSize, db->procSlot, status);
560 if (NULL == data) {
561 *status = OK;
562 //No available spaces in the current page.
563 //allocate new page
564 data= allocFromNewPageForVarSize(db, alignedSize, db->procSlot, status);
565 if (NULL == data && *status !=ErrLockTimeOut) {
566 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
567 *status = ErrNoMemory;
571 //releaseChunkMutex(db->procSlot);
572 return data;
575 //Assumes chunk mutex is already taken, before calling this
576 void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv)
578 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
579 size, firstPage_);
581 Page *page = ((PageInfo*)firstPage_);
582 size_t alignedSize = os::alignLong(size);
583 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
584 int pageSize = PAGE_SIZE;
585 bool hasSpace=false;
586 while(NULL != page)
588 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
589 hasSpace=false;
590 if (BITSET(((PageInfo*)page)->flags, HAS_SPACE)){
591 while ((char*) varInfo < ((char*)page + pageSize))
593 if (0 == varInfo->isUsed_)
595 hasSpace=true;
596 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
598 if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv))
600 printDebug(DM_Warning, "Unable to split the data bucket");
601 releaseChunkMutex(pslot);
602 return NULL;
604 releaseChunkMutex(pslot);
605 return ((char*)varInfo) + sizeof(VarSizeInfo);
607 else if (alignedSize == varInfo->size_) {
608 //varInfo->isUsed_ = 1;
609 int ret = Mutex::CASGen(&varInfo->isUsed_, 0, 1);
610 if(ret !=0) {
611 printDebug(DM_Warning,"Unable to get lock to set isUsed flag.");
612 *rv = ErrLockTimeOut;
613 releaseChunkMutex(pslot);
614 return NULL;
616 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
617 releaseChunkMutex(pslot);
618 return ((char *) varInfo) + sizeof(VarSizeInfo);
621 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
622 +varInfo->size_);
624 if (!hasSpace) CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
625 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
626 CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
628 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
629 page = ((PageInfo*) page)->nextPage_;
631 releaseChunkMutex(pslot);
632 return NULL;
635 void Chunk::freeForVarSizeAllocator(Database *db, void *ptr, int pslot)
637 int ret = getChunkMutex(pslot);
638 if (ret != 0)
640 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
641 return;
643 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
644 //varInfo->isUsed_ = 0;
645 if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) {
646 PageInfo *pageInfo = (PageInfo*)((char*)varInfo - sizeof(PageInfo));
647 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
648 bool found = false;
649 while(!found)
651 if(NULL==pInfo) break;
652 if (pInfo == pageInfo) {found = true; break; }
653 prev = pInfo;
654 pInfo = (PageInfo*)pInfo->nextPage_;
656 if (!found)
658 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
659 releaseChunkMutex(pslot);
660 return ;
662 if(curPage_== pageInfo) {curPage_ = prev ; }
663 pageInfo->isUsed_ = 0;
664 pageInfo->nextPageAfterMerge_ = NULL;
665 setPageDirty(pageInfo);
666 SETBIT(pageInfo->flags, HAS_SPACE);
667 prev->nextPage_ = pageInfo->nextPage_;
669 int retVal = Mutex::CASGen(&varInfo->isUsed_, 1, 0);
670 if(retVal !=0) {
671 printError(ErrAlready, "Fatal: Varsize double free for %x", ptr);
673 PageInfo *pageInfo = getPageInfo(db, ptr);
674 if (NULL == pageInfo)
676 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
677 releaseChunkMutex(db->procSlot);
678 return;
680 SETBIT(pageInfo->flags, HAS_SPACE);
681 SETBIT(pageInfo->flags, IS_DIRTY);
682 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
683 releaseChunkMutex(pslot);
684 return;
688 void Chunk::freeForLargeAllocator(void *ptr, int pslot)
690 //There will be max only one data element in a page.
691 //PageInfo is stored just before the data.
692 int ret = getChunkMutex(pslot);
693 if (ret != 0)
695 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
696 return;
698 PageInfo *pageInfo = (PageInfo*)(((char*)
699 ptr) - (sizeof(PageInfo) + sizeof(InUse)));
700 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
701 bool found = false;
702 while(!found)
704 if (pInfo == pageInfo) {found = true; break; }
705 prev = pInfo;
706 pInfo = (PageInfo*)pInfo->nextPage_;
708 if (!found)
710 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
711 releaseChunkMutex(pslot);
712 return ;
714 os::memset(((char*)pageInfo+sizeof(PageInfo)), 0 , allocSize_);
715 if(((PageInfo*)firstPage_)->nextPage_ != NULL){
716 pageInfo->nextPageAfterMerge_ = NULL;
717 //pageInfo->isUsed_ = 0;
718 ret = Mutex::CASGen(&pageInfo->isUsed_, pageInfo->isUsed_, 0);
719 if (ret != 0) printError(ErrSysFatal, "Unable to set isUsed flag");
720 InUse oldVal = pageInfo->flags;
721 InUse newVal = oldVal;
722 SETBIT(newVal, HAS_SPACE);
723 ret = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
724 if (ret != 0) printError(ErrSysFatal, "Unable to set flags");
725 if(pageInfo == firstPage_ && ((PageInfo*)firstPage_)->nextPage_ != NULL)
727 //firstPage_ = pageInfo->nextPage_ ;
728 ret = Mutex::CASL((long*)&firstPage_, (long) firstPage_,
729 (long)pageInfo->nextPage_);
730 if (ret != 0) printError(ErrSysFatal, "Unable to set firstPage");
731 setPageDirty(((PageInfo*)firstPage_));
733 else {
734 //prev->nextPage_ = pageInfo->nextPage_;
735 ret = Mutex::CASL((long*)&prev->nextPage_, (long) prev->nextPage_,
736 (long)pageInfo->nextPage_);
737 if (ret != 0) printError(ErrSysFatal, "Unable to set nextPage");
738 setPageDirty(prev);
741 setPageDirty(pageInfo);
742 releaseChunkMutex(pslot);
743 return;
746 //Frees the memory pointed by ptr
747 void Chunk::free(Database *db, void *ptr)
749 if (0 == allocSize_)
751 freeForVarSizeAllocator(db, ptr, db->procSlot);
752 return;
754 int noOfDataNodes =(int) os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
756 if (0 == noOfDataNodes)
758 freeForLargeAllocator(ptr, db->procSlot);
759 return;
761 /*int ret = getChunkMutex(db->procSlot);
762 if (ret != 0)
764 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
765 return;
767 //unset the used flag
768 //*((int*)ptr -1 ) = 0;
769 InUse oldValue=1;
770 if (*((InUse*)ptr -1 ) == 0) {
771 printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d value:%d", ptr, chunkID_,
772 *((InUse*)ptr -1 ));
773 oldValue=0;
774 //return;
776 int retVal = Mutex::CASGen(((InUse*)ptr -1), oldValue, 0);
777 if(retVal !=0) {
778 printError(ErrSysFatal, "Unable to get lock to free for %x", ptr);
779 //releaseChunkMutex(db->procSlot);
780 return;
782 PageInfo *pageInfo;
783 pageInfo = getPageInfo(db, ptr);
784 if (NULL == pageInfo)
786 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
787 //releaseChunkMutex(db->procSlot);
788 return;
790 //set the pageinfo where this ptr points
791 InUse oldVal = pageInfo->flags;
792 InUse newVal = oldVal;
793 SETBIT(newVal, HAS_SPACE);
794 retVal = Mutex::CASGen(&pageInfo->flags, oldVal, newVal);
795 if(retVal !=0) {
796 printError(ErrSysFatal, "Unable to get lock to set flags");
798 setPageDirty(pageInfo);
799 //releaseChunkMutex(db->procSlot);
800 return;
803 //returns the pageInfo of the page where this ptr points
804 //This works only if the data size is less than PAGE_SIZE
805 //If ptr points to data which is more than PAGE_SIZE,then
806 //calling this might lead to memory corruption
807 //Note:IMPORTANT::assumes db lock is taken before calling this
808 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
810 if (allocSize_ < PAGE_SIZE - sizeof(PageInfo)) {
811 int rem = (long) ptr % PAGE_SIZE;
812 return (PageInfo*)(((char*)ptr) - rem);
813 } else {
814 //large size allocator
815 char *inPtr = (char*)ptr;
816 PageInfo* pageInfo = ((PageInfo*)firstPage_);
818 while( pageInfo != NULL )
820 if (inPtr > (char*) pageInfo && pageInfo->nextPageAfterMerge_ >inPtr)
821 return pageInfo;
822 pageInfo = (PageInfo*)pageInfo->nextPage_ ;
825 return NULL;
828 //If called on chunk used to store tuples, it returns the total number of rows
829 //present in the table
830 long Chunk::getTotalDataNodes()
832 long totalNodes =0;
833 if (0 == allocSize_) //->variable size allocator
835 Page *page = ((PageInfo*)firstPage_);
836 while(NULL != page)
838 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
839 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
841 if (1 == varInfo->isUsed_) totalNodes++;
842 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
843 +varInfo->size_);
845 page = ((PageInfo*) page)->nextPage_;
847 return totalNodes;
850 //TODO::for large size allocator
851 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
853 Page *page = ((PageInfo*)firstPage_);
854 while(NULL != page)
856 //current it page wise later this will done
857 if(1==*(int*)(((char*)page)+sizeof(PageInfo)))
858 totalNodes++;
859 page = ((PageInfo*) page)->nextPage_;
861 return totalNodes;
864 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
865 PageInfo* pageInfo = ((PageInfo*)firstPage_);
866 char *data = ((char*)firstPage_) + sizeof(PageInfo);
867 int i=0;
868 while( pageInfo != NULL )
870 data = ((char*)pageInfo) + sizeof(PageInfo);
871 for (i = 0; i< noOfDataNodes; i++)
873 if (*((InUse*)data) == 1) { totalNodes++;}
874 data = data + allocSize_;
876 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
878 return totalNodes;
881 //TODO::for other type of allocators
882 int Chunk::compact(int procSlot)
884 PageInfo* pageInfo = ((PageInfo*)firstPage_);
885 PageInfo* prevPage = pageInfo;
886 if (NULL == pageInfo)
888 return 0;
890 int ret = getChunkMutex(procSlot);
891 if (ret != 0)
893 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
894 return ret;
896 pageInfo = (PageInfo*)pageInfo->nextPage_;
897 if (0 == allocSize_)
899 while( pageInfo != NULL )
901 bool flag = false;
902 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
903 sizeof(PageInfo));
904 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
906 if (1 == varInfo->isUsed_) {flag=true; break;}
907 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
908 +varInfo->size_);
910 if (!flag) {
911 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
912 prevPage->nextPage_ = pageInfo->nextPage_;
913 pageInfo->isUsed_ = 0;
915 prevPage = pageInfo;
916 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
917 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
919 }else if (allocSize_ < PAGE_SIZE)
921 while( pageInfo != NULL )
923 bool flag = false;
924 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
925 char *data = ((char*)pageInfo) + sizeof(PageInfo);
926 for (int i = 0; i< noOfDataNodes ; i++)
928 if (1 == *((InUse*)data)) { flag = true; break; }
929 data = data +allocSize_;
931 if (!flag) {
932 printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo);
933 prevPage->nextPage_ = pageInfo->nextPage_;
934 pageInfo->isUsed_ = 0;
935 pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_) ;
936 }else{
937 prevPage = pageInfo;
938 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
940 printDebug(DM_Alloc,"compact iter %x\n", pageInfo);
943 releaseChunkMutex(procSlot);
944 return 0;
947 int Chunk::totalPages()
949 //logic is same for variable size and for large data node allocator.
950 PageInfo* pageInfo = ((PageInfo*)firstPage_);
951 int totPages=0;
952 while( pageInfo != NULL )
954 totPages++;
955 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
957 return totPages;
959 int Chunk::totalDirtyPages()
961 PageInfo* pageInfo = ((PageInfo*)firstPage_);
962 int dirtyPages=0;
963 while( pageInfo != NULL )
965 if(BITSET(pageInfo->flags, IS_DIRTY)) dirtyPages++;
966 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
968 return dirtyPages;
972 int Chunk::initMutex()
974 return chunkMutex_.init("Chunk");
976 int Chunk::getChunkMutex(int procSlot)
978 return chunkMutex_.getLock(procSlot);
980 int Chunk::releaseChunkMutex(int procSlot)
982 return chunkMutex_.releaseLock(procSlot);
984 int Chunk::destroyMutex()
986 return chunkMutex_.destroy();
988 int Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pSlot, DbRetVal *rv)
990 InUse remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
991 //varInfo->isUsed_ = 1;
992 int ret = Mutex::CASGen(&varInfo->isUsed_ , 0, 1);
993 if(ret !=0) {
994 printDebug(DM_Warning, "Unable to set I isUsed flag");
995 *rv = ErrLockTimeOut;
996 return 1;
998 //varInfo->size_ = needSize;
999 ret = Mutex::CASGen(&varInfo->size_, varInfo->size_ , needSize);
1000 if(ret !=0) {
1001 printError(ErrSysFatal, "Unable to set I size flag");
1002 ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
1003 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1004 *rv = ErrSysFatal;
1005 return 1;
1007 VarSizeInfo *varInfo2 = (VarSizeInfo*)((char*)varInfo +
1008 sizeof(VarSizeInfo) + varInfo->size_);
1009 //varInfo2->isUsed_ = 0;
1010 ret = Mutex::CASGen(&varInfo2->isUsed_ , varInfo2->isUsed_, 0);
1011 if(ret !=0) {
1012 printError(ErrSysFatal, "Unable to set II isUsed flag");
1013 ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
1014 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1015 *rv = ErrSysFatal;
1016 return 1;
1018 //varInfo2->size_ = remSpace;
1019 ret = Mutex::CASGen(&varInfo2->size_, varInfo2->size_ , remSpace);
1020 if(ret !=0) {
1021 printError(ErrSysFatal, "Unable to set II size flag");
1022 ret = Mutex::CASGen((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1023 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1024 *rv = ErrSysFatal;
1025 return 1;
1027 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
1028 return 0;
1032 int Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize, int pslot)
1034 //already db alloc mutex is taken
1035 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
1036 //varInfo->isUsed_ = 0;
1037 int ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
1038 if(ret !=0) {
1039 printError(ErrSysFatal, "Fatal:Unable to get lock to set isUsed flag");
1041 //varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
1042 ret = Mutex::CASGen(&varInfo->size_, varInfo->size_ ,
1043 PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo));
1044 if(ret !=0) {
1045 printError(ErrSysFatal, "Unable to get lock to set size");
1047 DbRetVal rv =OK;
1048 return splitDataBucket(varInfo, needSize, pslot, &rv);
1050 void Chunk::setChunkNameForSystemDB(int id)
1052 strcpy(chunkName,ChunkName[id]);
1055 void Chunk::print()
1057 printf(" <Chunk Id> %d </Chunk Id> \n",chunkID_);
1058 printf(" <TotalPages> %d </TotalPages> \n",totalPages());
1059 if (Conf::config.useDurability())
1060 printf(" <DirtyPages> %d </DirtyPages> \n",totalDirtyPages());
1061 printf(" <ChunkName > %s </ChunkName> \n",getChunkName());
1062 printf(" <TotalDataNodes> %d </TotalDataNodes> \n",getTotalDataNodes());
1063 printf(" <SizeOfDataNodes> %d </SizeOfDataNodes> \n",getSize());
1064 printf(" <Allocation Type> ");
1065 if(allocType_==0)
1067 printf("FixedSizeAllocator ");
1068 }else if(allocType_==1)
1070 printf("VariableSizeAllocator ");
1071 }else
1073 printf("UnknownAllocator ");
1075 printf("</Allocation Type>\n");