code reorg
[csql.git] / src / storage / VarChunk.cxx
bloba55381496c2bf65f3751296336b1a66e2fbbb840
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
21 #include<CatalogTables.h>
24 //Will check from first page to current page and do first fit allocate
25 //if no space available, allocates new page
26 void* Chunk::allocFromNewPageForVarSize(Database *db, size_t size, int pslot, DbRetVal *rv)
28 //Should be called only for data items <PAGE_SIZE
29 void *vnode = varSizeFirstFitAllocate(size, pslot, rv);
30 if (vnode != NULL)
32 return vnode;
34 printDebug(DM_Warning, "No free space in any of the pages already being used");
35 *rv =OK;
36 DbRetVal ret = db->getAllocDatabaseMutex();
37 if (ret != 0)
39 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
40 return NULL;
42 Page *newPage = db->getFreePage();
43 if (NULL == newPage)
45 db->releaseAllocDatabaseMutex();
46 return NULL;
49 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
50 PageInfo *pInfo = (PageInfo*) newPage;
51 pInfo->setPageAsUsed(0);
52 setPageDirty(pInfo);
53 SETBIT(pInfo->flags, HAS_SPACE);
54 if (1 == createDataBucket(newPage, PAGE_SIZE, size, pslot))
56 printError(ErrSysFatal, "Split failed in new page...Should never happen");
57 *rv = ErrSysFatal;
58 db->releaseAllocDatabaseMutex();
59 return NULL;
61 long oldPage = (long)((PageInfo*)curPage_)->nextPage_;
62 //((PageInfo*)curPage_)->nextPage_ = newPage;
63 int retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
64 (long)(((PageInfo*)curPage_)->nextPage_), (long)newPage);
65 if(retVal !=0) {
66 printError(ErrSysFatal, "Unable to get lock to set chunk next page");
67 pInfo->setPageAsFree();
68 db->releaseAllocDatabaseMutex();
69 *rv = ErrSysFatal;
70 return NULL;
72 //curPage_ = newPage;
73 retVal = Mutex::CASL((long*) &curPage_, (long)curPage_, (long)newPage);
74 if(retVal !=0) {
75 printError(ErrSysFatal, "Unable to get lock to set curPage");
76 retVal = Mutex::CASL((long*) &(((PageInfo*)curPage_)->nextPage_),
77 (long)(((PageInfo*)curPage_)->nextPage_), (long)oldPage);
78 if (retVal !=0 ) printError(ErrSysFatal, "Unable to reset curPage");
79 pInfo->setPageAsFree();
80 db->releaseAllocDatabaseMutex();
81 *rv = ErrSysFatal;
82 return NULL;
84 db->releaseAllocDatabaseMutex();
85 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
86 return data;
89 //Allocates from the current page of the chunk.
90 //Scans through the VarSizeInfo objects in the page and gets the free slot
91 void* Chunk::allocateFromCurPageForVarSize(size_t size, int pslot, DbRetVal *rv)
93 //Should be called only for data items <PAGE_SIZE
94 Page *page = ((PageInfo*)curPage_);
95 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
96 chunkID_, size, curPage_);
97 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
98 sizeof(PageInfo));
99 //if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
100 int pageSize = PAGE_SIZE;
101 bool hasSpace= false;
102 while ((char*) varInfo < ((char*)page + pageSize))
104 if (0 == varInfo->isUsed_)
106 hasSpace= true;
107 if( size + sizeof(VarSizeInfo) < varInfo->size_)
109 if (1 == splitDataBucket(varInfo, size, pslot, rv))
111 printDebug(DM_Warning, "Unable to split the data bucket");
112 //releaseChunkMutex(pslot);
113 return NULL;
115 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
116 chunkID_, size, varInfo);
117 //releaseChunkMutex(pslot);
118 return (char*)varInfo + sizeof(VarSizeInfo);
120 else if (size == varInfo->size_) {
121 //varInfo->isUsed_ = 1;
122 int ret = Mutex::CASGen(&varInfo->isUsed_ , 0, 1);
123 if(ret !=0) {
124 printDebug(DM_Warning, "Unable to get lock for var alloc size:%d ", size);
125 *rv = ErrLockTimeOut;
126 //releaseChunkMutex(pslot);
127 return NULL;
129 //releaseChunkMutex(pslot);
130 return (char *) varInfo + sizeof(VarSizeInfo);
134 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
135 +varInfo->size_);
137 if (!hasSpace) CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
138 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
139 CLEARBIT(((PageInfo*)curPage_)->flags, HAS_SPACE);
140 //releaseChunkMutex(pslot);
141 return NULL;
144 void* Chunk::allocateForVarLargeSize(PageInfo *pageInfo, size_t size, int offset)
146 //large size allocate for varSize data
147 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) + sizeof(PageInfo));
148 pageInfo->nextPageAfterMerge_ = ((char*)pageInfo + offset);
149 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
150 curPage_ = (Page*) pageInfo;
151 varInfo->size_= size;
152 int ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 1);
153 if(ret !=0) {
154 printError(ErrLockTimeOut, "Unable to get lock for var alloc. Retry...");
155 return NULL;
157 pageInfo->isUsed_=1;
158 setPageDirty(pageInfo);
159 return (char *) varInfo + sizeof(VarSizeInfo);
162 //Allocates memory to store data of variable size
163 void* Chunk::allocate(Database *db, size_t size, DbRetVal *status)
165 if (0 == size) return NULL;
166 //check if the size is more than PAGE_SIZE
167 //if it is more than the PAGE_SIZE, then allocate new
168 //page using database and then link the curPage to the
169 //newly allocated page
170 //if it is less than PAGE_SIZE, then check the curpage for
171 //free memory of specified size
172 //if not available, then scan from the firstPage for the free
173 //space
175 //TODO::During the scan, merge nearby nodes if both are free
176 //if not available then allocate new page
178 size_t alignedSize = os::alignLong(size);
179 void *data = NULL;
180 int ret = getChunkMutex(db->procSlot);
181 if (ret != 0)
183 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
184 *status = ErrLockTimeOut;
185 return NULL;
187 if (alignedSize > PAGE_SIZE )
189 data = allocateForLargeDataSize(db, alignedSize);
191 else
193 data = allocateFromCurPageForVarSize(alignedSize, db->procSlot, status);
194 if (NULL == data) {
195 *status = OK;
196 //No available spaces in the current page.
197 //allocate new page
198 data= allocFromNewPageForVarSize(db, alignedSize, db->procSlot, status);
199 if (NULL == data && *status !=ErrLockTimeOut) {
200 printError(ErrNoMemory, "No memory in any of the pages:Increase db size");
201 *status = ErrNoMemory;
205 releaseChunkMutex(db->procSlot);
206 return data;
209 //Assumes chunk mutex is already taken, before calling this
210 void* Chunk::varSizeFirstFitAllocate(size_t size, int pslot, DbRetVal *rv)
212 printDebug(DM_VarAlloc, "Chunk::varSizeFirstFitAllocate size:%d firstPage:%x",
213 size, firstPage_);
215 Page *page = ((PageInfo*)firstPage_);
216 size_t alignedSize = os::alignLong(size);
217 //if ( 0 != getChunkMutex(pslot)) { *rv = ErrLockTimeOut; return NULL; }
218 int pageSize = PAGE_SIZE;
219 bool hasSpace=false;
220 while(NULL != page)
222 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
223 hasSpace=false;
224 if (BITSET(((PageInfo*)page)->flags, HAS_SPACE)){
225 while ((char*) varInfo < ((char*)page + pageSize))
227 if (0 == varInfo->isUsed_)
229 hasSpace=true;
230 if( alignedSize +sizeof(VarSizeInfo) < varInfo->size_)
232 if( 1 == splitDataBucket(varInfo, alignedSize, pslot, rv))
234 printDebug(DM_Warning, "Unable to split the data bucket");
235 //releaseChunkMutex(pslot);
236 return NULL;
238 //releaseChunkMutex(pslot);
239 return ((char*)varInfo) + sizeof(VarSizeInfo);
241 else if (alignedSize == varInfo->size_) {
242 //varInfo->isUsed_ = 1;
243 int ret = Mutex::CASGen(&varInfo->isUsed_, 0, 1);
244 if(ret !=0) {
245 printDebug(DM_Warning,"Unable to get lock to set isUsed flag.");
246 *rv = ErrLockTimeOut;
247 //releaseChunkMutex(pslot);
248 return NULL;
250 printDebug(DM_VarAlloc, "VarSizeFirstFitAllocate returning %x", ((char*)varInfo) +sizeof(VarSizeInfo));
251 //releaseChunkMutex(pslot);
252 return ((char *) varInfo) + sizeof(VarSizeInfo);
255 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
256 +varInfo->size_);
258 if (!hasSpace) CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
259 if (hasSpace && size < MIN_VARCHAR_ALLOC_SIZE)
260 CLEARBIT(((PageInfo*)page)->flags, HAS_SPACE);
262 printDebug(DM_VarAlloc, "Chunk:This page does not have free data nodes page:%x", page);
263 page = ((PageInfo*) page)->nextPage_;
265 //releaseChunkMutex(pslot);
266 return NULL;
269 void Chunk::freeForVarSizeAllocator(Database *db, void *ptr, int pslot)
271 int ret = getChunkMutex(pslot);
272 if (ret != 0)
274 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
275 return;
277 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
278 //varInfo->isUsed_ = 0;
279 if(varInfo->size_ > (PAGE_SIZE - (sizeof(VarSizeInfo)+sizeof(PageInfo)))) {
280 PageInfo *pageInfo = (PageInfo*)((char*)varInfo - sizeof(PageInfo));
281 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
282 bool found = false;
283 while(!found)
285 if(NULL==pInfo) break;
286 if (pInfo == pageInfo) {found = true; break; }
287 prev = pInfo;
288 pInfo = (PageInfo*)pInfo->nextPage_;
290 if (!found)
292 printError(ErrSysFatal,"Page %x not found in page list:Logical error", pageInfo );
293 releaseChunkMutex(pslot);
294 return ;
296 if(curPage_== pageInfo) {curPage_ = prev ; }
297 pageInfo->isUsed_ = 0;
298 pageInfo->nextPageAfterMerge_ = NULL;
299 setPageDirty(pageInfo);
300 SETBIT(pageInfo->flags, HAS_SPACE);
301 prev->nextPage_ = pageInfo->nextPage_;
303 int retVal = Mutex::CASGen(&varInfo->isUsed_, 1, 0);
304 if(retVal !=0) {
305 printError(ErrAlready, "Fatal: Varsize double free for %x", ptr);
307 PageInfo *pageInfo = getPageInfo(db, ptr);
308 if (NULL == pageInfo)
310 printError(ErrSysFatal,"Fatal: pageInfo is NULL", pageInfo );
311 releaseChunkMutex(db->procSlot);
312 return;
314 SETBIT(pageInfo->flags, HAS_SPACE);
315 SETBIT(pageInfo->flags, IS_DIRTY);
316 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
317 releaseChunkMutex(pslot);
318 return;
322 int Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize, int pSlot, DbRetVal *rv)
324 InUse remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
325 //varInfo->isUsed_ = 1;
326 int ret = Mutex::CASGen(&varInfo->isUsed_ , 0, 1);
327 if(ret !=0) {
328 printDebug(DM_Warning, "Unable to set I isUsed flag");
329 *rv = ErrLockTimeOut;
330 return 1;
332 //varInfo->size_ = needSize;
333 ret = Mutex::CASGen(&varInfo->size_, varInfo->size_ , needSize);
334 if(ret !=0) {
335 printError(ErrSysFatal, "Unable to set I size flag");
336 ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
337 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
338 *rv = ErrSysFatal;
339 return 1;
341 VarSizeInfo *varInfo2 = (VarSizeInfo*)((char*)varInfo +
342 sizeof(VarSizeInfo) + varInfo->size_);
343 //varInfo2->isUsed_ = 0;
344 ret = Mutex::CASGen(&varInfo2->isUsed_ , varInfo2->isUsed_, 0);
345 if(ret !=0) {
346 printError(ErrSysFatal, "Unable to set II isUsed flag");
347 ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
348 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
349 *rv = ErrSysFatal;
350 return 1;
352 //varInfo2->size_ = remSpace;
353 ret = Mutex::CASGen(&varInfo2->size_, varInfo2->size_ , remSpace);
354 if(ret !=0) {
355 printError(ErrSysFatal, "Unable to set II size flag");
356 ret = Mutex::CASGen((int*)&varInfo->isUsed_ , varInfo->isUsed_, 0);
357 if (ret !=0) printError(ErrSysFatal, "Unable to reset isUsed flag");
358 *rv = ErrSysFatal;
359 return 1;
361 printDebug(DM_VarAlloc, "Remaining space is %d\n", remSpace);
362 return 0;
365 int Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize, int pslot)
367 //already db alloc mutex is taken
368 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
369 //varInfo->isUsed_ = 0;
370 int ret = Mutex::CASGen(&varInfo->isUsed_ , varInfo->isUsed_, 0);
371 if(ret !=0) {
372 printError(ErrSysFatal, "Fatal:Unable to get lock to set isUsed flag");
374 //varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
375 ret = Mutex::CASGen(&varInfo->size_, varInfo->size_ ,
376 PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo));
377 if(ret !=0) {
378 printError(ErrSysFatal, "Unable to get lock to set size");
380 DbRetVal rv =OK;
381 return splitDataBucket(varInfo, needSize, pslot, &rv);
384 long Chunk::getVarTotalDataNodes()
386 long totalNodes=0;
387 Page *page = ((PageInfo*)firstPage_);
388 while(NULL != page)
390 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
391 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
393 if (1 == varInfo->isUsed_) totalNodes++;
394 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
395 +varInfo->size_);
397 page = ((PageInfo*) page)->nextPage_;
399 return totalNodes;
402 void Chunk:varCompact()
404 while( pageInfo != NULL )
406 bool flag = false;
407 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)pageInfo) +
408 sizeof(PageInfo));
409 while ((char*) varInfo < ((char*)pageInfo + PAGE_SIZE))
411 if (1 == varInfo->isUsed_) {flag=true; break;}
412 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
413 +varInfo->size_);
415 if (!flag) {
416 printDebug(DM_VarAlloc,"Freeing unused page in varsize allocator %x\n", pageInfo);
417 prevPage->nextPage_ = pageInfo->nextPage_;
418 pageInfo->isUsed_ = 0;
420 prevPage = pageInfo;
421 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
422 printDebug(DM_VarAlloc,"compact iter %x\n", pageInfo);
424 return;