checkpoint server changes
[csql.git] / src / storage / Chunk.cxx
blob23c163b2336a57a9944173238d7b56a4874af744
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
23 // sets the size of the Chunk allocator for fixed size
24 // allocator
25 // we need one integer to store book keeping information
26 // whether the storage allocation unit is used of not
27 // when it is deleted this flag is only set to unused
28 void Chunk::setSize(size_t size)
31 size_t needSize = size + sizeof(InUse);
32 size_t multiple = (size_t) os::floor(needSize / sizeof(size_t));
33 size_t rem = needSize % sizeof(size_t);
34 if (0 == rem)
35 allocSize_ = needSize;
36 else
37 allocSize_ = (multiple + 1) * sizeof(size_t);
40 void* Chunk::allocateForLargeDataSize(Database *db)
42 PageInfo* pageInfo = ((PageInfo*)curPage_);
43 DbRetVal ret = db->getAllocDatabaseMutex();
44 if (ret != 0)
46 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
47 return NULL;
50 //check whether we have space in curPage
51 if (BITSET(pageInfo->flags, HAS_SPACE))
53 char *data = ((char*)curPage_) + sizeof(PageInfo);
54 int oldVal = pageInfo->flags;
55 int newVal = oldVal;
56 CLEARBIT(newVal, HAS_SPACE);
57 int retVal = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
58 if (retVal !=0) printError(ErrSysFatal, "Unable to set flags");
59 *((InUse*)data) = 1;
60 //Mutex::CAS((InUse*)data , 0, 1);
61 db->releaseAllocDatabaseMutex();
62 return data + sizeof(InUse);
65 //no space in curpage , get new page from database
66 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
67 if (NULL == pageInfo)
69 db->releaseAllocDatabaseMutex();
70 printError(ErrNoMemory,"No more free pages in the database");
71 return NULL;
73 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
74 chunkID_, pageInfo);
75 int multiple = (int) os::floor(allocSize_ / PAGE_SIZE);
76 int offset = ((multiple + 1) * PAGE_SIZE);
78 pageInfo->setPageAsUsed(offset);
79 setPageDirty(pageInfo);
82 //create the link
83 //((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
84 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
85 (long)(((PageInfo*)curPage_)->nextPage_), (long)pageInfo);
86 if(retVal !=0) {
87 printError(ErrLockTimeOut, "Fatal: Unable to create page link.");
90 //Make this as current page
91 //curPage_ = (Page*) pageInfo;
92 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)pageInfo);
93 if(retVal !=0) {
94 printError(ErrLockTimeOut, "Fatal:Unable to set current page");
97 char* data = ((char*)curPage_) + sizeof(PageInfo);
98 int oldVal = pageInfo->flags;
99 int newVal = oldVal;
100 CLEARBIT(newVal, HAS_SPACE);
101 retVal = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
102 if(retVal !=0) {
103 printError(ErrLockTimeOut, "Fatal:Unable to set flags");
105 *((InUse*)data) = 1;
106 //Mutex::CASL((InUse*)data , 0, 1);
107 db->releaseAllocDatabaseMutex();
108 return data + sizeof(InUse);
112 void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *status)
114 PageInfo *pageIter = ((PageInfo*)firstPage_);
115 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
116 chunkID_);
117 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
118 firstPage_);
119 char *data = NULL;
120 int i = 0;
121 //scan from first page to locate a free node available
122 while(NULL != pageIter)
124 data = ((char*)pageIter) + sizeof(PageInfo);
125 if (BITSET(pageIter->flags, HAS_SPACE))
127 for (i = 0; i< noOfDataNodes ; i++)
129 if (1 == *((InUse*)data))
130 data = data + allocSize_;
131 else break;
133 if (i != noOfDataNodes) break;
135 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
136 chunkID_, pageIter);
137 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
139 if (NULL == pageIter) {
140 *status = ErrNoMemory;
141 return NULL;
143 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
144 chunkID_, pageIter);
145 //*((InUse*)data) = 1;
146 #if defined(__sparcv9)
147 int ret = Mutex::CASL((InUse*)data , 0, 1);
148 #else
149 int ret = Mutex::CAS((InUse*)data , 0, 1);
150 #endif
151 if(ret !=0) {
152 *status = ErrLockTimeOut;
153 //printError(ErrLockTimeOut, "Unable to allocate from first page. Retry...");
154 return NULL;
156 return data + sizeof(InUse);
159 void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status)
161 DbRetVal ret = db->getAllocDatabaseMutex();
162 if (ret != 0)
164 printDebug(DM_Warning,"Unable to acquire alloc database Mutex Chunkid:%d", chunkID_);
165 *status = ErrLockTimeOut;
166 return NULL;
168 //get a new page from db
169 Page *page = db->getFreePage();
170 if (page == NULL) {
171 printError(ErrNoMemory, "Unable to allocate page");
172 db->releaseAllocDatabaseMutex();
173 *status = ErrNoMemory;
174 return NULL;
176 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
177 chunkID_, page);
178 //Initialize pageInfo for this new page
179 PageInfo *pInfo = (PageInfo*)page;
180 pInfo->setPageAsUsed(0);
181 setPageDirty(pInfo);
183 char* data = ((char*)page) + sizeof(PageInfo);
184 *((InUse*)data) = 1;
185 //Mutex::CAS((int*)data , 0, 1);
187 //create the link between old page and the newly created page
188 PageInfo* pageInfo = ((PageInfo*)curPage_);
190 long oldPage = (long)pageInfo->nextPage_;
191 //pageInfo->nextPage_ = page;
192 int retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)page);
193 if(retVal !=0) {
194 *((InUse*)data) = 0;
195 pInfo->setPageAsFree();
196 printDebug(DM_Warning, "Unable to get lock to set chunk list.");
197 *status = ErrLockTimeOut;
198 return NULL;
201 //make this new page as the current page
202 //curPage_ = page;
203 retVal = Mutex::CASL((long*)&curPage_ , (long)curPage_, (long)page);
204 if(retVal !=0) {
205 *((InUse*)data) = 0;
206 pInfo->setPageAsFree();
207 retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)oldPage);
208 if (retVal !=0) printError(ErrSysFatal, "Fatal: Unable to reset the nextPage");
209 printDebug(DM_Warning, "Unable to get lock to set curPage");
210 *status = ErrLockTimeOut;
211 return NULL;
214 db->releaseAllocDatabaseMutex();
215 return data + sizeof(InUse);
218 //Allocates memory to store data
219 //TODO::check whether it is locked before allocating.
220 //delete tuple will set the usedflag to true, but locks will be held
221 //till commit and it shall be rolledback.So make sure that it does not
222 //allocate deleted tuple which is yet to be commited.
224 void* Chunk::allocate(Database *db, DbRetVal *status)
226 PageInfo* pageInfo = ((PageInfo*)curPage_);
228 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
229 char *data = ((char*)curPage_) + sizeof(PageInfo);
230 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
231 chunkID_, curPage_, noOfDataNodes);
233 //1.scan through data list and find if any is free to use in current page
234 //2.If there is none then
235 // a) get new free page from db. set the prev->next to point
236 // to this new page
237 //4. b) initialize the free page to zero and get first data ptr.
238 //5.If there is one, return that
240 //For allocation more than PAGE_SIZE
241 if (0 == noOfDataNodes)
243 data = (char*) allocateForLargeDataSize(db);
244 return data;
247 /*int ret = getChunkMutex(db->procSlot);
248 if (ret != 0)
250 if (status != NULL) *status = ErrLockTimeOut;
251 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
252 return NULL;
254 int i = noOfDataNodes;
255 if (BITSET(pageInfo->flags, HAS_SPACE))
257 for (i = 1; i< noOfDataNodes; i++)
259 if (*((InUse*)data) == 1) data = data + allocSize_;
260 else break;
264 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
265 chunkID_, i);
266 //It comes here if the pageInfo hasFreeSpace
267 //or there are no free data space in this page
268 if (i == noOfDataNodes && *((InUse*)data) == 1)
271 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
272 //there are no free data space in this page
273 int oldVal = pageInfo->flags;
274 int newVal = oldVal;
275 CLEARBIT(newVal, HAS_SPACE);
276 int ret = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
277 if(ret !=0) {
278 *status = ErrLockTimeOut;
279 printDebug(DM_Warning, "Unable to set hasFreespace");
280 return NULL;
282 *status = OK;
283 data = (char*) allocateFromFirstPage(db, noOfDataNodes, status);
284 if (NULL == data && *status != ErrLockTimeOut)
286 *status = OK;
287 data = (char*) allocateFromNewPage(db, status);
288 if (data == NULL && *status != ErrLockTimeOut)
290 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
291 *status = ErrNoMemory;
294 setPageDirty(db, data);
295 return data;
297 //*((InUse*)data) = 1;
298 #if defined(__sparcv9)
299 int ret = Mutex::CASL((InUse*)data , 0, 1);
300 #else
301 int ret = Mutex::CAS((InUse*)data , 0, 1);
302 #endif
303 if(ret !=0) {
304 *status = ErrLockTimeOut;
305 printDebug(DM_Warning, "Unable to set isUsed : retry...");
306 return NULL;
308 setPageDirty(db, data);
309 return data + sizeof(InUse);
312 void Chunk::setPageDirty(Database *db, void *ptr)
314 if (chunkID_ < LastCatalogID) return;
315 PageInfo *pageInfo = getPageInfo(db, ptr);
316 if (NULL == pageInfo)
318 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
319 return;
321 SETBIT(pageInfo->flags, IS_DIRTY);
322 return;
324 void Chunk::setPageDirty(PageInfo *pInfo)
326 if (chunkID_ < LastCatalogID) return;
327 SETBIT(pInfo->flags, IS_DIRTY);
330 void* Chunk::allocateForLargeDataSize(Database *db, size_t size)
332 //no need to take chunk mutexes for this, as we are taking alloc database mutex
333 int multiple = (int) os::floor(size / PAGE_SIZE);
334 int offset = ((multiple + 1) * PAGE_SIZE);
335 PageInfo* pageInfo = ((PageInfo*)curPage_);
336 if(0==allocSize_)
337 pageInfo = (PageInfo*)db->getFreePage(size);
338 else
339 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
340 if (NULL == pageInfo)
342 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
343 return NULL;
345 printDebug(DM_VarAlloc,"Chunk::alnextPageAfterMerge_locate Large Data Item id:%d Size:%d curPage:%x ",
346 chunkID_, size, curPage_);
347 //TODO:: logic pending
348 if(allocSize_!=0){
349 //large size allocate for FixedSize data
350 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
351 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
352 curPage_ = (Page*) pageInfo;
353 char* data = ((char*)curPage_) + sizeof(PageInfo);
354 //*((InUse*)data) = 1;
355 #if defined(__sparcv9)
356 int ret = Mutex::CASL((InUse*)data , 0, 1);
357 #else
358 int ret = Mutex::CAS((InUse*)data , 0, 1);
359 #endif
360 if(ret !=0) {
361 printError(ErrLockTimeOut, "Lock Timeout: retry...");
362 return NULL;
364 pageInfo->isUsed_=1;
365 int oldVal = pageInfo->flags;
366 int newVal = oldVal;
367 CLEARBIT(newVal, HAS_SPACE);
368 SETBIT(newVal, IS_DIRTY);
369 ret = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
370 if (ret !=0) printError(ErrSysFatal, "Unable to set flags");
371 return data + sizeof(InUse);
372 }else{
373 //large size allocate for varSize data
374 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + sizeof(PageInfo));
375 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
376 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
377 curPage_ = (Page*) pageInfo;
378 varInfo->size_= size;
379 int ret = Mutex::CAS(&varInfo->isUsed_ , varInfo->isUsed_, 1);
380 if(ret !=0) {
381 printError(ErrLockTimeOut, "Unable to get lock for var alloc. Retry...");
382 return NULL;
384 pageInfo->isUsed_=1;
385 SETBIT(pageInfo->flags, IS_DIRTY);
386 return (char *) varInfo + sizeof(VarSizeInfo);
388 //REDESIGN MAY BE REQUIRED:Lets us live with this for now.
389 //what happens to the space lets say 10000 bytes is allocated
390 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
391 //in this case.So need to take of this.
392 //Will be coded at later stage as this is developed to support
393 //undo logging and currently we shall assume that the logs generated
394 //wont be greater than PAGE_SIZE.
395 return NULL;
400 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, DbRetVal *rv)
402 //Should be called only for data items <PAGE_SIZE
403 void *vnode = varSizeFirstFitAllocate(size, pslot, rv);
404 if (vnode != NULL)
406 return vnode;
408 printDebug(DM_Warning, "No free space in any of the pages already being used");
409 *rv =OK;
410 DbRetVal ret = db->getAllocDatabaseMutex();
411 if (ret != 0)
413 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
414 return NULL;
416 Page *newPage = db->getFreePage();
417 if (NULL == newPage)
419 db->releaseAllocDatabaseMutex();
420 return NULL;
423 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
424 PageInfo *pInfo = (PageInfo*) newPage;
425 pInfo->setPageAsUsed(0);
426 setPageDirty(pInfo);
427 if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot))
429 printError(ErrSysFatal, "Split failed in new page...Should never happen");
430 *rv = ErrSysFatal;
431 db->releaseAllocDatabaseMutex();
432 return NULL;
434 long oldPage = (long)((PageInfo*)curPage_)->nextPage_;
435 //((PageInfo*)curPage_)->nextPage_ = newPage;
436 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
437 (long)(((PageInfo*)curPage_)->nextPage_), (long)newPage);
438 if(retVal !=0) {
439 printError(ErrSysFatal, "Unable to get lock to set chunk next page");
440 pInfo->setPageAsFree();
441 db->releaseAllocDatabaseMutex();
442 *rv = ErrSysFatal;
443 return NULL;
445 //curPage_ = newPage;
446 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)newPage);
447 if(retVal !=0) {
448 printError(ErrSysFatal, "Unable to get lock to set curPage");
449 retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
450 (long)(((PageInfo*)curPage_)->nextPage_), (long)oldPage);
451 if (retVal !=0 ) printError(ErrSysFatal, "Unable to reset curPage");
452 pInfo->setPageAsFree();
453 db->releaseAllocDatabaseMutex();
454 *rv = ErrSysFatal;
455 return NULL;
457 db->releaseAllocDatabaseMutex();
458 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
459 return data;
462 //Allocates from the current page of the chunk.
463 //Scans through the VarSizeInfo objects in the page and gets the free slot
464 void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv)
466 //Should be called only for data items <PAGE_SIZE
467 Page *page = ((PageInfo*)curPage_);
468 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
469 chunkID_, size, curPage_);
470 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
471 sizeof(PageInfo));
472 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
473 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
475 if (0 == varInfo->isUsed_)
477 if( size + sizeof(VarSizeInfo) < varInfo->size_)
479 if (1 == splitDataBucket(varInfo, size, pslot, rv))
481 printDebug(DM_Warning, "Unable to split the data bucket");
482 releaseChunkMutex(pslot);
483 return NULL;
485 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
486 chunkID_, size, varInfo);
487 releaseChunkMutex(pslot);
488 return (char*)varInfo + sizeof(VarSizeInfo);
490 else if (size == varInfo->size_) {
491 //varInfo->isUsed_ = 1;
492 int ret = Mutex::CAS(&varInfo->isUsed_ , 0, 1);
493 if(ret !=0) {
494 printDebug(DM_Warning, "Unable to get lock for var alloc size:%d ", size);
495 *rv = ErrLockTimeOut;
496 releaseChunkMutex(pslot);
497 return NULL;
499 releaseChunkMutex(pslot);
500 return (char *) varInfo + sizeof(VarSizeInfo);
504 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
505 +varInfo->size_);
507 releaseChunkMutex(pslot);
508 return NULL;
511 //Allocates memory to store data of variable size
512 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
514 if (0 == size) return NULL;
515 //check if the size is more than PAGE_SIZE
516 //if it is more than the PAGE_SIZE, then allocate new
517 //page using database and then link the curPage to the
518 //newly allocated page
519 //if it is less than PAGE_SIZE, then check the curpage for
520 //free memory of specified size
521 //if not available, then scan from the firstPage for the free
522 //space
524 //TODO::During the scan, merge nearby nodes if both are free
525 //if not available then allocate new page
527 size_t alignedSize = os::alignLong(size);
528 void *data = NULL;
529 /*int ret = getChunkMutex(db->procSlot);
530 if (ret != 0)
532 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
533 *status = ErrLockTimeOut;
534 return NULL;
536 if (alignedSize > PAGE_SIZE )
538 data = allocateForLargeDataSize(db, alignedSize);
540 else
542 data = allocateFromCurPageForVarSize(alignedSize, db->procSlot, status);
543 if (NULL == data) {
544 *status = OK;
545 //No available spaces in the current page.
546 //allocate new page
547 data= allocFromNewPageForVarSize(db, alignedSize, db->procSlot, status);
548 if (NULL == data && *status !=ErrLockTimeOut) {
549 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
550 *status = ErrNoMemory;
554 //releaseChunkMutex(db->procSlot);
555 return data;
558 //Assumes chunk mutex is already taken, before calling this
559 void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv)
561 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
562 size, firstPage_);
564 Page *page = ((PageInfo*)firstPage_);
565 size_t alignedSize = os::alignLong(size);
566 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
567 while(NULL != page)
569 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
570 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
572 if (0 == varInfo->isUsed_)
574 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
576 if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv))
578 printDebug(DM_Warning, "Unable to split the data bucket");
579 releaseChunkMutex(pslot);
580 return NULL;
582 releaseChunkMutex(pslot);
583 return ((char*)varInfo) + sizeof(VarSizeInfo);
585 else if (alignedSize == varInfo->size_) {
586 //varInfo->isUsed_ = 1;
587 int ret = Mutex::CAS((int*)&varInfo->isUsed_, 0, 1);
588 if(ret !=0) {
589 printDebug(DM_Warning,"Unable to get lock to set isUsed flag.");
590 *rv = ErrLockTimeOut;
591 releaseChunkMutex(pslot);
592 return NULL;
594 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
595 releaseChunkMutex(pslot);
596 return ((char *) varInfo) + sizeof(VarSizeInfo);
599 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
600 +varInfo->size_);
602 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
603 page = ((PageInfo*) page)->nextPage_;
605 releaseChunkMutex(pslot);
606 return NULL;
609 void Chunk::freeForVarSizeAllocator(void *ptr, int pslot)
611 /*int ret = getChunkMutex(pslot);
612 if (ret != 0)
614 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
615 return;
617 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
618 //varInfo->isUsed_ = 0;
619 if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) {
620 PageInfo *pageInfo = (PageInfo*)((char*)varInfo - sizeof(PageInfo));
621 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
622 bool found = false;
623 while(!found)
625 if(NULL==pInfo) break;
626 if (pInfo == pageInfo) {found = true; break; }
627 prev = pInfo;
628 pInfo = (PageInfo*)pInfo->nextPage_;
630 if (!found)
632 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
633 return ;
635 if(curPage_== pageInfo) {curPage_ = prev ; }
636 pageInfo->isUsed_ = 0;
637 pageInfo->nextPageAfterMerge_ = NULL;
638 CLEARBIT(pageInfo->flags, IS_DIRTY);
639 SETBIT(pageInfo->flags, HAS_SPACE);
640 prev->nextPage_ = pageInfo->nextPage_;
642 int ret = Mutex::CAS((int*)&varInfo->isUsed_, 1, 0);
643 if(ret !=0) {
644 printError(ErrAlready, "Fatal: Varsize double free for %x", ptr);
646 //TODO
647 //setPageDirty(ptr);
648 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
649 //releaseChunkMutex(pslot);
650 return;
654 void Chunk::freeForLargeAllocator(void *ptr, int pslot)
656 //There will be max only one data element in a page.
657 //PageInfo is stored just before the data.
658 int ret = getChunkMutex(pslot);
659 if (ret != 0)
661 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
662 return;
664 PageInfo *pageInfo = (PageInfo*)(((char*)
665 ptr) - (sizeof(PageInfo) + sizeof(InUse)));
666 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
667 bool found = false;
668 while(!found)
670 if (pInfo == pageInfo) {found = true; break; }
671 prev = pInfo;
672 pInfo = (PageInfo*)pInfo->nextPage_;
674 if (!found)
676 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
677 releaseChunkMutex(pslot);
678 return ;
680 os::memset(((char*)pageInfo+sizeof(PageInfo)), 0 , allocSize_);
681 if(((PageInfo*)firstPage_)->nextPage_ != NULL){
682 pageInfo->nextPageAfterMerge_ = NULL;
683 //pageInfo->isUsed_ = 0;
684 ret = Mutex::CAS((int*)&pageInfo->isUsed_, pageInfo->isUsed_, 0);
685 if (ret != 0) printError(ErrSysFatal, "Unable to set isUsed flag");
686 int oldVal = pageInfo->flags;
687 int newVal = oldVal;
688 SETBIT(newVal, HAS_SPACE);
689 ret = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal);
690 if (ret != 0) printError(ErrSysFatal, "Unable to set flags");
691 if(pageInfo == firstPage_ && ((PageInfo*)firstPage_)->nextPage_ != NULL)
693 //firstPage_ = pageInfo->nextPage_ ;
694 ret = Mutex::CASL((long*)&firstPage_, (long) firstPage_,
695 (long)pageInfo->nextPage_);
696 if (ret != 0) printError(ErrSysFatal, "Unable to set firstPage");
697 SETBIT(((PageInfo*)firstPage_)->flags , IS_DIRTY);
699 else {
700 //prev->nextPage_ = pageInfo->nextPage_;
701 ret = Mutex::CASL((long*)&prev->nextPage_, (long) prev->nextPage_,
702 (long)pageInfo->nextPage_);
703 if (ret != 0) printError(ErrSysFatal, "Unable to set nextPage");
704 SETBIT(prev->flags, IS_DIRTY);
707 SETBIT(pageInfo->flags, IS_DIRTY);
708 releaseChunkMutex(pslot);
709 return;
712 //Frees the memory pointed by ptr
713 void Chunk::free(Database *db, void *ptr)
715 if (0 == allocSize_)
717 freeForVarSizeAllocator(ptr, db->procSlot);
718 return;
720 int noOfDataNodes =(int) os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
722 if (0 == noOfDataNodes)
724 freeForLargeAllocator(ptr, db->procSlot);
725 return;
727 /*int ret = getChunkMutex(db->procSlot);
728 if (ret != 0)
730 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
731 return;
733 //below is the code for freeing in fixed size allocator
735 //unset the used flag
736 //*((int*)ptr -1 ) = 0;
737 if (*((InUse*)ptr -1 ) == 0) {
738 printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d", ptr, chunkID_);
739 //return;
741 #if defined(__sparcv9)
742 int ret = Mutex::CASL(((InUse*)ptr -1), 1, 0);
743 #else
744 int ret = Mutex::CAS(((InUse*)ptr -1), 1, 0);
745 #endif
746 if(ret !=0) {
747 printError(ErrSysFatal, "Unable to get lock to free for %x", ptr);
748 return;
750 PageInfo *pageInfo;
751 pageInfo = getPageInfo(db, ptr);
752 if (NULL == pageInfo)
754 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
755 //releaseChunkMutex(db->procSlot);
756 return;
758 //set the pageinfo where this ptr points
759 int oldVal = pageInfo->flags;
760 int newVal = oldVal;
761 SETBIT(newVal, HAS_SPACE);
762 SETBIT(newVal, IS_DIRTY);
763 ret = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal);
764 if(ret !=0) {
765 printError(ErrSysFatal, "Unable to get lock to set flags");
767 //releaseChunkMutex(db->procSlot);
768 return;
771 //returns the pageInfo of the page where this ptr points
772 //This works only if the data size is less than PAGE_SIZE
773 //If ptr points to data which is more than PAGE_SIZE,then
774 //calling this might lead to memory corruption
775 //Note:IMPORTANT::assumes db lock is taken before calling this
776 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
778 if (allocSize_ < PAGE_SIZE - sizeof(PageInfo)) {
779 int rem = (long) ptr % PAGE_SIZE;
780 return (PageInfo*)(((char*)ptr) - rem);
781 } else {
782 //large size allocator
783 char *inPtr = (char*)ptr;
784 PageInfo* pageInfo = ((PageInfo*)firstPage_);
786 while( pageInfo != NULL )
788 if (inPtr > (char*) pageInfo && pageInfo->nextPageAfterMerge_ >inPtr)
789 return pageInfo;
790 pageInfo = (PageInfo*)pageInfo->nextPage_ ;
793 return NULL;
796 //If called on chunk used to store tuples, it returns the total number of rows
797 //present in the table
798 long Chunk::getTotalDataNodes()
800 long totalNodes =0;
801 if (0 == allocSize_) //->variable size allocator
803 Page *page = ((PageInfo*)firstPage_);
804 while(NULL != page)
806 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
807 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
809 if (1 == varInfo->isUsed_) totalNodes++;
810 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
811 +varInfo->size_);
813 page = ((PageInfo*) page)->nextPage_;
815 return totalNodes;
818 //TODO::for large size allocator
819 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
821 Page *page = ((PageInfo*)firstPage_);
822 while(NULL != page)
824 //current it page wise later this will done
825 if(1==*(int*)(((char*)page)+sizeof(PageInfo)))
826 totalNodes++;
827 page = ((PageInfo*) page)->nextPage_;
829 return totalNodes;
832 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
833 PageInfo* pageInfo = ((PageInfo*)firstPage_);
834 char *data = ((char*)firstPage_) + sizeof(PageInfo);
835 int i=0;
836 while( pageInfo != NULL )
838 data = ((char*)pageInfo) + sizeof(PageInfo);
839 for (i = 0; i< noOfDataNodes; i++)
841 if (*((InUse*)data) == 1) { totalNodes++;}
842 data = data + allocSize_;
844 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
846 return totalNodes;
849 //TODO::for other type of allocators
850 int Chunk::compact(int procSlot)
852 PageInfo* pageInfo = ((PageInfo*)firstPage_);
853 PageInfo* prevPage = pageInfo;
854 if (NULL == pageInfo)
856 return 0;
858 int ret = getChunkMutex(procSlot);
859 if (ret != 0)
861 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
862 return ret;
864 pageInfo = (PageInfo*)pageInfo->nextPage_;
865 if (0 == allocSize_)
867 while( pageInfo != NULL )
869 bool flag = false;
870 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
871 sizeof(PageInfo));
872 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
874 if (1 == varInfo->isUsed_) {flag=true; break;}
875 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
876 +varInfo->size_);
878 if (!flag) {
879 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
880 prevPage->nextPage_ = pageInfo->nextPage_;
881 pageInfo->isUsed_ = 0;
883 prevPage = pageInfo;
884 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
885 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
887 }else if (allocSize_ < PAGE_SIZE)
889 while( pageInfo != NULL )
891 bool flag = false;
892 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
893 char *data = ((char*)pageInfo) + sizeof(PageInfo);
894 for (int i = 0; i< noOfDataNodes ; i++)
896 if (1 == *((InUse*)data)) { flag = true; break; }
897 data = data +allocSize_;
899 if (!flag) {
900 printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo);
901 prevPage->nextPage_ = pageInfo->nextPage_;
902 pageInfo->isUsed_ = 0;
903 pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_) ;
904 }else{
905 prevPage = pageInfo;
906 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
908 printDebug(DM_Alloc,"compact iter %x\n", pageInfo);
911 releaseChunkMutex(procSlot);
912 return 0;
915 int Chunk::totalPages()
917 //logic is same for variable size and for large data node allocator.
918 PageInfo* pageInfo = ((PageInfo*)firstPage_);
919 int totPages=0;
920 while( pageInfo != NULL )
922 totPages++;
923 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
925 return totPages;
927 int Chunk::totalDirtyPages()
929 PageInfo* pageInfo = ((PageInfo*)firstPage_);
930 int dirtyPages=0;
931 while( pageInfo != NULL )
933 if(BITSET(pageInfo->flags, IS_DIRTY)) dirtyPages++;
934 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
936 return dirtyPages;
940 int Chunk::initMutex()
942 return chunkMutex_.init("Chunk");
944 int Chunk::getChunkMutex(int procSlot)
946 return chunkMutex_.getLock(procSlot);
948 int Chunk::releaseChunkMutex(int procSlot)
950 return chunkMutex_.releaseLock(procSlot);
952 int Chunk::destroyMutex()
954 return chunkMutex_.destroy();
956 int Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pSlot, DbRetVal *rv)
958 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
959 //varInfo->isUsed_ = 1;
960 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , 0, 1);
961 if(ret !=0) {
962 printDebug(DM_Warning, "Unable to set I isUsed flag");
963 *rv = ErrLockTimeOut;
964 return 1;
966 //varInfo->size_ = needSize;
967 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ , needSize);
968 if(ret !=0) {
969 printError(ErrSysFatal, "Unable to set I size flag");
970 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
971 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
972 *rv = ErrSysFatal;
973 return 1;
975 VarSizeInfo *varInfo2 = (VarSizeInfo*)((char*)varInfo +
976 sizeof(VarSizeInfo) + varInfo->size_);
977 //varInfo2->isUsed_ = 0;
978 ret = Mutex::CAS((int*)&varInfo2->isUsed_ , varInfo2->isUsed_, 0);
979 if(ret !=0) {
980 printError(ErrSysFatal, "Unable to set II isUsed flag");
981 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
982 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
983 *rv = ErrSysFatal;
984 return 1;
986 //varInfo2->size_ = remSpace;
987 ret = Mutex::CAS((int*)&varInfo2->size_, varInfo2->size_ , remSpace);
988 if(ret !=0) {
989 printError(ErrSysFatal, "Unable to set II size flag");
990 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
991 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
992 *rv = ErrSysFatal;
993 return 1;
995 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
996 return 0;
1000 int Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize, int pslot)
1002 //already db alloc mutex is taken
1003 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
1004 //varInfo->isUsed_ = 0;
1005 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1006 if(ret !=0) {
1007 printError(ErrSysFatal, "Fatal:Unable to get lock to set isUsed flag");
1009 //varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
1010 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ ,
1011 PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo));
1012 if(ret !=0) {
1013 printError(ErrSysFatal, "Unable to get lock to set size");
1015 DbRetVal rv =OK;
1016 return splitDataBucket(varInfo, needSize, pslot, &rv);
1018 void Chunk::setChunkNameForSystemDB(int id)
1020 strcpy(chunkName,ChunkName[id]);
1023 void Chunk::print()
1025 printf(" <Chunk Id> %d </Chunk Id> \n",chunkID_);
1026 printf(" <TotalPages> %d </TotalPages> \n",totalPages());
1027 if (Conf::config.useDurability())
1028 printf(" <DirtyPages> %d </DirtyPages> \n",totalDirtyPages());
1029 printf(" <ChunkName > %s </ChunkName> \n",getChunkName());
1030 printf(" <TotalDataNodes> %d </TotalDataNodes> \n",getTotalDataNodes());
1031 printf(" <SizeOfDataNodes> %d </SizeOfDataNodes> \n",getSize());
1032 printf(" <Allocation Type> ");
1033 if(allocType_==0)
1035 printf("FixedSizeAllocator ");
1036 }else if(allocType_==1)
1038 printf("VariableSizeAllocator ");
1039 }else
1041 printf("UnknownAllocator ");
1043 printf("</Allocation Type>\n");