Feature Request ID: 1669025
[csql.git] / src / server / Chunk.c
blob50e397e19ea4c1428475106ea410346d7d1a2ded
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
22 // sets the size of the Chunk allocator for fixed size
23 // allocator
24 // we need one integer to store book keeping information
25 // whether the storage allocation unit is used of not
26 // when it is deleted this flag is only set to unused
27 void Chunk::setSize(size_t size)
30 size_t needSize = size + sizeof(int);
31 size_t multiple = os::floor(needSize / sizeof(size_t));
32 size_t rem = needSize % sizeof(size_t);
33 if (0 == rem)
34 allocSize_ = needSize;
35 else
36 allocSize_ = (multiple + 1) * sizeof(size_t);
39 //Allocates memory to store data
40 //TODO::check whether it is locked before allocating.
41 //delete tuple will set the usedflag to true, but locks will be held
42 //till commit and it shall be rolledback.So make sure that it does not
43 //allocate deleted tuple which is yet to be commited.
45 void* Chunk::allocate(Database *db)
47 int ret = getChunkMutex();
48 if (ret != 0)
50 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
51 return NULL;
53 PageInfo* pageInfo = ((PageInfo*)curPage_);
55 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
56 char *data = ((char*)curPage_) + sizeof(PageInfo);
57 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
58 chunkID_, curPage_, noOfDataNodes);
60 //1.scan through data list and find if any is free to use in current page
61 //2.If there is none then
62 // a) get new free page from db. set the prev->next to point
63 // to this new page
64 //4. b) initialize the free page to zero and get first data ptr.
65 //5.If there is one, return that
67 //For allocation more than PAGE_SIZE
68 if (0 == noOfDataNodes)
70 ret = db->getAllocDatabaseMutex();
71 if (ret != 0)
73 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
74 return NULL;
76 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
77 if (NULL == pageInfo)
79 releaseChunkMutex();
80 db->releaseAllocDatabaseMutex();
81 printError(ErrNoMemory,"No more free pages in the database");
82 return NULL;
84 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
85 chunkID_, pageInfo);
86 int multiple = os::floor(allocSize_ / PAGE_SIZE);
87 int offset = ((multiple + 1) * PAGE_SIZE);
89 pageInfo->setPageAsUsed(offset);
91 //create the link
92 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
93 //Make this as current page
94 curPage_ = (Page*) pageInfo;
95 data = ((char*)curPage_) + sizeof(PageInfo);
96 //TODO::check whether it is locked
97 *((int*)data) = 1;
98 releaseChunkMutex();
99 db->releaseAllocDatabaseMutex();
100 return data + sizeof(int);
104 int i = noOfDataNodes;
105 if (pageInfo->hasFreeSpace_ == 1)
107 for (i = 1; i< noOfDataNodes; i++)
109 if (*((int*)data) == 1) data = data + allocSize_;
110 else break;
114 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
115 chunkID_, i);
116 //It comes here if the pageInfo->hasFreeSpace ==0
117 //or there are no free data space in this page
118 if (i == noOfDataNodes && *((int*)data) == 1)
120 ret = db->getAllocDatabaseMutex();
121 if (ret != 0)
123 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
124 return NULL;
127 //there are no free data space in this page
128 pageInfo->hasFreeSpace_ = 0;
129 //get a new page from db
130 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
131 Page *page = db->getFreePage();
132 if (page == NULL)
134 PageInfo *pageIter = ((PageInfo*)firstPage_);
135 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
136 chunkID_);
137 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
138 firstPage_);
139 //scan from first page to locate a free node available
140 while(NULL != pageIter)
142 data = ((char*)pageIter) + sizeof(PageInfo);
143 if (pageIter->hasFreeSpace_ == 1)
145 for (i = 0; i< noOfDataNodes -1; i++)
147 if (1 == *((int*)data))
148 data = data + allocSize_;
149 else break;
151 if (i != noOfDataNodes -1) break;
153 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
154 chunkID_, pageIter);
155 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
157 if (NULL == pageIter)
159 releaseChunkMutex();
160 db->releaseAllocDatabaseMutex();
161 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
162 return NULL;
164 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
165 chunkID_, pageIter);
166 *((int*)data) = 1;
167 releaseChunkMutex();
168 db->releaseAllocDatabaseMutex();
169 return data + sizeof(int);
171 db->releaseAllocDatabaseMutex();
172 //os::memset(page, 0, PAGE_SIZE);
173 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
174 chunkID_, page);
175 //Initialize pageInfo for this new page
176 PageInfo *pInfo = (PageInfo*)page;
177 pInfo->setPageAsUsed(0);
179 //create the link between old page and the newly created page
180 data = ((char*)page) + sizeof(PageInfo);
181 pageInfo->nextPage_ = page;
183 //make this new page as the current page
184 curPage_ = page;
187 *((int*)data) = 1;
188 releaseChunkMutex();
189 return data + sizeof(int);
192 //Allocates memory to store data
193 void* Chunk::allocate(Database *db, size_t size)
196 if (0 == size) return NULL;
197 //align the size first
198 //check if the size is more than PAGE_SIZE
199 //if it is more than the PAGE_SIZE, then allocate new
200 //page using database and then link the curPage to the
201 //newly allocated page
202 //if it is less than PAGE_SIZE, then check the curpage for
203 //free memory of specified size
204 //if not available, then scan from the firstPage for the free
205 //space
207 //TODO::During the scan, merge nearby nodes if both are free
208 //if not available then allocate new page
209 int ret = getChunkMutex();
210 if (ret != 0)
212 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
213 return NULL;
215 size_t alignedSize = os::align(size);
217 if (size > PAGE_SIZE)
219 int multiple = os::floor(alignedSize / PAGE_SIZE);
220 int offset = ((multiple + 1) * PAGE_SIZE);
221 PageInfo* pageInfo = ((PageInfo*)curPage_);
222 ret = db->getAllocDatabaseMutex();
223 if (ret != 0)
225 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
226 return NULL;
228 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
229 if (NULL == pageInfo)
231 releaseChunkMutex();
232 db->releaseAllocDatabaseMutex();
233 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
234 return NULL;
236 printDebug(DM_VarAlloc,"Chunk::allocate Large Data Item id:%d Size:%d curPage:%x ",
237 chunkID_, alignedSize, curPage_);
238 //TODO:: logic pending
239 //what happens to the space lets say 10000 bytes is allocated
240 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
241 //in this case.So need to take of this.
242 //Will be coded at later stage as this is developed to support
243 //undo logging and currently we shall assume that the logs generated
244 //wont be greater than PAGE_SIZE.
245 printf("data size greater than page size\n");
246 releaseChunkMutex();
247 db->releaseAllocDatabaseMutex();
248 return NULL;
250 else
252 Page *page = ((PageInfo*)curPage_);
253 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
254 chunkID_, alignedSize, curPage_);
255 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
256 sizeof(PageInfo));
257 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
259 if (0 == varInfo->isUsed_)
261 if( alignedSize < varInfo->size_)
263 splitDataBucket(varInfo, alignedSize);
264 releaseChunkMutex();
265 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
266 chunkID_, alignedSize, varInfo);
267 return (char*)varInfo + sizeof(VarSizeInfo);
270 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
271 +varInfo->size_);
273 //No available spaces in the current page.
274 //allocate new page
275 ret = db->getAllocDatabaseMutex();
276 if (ret != 0)
278 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
279 return NULL;
281 Page *newPage = db->getFreePage();
282 if (NULL == newPage)
284 void *data = varSizeFirstFitAllocate(size);
285 releaseChunkMutex();
286 db->releaseAllocDatabaseMutex();
287 if (NULL == data)
288 printError(ErrNoMemory,"No more free space in the database:Increase db size");
289 return data;
291 db->releaseAllocDatabaseMutex();
293 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
294 PageInfo *pInfo = (PageInfo*) newPage;
295 pInfo->setPageAsUsed(0);
296 createDataBucket(newPage, PAGE_SIZE, alignedSize);
298 ((PageInfo*)curPage_)->nextPage_ = newPage;
299 curPage_ = newPage;
300 releaseChunkMutex();
301 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
302 return data;
306 //Assumes chunk mutex is already taken, before calling this
307 void* Chunk::varSizeFirstFitAllocate(size_t size)
309 Page *page = ((PageInfo*)firstPage_);
310 size_t alignedSize = os::align(size);
311 while(NULL != page)
313 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
314 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
316 if (0 == varInfo->isUsed_)
318 if( alignedSize < varInfo->size_)
320 splitDataBucket(varInfo, alignedSize);
321 return (char*)varInfo + sizeof(VarSizeInfo);
324 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
325 +varInfo->size_);
327 page = ((PageInfo*) page)->nextPage_;
329 return NULL;
334 //Frees the memory pointed by ptr
335 void Chunk::free(Database *db, void *ptr)
337 int ret = getChunkMutex();
338 if (ret != 0)
340 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
341 return;
343 if (0 == allocSize_)
345 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
346 varInfo->isUsed_ = 0;
347 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
348 //return from here for variable size allocator
349 releaseChunkMutex();
350 return;
352 //unset the used flag
353 *((int*)ptr -1 ) = 0;
354 int noOfDataNodes =os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
356 if (0 == noOfDataNodes)
358 //means tuple larger than PAGE_SIZE
359 PageInfo *pageInfo = (PageInfo*)(((char*)
360 ptr) - (sizeof(PageInfo) + sizeof(int)));
361 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
362 bool found = false;
363 while(!found)
365 if (pInfo == pageInfo) {found = true; break; }
366 prev = pInfo;
367 pInfo = (PageInfo*)pInfo->nextPage_;
369 if (!found)
371 printError(ErrSysFatal,"Page %x not found in page list:Logic error", pageInfo );
372 releaseChunkMutex();
373 return ;
375 prev->nextPage_ = pageInfo->nextPage_;
376 pageInfo->nextPageAfterMerge_ = NULL;
377 pageInfo->isUsed_ = 0;
378 os::memset(pageInfo, 0 , allocSize_);
379 pageInfo->hasFreeSpace_ = 1;
380 releaseChunkMutex();
381 return;
383 PageInfo *pageInfo;
384 pageInfo = getPageInfo(db, ptr);
385 if (NULL == pageInfo)
387 printError(ErrSysFatal,"Probable Data corruption: pageInfo is NULL", pageInfo );
388 releaseChunkMutex();
389 return;
391 //set the pageinfo where this ptr points
392 pageInfo->hasFreeSpace_ = 1;
393 releaseChunkMutex();
394 return;
397 //returns the pageInfo of the page where this ptr points
398 //This works only if the data size is less than PAGE_SIZE
399 //If ptr points to data which is more than PAGE_SIZE,then
400 //calling this might lead to memory corruption
401 //Note:IMPORTANT::assumes db lock is taken before calling this
402 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
404 char *inPtr = (char*)ptr;
405 PageInfo* pageInfo = ((PageInfo*)firstPage_);
407 while( pageInfo != NULL )
409 if (inPtr > (char*) pageInfo && ((char*)pageInfo + PAGE_SIZE) >inPtr)
410 return pageInfo;
411 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
413 return NULL;
416 long Chunk::getTotalDataNodes()
418 int ret = getChunkMutex();
419 if (ret != 0)
421 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
422 return NULL;
425 //TODO:: for variable size allocator
426 if (0 == allocSize_) //->variable size allocator
427 return 0;
429 //TODO::for large size allocator
430 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
431 return 0;
433 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
434 PageInfo* pageInfo = ((PageInfo*)firstPage_);
435 char *data = ((char*)firstPage_) + sizeof(PageInfo);
436 long totalNodes = 0;
437 int i=0;
438 while( pageInfo != NULL )
440 data = ((char*)pageInfo) + sizeof(PageInfo);
441 for (i = 0; i< noOfDataNodes; i++)
443 if (*((int*)data) == 1) { totalNodes++;}
444 data = data + allocSize_;
446 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
448 releaseChunkMutex();
449 return totalNodes;
452 int Chunk::totalPages()
454 int ret = getChunkMutex();
455 if (ret != 0)
457 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
458 return NULL;
460 //logic is same for variable size and for large data node allocator.
461 PageInfo* pageInfo = ((PageInfo*)firstPage_);
462 int totPages=0;
463 while( pageInfo != NULL )
465 totPages++;
466 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
468 releaseChunkMutex();
469 return totPages;
472 int Chunk::initMutex()
474 return chunkMutex_.init();
476 int Chunk::getChunkMutex()
478 return chunkMutex_.tryLock();
480 int Chunk::releaseChunkMutex()
482 return chunkMutex_.releaseLock();
484 int Chunk::destroyMutex()
486 return chunkMutex_.destroy();
488 void Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize)
490 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
491 varInfo->isUsed_ = 1;
492 varInfo->size_ = needSize;
493 varInfo = (VarSizeInfo*)((char*)varInfo +
494 sizeof(VarSizeInfo) + varInfo->size_);
495 varInfo->isUsed_ = 0;
496 varInfo->size_ = remSpace;
497 return;
501 void Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize)
503 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
504 varInfo->isUsed_ = 0;
505 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
506 splitDataBucket(varInfo, needSize);
508 return;