Added more test cases and fixed failures of Table/test001 and UserManager/test001
[csql.git] / src / server / Chunk.cxx
blob269c43a1e0550b5b61fcf4c34adfb1a54fb396cb
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Allocator.h>
17 #include<Database.h>
18 #include<os.h>
19 #include<Debug.h>
20 #include<Config.h>
22 // sets the size of the Chunk allocator for fixed size
23 // allocator
24 // we need one integer to store book keeping information
25 // whether the storage allocation unit is used of not
26 // when it is deleted this flag is only set to unused
27 void Chunk::setSize(size_t size)
30 size_t needSize = size + sizeof(int);
31 size_t multiple = os::floor(needSize / sizeof(size_t));
32 size_t rem = needSize % sizeof(size_t);
33 if (0 == rem)
34 allocSize_ = needSize;
35 else
36 allocSize_ = (multiple + 1) * sizeof(size_t);
39 //Allocates memory to store data
40 //TODO::check whether it is locked before allocating.
41 //delete tuple will set the usedflag to true, but locks will be held
42 //till commit and it shall be rolledback.So make sure that it does not
43 //allocate deleted tuple which is yet to be commited.
45 void* Chunk::allocate(Database *db)
47 int ret = getChunkMutex();
48 if (ret != 0)
50 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
51 return NULL;
53 PageInfo* pageInfo = ((PageInfo*)curPage_);
55 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
56 char *data = ((char*)curPage_) + sizeof(PageInfo);
57 printDebug(DM_Alloc, "Chunk::allocate id:%d curPage:%x noOfDataNodes:%d",
58 chunkID_, curPage_, noOfDataNodes);
60 //1.scan through data list and find if any is free to use in current page
61 //2.If there is none then
62 // a) get new free page from db. set the prev->next to point
63 // to this new page
64 //4. b) initialize the free page to zero and get first data ptr.
65 //5.If there is one, return that
67 //For allocation more than PAGE_SIZE
68 if (0 == noOfDataNodes)
70 ret = db->getAllocDatabaseMutex();
71 if (ret != 0)
73 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
74 return NULL;
76 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
77 if (NULL == pageInfo)
79 releaseChunkMutex();
80 db->releaseAllocDatabaseMutex();
81 printError(ErrNoMemory,"No more free pages in the database");
82 return NULL;
84 printDebug(DM_Alloc, "Chunk ID:%d Large Data Item newPage:%x",
85 chunkID_, pageInfo);
86 int multiple = os::floor(allocSize_ / PAGE_SIZE);
87 int offset = ((multiple + 1) * PAGE_SIZE);
89 pageInfo->setPageAsUsed(offset);
91 //create the link
92 ((PageInfo*)curPage_)->nextPage_ = (Page*) pageInfo;
93 //Make this as current page
94 curPage_ = (Page*) pageInfo;
95 data = ((char*)curPage_) + sizeof(PageInfo);
96 //TODO::check whether it is locked
97 *((int*)data) = 1;
98 releaseChunkMutex();
99 db->releaseAllocDatabaseMutex();
100 return data + sizeof(int);
104 int i = noOfDataNodes;
105 if (pageInfo->hasFreeSpace_ == 1)
107 for (i = 1; i< noOfDataNodes; i++)
109 if (*((int*)data) == 1) data = data + allocSize_;
110 else break;
114 printDebug(DM_Alloc, "ChunkID:%d Node which might be free is %d",
115 chunkID_, i);
116 //It comes here if the pageInfo->hasFreeSpace ==0
117 //or there are no free data space in this page
118 if (i == noOfDataNodes && *((int*)data) == 1)
120 ret = db->getAllocDatabaseMutex();
121 if (ret != 0)
123 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
124 return NULL;
127 //there are no free data space in this page
128 pageInfo->hasFreeSpace_ = 0;
129 //get a new page from db
130 printDebug(DM_Alloc, "ChunkID:%d curPage does not have free nodes.", chunkID_);
131 Page *page = db->getFreePage();
132 if (page == NULL)
134 PageInfo *pageIter = ((PageInfo*)firstPage_);
135 printDebug(DM_Alloc, "Chunk ID:%d. No free page in database",
136 chunkID_);
137 printDebug(DM_Alloc, "Scan from firstPage:%x for free nodes",
138 firstPage_);
139 //scan from first page to locate a free node available
140 while(NULL != pageIter)
142 data = ((char*)pageIter) + sizeof(PageInfo);
143 if (pageIter->hasFreeSpace_ == 1)
145 for (i = 0; i< noOfDataNodes -1; i++)
147 if (1 == *((int*)data))
148 data = data + allocSize_;
149 else break;
151 if (i != noOfDataNodes -1) break;
153 printDebug(DM_Alloc, "Chunk ID: %d Page :%x does not have free nodes",
154 chunkID_, pageIter);
155 pageIter = (PageInfo*)((PageInfo*) pageIter)->nextPage_;
157 if (NULL == pageIter)
159 releaseChunkMutex();
160 db->releaseAllocDatabaseMutex();
161 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
162 return NULL;
164 printDebug(DM_Alloc,"ChunkID:%d Scan for free node End:Page :%x",
165 chunkID_, pageIter);
166 *((int*)data) = 1;
167 releaseChunkMutex();
168 db->releaseAllocDatabaseMutex();
169 return data + sizeof(int);
171 db->releaseAllocDatabaseMutex();
172 //os::memset(page, 0, PAGE_SIZE);
173 printDebug(DM_Alloc, "ChunkID:%d Normal Data Item newPage:%x",
174 chunkID_, page);
175 //Initialize pageInfo for this new page
176 PageInfo *pInfo = (PageInfo*)page;
177 pInfo->setPageAsUsed(0);
179 //create the link between old page and the newly created page
180 data = ((char*)page) + sizeof(PageInfo);
181 pageInfo->nextPage_ = page;
183 //make this new page as the current page
184 curPage_ = page;
187 *((int*)data) = 1;
188 releaseChunkMutex();
189 return data + sizeof(int);
192 //Allocates memory to store data
193 void* Chunk::allocate(Database *db, size_t size)
196 if (0 == size) return NULL;
197 //align the size first
198 //check if the size is more than PAGE_SIZE
199 //if it is more than the PAGE_SIZE, then allocate new
200 //page using database and then link the curPage to the
201 //newly allocated page
202 //if it is less than PAGE_SIZE, then check the curpage for
203 //free memory of specified size
204 //if not available, then scan from the firstPage for the free
205 //space
207 //TODO::During the scan, merge nearby nodes if both are free
208 //if not available then allocate new page
209 int ret = getChunkMutex();
210 if (ret != 0)
212 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
213 return NULL;
215 size_t alignedSize = os::align(size);
217 if (size > PAGE_SIZE)
219 int multiple = os::floor(alignedSize / PAGE_SIZE);
220 int offset = ((multiple + 1) * PAGE_SIZE);
221 PageInfo* pageInfo = ((PageInfo*)curPage_);
222 ret = db->getAllocDatabaseMutex();
223 if (ret != 0)
225 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
226 return NULL;
228 pageInfo = (PageInfo*)db->getFreePage(allocSize_);
229 if (NULL == pageInfo)
231 releaseChunkMutex();
232 db->releaseAllocDatabaseMutex();
233 printError(ErrNoMemory,"No more free pages in the database:Increase db size");
234 return NULL;
236 printDebug(DM_VarAlloc,"Chunk::allocate Large Data Item id:%d Size:%d curPage:%x ",
237 chunkID_, alignedSize, curPage_);
238 //TODO:: logic pending
239 //what happens to the space lets say 10000 bytes is allocated
240 //it needs 2 pages,= 16000 bytes, 6000 bytes should not be wasted
241 //in this case.So need to take of this.
242 //Will be coded at later stage as this is developed to support
243 //undo logging and currently we shall assume that the logs generated
244 //wont be greater than PAGE_SIZE.
245 printf("data size greater than page size\n");
246 releaseChunkMutex();
247 db->releaseAllocDatabaseMutex();
248 return NULL;
250 else
252 Page *page = ((PageInfo*)curPage_);
253 printDebug(DM_VarAlloc, "Chunk::allocate Normal Data Item id:%d Size:%d curPage:%x ",
254 chunkID_, alignedSize, curPage_);
255 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) +
256 sizeof(PageInfo));
257 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
259 if (0 == varInfo->isUsed_)
261 if( alignedSize + sizeof(VarSizeInfo) < varInfo->size_)
263 splitDataBucket(varInfo, alignedSize);
264 releaseChunkMutex();
265 printDebug(DM_VarAlloc, "Chunkid:%d splitDataBucket: Size: %d Item:%x ",
266 chunkID_, alignedSize, varInfo);
267 return (char*)varInfo + sizeof(VarSizeInfo);
269 else if (alignedSize == varInfo->size_) {
270 varInfo->isUsed_ = 1;
271 releaseChunkMutex();
272 return (char *) varInfo + sizeof(VarSizeInfo);
276 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
277 +varInfo->size_);
279 //No available spaces in the current page.
280 //allocate new page
281 ret = db->getAllocDatabaseMutex();
282 if (ret != 0)
284 printError(ErrLockTimeOut,"Unable to acquire alloc database Mutex");
285 return NULL;
287 Page *newPage = db->getFreePage();
288 if (NULL == newPage)
290 void *data = varSizeFirstFitAllocate(size);
291 releaseChunkMutex();
292 db->releaseAllocDatabaseMutex();
293 if (NULL == data)
294 printError(ErrNoMemory,"No more free space in the database:Increase db size");
295 return data;
297 db->releaseAllocDatabaseMutex();
299 printDebug(DM_VarAlloc, "ChunkID:%d New Page: %x ", chunkID_, newPage);
300 PageInfo *pInfo = (PageInfo*) newPage;
301 pInfo->setPageAsUsed(0);
302 createDataBucket(newPage, PAGE_SIZE, alignedSize);
304 ((PageInfo*)curPage_)->nextPage_ = newPage;
305 curPage_ = newPage;
306 releaseChunkMutex();
307 char *data= ((char*)newPage) + sizeof(PageInfo) + sizeof(VarSizeInfo);
308 return data;
312 //Assumes chunk mutex is already taken, before calling this
313 void* Chunk::varSizeFirstFitAllocate(size_t size)
315 Page *page = ((PageInfo*)firstPage_);
316 size_t alignedSize = os::align(size);
317 while(NULL != page)
319 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
320 while ((char*) varInfo < ((char*)page + PAGE_SIZE))
322 if (0 == varInfo->isUsed_)
324 if( alignedSize < varInfo->size_)
326 splitDataBucket(varInfo, alignedSize);
327 return (char*)varInfo + sizeof(VarSizeInfo);
330 varInfo = (VarSizeInfo*)((char*)varInfo + sizeof(VarSizeInfo)
331 +varInfo->size_);
333 page = ((PageInfo*) page)->nextPage_;
335 return NULL;
340 //Frees the memory pointed by ptr
341 void Chunk::free(Database *db, void *ptr)
343 int ret = getChunkMutex();
344 if (ret != 0)
346 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
347 return;
349 if (0 == allocSize_)
351 VarSizeInfo *varInfo = (VarSizeInfo*)((char*)ptr- sizeof(VarSizeInfo));
352 varInfo->isUsed_ = 0;
353 printDebug(DM_VarAlloc,"chunkID:%d Unset isUsed for %x", chunkID_, varInfo);
354 //return from here for variable size allocator
355 releaseChunkMutex();
356 return;
358 //unset the used flag
359 *((int*)ptr -1 ) = 0;
360 int noOfDataNodes =os::floor((PAGE_SIZE - sizeof(PageInfo)) / allocSize_);
362 if (0 == noOfDataNodes)
364 //means tuple larger than PAGE_SIZE
365 PageInfo *pageInfo = (PageInfo*)(((char*)
366 ptr) - (sizeof(PageInfo) + sizeof(int)));
367 PageInfo *pInfo = (PageInfo*)firstPage_, *prev = (PageInfo*)firstPage_;
368 bool found = false;
369 while(!found)
371 if (pInfo == pageInfo) {found = true; break; }
372 prev = pInfo;
373 pInfo = (PageInfo*)pInfo->nextPage_;
375 if (!found)
377 printError(ErrSysFatal,"Page %x not found in page list:Logic error", pageInfo );
378 releaseChunkMutex();
379 return ;
381 prev->nextPage_ = pageInfo->nextPage_;
382 pageInfo->nextPageAfterMerge_ = NULL;
383 pageInfo->isUsed_ = 0;
384 os::memset(pageInfo, 0 , allocSize_);
385 pageInfo->hasFreeSpace_ = 1;
386 releaseChunkMutex();
387 return;
389 PageInfo *pageInfo;
390 pageInfo = getPageInfo(db, ptr);
391 if (NULL == pageInfo)
393 printError(ErrSysFatal,"Probable Data corruption: pageInfo is NULL", pageInfo );
394 releaseChunkMutex();
395 return;
397 //set the pageinfo where this ptr points
398 pageInfo->hasFreeSpace_ = 1;
399 releaseChunkMutex();
400 return;
403 //returns the pageInfo of the page where this ptr points
404 //This works only if the data size is less than PAGE_SIZE
405 //If ptr points to data which is more than PAGE_SIZE,then
406 //calling this might lead to memory corruption
407 //Note:IMPORTANT::assumes db lock is taken before calling this
408 PageInfo* Chunk::getPageInfo(Database *db, void *ptr)
410 char *inPtr = (char*)ptr;
411 PageInfo* pageInfo = ((PageInfo*)firstPage_);
413 while( pageInfo != NULL )
415 if (inPtr > (char*) pageInfo && ((char*)pageInfo + PAGE_SIZE) >inPtr)
416 return pageInfo;
417 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
419 return NULL;
422 long Chunk::getTotalDataNodes()
424 int ret = getChunkMutex();
425 if (ret != 0)
427 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
428 return NULL;
431 //TODO:: for variable size allocator
432 if (0 == allocSize_) //->variable size allocator
433 return 0;
435 //TODO::for large size allocator
436 if (allocSize_ >PAGE_SIZE)//->each page has only one data node
437 return 0;
439 int noOfDataNodes=os::floor((PAGE_SIZE - sizeof(PageInfo))/allocSize_);
440 PageInfo* pageInfo = ((PageInfo*)firstPage_);
441 char *data = ((char*)firstPage_) + sizeof(PageInfo);
442 long totalNodes = 0;
443 int i=0;
444 while( pageInfo != NULL )
446 data = ((char*)pageInfo) + sizeof(PageInfo);
447 for (i = 0; i< noOfDataNodes; i++)
449 if (*((int*)data) == 1) { totalNodes++;}
450 data = data + allocSize_;
452 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
454 releaseChunkMutex();
455 return totalNodes;
458 int Chunk::totalPages()
460 int ret = getChunkMutex();
461 if (ret != 0)
463 printError(ErrLockTimeOut,"Unable to acquire chunk Mutex");
464 return NULL;
466 //logic is same for variable size and for large data node allocator.
467 PageInfo* pageInfo = ((PageInfo*)firstPage_);
468 int totPages=0;
469 while( pageInfo != NULL )
471 totPages++;
472 pageInfo = (PageInfo*)(((PageInfo*)pageInfo)->nextPage_) ;
474 releaseChunkMutex();
475 return totPages;
478 int Chunk::initMutex()
480 return chunkMutex_.init();
482 int Chunk::getChunkMutex()
484 return chunkMutex_.tryLock();
486 int Chunk::releaseChunkMutex()
488 return chunkMutex_.releaseLock();
490 int Chunk::destroyMutex()
492 return chunkMutex_.destroy();
494 void Chunk::splitDataBucket(VarSizeInfo *varInfo, size_t needSize)
496 int remSpace = varInfo->size_ - sizeof(VarSizeInfo) - needSize;
497 varInfo->isUsed_ = 1;
498 varInfo->size_ = needSize;
499 varInfo = (VarSizeInfo*)((char*)varInfo +
500 sizeof(VarSizeInfo) + varInfo->size_);
501 varInfo->isUsed_ = 0;
502 varInfo->size_ = remSpace;
503 return;
507 void Chunk::createDataBucket(Page *page, size_t totalSize, size_t needSize)
509 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)page) + sizeof(PageInfo));
510 varInfo->isUsed_ = 0;
511 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
512 splitDataBucket(varInfo, needSize);
514 return;