Added more test cases and fixed failures of Table/test001 and UserManager/test001
[csql.git] / src / server / DatabaseManagerImpl.cxx
blob57ce36ce46e86ed08f5d593422f93544dbd8b664
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<DatabaseManager.h>
18 #include<DatabaseManagerImpl.h>
19 #include<os.h>
20 #include<Table.h>
21 #include<TableImpl.h>
22 #include<Transaction.h>
23 #include<CatalogTables.h>
24 #include<Index.h>
25 #include<Lock.h>
26 #include<Debug.h>
27 #include<Config.h>
28 #include<Process.h>
31 DatabaseManagerImpl::~DatabaseManagerImpl()
33 //Note:Databases are closed by the session interface
34 delete tMgr_;
35 delete lMgr_;
38 void DatabaseManagerImpl::createLockManager()
40 lMgr_ = new LockManager(systemDatabase_);
41 return;
44 void DatabaseManagerImpl::createTransactionManager()
47 tMgr_ = new TransactionManager();
48 tMgr_->setFirstTrans(systemDatabase_->getSystemDatabaseTrans(0));
49 return;
52 DbRetVal DatabaseManagerImpl::openSystemDatabase()
54 DbRetVal rv = openDatabase(SYSTEMDB);
55 if (rv != OK) return rv;
56 systemDatabase_ = db_;
57 db_ = NULL;
58 if (NULL == systemDatabase_)
60 printError(ErrAlready, "Database is already opened");
61 return ErrAlready;
63 printDebug(DM_Database, "Opened system database");
64 logFinest(logger, "Opened system database");
65 return OK;
68 DbRetVal DatabaseManagerImpl::closeSystemDatabase()
70 Database *db = db_;
71 //make them to point to system database file descriptor
72 //and database pointer
73 db_ = systemDatabase_;
74 closeDatabase();
75 db_ = db;
76 printDebug(DM_Database, "Closed system database");
77 logFinest(logger, "Closed System database");
78 return OK;
81 DbRetVal DatabaseManagerImpl::createDatabase(const char *name, size_t size)
83 if (NULL != db_ )
85 printError(ErrAlready, "Database is already created");
86 return ErrAlready;
88 caddr_t rtnAddr = (caddr_t) NULL;
89 shared_memory_id shm_id = 0;
91 char *startaddr = (char*)Conf::config.getMapAddress();
92 shared_memory_key key = 0;
93 if (0 == strcmp(name, SYSTEMDB))
96 key = Conf::config.getSysDbKey();
98 else
100 startaddr = startaddr + Conf::config.getMaxSysDbSize();
101 key = Conf::config.getUserDbKey();
103 shm_id = os::shm_create(key, size, 0666);
104 if (-1 == shm_id)
106 printError(ErrOS, "Shared memory create failed");
107 return ErrOS;
110 void *shm_ptr = os::shm_attach(shm_id, startaddr, SHM_RND);
111 rtnAddr = (caddr_t) shm_ptr;
112 if (rtnAddr < 0 || shm_ptr == (char*)0xffffffff)
114 printError(ErrOS, "Shared memory attach returned -ve value %d", rtnAddr);
115 return ErrOS;
117 memset(shm_ptr, 0, size );
118 db_ = new Database();
119 printDebug(DM_Database, "Creating database:%s",name);
121 //TODO:for user database do not have transtable and processtable mutex
122 db_->setMetaDataPtr((DatabaseMetaData*)rtnAddr);
123 db_->setDatabaseID(1);
124 db_->setName(name);
125 db_->setMaxSize(size);
126 db_->setMaxChunks(MAX_CHUNKS);
127 db_->initAllocDatabaseMutex();
128 db_->initTransTableMutex();
129 db_->initDatabaseMutex();
130 db_->initProcessTableMutex();
132 //compute the first page after book keeping information
133 size_t offset = os::alignLong(sizeof (DatabaseMetaData));
134 //Only for system db chunk array, trans array and proc array will be there
135 if (0 == strcmp(name, SYSTEMDB))
137 offset = offset + os::alignLong( MAX_CHUNKS * sizeof (Chunk));
138 offset = offset + os::alignLong( Conf::config.getMaxTrans() * sizeof(Transaction));
139 offset = offset + os::alignLong( Conf::config.getMaxProcs() * sizeof(ProcInfo));
140 offset = offset + os::alignLong( Conf::config.getMaxProcs() *
141 Conf::config.getMaxThreads() * sizeof(ThreadInfo));
143 int multiple = os::floor(offset / PAGE_SIZE);
144 char *curPage = (((char*)rtnAddr) + ((multiple + 1) * PAGE_SIZE));
146 db_->setCurrentPage(curPage);
147 db_->setFirstPage(curPage);
149 if (0 == strcmp(name, SYSTEMDB)) return OK;
151 //Allocate new chunk to store hash index nodes
152 Chunk *chunkInfo = createUserChunk(sizeof(HashIndexNode));
153 if (NULL == chunkInfo)
155 printError(ErrSysInternal, "Failed to allocate hash index nodes chunk");
156 return ErrSysInternal;
158 printDebug(DM_Database, "Creating Chunk for storing Hash index nodes %x",
159 chunkInfo);
161 db_->setHashIndexChunk(chunkInfo);
162 logFinest(logger, "Created database %s" , name);
164 return OK;
167 DbRetVal DatabaseManagerImpl::deleteDatabase(const char *name)
169 shared_memory_id shm_id = 0;
170 if (0 == strcmp(name, SYSTEMDB))
172 shm_id = os::shm_open(Conf::config.getSysDbKey(), 100, 0666);
173 os::shmctl(shm_id, IPC_RMID);
174 } else {
175 shm_id = os::shm_open(Conf::config.getUserDbKey(), 100, 0666);
176 os::shmctl(shm_id, IPC_RMID);
178 logFinest(logger, "Deleted database %s" , name);
179 return OK;
183 DbRetVal DatabaseManagerImpl::openDatabase(const char *name)
185 long size = Conf::config.getMaxSysDbSize();
186 char *startaddr = (char*)Conf::config.getMapAddress();
187 if (0 == strcmp(name , SYSTEMDB))
189 if (NULL !=systemDatabase_)
191 printError(ErrAlready, "System Database already open");
192 return ErrAlready;
195 else
197 if (NULL ==systemDatabase_)
199 printError(ErrNotOpen, "System Database not open");
200 return ErrNotOpen;
202 size = Conf::config.getMaxDbSize();
203 startaddr = startaddr + Conf::config.getMaxSysDbSize();
205 if (NULL != db_)
207 printError(ErrAlready, "User Database already open");
208 return ErrAlready;
210 caddr_t attAddr;
211 if (0 == strcmp(name, SYSTEMDB))
212 attAddr = ProcessManager::sysAddr;
213 else
214 attAddr = ProcessManager::usrAddr;
216 if (ProcessManager::noThreads == 0)
218 //system db should be opened before user database files
219 caddr_t rtnAddr = (caddr_t) NULL;
221 shared_memory_id shm_id = 0;
222 shared_memory_key key = 0;
224 if (0 == strcmp(name, SYSTEMDB))
225 key = Conf::config.getSysDbKey();
226 else
227 key = Conf::config.getUserDbKey();
228 shm_id = os::shm_open(key, size, 0666);
229 if (shm_id == -1 )
231 printError(ErrOS, "Shared memory open failed");
232 return ErrOS;
235 void *shm_ptr = os::shm_attach(shm_id, startaddr, SHM_RND);
237 rtnAddr = (caddr_t) shm_ptr;
239 if (rtnAddr < 0 || shm_ptr == (char*)0xffffffff)
241 printError(ErrOS, "Shared memory attach returned -ve value %x %d", shm_ptr, errno);
242 return ErrOS;
244 else
246 db_ = new Database();
247 db_->setMetaDataPtr((DatabaseMetaData*)rtnAddr);
248 if (0 == strcmp(name, SYSTEMDB))
249 ProcessManager::sysAddr = rtnAddr;
250 else
251 ProcessManager::usrAddr = rtnAddr;
253 printDebug(DM_Database, "Opening database: %s", name);
254 logFinest(logger, "Opened database %s" , name);
255 return OK;
259 db_ = new Database();
260 printDebug(DM_Database, "Opening database: %s", name);
261 db_->setMetaDataPtr((DatabaseMetaData*)attAddr);
262 logFinest(logger, "Opened database %s" , name);
263 return OK;
266 DbRetVal DatabaseManagerImpl::closeDatabase()
268 printDebug(DM_Database, "Closing database: %s",(char*)db_->getName());
269 logFinest(logger, "Closed database");
270 if (NULL == db_)
272 printError(ErrAlready, "Database is already closed\n");
273 return ErrAlready;
275 //check if this is the last thread to be deregistered
276 //if (ProcessManager::noThreads == 1)
278 os::shm_detach((char*)db_->getMetaDataPtr());
280 delete db_;
281 db_ = NULL;
283 //Assumes that system database mutex is taken before calling this.
284 Chunk* DatabaseManagerImpl::createUserChunk(size_t size)
286 //Allocate new node in system database to store
287 Chunk *chunk = getSystemTableChunk(UserChunkTableId);
288 void *ptr = chunk->allocate(systemDatabase_);
289 if (NULL == ptr)
291 printError(ErrNoMemory, "Allocation failed for User chunk catalog table");
292 return NULL;
294 Chunk *chunkInfo = (Chunk*)ptr;
295 chunkInfo->initMutex();
296 if (0 != size) chunkInfo->setSize(size);
297 if (chunkInfo->allocSize_ > PAGE_SIZE)
298 chunkInfo->curPage_ = db_->getFreePage(chunkInfo->allocSize_);
299 else
300 chunkInfo->curPage_ = db_->getFreePage();
301 if ( NULL == chunkInfo->curPage_)
303 chunkInfo->destroyMutex();
304 chunk->free(db_, ptr);
305 printError(ErrNoMemory, "Database full: No space to allocate from database");
306 return NULL;
308 PageInfo* firstPageInfo = ((PageInfo*)chunkInfo->curPage_);
309 if (chunkInfo->allocSize_ > PAGE_SIZE)
311 int multiple = os::floor(chunkInfo->allocSize_ / PAGE_SIZE);
312 int offset = ((multiple + 1) * PAGE_SIZE);
313 firstPageInfo->setPageAsUsed(offset);
315 else
316 firstPageInfo->setPageAsUsed(chunkInfo->allocSize_);
317 if (0 == size)
319 VarSizeInfo *varInfo = (VarSizeInfo*)(((char*)firstPageInfo) + sizeof(PageInfo));
320 varInfo->isUsed_ = 0;
321 varInfo->size_ = PAGE_SIZE - sizeof(PageInfo) - sizeof(VarSizeInfo);
324 chunkInfo->firstPage_ = chunkInfo->curPage_;
326 if (0 == size)
327 chunkInfo->setAllocType(VariableSizeAllocator);
328 else
329 chunkInfo->setAllocType(FixedSizeAllocator);
331 //TODO::Generate chunkid::use tableid
332 chunkInfo->setChunkID(-1);
333 printDebug(DM_Database, "Creating new User chunk chunkID:%d size: %d firstPage:%x",
334 -1, chunkInfo->allocSize_, firstPageInfo);
336 return chunkInfo;
340 //Assumes that system database mutex is taken before calling this.
341 DbRetVal DatabaseManagerImpl::deleteUserChunk(Chunk *chunk)
343 //Go to the pages and set them to notUsed
344 Page *page = chunk->firstPage_;
345 PageInfo* pageInfo = ((PageInfo*)page);
346 //Here...sure that atleast one page will be there even no tuples
347 //are inserted.so not checking if pageInfo == NULL
348 while( pageInfo->nextPage_ != NULL)
350 PageInfo *prev = pageInfo;
351 pageInfo = (PageInfo*)(pageInfo->nextPage_);
352 //sets pageInfo->isUsed_ = 0 and pageInfo->hasFreeSpace_ = 0
353 //and initializes the page content to zero
354 if(NULL == pageInfo->nextPageAfterMerge_)
355 os::memset(prev, 0, PAGE_SIZE);
356 else
358 int size = (char*) pageInfo->nextPageAfterMerge_ - (char*) pageInfo;
359 os::memset(prev, 0, size);
361 printDebug(DM_Database,"deleting user chunk:%x clearing page %x",chunk, prev);
363 //The above loop wont execute for the last page
364 //and for the case where table has only one page
365 if(NULL == pageInfo->nextPageAfterMerge_)
366 os::memset(pageInfo, 0, PAGE_SIZE);
367 else
369 int size = (char*) pageInfo->nextPageAfterMerge_ - (char*) pageInfo;
370 os::memset(pageInfo, 0, size);
372 printDebug(DM_Database,"deleting user chunk:%x clearing page %x",chunk, pageInfo);
373 chunk->chunkID_ = -1;
374 chunk->allocSize_ = 0;
375 chunk->curPage_ = NULL;
376 chunk->firstPage_ = NULL;
377 chunk->destroyMutex();
378 printDebug(DM_Database,"deleting user chunk:%x",chunk);
379 return OK;
382 //-1 -> Unable to create chunk. No memory
383 //-2 -> Unable to update the catalog tables
384 DbRetVal DatabaseManagerImpl::createTable(const char *name, TableDef &def)
386 DbRetVal rv = OK;
387 size_t sizeofTuple = def.getTupleSize();
388 rv = systemDatabase_->getDatabaseMutex();
389 if (OK != rv ) {
390 printError(rv, "Unable to get Database mutex");
391 return rv;
393 //create a chunk to store the tuples
394 Chunk *ptr = createUserChunk(sizeofTuple);
395 if (NULL == ptr)
397 systemDatabase_->releaseDatabaseMutex();
398 printError(ErrNoResource, "Unable to create user chunk");
399 return ErrNoResource;
401 printDebug(DM_Database,"Created UserChunk:%x", ptr);
402 //add row to TABLE
403 CatalogTableTABLE cTable(systemDatabase_);
404 int tblID = ((Chunk*)ptr)->getChunkID();
405 void *tptr = NULL;
406 rv = cTable.insert(name, tblID, sizeofTuple,
407 def.getFieldCount(), ptr, tptr);
408 if (OK != rv)
410 deleteUserChunk(ptr);
411 systemDatabase_->releaseDatabaseMutex();
412 printError(ErrSysInternal, "Unable to update catalog table TABLE");
413 return ErrSysInternal;
415 printDebug(DM_Database,"Inserted into TABLE:%s",name);
416 //add rows to FIELD
417 FieldIterator iter = def.getFieldIterator();
418 CatalogTableFIELD cField(systemDatabase_);
419 rv = cField.insert(iter, tblID ,tptr);
420 if (OK != rv)
422 deleteUserChunk(ptr);
423 void *cptr, *ttptr;//Dummy as remove below needs both these OUT params
424 cTable.remove(name, cptr, ttptr);
425 systemDatabase_->releaseDatabaseMutex();
426 printError(ErrSysInternal, "Unable to update catalog table FIELD");
427 return ErrSysInternal;
429 printDebug(DM_Database,"Inserted into FIELD:%s",name);
430 systemDatabase_->releaseDatabaseMutex();
431 printDebug(DM_Database,"Table Created:%s",name);
432 logFinest(logger, "Table Created %s" , name);
433 return OK;
436 //TODO::If any operation fails in between, then we may have some
437 //dangling tuples, say we have have rows in INDEX table
438 //which will not have any corresponding entries in TABLE
439 //CHANGE the sequence so that it deletes from the bottom as
440 //opposed to start from top as is written now
441 DbRetVal DatabaseManagerImpl::dropTable(const char *name)
443 void *chunk = NULL;
444 void *tptr =NULL;
445 DbRetVal rv = systemDatabase_->getDatabaseMutex();
446 if (OK != rv) {
447 printError(ErrSysInternal, "Unable to get database mutex");
448 return ErrSysInternal;
451 //remove the entry in TABLE
452 CatalogTableTABLE cTable(systemDatabase_);
453 rv = cTable.remove(name, chunk, tptr);
454 if (OK != rv) {
455 systemDatabase_->releaseDatabaseMutex();
456 printError(ErrSysInternal, "Unable to update catalog table TABLE");
457 return ErrSysInternal;
459 printDebug(DM_Database,"Deleted from TABLE:%s",name);
461 //remove the entries in the FIELD table
462 CatalogTableFIELD cField(systemDatabase_);
463 rv = cField.remove(tptr);
464 if (OK != rv) {
465 systemDatabase_->releaseDatabaseMutex();
466 printError(ErrSysInternal, "Unable to update catalog table FIELD");
467 return ErrSysInternal;
469 printDebug(DM_Database,"Deleted from FIELD:%s",name);
471 rv = deleteUserChunk((Chunk*)chunk);
472 if (OK != rv) {
473 systemDatabase_->releaseDatabaseMutex();
474 printError(rv, "Unable to delete the chunk");
475 return rv;
477 printDebug(DM_Database,"Deleted UserChunk:%x", chunk);
479 //TODO::check whether indexes are available and drop that also.
480 CatalogTableINDEX cIndex(systemDatabase_);
481 int noIndexes = cIndex.getNumIndexes(tptr);
482 char *idxName;
483 for (int i =1 ; i<= noIndexes; i++) {
484 idxName = cIndex.getIndexName(tptr, i);
485 dropIndexInt(idxName, false);
487 Chunk *chunkNode = systemDatabase_->getSystemDatabaseChunk(UserChunkTableId);
488 chunkNode->free(systemDatabase_, (Chunk *) chunk);
489 systemDatabase_->releaseDatabaseMutex();
490 printDebug(DM_Database, "Deleted Table %s" , name);
491 logFinest(logger, "Deleted Table %s" , name);
492 return OK;
495 //Return values: NULL for table not found
496 Table* DatabaseManagerImpl::openTable(const char *name)
498 DbRetVal ret = OK;
499 //TODO::store table handles in list so that if it is
500 //not closed by the application. destructor shall close it.
501 TableImpl *table = new TableImpl();
502 table->setDB(db_);
503 table->setSystemDB(systemDatabase_);
504 table->setLockManager(lMgr_);
505 table->setTrans(&(tMgr_->trans));
507 //to store the chunk pointer of table
508 void *chunk = NULL;
510 //to store the tuple pointer of the table
511 void *tptr =NULL;
513 //TODO::need to take shared lock on the table so that
514 //all ddl operation will be denied on that table
515 //which includes index creation, alter table
517 DbRetVal rv = systemDatabase_->getDatabaseMutex();
518 if (OK != rv) {
519 printError(ErrSysInternal, "Unable to get database mutex");
520 return NULL;
522 CatalogTableTABLE cTable(systemDatabase_);
523 ret = cTable.getChunkAndTblPtr(name, chunk, tptr);
524 if ( OK != ret)
526 printError(ErrNotExists, "Table not exists %s", name);
527 return NULL;
529 TABLE *tTuple = (TABLE*)tptr;
530 table->setTableInfo(tTuple->tblName_, tTuple->tblID_, tTuple->length_,
531 tTuple->numFlds_, tTuple->numIndexes_, tTuple->chunkPtr_);
533 //get field information from FIELD table
534 CatalogTableFIELD cField(systemDatabase_);
535 cField.getFieldInfo(tptr, table->fldList_);
537 //get the number of indexes on this table
538 //and populate the indexPtr array
539 CatalogTableINDEX cIndex(systemDatabase_);
540 table->numIndexes_ = cIndex.getNumIndexes(tptr);
541 if (table->numIndexes_)
542 table->indexPtr_ = new char*[table->numIndexes_];
543 else
544 table->indexPtr_ = NULL;
545 cIndex.getIndexPtrs(tptr, table->indexPtr_);
547 //HACK:Below works only for one hash index on table
548 if (table->numIndexes_ == 1)
550 SingleFieldHashIndexInfo *hIdxInfo = new SingleFieldHashIndexInfo();
551 CatalogTableINDEXFIELD cIndexField(systemDatabase_);
552 cIndexField.getFieldNameAndType(table->indexPtr_[0], hIdxInfo->fldName,
553 hIdxInfo->type);
554 ChunkIterator citer = CatalogTableINDEX::getIterator(table->indexPtr_[0]);
555 hIdxInfo->noOfBuckets = CatalogTableINDEX::getNoOfBuckets(table->indexPtr_[0]);
556 hIdxInfo->buckets = (Bucket*)citer.nextElement();
558 table->idxInfo = (IndexInfo*) hIdxInfo;
560 systemDatabase_->releaseDatabaseMutex();
561 printDebug(DM_Database,"Opening table handle name:%s chunk:%x numIndex:%d",
562 name, chunk, table->numIndexes_);
563 logFinest(logger, "Opening Table %s" , name);
565 return table;
568 //Return values: -1 for table not found
569 void DatabaseManagerImpl::closeTable(Table *table)
571 printDebug(DM_Database,"Closing table handle: %x", table);
572 if (NULL == table) return;
573 delete table;
574 logFinest(logger, "Closing Table");
577 DbRetVal DatabaseManagerImpl::createIndex(const char *indName, IndexInitInfo *info)
579 DbRetVal rv = OK;
580 if (info->indType == hashIndex)
582 //Assumes info is of type HashIndexInitInfo
583 HashIndexInitInfo *hInfo = (HashIndexInitInfo*) info;
584 rv = createHashIndex(indName, info->tableName, info->list, hInfo->bucketSize);
586 else
588 //TODO::tree index
590 return rv;
594 //-1 -> Table does not exists
595 //-2 -> Field does not exists
596 //-3 -> bucketSize is not valid
597 DbRetVal DatabaseManagerImpl::createHashIndex(const char *indName, const char *tblName,
598 FieldNameList &fldList, int bucketSize)
600 //validate the bucket size
601 if (bucketSize < 100 || bucketSize > 200000)
603 printError(ErrBadRange, "Index Bucket size %d not in range 100-200000",
604 bucketSize);
605 return ErrBadRange;
607 void *tptr =NULL;
608 void *chunk = NULL;
609 DbRetVal rv = systemDatabase_->getDatabaseMutex();
610 if (OK != rv)
612 printError(ErrSysInternal, "Unable to get database mutex");
613 return ErrSysInternal;
616 //check whether table exists
617 CatalogTableTABLE cTable(systemDatabase_);
618 cTable.getChunkAndTblPtr(tblName, chunk, tptr);
619 if (NULL == tptr)
621 systemDatabase_->releaseDatabaseMutex();
622 printError(ErrNotExists, "Table does not exist %s", tblName);
623 return ErrNotExists;
625 int totFlds = fldList.size();
627 //check whether field exists
628 char **fptr = new char* [totFlds];
629 CatalogTableFIELD cField(systemDatabase_);
630 rv = cField.getFieldPtrs(fldList, tptr, fptr);
631 if (OK != rv)
633 delete[] fptr;
634 systemDatabase_->releaseDatabaseMutex();
635 printError(ErrNotExists, "Field does not exist");
636 return ErrNotExists;
639 //create chunk to store the meta data of the index created
640 //for latches and bucket pointers
641 Chunk* chunkInfo = createUserChunk(bucketSize * sizeof(Bucket));
642 if (NULL == chunkInfo)
644 delete[] fptr;
645 systemDatabase_->releaseDatabaseMutex();
646 printError(ErrSysInternal, "Unable to create chunk");
647 return ErrSysInternal;
649 //create memory for holding the bucket pointers
650 void *buckets = chunkInfo->allocate(db_);
651 if (NULL == buckets)
653 delete[] fptr;
654 deleteUserChunk(chunkInfo);
655 systemDatabase_->releaseDatabaseMutex();
656 printError(ErrNoMemory, "Unable to allocate memory for bucket");
657 return ErrNoMemory;
659 Bucket *buck = (Bucket*) buckets;
660 initHashBuckets(buck, bucketSize);
662 //create chunk to store the hash index nodes
663 Chunk* hChunk = createUserChunk(sizeof(HashIndexNode));
664 if (NULL == hChunk)
666 delete[] fptr;
667 deleteUserChunk(chunkInfo);
668 systemDatabase_->releaseDatabaseMutex();
669 printError(ErrSysInternal, "Unable to create chunk for storing hash index nodes");
670 return ErrSysInternal;
674 //add row to INDEX
675 void *tupleptr = NULL;
676 CatalogTableINDEX cIndex(systemDatabase_);
677 DbRetVal ret = cIndex.insert(indName, tptr, fldList.size(),
678 chunkInfo, bucketSize, hChunk, tupleptr);
679 if (OK != rv)
681 delete[] fptr;
682 deleteUserChunk(hChunk);
683 deleteUserChunk(chunkInfo);
684 systemDatabase_->releaseDatabaseMutex();
685 printError(ErrSysInternal, "Catalog table updation failed in INDEX table");
686 return ErrSysInternal;
688 //add rows to INDEXFIELD
689 CatalogTableINDEXFIELD cIndexField(systemDatabase_);
690 ret = cIndexField.insert(fldList, tupleptr, tptr, fptr);
692 if (OK != rv)
694 delete[] fptr;
695 deleteUserChunk(hChunk);
696 deleteUserChunk(chunkInfo);
697 systemDatabase_->releaseDatabaseMutex();
698 printError(ErrSysInternal, "Catalog table updation failed in INDEXFIELD table");
699 return ErrSysInternal;
701 delete[] fptr;
702 //TODO::If tuples present in this table, then
703 //create hash index nodes and store it
704 //Take table lock
705 systemDatabase_->releaseDatabaseMutex();
706 printDebug(DM_Database, "Creating Hash Index Name:%s tblname:%s buckets:%x",
707 indName, tblName, buckets);
708 logFinest(logger, "Creating HashIndex %s on %s with bucket size %d",
709 indName, tblName, buckets);
710 return OK;
713 void DatabaseManagerImpl::initHashBuckets(Bucket *buck, int bucketSize)
715 os::memset((void*)buck, 0, bucketSize * sizeof(Bucket));
717 for (int i=0; i < bucketSize ; i++)
719 buck[i].mutex_.init();
721 return;
724 DbRetVal DatabaseManagerImpl::dropIndex(const char *name)
726 return dropIndexInt(name, true);
729 DbRetVal DatabaseManagerImpl::dropIndexInt(const char *name, bool takeLock)
731 DbRetVal rv = OK;
732 void *chunk = NULL, *hchunk = NULL;
733 void *tptr =NULL;
734 int ret = 0;
735 if (takeLock) {
736 rv = systemDatabase_->getDatabaseMutex();
737 if (OK != rv)
739 printError(ErrSysInternal, "Unable to get database mutex");
740 return ErrSysInternal;
744 //remove the entry in INDEX
745 CatalogTableINDEX cIndex(systemDatabase_);
746 rv = cIndex.remove(name, chunk, hchunk, tptr);
747 if (OK != rv)
749 if (takeLock) systemDatabase_->releaseDatabaseMutex();
750 printError(ErrSysInternal, "Catalog table updation failed for INDEX table");
751 return ErrSysInternal;
753 printDebug(DM_Database, "Removing from INDEX %s",name);
754 //remove the entries in the INDEXFIELD table
755 CatalogTableINDEXFIELD cIndexField(systemDatabase_);
756 rv = cIndexField.remove(tptr);
757 if (OK != rv)
759 if (takeLock) systemDatabase_->releaseDatabaseMutex();
760 printError(ErrSysInternal, "Catalog table updation failed for INDEX table");
761 return ErrSysInternal;
763 printDebug(DM_Database, "Removing from INDEXFIELD %s",name);
765 //delete the index chunk
766 rv = deleteUserChunk((Chunk*)chunk);
767 if (OK != rv)
769 if (takeLock) systemDatabase_->releaseDatabaseMutex();
770 printError(ErrSysInternal, "Unable to delete the index chunk");
771 return ErrSysInternal;
773 //delete the index hash node chunk
774 rv = deleteUserChunk((Chunk*)hchunk);
775 if (OK != rv)
777 if (takeLock) systemDatabase_->releaseDatabaseMutex();
778 printError(ErrSysInternal, "Unable to delete the index hash node chunk");
779 return ErrSysInternal;
781 if (takeLock) systemDatabase_->releaseDatabaseMutex();
782 Chunk *chunkNode = systemDatabase_->getSystemDatabaseChunk(UserChunkTableId);
783 chunkNode->free(systemDatabase_, (Chunk *) chunk);
784 chunkNode->free(systemDatabase_, (Chunk *) hchunk);
786 //TODO::If tuples present in this table, then
787 //free all hash index nodes for this table.
788 //free all nodes in list of all buckets
789 //Take table lock
791 printDebug(DM_Database, "Dropped hash index %s",name);
792 logFinest(logger, "Deleted Index %s", name);
793 return OK;
796 DbRetVal DatabaseManagerImpl::registerThread()
798 DbRetVal rv = OK;
799 if (pMgr_ != NULL)
801 printError(ErrAlready, "Process already registered\n");
802 return ErrAlready;
804 pMgr_ = new ProcessManager(sysDb());
805 return pMgr_->registerThread();
808 DbRetVal DatabaseManagerImpl::deregisterThread()
810 if (pMgr_ == NULL)
812 printError(ErrBadCall, "Process already deregistered or never registered\n");
813 return ErrBadCall;
815 DbRetVal rv = pMgr_->deregisterThread();
816 delete pMgr_;
817 pMgr_ = NULL;
818 return rv;
821 ChunkIterator DatabaseManagerImpl::getSystemTableIterator(CatalogTableID id)
823 Chunk *fChunk = systemDatabase_->getSystemDatabaseChunk(id);
824 return fChunk->getIterator();
827 Chunk* DatabaseManagerImpl::getSystemTableChunk(CatalogTableID id)
829 return systemDatabase_->getSystemDatabaseChunk(id);