allocator fixes
[csql.git] / src / storage / Chunk.cxx
bloba6e7588029e33c9c145f196d69b9e087061f7f56
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
23 // sets the size of the Chunk allocator for fixed size
24 // allocator
25 // we need one integer to store book keeping information
26 // whether the storage allocation unit is used of not
27 // when it is deleted this flag is only set to unused
28 void Chunk::setSize(size_t size)
31 size_t needSize = size + sizeof(InUse);
32 size_t multiple = (size_t) os::floor(needSize / sizeof(size_t));
33 size_t rem = needSize % sizeof(size_t);
34 if (0 == rem)
35 allocSize_ = needSize;
36 else
37 allocSize_ = (multiple + 1) * sizeof(size_t);
40 void* Chunk::allocateForLargeDataSize(Database *db)
42 PageInfo* pageInfo = ((PageInfo*)curPage_);
43 DbRetVal ret = db->getAllocDatabaseMutex();
44 if (ret != 0)
46 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
47 return NULL;
50 //check whether we have space in curPage
51 if (BITSET(pageInfo->flags, HAS_SPACE))
53 char *data = ((char*)curPage_) + sizeof(PageInfo);
54 int oldVal = pageInfo->flags;
55 int newVal = oldVal;
56 CLEARBIT(newVal, HAS_SPACE);
57 int retVal = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
58 if (retVal !=0) printError(ErrSysFatal, "Unable to set flags");
59 *((InUse*)data) = 1;
60 //Mutex::CAS((InUse*)data , 0, 1);
61 db->releaseAllocDatabaseMutex();
62 return data + sizeof(InUse);
65 //no space in curpage , get new page from database
66 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
67 if (NULL == pageInfo)
69 db->releaseAllocDatabaseMutex();
70 printError(ErrNoMemory,"No more free pages in the database");
71 return NULL;
73 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
74 chunkID_, pageInfo);
75 int multiple = (int) os::floor(allocSize_ / PAGE_SIZE);
76 int offset = ((multiple + 1) * PAGE_SIZE);
78 pageInfo->setPageAsUsed(offset);
79 setPageDirty(pageInfo);
82 //create the link
83 //((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
84 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
85 (long)(((PageInfo*)curPage_)->nextPage_), (long)pageInfo);
86 if(retVal !=0) {
87 printError(ErrLockTimeOut, "Fatal: Unable to create page link.");
90 //Make this as current page
91 //curPage_ = (Page*) pageInfo;
92 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)pageInfo);
93 if(retVal !=0) {
94 printError(ErrLockTimeOut, "Fatal:Unable to set current page");
97 char* data = ((char*)curPage_) + sizeof(PageInfo);
98 int oldVal = pageInfo->flags;
99 int newVal = oldVal;
100 CLEARBIT(newVal, HAS_SPACE);
101 retVal = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
102 if(retVal !=0) {
103 printError(ErrLockTimeOut, "Fatal:Unable to set flags");
105 *((InUse*)data) = 1;
106 //Mutex::CASL((InUse*)data , 0, 1);
107 db->releaseAllocDatabaseMutex();
108 return data + sizeof(InUse);
112 void* Chunk::allocateFromFirstPage(Database *db, int noOfDataNodes, DbRetVal *status)
114 PageInfo *pageIter = ((PageInfo*)firstPage_);
115 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
116 chunkID_);
117 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
118 firstPage_);
119 char *data = NULL;
120 int i = 0;
121 //scan from first page to locate a free node available
122 while(NULL != pageIter)
124 data = ((char*)pageIter) + sizeof(PageInfo);
125 if (BITSET(pageIter->flags, HAS_SPACE))
127 for (i = 0; i< noOfDataNodes ; i++)
129 if (1 == *((InUse*)data))
130 data = data + allocSize_;
131 else break;
133 if (i != noOfDataNodes) break;
135 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
136 chunkID_, pageIter);
137 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
139 if (NULL == pageIter) {
140 *status = ErrNoMemory;
141 return NULL;
143 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
144 chunkID_, pageIter);
145 //*((InUse*)data) = 1;
146 #if defined(__sparcv9)
147 int ret = Mutex::CASL((InUse*)data , 0, 1);
148 #else
149 int ret = Mutex::CAS((InUse*)data , 0, 1);
150 #endif
151 if(ret !=0) {
152 *status = ErrLockTimeOut;
153 //printError(ErrLockTimeOut, "Unable to allocate from first page. Retry...");
154 return NULL;
156 setPageDirty(pageIter);
157 return data + sizeof(InUse);
160 void* Chunk::allocateFromNewPage(Database *db, DbRetVal *status)
162 DbRetVal ret = db->getAllocDatabaseMutex();
163 if (ret != 0)
165 printDebug(DM_Warning,"Unable to acquire alloc database Mutex Chunkid:%d", chunkID_);
166 *status = ErrLockTimeOut;
167 return NULL;
169 //get a new page from db
170 Page *page = db->getFreePage();
171 if (page == NULL) {
172 printError(ErrNoMemory, "Unable to allocate page");
173 db->releaseAllocDatabaseMutex();
174 *status = ErrNoMemory;
175 return NULL;
177 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
178 chunkID_, page);
179 //Initialize pageInfo for this new page
180 PageInfo *pInfo = (PageInfo*)page;
181 pInfo->setPageAsUsed(0);
182 setPageDirty(pInfo);
184 char* data = ((char*)page) + sizeof(PageInfo);
185 *((InUse*)data) = 1;
186 //Mutex::CAS((int*)data , 0, 1);
188 //create the link between old page and the newly created page
189 PageInfo* pageInfo = ((PageInfo*)curPage_);
191 long oldPage = (long)pageInfo->nextPage_;
192 //pageInfo->nextPage_ = page;
193 int retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)page);
194 if(retVal !=0) {
195 *((InUse*)data) = 0;
196 pInfo->setPageAsFree();
197 printDebug(DM_Warning, "Unable to get lock to set chunk list.");
198 *status = ErrLockTimeOut;
199 db->releaseAllocDatabaseMutex();
200 return NULL;
203 //make this new page as the current page
204 //curPage_ = page;
205 retVal = Mutex::CASL((long*)&curPage_ , (long)curPage_, (long)page);
206 if(retVal !=0) {
207 *((InUse*)data) = 0;
208 pInfo->setPageAsFree();
209 retVal = Mutex::CASL((long*)&pageInfo->nextPage_ , (long)pageInfo->nextPage_, (long)oldPage);
210 if (retVal !=0) printError(ErrSysFatal, "Fatal: Unable to reset the nextPage");
211 printDebug(DM_Warning, "Unable to get lock to set curPage");
212 *status = ErrLockTimeOut;
213 db->releaseAllocDatabaseMutex();
214 return NULL;
217 db->releaseAllocDatabaseMutex();
218 return data + sizeof(InUse);
221 //Allocates memory to store data
222 //TODO::check whether it is locked before allocating.
223 //delete tuple will set the usedflag to true, but locks will be held
224 //till commit and it shall be rolledback.So make sure that it does not
225 //allocate deleted tuple which is yet to be commited.
227 void* Chunk::allocate(Database *db, DbRetVal *status)
229 PageInfo* pageInfo = ((PageInfo*)curPage_);
231 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
232 char *data = ((char*)curPage_) + sizeof(PageInfo);
233 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
234 chunkID_, curPage_, noOfDataNodes);
236 //1.scan through data list and find if any is free to use in current page
237 //2.If there is none then
238 // a) get new free page from db. set the prev->next to point
239 // to this new page
240 //4. b) initialize the free page to zero and get first data ptr.
241 //5.If there is one, return that
243 //For allocation more than PAGE_SIZE
244 if (0 == noOfDataNodes)
246 data = (char*) allocateForLargeDataSize(db);
247 return data;
250 //Chunk Mutex is not required as atomic instructions are instead used.
251 //This improves the concurrency
252 /*int ret = getChunkMutex(db->procSlot);
253 if (ret != 0)
255 if (status != NULL) *status = ErrLockTimeOut;
256 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
257 return NULL;
259 int i = noOfDataNodes;
260 if (BITSET(pageInfo->flags, HAS_SPACE))
262 for (i = 1; i< noOfDataNodes; i++)
264 if (*((InUse*)data) == 1) data = data + allocSize_;
265 else break;
269 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
270 chunkID_, i);
271 //It comes here if the pageInfo hasFreeSpace
272 //or there are no free data space in this page
273 if (i == noOfDataNodes && *((InUse*)data) == 1)
276 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
277 //there are no free data space in this page
278 int oldVal = pageInfo->flags;
279 int newVal = oldVal;
280 CLEARBIT(newVal, HAS_SPACE);
281 int ret = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
282 if(ret !=0) {
283 *status = ErrLockTimeOut;
284 printDebug(DM_Warning, "Unable to set hasFreespace");
285 //releaseChunkMutex(db->procSlot);
286 return NULL;
288 *status = OK;
289 data = (char*) allocateFromFirstPage(db, noOfDataNodes, status);
290 //releaseChunkMutex(db->procSlot);
291 if (NULL == data && *status != ErrLockTimeOut)
293 *status = OK;
294 data = (char*) allocateFromNewPage(db, status);
295 if (data == NULL && *status != ErrLockTimeOut)
297 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
298 *status = ErrNoMemory;
301 if (*status == OK) setPageDirty(db, data);
302 return data;
304 //*((InUse*)data) = 1;
305 #if defined(__sparcv9)
306 int retVal = Mutex::CASL((InUse*)data , 0, 1);
307 #else
308 int retVal = Mutex::CAS((InUse*)data , 0, 1);
309 #endif
310 if(retVal !=0) {
311 *status = ErrLockTimeOut;
312 //releaseChunkMutex(db->procSlot);
313 printDebug(DM_Warning, "Unable to set isUsed : retry...");
314 return NULL;
316 setPageDirty(db, data);
317 //releaseChunkMutex(db->procSlot);
318 return data + sizeof(InUse);
321 void Chunk::setPageDirty(Database *db, void *ptr)
323 if (chunkID_ < LastCatalogID) return;
324 PageInfo *pageInfo = getPageInfo(db, ptr);
325 if (NULL == pageInfo)
327 printError(ErrSysFatal,"Fatal: pageInfo is NULL %x", ptr );
328 return;
330 SETBIT(pageInfo->flags, IS_DIRTY);
331 return;
333 void Chunk::setPageDirty(PageInfo *pInfo)
335 if (chunkID_ < LastCatalogID) return;
336 SETBIT(pInfo->flags, IS_DIRTY);
339 void* Chunk::allocateForLargeDataSize(Database *db, size_t size)
341 //no need to take chunk mutexes for this, as we are taking alloc database mutex
342 int multiple = (int) os::floor(size / PAGE_SIZE);
343 int offset = ((multiple + 1) * PAGE_SIZE);
344 PageInfo* pageInfo = ((PageInfo*)curPage_);
345 if(0==allocSize_)
346 pageInfo = (PageInfo*)db->getFreePage(size);
347 else
348 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
349 if (NULL == pageInfo)
351 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
352 return NULL;
354 printDebug(DM_VarAlloc,"Chunk::alnextPageAfterMerge_locate Large Data Item id:%d Size:%d curPage:%x ",
355 chunkID_, size, curPage_);
356 //TODO:: logic pending
357 if(allocSize_!=0){
358 //large size allocate for FixedSize data
359 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
360 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
361 curPage_ = (Page*) pageInfo;
362 char* data = ((char*)curPage_) + sizeof(PageInfo);
363 //*((InUse*)data) = 1;
364 #if defined(__sparcv9)
365 int ret = Mutex::CASL((InUse*)data , 0, 1);
366 #else
367 int ret = Mutex::CAS((InUse*)data , 0, 1);
368 #endif
369 if(ret !=0) {
370 printError(ErrLockTimeOut, "Lock Timeout: retry...");
371 return NULL;
373 pageInfo->isUsed_=1;
374 int oldVal = pageInfo->flags;
375 int newVal = oldVal;
376 CLEARBIT(newVal, HAS_SPACE);
377 setPageDirty(pageInfo);
378 ret = Mutex::CAS(&pageInfo->flags, oldVal, newVal);
379 if (ret !=0) printError(ErrSysFatal, "Unable to set flags");
380 return data + sizeof(InUse);
381 }else{
382 //large size allocate for varSize data
383 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + sizeof(PageInfo));
384 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
385 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
386 curPage_ = (Page*) pageInfo;
387 varInfo->size_= size;
388 int ret = Mutex::CAS(&varInfo->isUsed_ , varInfo->isUsed_, 1);
389 if(ret !=0) {
390 printError(ErrLockTimeOut, "Unable to get lock for var alloc. Retry...");
391 return NULL;
393 pageInfo->isUsed_=1;
394 setPageDirty(pageInfo);
395 return (char *) varInfo + sizeof(VarSizeInfo);
397 //REDESIGN MAY BE REQUIRED:Lets us live with this for now.
398 //what happens to the space lets say 10000 bytes is allocated
399 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
400 //in this case.So need to take of this.
401 //Will be coded at later stage as this is developed to support
402 //undo logging and currently we shall assume that the logs generated
403 //wont be greater than PAGE_SIZE.
404 return NULL;
409 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, DbRetVal *rv)
411 //Should be called only for data items <PAGE_SIZE
412 void *vnode = varSizeFirstFitAllocate(size, pslot, rv);
413 if (vnode != NULL)
415 return vnode;
417 printDebug(DM_Warning, "No free space in any of the pages already being used");
418 *rv =OK;
419 DbRetVal ret = db->getAllocDatabaseMutex();
420 if (ret != 0)
422 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
423 return NULL;
425 Page *newPage = db->getFreePage();
426 if (NULL == newPage)
428 db->releaseAllocDatabaseMutex();
429 return NULL;
432 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
433 PageInfo *pInfo = (PageInfo*) newPage;
434 pInfo->setPageAsUsed(0);
435 setPageDirty(pInfo);
436 SETBIT(pInfo->flags, HAS_SPACE);
437 if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot))
439 printError(ErrSysFatal, "Split failed in new page...Should never happen");
440 *rv = ErrSysFatal;
441 db->releaseAllocDatabaseMutex();
442 return NULL;
444 long oldPage = (long)((PageInfo*)curPage_)->nextPage_;
445 //((PageInfo*)curPage_)->nextPage_ = newPage;
446 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
447 (long)(((PageInfo*)curPage_)->nextPage_), (long)newPage);
448 if(retVal !=0) {
449 printError(ErrSysFatal, "Unable to get lock to set chunk next page");
450 pInfo->setPageAsFree();
451 db->releaseAllocDatabaseMutex();
452 *rv = ErrSysFatal;
453 return NULL;
455 //curPage_ = newPage;
456 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)newPage);
457 if(retVal !=0) {
458 printError(ErrSysFatal, "Unable to get lock to set curPage");
459 retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
460 (long)(((PageInfo*)curPage_)->nextPage_), (long)oldPage);
461 if (retVal !=0 ) printError(ErrSysFatal, "Unable to reset curPage");
462 pInfo->setPageAsFree();
463 db->releaseAllocDatabaseMutex();
464 *rv = ErrSysFatal;
465 return NULL;
467 db->releaseAllocDatabaseMutex();
468 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
469 return data;
472 //Allocates from the current page of the chunk.
473 //Scans through the VarSizeInfo objects in the page and gets the free slot
474 void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv)
476 //Should be called only for data items <PAGE_SIZE
477 Page *page = ((PageInfo*)curPage_);
478 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
479 chunkID_, size, curPage_);
480 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
481 sizeof(PageInfo));
482 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
483 int pageSize = PAGE_SIZE;
484 bool hasSpace= false;
485 while ((char*) varInfo < ((char*)page + pageSize))
487 if (0 == varInfo->isUsed_)
489 hasSpace= true;
490 if( size + sizeof(VarSizeInfo) < varInfo->size_)
492 if (1 == splitDataBucket(varInfo, size, pslot, rv))
494 printDebug(DM_Warning, "Unable to split the data bucket");
495 releaseChunkMutex(pslot);
496 return NULL;
498 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
499 chunkID_, size, varInfo);
500 releaseChunkMutex(pslot);
501 return (char*)varInfo + sizeof(VarSizeInfo);
503 else if (size == varInfo->size_) {
504 //varInfo->isUsed_ = 1;
505 int ret = Mutex::CAS(&varInfo->isUsed_ , 0, 1);
506 if(ret !=0) {
507 printDebug(DM_Warning, "Unable to get lock for var alloc size:%d ", size);
508 *rv = ErrLockTimeOut;
509 releaseChunkMutex(pslot);
510 return NULL;
512 releaseChunkMutex(pslot);
513 return (char *) varInfo + sizeof(VarSizeInfo);
517 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
518 +varInfo->size_);
520 if (!hasSpace) CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
521 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
522 CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
523 releaseChunkMutex(pslot);
524 return NULL;
527 //Allocates memory to store data of variable size
528 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
530 if (0 == size) return NULL;
531 //check if the size is more than PAGE_SIZE
532 //if it is more than the PAGE_SIZE, then allocate new
533 //page using database and then link the curPage to the
534 //newly allocated page
535 //if it is less than PAGE_SIZE, then check the curpage for
536 //free memory of specified size
537 //if not available, then scan from the firstPage for the free
538 //space
540 //TODO::During the scan, merge nearby nodes if both are free
541 //if not available then allocate new page
543 size_t alignedSize = os::alignLong(size);
544 void *data = NULL;
545 /*int ret = getChunkMutex(db->procSlot);
546 if (ret != 0)
548 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
549 *status = ErrLockTimeOut;
550 return NULL;
552 if (alignedSize > PAGE_SIZE )
554 data = allocateForLargeDataSize(db, alignedSize);
556 else
558 data = allocateFromCurPageForVarSize(alignedSize, db->procSlot, status);
559 if (NULL == data) {
560 *status = OK;
561 //No available spaces in the current page.
562 //allocate new page
563 data= allocFromNewPageForVarSize(db, alignedSize, db->procSlot, status);
564 if (NULL == data && *status !=ErrLockTimeOut) {
565 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
566 *status = ErrNoMemory;
570 //releaseChunkMutex(db->procSlot);
571 return data;
574 //Assumes chunk mutex is already taken, before calling this
575 void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv)
577 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
578 size, firstPage_);
580 Page *page = ((PageInfo*)firstPage_);
581 size_t alignedSize = os::alignLong(size);
582 if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
583 int pageSize = PAGE_SIZE;
584 bool hasSpace=false;
585 while(NULL != page)
587 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
588 hasSpace=false;
589 if (BITSET(((PageInfo*)page)->flags, HAS_SPACE)){
590 while ((char*) varInfo < ((char*)page + pageSize))
592 if (0 == varInfo->isUsed_)
594 hasSpace=true;
595 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
597 if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv))
599 printDebug(DM_Warning, "Unable to split the data bucket");
600 releaseChunkMutex(pslot);
601 return NULL;
603 releaseChunkMutex(pslot);
604 return ((char*)varInfo) + sizeof(VarSizeInfo);
606 else if (alignedSize == varInfo->size_) {
607 //varInfo->isUsed_ = 1;
608 int ret = Mutex::CAS((int*)&varInfo->isUsed_, 0, 1);
609 if(ret !=0) {
610 printDebug(DM_Warning,"Unable to get lock to set isUsed flag.");
611 *rv = ErrLockTimeOut;
612 releaseChunkMutex(pslot);
613 return NULL;
615 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
616 releaseChunkMutex(pslot);
617 return ((char *) varInfo) + sizeof(VarSizeInfo);
620 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
621 +varInfo->size_);
623 if (!hasSpace) CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
624 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
625 CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
627 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
628 page = ((PageInfo*) page)->nextPage_;
630 releaseChunkMutex(pslot);
631 return NULL;
634 void Chunk::freeForVarSizeAllocator(Database *db, void *ptr, int pslot)
636 int ret = getChunkMutex(pslot);
637 if (ret != 0)
639 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
640 return;
642 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
643 //varInfo->isUsed_ = 0;
644 if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) {
645 PageInfo *pageInfo = (PageInfo*)((char*)varInfo - sizeof(PageInfo));
646 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
647 bool found = false;
648 while(!found)
650 if(NULL==pInfo) break;
651 if (pInfo == pageInfo) {found = true; break; }
652 prev = pInfo;
653 pInfo = (PageInfo*)pInfo->nextPage_;
655 if (!found)
657 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
658 releaseChunkMutex(pslot);
659 return ;
661 if(curPage_== pageInfo) {curPage_ = prev ; }
662 pageInfo->isUsed_ = 0;
663 pageInfo->nextPageAfterMerge_ = NULL;
664 setPageDirty(pageInfo);
665 SETBIT(pageInfo->flags, HAS_SPACE);
666 prev->nextPage_ = pageInfo->nextPage_;
668 int retVal = Mutex::CAS((int*)&varInfo->isUsed_, 1, 0);
669 if(retVal !=0) {
670 printError(ErrAlready, "Fatal: Varsize double free for %x", ptr);
672 PageInfo *pageInfo = getPageInfo(db, ptr);
673 if (NULL == pageInfo)
675 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
676 releaseChunkMutex(db->procSlot);
677 return;
679 SETBIT(pageInfo->flags, HAS_SPACE);
680 SETBIT(pageInfo->flags, IS_DIRTY);
681 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
682 releaseChunkMutex(pslot);
683 return;
687 void Chunk::freeForLargeAllocator(void *ptr, int pslot)
689 //There will be max only one data element in a page.
690 //PageInfo is stored just before the data.
691 int ret = getChunkMutex(pslot);
692 if (ret != 0)
694 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
695 return;
697 PageInfo *pageInfo = (PageInfo*)(((char*)
698 ptr) - (sizeof(PageInfo) + sizeof(InUse)));
699 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
700 bool found = false;
701 while(!found)
703 if (pInfo == pageInfo) {found = true; break; }
704 prev = pInfo;
705 pInfo = (PageInfo*)pInfo->nextPage_;
707 if (!found)
709 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
710 releaseChunkMutex(pslot);
711 return ;
713 os::memset(((char*)pageInfo+sizeof(PageInfo)), 0 , allocSize_);
714 if(((PageInfo*)firstPage_)->nextPage_ != NULL){
715 pageInfo->nextPageAfterMerge_ = NULL;
716 //pageInfo->isUsed_ = 0;
717 ret = Mutex::CAS((int*)&pageInfo->isUsed_, pageInfo->isUsed_, 0);
718 if (ret != 0) printError(ErrSysFatal, "Unable to set isUsed flag");
719 int oldVal = pageInfo->flags;
720 int newVal = oldVal;
721 SETBIT(newVal, HAS_SPACE);
722 ret = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal);
723 if (ret != 0) printError(ErrSysFatal, "Unable to set flags");
724 if(pageInfo == firstPage_ && ((PageInfo*)firstPage_)->nextPage_ != NULL)
726 //firstPage_ = pageInfo->nextPage_ ;
727 ret = Mutex::CASL((long*)&firstPage_, (long) firstPage_,
728 (long)pageInfo->nextPage_);
729 if (ret != 0) printError(ErrSysFatal, "Unable to set firstPage");
730 setPageDirty(((PageInfo*)firstPage_));
732 else {
733 //prev->nextPage_ = pageInfo->nextPage_;
734 ret = Mutex::CASL((long*)&prev->nextPage_, (long) prev->nextPage_,
735 (long)pageInfo->nextPage_);
736 if (ret != 0) printError(ErrSysFatal, "Unable to set nextPage");
737 setPageDirty(prev);
740 setPageDirty(pageInfo);
741 releaseChunkMutex(pslot);
742 return;
745 //Frees the memory pointed by ptr
746 void Chunk::free(Database *db, void *ptr)
748 if (0 == allocSize_)
750 freeForVarSizeAllocator(db, ptr, db->procSlot);
751 return;
753 int noOfDataNodes =(int) os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
755 if (0 == noOfDataNodes)
757 freeForLargeAllocator(ptr, db->procSlot);
758 return;
760 /*int ret = getChunkMutex(db->procSlot);
761 if (ret != 0)
763 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
764 return;
766 //unset the used flag
767 //*((int*)ptr -1 ) = 0;
768 int oldValue=1;
769 if (*((InUse*)ptr -1 ) == 0) {
770 printError(ErrSysFatal, "Fatal:Data node already freed %x Chunk:%d value:%d", ptr, chunkID_,
771 *((InUse*)ptr -1 ));
772 oldValue=0;
773 //return;
775 #if defined(__sparcv9)
776 int retVal = Mutex::CASL(((InUse*)ptr -1), oldValue, 0);
777 #else
778 int retVal = Mutex::CAS(((InUse*)ptr -1), oldValue, 0);
779 #endif
780 if(retVal !=0) {
781 printError(ErrSysFatal, "Unable to get lock to free for %x", ptr);
782 //releaseChunkMutex(db->procSlot);
783 return;
785 PageInfo *pageInfo;
786 pageInfo = getPageInfo(db, ptr);
787 if (NULL == pageInfo)
789 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
790 //releaseChunkMutex(db->procSlot);
791 return;
793 //set the pageinfo where this ptr points
794 int oldVal = pageInfo->flags;
795 int newVal = oldVal;
796 SETBIT(newVal, HAS_SPACE);
797 retVal = Mutex::CAS((int*)&pageInfo->flags, oldVal, newVal);
798 if(retVal !=0) {
799 printError(ErrSysFatal, "Unable to get lock to set flags");
801 setPageDirty(pageInfo);
802 //releaseChunkMutex(db->procSlot);
803 return;
806 //returns the pageInfo of the page where this ptr points
807 //This works only if the data size is less than PAGE_SIZE
808 //If ptr points to data which is more than PAGE_SIZE,then
809 //calling this might lead to memory corruption
810 //Note:IMPORTANT::assumes db lock is taken before calling this
811 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
813 if (allocSize_ < PAGE_SIZE - sizeof(PageInfo)) {
814 int rem = (long) ptr % PAGE_SIZE;
815 return (PageInfo*)(((char*)ptr) - rem);
816 } else {
817 //large size allocator
818 char *inPtr = (char*)ptr;
819 PageInfo* pageInfo = ((PageInfo*)firstPage_);
821 while( pageInfo != NULL )
823 if (inPtr > (char*) pageInfo && pageInfo->nextPageAfterMerge_ >inPtr)
824 return pageInfo;
825 pageInfo = (PageInfo*)pageInfo->nextPage_ ;
828 return NULL;
831 //If called on chunk used to store tuples, it returns the total number of rows
832 //present in the table
833 long Chunk::getTotalDataNodes()
835 long totalNodes =0;
836 if (0 == allocSize_) //->variable size allocator
838 Page *page = ((PageInfo*)firstPage_);
839 while(NULL != page)
841 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
842 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
844 if (1 == varInfo->isUsed_) totalNodes++;
845 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
846 +varInfo->size_);
848 page = ((PageInfo*) page)->nextPage_;
850 return totalNodes;
853 //TODO::for large size allocator
854 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
856 Page *page = ((PageInfo*)firstPage_);
857 while(NULL != page)
859 //current it page wise later this will done
860 if(1==*(int*)(((char*)page)+sizeof(PageInfo)))
861 totalNodes++;
862 page = ((PageInfo*) page)->nextPage_;
864 return totalNodes;
867 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
868 PageInfo* pageInfo = ((PageInfo*)firstPage_);
869 char *data = ((char*)firstPage_) + sizeof(PageInfo);
870 int i=0;
871 while( pageInfo != NULL )
873 data = ((char*)pageInfo) + sizeof(PageInfo);
874 for (i = 0; i< noOfDataNodes; i++)
876 if (*((InUse*)data) == 1) { totalNodes++;}
877 data = data + allocSize_;
879 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
881 return totalNodes;
884 //TODO::for other type of allocators
885 int Chunk::compact(int procSlot)
887 PageInfo* pageInfo = ((PageInfo*)firstPage_);
888 PageInfo* prevPage = pageInfo;
889 if (NULL == pageInfo)
891 return 0;
893 int ret = getChunkMutex(procSlot);
894 if (ret != 0)
896 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
897 return ret;
899 pageInfo = (PageInfo*)pageInfo->nextPage_;
900 if (0 == allocSize_)
902 while( pageInfo != NULL )
904 bool flag = false;
905 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
906 sizeof(PageInfo));
907 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
909 if (1 == varInfo->isUsed_) {flag=true; break;}
910 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
911 +varInfo->size_);
913 if (!flag) {
914 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
915 prevPage->nextPage_ = pageInfo->nextPage_;
916 pageInfo->isUsed_ = 0;
918 prevPage = pageInfo;
919 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
920 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
922 }else if (allocSize_ < PAGE_SIZE)
924 while( pageInfo != NULL )
926 bool flag = false;
927 int noOfDataNodes=(int) os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
928 char *data = ((char*)pageInfo) + sizeof(PageInfo);
929 for (int i = 0; i< noOfDataNodes ; i++)
931 if (1 == *((InUse*)data)) { flag = true; break; }
932 data = data +allocSize_;
934 if (!flag) {
935 printDebug(DM_Alloc,"Freeing unused page in fixed allocator %x\n", pageInfo);
936 prevPage->nextPage_ = pageInfo->nextPage_;
937 pageInfo->isUsed_ = 0;
938 pageInfo = (PageInfo*)(((PageInfo*)prevPage)->nextPage_) ;
939 }else{
940 prevPage = pageInfo;
941 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
943 printDebug(DM_Alloc,"compact iter %x\n", pageInfo);
946 releaseChunkMutex(procSlot);
947 return 0;
950 int Chunk::totalPages()
952 //logic is same for variable size and for large data node allocator.
953 PageInfo* pageInfo = ((PageInfo*)firstPage_);
954 int totPages=0;
955 while( pageInfo != NULL )
957 totPages++;
958 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
960 return totPages;
962 int Chunk::totalDirtyPages()
964 PageInfo* pageInfo = ((PageInfo*)firstPage_);
965 int dirtyPages=0;
966 while( pageInfo != NULL )
968 if(BITSET(pageInfo->flags, IS_DIRTY)) dirtyPages++;
969 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
971 return dirtyPages;
975 int Chunk::initMutex()
977 return chunkMutex_.init("Chunk");
979 int Chunk::getChunkMutex(int procSlot)
981 return chunkMutex_.getLock(procSlot);
983 int Chunk::releaseChunkMutex(int procSlot)
985 return chunkMutex_.releaseLock(procSlot);
987 int Chunk::destroyMutex()
989 return chunkMutex_.destroy();
991 int Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pSlot, DbRetVal *rv)
993 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
994 //varInfo->isUsed_ = 1;
995 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , 0, 1);
996 if(ret !=0) {
997 printDebug(DM_Warning, "Unable to set I isUsed flag");
998 *rv = ErrLockTimeOut;
999 return 1;
1001 //varInfo->size_ = needSize;
1002 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ , needSize);
1003 if(ret !=0) {
1004 printError(ErrSysFatal, "Unable to set I size flag");
1005 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1006 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1007 *rv = ErrSysFatal;
1008 return 1;
1010 VarSizeInfo *varInfo2 = (VarSizeInfo*)((char*)varInfo +
1011 sizeof(VarSizeInfo) + varInfo->size_);
1012 //varInfo2->isUsed_ = 0;
1013 ret = Mutex::CAS((int*)&varInfo2->isUsed_ , varInfo2->isUsed_, 0);
1014 if(ret !=0) {
1015 printError(ErrSysFatal, "Unable to set II isUsed flag");
1016 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1017 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1018 *rv = ErrSysFatal;
1019 return 1;
1021 //varInfo2->size_ = remSpace;
1022 ret = Mutex::CAS((int*)&varInfo2->size_, varInfo2->size_ , remSpace);
1023 if(ret !=0) {
1024 printError(ErrSysFatal, "Unable to set II size flag");
1025 ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1026 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
1027 *rv = ErrSysFatal;
1028 return 1;
1030 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
1031 return 0;
1035 int Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize, int pslot)
1037 //already db alloc mutex is taken
1038 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
1039 //varInfo->isUsed_ = 0;
1040 int ret = Mutex::CAS((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
1041 if(ret !=0) {
1042 printError(ErrSysFatal, "Fatal:Unable to get lock to set isUsed flag");
1044 //varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
1045 ret = Mutex::CAS((int*)&varInfo->size_, varInfo->size_ ,
1046 PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo));
1047 if(ret !=0) {
1048 printError(ErrSysFatal, "Unable to get lock to set size");
1050 DbRetVal rv =OK;
1051 return splitDataBucket(varInfo, needSize, pslot, &rv);
1053 void Chunk::setChunkNameForSystemDB(int id)
1055 strcpy(chunkName,ChunkName[id]);
1058 void Chunk::print()
1060 printf(" <Chunk Id> %d </Chunk Id> \n",chunkID_);
1061 printf(" <TotalPages> %d </TotalPages> \n",totalPages());
1062 if (Conf::config.useDurability())
1063 printf(" <DirtyPages> %d </DirtyPages> \n",totalDirtyPages());
1064 printf(" <ChunkName > %s </ChunkName> \n",getChunkName());
1065 printf(" <TotalDataNodes> %d </TotalDataNodes> \n",getTotalDataNodes());
1066 printf(" <SizeOfDataNodes> %d </SizeOfDataNodes> \n",getSize());
1067 printf(" <Allocation Type> ");
1068 if(allocType_==0)
1070 printf("FixedSizeAllocator ");
1071 }else if(allocType_==1)
1073 printf("VariableSizeAllocator ");
1074 }else
1076 printf("UnknownAllocator ");
1078 printf("</Allocation Type>\n");