optimizing single table aggregate queries with no index
[csql.git] / src / storage / TableImpl.cxx
blobab6145ca92f5cb9140840274bb1958bf8d08db5c
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Index.h>
17 #include<CatalogTables.h>
18 #include<Lock.h>
19 #include<Debug.h>
20 #include<Table.h>
21 #include<TableImpl.h>
22 #include<Predicate.h>
23 #include<PredicateImpl.h>
24 #include<Index.h>
25 #include<Config.h>
26 #include<AggTableImpl.h> //for AggType
28 void Table::getFieldNameAlone(char *fname, char *name) {
29 bool dotFound= false;
30 char *fullname = fname;
31 while(*fullname != '\0')
33 if (*fullname == '.') { dotFound = true; break; }
34 fullname++;
36 if (dotFound) strcpy(name, ++fullname); else strcpy(name, fname);
39 void Table::getTableNameAlone(char *fname, char *name) {
40 strcpy(name, fname);
41 char *start = name;
42 bool dotFound = false;
43 while(*name != '\0')
45 if (*name == '.') { *name='\0'; dotFound = true; break; }
46 name++;
48 if (!dotFound) strcpy(start, "");
49 return;
52 DbRetVal TableImpl::bindFld(const char *name, void *val)
54 if (name[0] == '*' ) return OK;
55 //set it in the field list
56 char fieldName[IDENTIFIER_LENGTH];
57 getFieldNameAlone((char*)name, fieldName);
58 DbRetVal rv = fldList_.updateBindVal(fieldName, val);
59 if (OK != rv) {
60 printError(ErrNotExists, "Field %s does not exist", fieldName);
61 return rv;
63 return OK;
66 bool TableImpl::isFldNull(const char *name){
67 if (name[0] == '*') return false;
68 char fieldName[IDENTIFIER_LENGTH];
69 getFieldNameAlone((char*)name, fieldName);
70 int colpos = fldList_.getFieldPosition(fieldName);
71 if (-1 == colpos)
73 printError(ErrNotExists, "Field %s does not exist", name);
74 return false;
77 return isFldNull(colpos);
80 int TableImpl::getFldPos(char *name)
82 return fldList_.getFieldPosition(name);
84 bool TableImpl::isFldNull(int colpos)
86 if (!curTuple_) return false;
87 if (colpos <1 || colpos > numFlds_) return false;
88 if (isIntUsedForNULL) {
89 int nullVal = *(int*)((char*)curTuple_ + (length_ - 4));
90 if (BITSET(nullVal, colpos)) return true;
92 else {
93 char *nullOffset = (char*)curTuple_ - os::align(numFlds_);
94 if (nullOffset[colpos-1]) return true;
96 return false;
98 void TableImpl::resetNullinfo()
100 if (isIntUsedForNULL) {
101 iNullInfo =0;
103 else {
104 int i=0;
105 while(i < numFlds_) { cNullInfo[0] = 0;}
108 DbRetVal TableImpl::markFldNull(char const* name)
110 DbRetVal rv = OK;
111 int colpos = fldList_.getFieldPosition(name);
112 if (-1 == colpos)
114 printError(ErrNotExists, "Field %s does not exist", name);
115 return ErrNotExists;
117 rv = markFldNull(colpos);
118 return rv;
121 DbRetVal TableImpl::markFldNull(int fldpos)
123 if (fldpos <1 || fldpos > numFlds_) return ErrBadArg;
124 bool isBitSet = false;
125 if (isIntUsedForNULL) {
126 if (!BITSET(iNotNullInfo, fldpos)) {
127 SETBIT(iNullInfo, fldpos);
128 isBitSet = true;
130 else {
131 printError(ErrNullViolation, "NOT NULL constraint violation");
132 return ErrNullViolation;
135 else {
136 if (!BITSET(iNotNullInfo, fldpos)) cNullInfo[fldpos-1] = 1;
137 else {
138 printError(ErrNullViolation, "NOT NULL constraint violation");
139 return ErrNullViolation;
142 return OK;
145 void TableImpl::clearFldNull(const char *name)
147 int colpos = fldList_.getFieldPosition(name);
148 if (-1 == colpos)
150 printError(ErrNotExists, "Field %s does not exist", name);
151 return;
154 clearFldNull(colpos);
157 void TableImpl::clearFldNull(int colpos)
159 if (colpos <1 || colpos > numFlds_) return;
160 if (isIntUsedForNULL) {
161 CLEARBIT(iNullInfo, colpos);
163 else
164 cNullInfo[colpos-1] = 0;
165 return;
169 DbRetVal TableImpl::execute()
171 if (NULL != iter)
173 printError(ErrAlready,"Scan already open:Close and re execute");
174 return ErrAlready;
176 //table ptr is set in predicate because it needs to access the
177 //type and length to evaluate
178 if( NULL != pred_)
180 PredicateImpl *pred = (PredicateImpl*) pred_;
181 pred->setTable(this);
182 pred->setProjectionList(NULL);
183 pred->setOffsetAndType();
185 DbRetVal ret = OK;
186 ret = createPlan();
187 if (OK != ret)
189 printError(ErrSysInternal,"Unable to create the plan");
190 return ErrSysInternal;
192 if (useIndex_ >= 0)
193 iter = new TupleIterator(pred_, scanType_, idxInfo[useIndex_], chunkPtr_, sysDB_->procSlot,isBetween,isPointLook);
194 else if (scanType_ == fullTableScan)
195 iter = new TupleIterator(pred_, scanType_, NULL, chunkPtr_, sysDB_->procSlot,isBetween,isPointLook);
196 else
198 printError(ErrSysFatal,"Unable to create tuple iterator");//should never happen
199 return ErrSysFatal;
201 ret = iter->open();
202 if (OK != ret)
204 printError(ret,"Unable to open the iterator");
205 return ret;
207 return OK;
211 DbRetVal TableImpl::createPlan()
213 if (isPlanCreated) {
214 //will do early return here. plan is generated only when setPredicate is called.
215 if (scanType_ == unknownScan) return ErrSysFatal; //this should never happen
216 else return OK;
218 isBetween=false;
219 isPointLook = false;
220 useIndex_ = -1;
221 //if there are no predicates then go for full scan
222 //if there are no indexes then go for full scan
223 if (NULL == pred_ || NULL == indexPtr_)
225 scanType_ = fullTableScan;
226 isPlanCreated = true;
227 return OK;
229 if (NULL != indexPtr_)
231 PredicateImpl *pred = (PredicateImpl*)pred_;
232 printDebug(DM_Predicate, "predicate does not involve NOT , OR operator");
233 if (!pred->isNotOrInvolved())
235 printDebug(DM_Predicate, "predicate does not involve NOT , OR operator");
236 for (int i =0; i < numIndexes_; i++)
238 HashIndexInfo* info = (HashIndexInfo*) idxInfo[i];
239 FieldIterator iter = info->idxFldList.getIterator();
240 while(iter.hasElement())
242 FieldDef def = iter.nextElement();
243 if (pred->pointLookupInvolved(def.fldName_))
245 printDebug(DM_Predicate, "point lookup involved for field %s",def.fldName_);
246 if(hashIndex == info->indType) scanType_ = hashIndexScan;
247 else scanType_ = treeIndexScan;
248 isPlanCreated = true;
249 isPointLook = true;
250 useIndex_ = i;
252 else if (pred->isBetweenInvolved(def.fldName_))
254 if (treeIndex == info->indType)
256 scanType_ = treeIndexScan;
257 isPlanCreated = true;
258 useIndex_ = i;
259 isBetween=true;
260 break; //no composite index for tree index
263 else if (pred->rangeQueryInvolved(def.fldName_))
265 printDebug(DM_Predicate, "range lookup involved for field %s",def.fldName_);
266 if (treeIndex == info->indType)
268 scanType_ = treeIndexScan;
269 isPlanCreated = true;
270 useIndex_ = i;
271 break; //no composite index for tree index
273 }else {
274 useIndex_ = -1;
275 break;
277 }//while iter.hasElement()
278 if (useIndex_ != -1) return OK;
279 }//for
282 scanType_ = fullTableScan;
283 isPlanCreated = true;
284 return OK;
287 void* TableImpl::fetch()
289 fetchNoBind();
290 if (NULL == curTuple_) return curTuple_;
291 copyValuesToBindBuffer(curTuple_);
292 return curTuple_;
294 void* TableImpl::fetch(DbRetVal &rv)
296 fetchNoBind(rv);
297 if (NULL == curTuple_) return curTuple_;
298 copyValuesToBindBuffer(curTuple_);
299 return curTuple_;
302 void* TableImpl::fetchNoBind()
304 if (NULL == iter)
306 printError(ErrNotOpen,"Scan not open or Scan is closed\n");
307 return NULL;
309 void *prevTuple = curTuple_;
310 curTuple_ = iter->next();
311 if (NULL == curTuple_)
313 return NULL;
315 DbRetVal lockRet = OK;
316 if ((*trans)->isoLevel_ == READ_REPEATABLE) {
317 lockRet = lMgr_->getSharedLock(curTuple_, trans);
318 if (OK != lockRet)
320 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
321 curTuple_ = prevTuple;
322 return NULL;
326 else if ((*trans)->isoLevel_ == READ_COMMITTED)
328 //if iso level is read committed, operation duration lock is sufficent
329 //so release it here itself.
330 int tries = 5;
331 struct timeval timeout;
332 timeout.tv_sec = Conf::config.getMutexSecs();
333 timeout.tv_usec = Conf::config.getMutexUSecs();
335 bool status = false;
336 while(true) {
337 lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status);
338 if (OK != lockRet)
340 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
341 curTuple_ = prevTuple;
342 return NULL;
344 if (!status) break;
345 tries--;
346 if (tries == 0) break;
347 os::select(0, 0, 0, 0, &timeout);
350 if (tries == 0)
352 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
353 curTuple_ = prevTuple;
354 return NULL;
357 return curTuple_;
360 void* TableImpl::fetchNoBind(DbRetVal &rv)
362 rv = OK;
363 if (NULL == iter)
365 printError(ErrNotOpen,"Scan not open or Scan is closed\n");
366 rv = ErrNotOpen;
367 return NULL;
369 void *prevTuple = curTuple_;
370 curTuple_ = iter->next();
371 if (NULL == curTuple_)
373 return NULL;
375 DbRetVal lockRet = OK;
376 if ((*trans)->isoLevel_ == READ_REPEATABLE) {
377 lockRet = lMgr_->getSharedLock(curTuple_, trans);
378 if (OK != lockRet)
380 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
381 rv = ErrLockTimeOut;
382 curTuple_ = prevTuple;
383 return NULL;
387 else if ((*trans)->isoLevel_ == READ_COMMITTED)
389 //if iso level is read committed, operation duration lock is sufficent
390 //so release it here itself.
391 int tries = 5;
392 struct timeval timeout;
393 timeout.tv_sec = Conf::config.getMutexSecs();
394 timeout.tv_usec = Conf::config.getMutexUSecs();
396 bool status = false;
397 while(true) {
398 lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status);
399 if (OK != lockRet)
401 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
402 curTuple_ = prevTuple;
403 rv = ErrLockTimeOut;
404 return NULL;
406 if (!status) break;
407 tries--;
408 if (tries == 0) break;
409 os::select(0, 0, 0, 0, &timeout);
412 if (tries == 0)
414 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
415 curTuple_ = prevTuple;
416 rv = ErrLockTimeOut;
417 return NULL;
420 return curTuple_;
422 DbRetVal TableImpl::fetchAgg(const char * fldName, AggType aType, void *buf)
424 FieldInfo *info = new FieldInfo();
425 DbRetVal rv = getFieldInfo(fldName, info);
426 if (OK != rv) return rv;
427 bool res= false;
429 char *tuple = (char*) fetchNoBind(rv);
430 if ( NULL == tuple)
432 *(int*)buf = 0; //assuming int. could create porting problems(64 |endian)
433 return OK;
435 int count =1;
436 AllDataType::copyVal(buf, (void*) (tuple+info->offset), info->type, info->length);
437 while(1) {
438 tuple = (char*) fetchNoBind(rv);
439 if (NULL == tuple) break;
440 switch(aType) {
441 case AGG_MIN:
443 res = AllDataType::compareVal(buf, (void*) (tuple+info->offset),
444 OpGreaterThan,
445 info->type, info->length);
446 if (res) AllDataType::copyVal(buf, (void*) (tuple+info->offset),
447 info->type, info->length);
448 break;
450 case AGG_MAX:
452 res = AllDataType::compareVal(buf, (void*) (tuple+info->offset),
453 OpLessThan,
454 info->type, info->length);
455 if (res) AllDataType::copyVal(buf, (void*) (tuple+info->offset),
456 info->type, info->length);
457 break;
459 case AGG_SUM:
461 AllDataType::addVal(buf, (void*) (tuple+info->offset),
462 info->type);
463 break;
465 case AGG_AVG:
467 AllDataType::addVal(buf, (void*) (tuple+info->offset),
468 info->type);
469 count++;
470 break;
472 case AGG_COUNT:
474 count++;
475 break;
479 switch(aType) {
480 case AGG_AVG:
482 AllDataType::divVal(buf, &count,info->type);
483 break;
485 case AGG_COUNT:
487 (*(int*)buf) = count;
488 break;
491 delete info;
492 return OK;
494 DbRetVal TableImpl::insertTuple()
496 DbRetVal ret =OK;
497 void *tptr = ((Chunk*)chunkPtr_)->allocate(db_, &ret);
498 if (NULL == tptr)
500 printError(ret, "Unable to allocate record from chunk");
501 return ret;
503 ret = lMgr_->getExclusiveLock(tptr, trans);
504 if (OK != ret)
506 ((Chunk*)chunkPtr_)->free(db_, tptr);
507 printError(ret, "Could not get lock for the insert tuple %x", tptr);
508 return ErrLockTimeOut;
511 curTuple_ = tptr;
513 ret = copyValuesFromBindBuffer(tptr);
514 if (ret != OK)
516 printError(ret, "Unable to copy values from bind buffer");
517 (*trans)->removeFromHasList(db_, tptr);
518 lMgr_->releaseLock(tptr);
519 ((Chunk*)chunkPtr_)->free(db_, tptr);
520 return ret;
522 int addSize = 0;
523 if (numFlds_ < 31)
525 addSize = 4;
526 *(int*)((char*)(tptr) + (length_-addSize)) = iNullInfo;
528 else
530 addSize = os::align(numFlds_);
531 os::memcpy(((char*)(tptr) + (length_-addSize)), cNullInfo, addSize);
534 //int tupleSize = length_ + addSize;
535 if (NULL != indexPtr_)
537 int i;
538 //it has index
539 for (i = 0; i < numIndexes_ ; i++)
541 ret = insertIndexNode(*trans, indexPtr_[i], idxInfo[i], tptr);
542 if (ret != OK) { printError(ret, "Error in inserting to index"); break;}
544 if (i != numIndexes_ )
546 for (int j = 0; j < i ; j++) {
547 printError(ErrWarning, "Deleting index node");
548 deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr);
550 lMgr_->releaseLock(tptr);
551 (*trans)->removeFromHasList(db_, tptr);
552 ((Chunk*)chunkPtr_)->free(db_, tptr);
553 //PRABA::TEMP
554 //printError(ret, "Unable to insert index node for tuple %x ", tptr);
555 printError(ret, "Unable to insert index node for tuple %x %d", tptr, *(int*)tptr);
556 return ret;
559 if (undoFlag)
560 ret = (*trans)->appendUndoLog(sysDB_, InsertOperation, tptr, length_);
561 if (ret != OK) {
562 printError(ret, "Unable to create undo log for %x %d", tptr, *(int*)tptr);
563 for (int j = 0; j < numIndexes_ ; j++) {
564 printError(ErrWarning, "Deleting index node");
565 deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr);
567 lMgr_->releaseLock(tptr);
568 (*trans)->removeFromHasList(db_, tptr);
569 ((Chunk*)chunkPtr_)->free(db_, tptr);
571 return ret;
574 DbRetVal TableImpl::deleteTuple()
576 if (NULL == curTuple_)
578 printError(ErrNotOpen, "Scan not open: No Current tuple");
579 return ErrNotOpen;
581 DbRetVal ret = lMgr_->getExclusiveLock(curTuple_, trans);
582 if (OK != ret)
584 printError(ret, "Could not get lock for the delete tuple %x", curTuple_);
585 return ErrLockTimeOut;
588 if (NULL != indexPtr_)
590 int i;
591 //it has index
592 for (i = 0; i < numIndexes_ ; i++)
594 ret = deleteIndexNode(*trans, indexPtr_[i], idxInfo[i], curTuple_);
595 if (ret != OK) break;
597 if (i != numIndexes_ )
599 for (int j = 0; j < i ; j++)
600 insertIndexNode(*trans, indexPtr_[j], idxInfo[j], curTuple_);
601 lMgr_->releaseLock(curTuple_);
602 (*trans)->removeFromHasList(db_, curTuple_);
603 printError(ret, "Unable to insert index node for tuple %x", curTuple_);
604 return ret;
607 ((Chunk*)chunkPtr_)->free(db_, curTuple_);
608 if (undoFlag)
609 ret = (*trans)->appendUndoLog(sysDB_, DeleteOperation, curTuple_, length_);
610 iter->prev();
611 return ret;
614 int TableImpl::deleteWhere()
616 int tuplesDeleted = 0;
617 DbRetVal rv = OK;
618 rv = execute();
619 if (rv !=OK) return (int) rv;
620 while(true){
621 fetchNoBind( rv);
622 if (rv != OK) { tuplesDeleted = (int)rv; break; }
623 if (NULL == curTuple_) break;
624 rv = deleteTuple();
625 if (rv != OK) {
626 printError(rv, "Error: Could only delete %d tuples", tuplesDeleted);
627 close();
628 return (int) rv;
630 tuplesDeleted++;
632 close();
633 return tuplesDeleted;
636 int TableImpl::truncate()
638 //take exclusive lock on the table
639 //get the chunk ptr of the table
640 //traverse the tablechunks and free all the pages except the first one
641 //get the chunk ptr of all its indexes
642 //traverse the indexchunks and free all the pages except the first one
643 //release table lock
645 //TEMPORARY FIX
646 DbRetVal rv = OK;
647 Predicate* tmpPred = pred_;
648 pred_ = NULL;
649 isPlanCreated = false;
650 int tuplesDeleted = deleteWhere();
651 isPlanCreated = false;
652 pred_ = tmpPred;
653 return tuplesDeleted;
656 DbRetVal TableImpl::updateTuple()
658 if (NULL == curTuple_)
660 printError(ErrNotOpen, "Scan not open: No Current tuple");
661 return ErrNotOpen;
663 DbRetVal ret = lMgr_->getExclusiveLock(curTuple_, trans);
664 if (OK != ret)
666 printError(ret, "Could not get lock for the update tuple %x", curTuple_);
667 return ErrLockTimeOut;
669 if (NULL != indexPtr_)
671 //it has index
672 //TODO::If it fails while updating index node, we have to undo all the updates
673 //on other indexes on the table.Currently it will leave the database in an
674 //inconsistent state.
675 for (int i = 0; i < numIndexes_ ; i++)
677 ret = updateIndexNode(*trans, indexPtr_[i], idxInfo[i], curTuple_);
678 if (ret != OK)
680 lMgr_->releaseLock(curTuple_);
681 (*trans)->removeFromHasList(db_, curTuple_);
682 printError(ret, "Unable to update index node for tuple %x", curTuple_);
683 return ret;
687 if (undoFlag)
688 ret = (*trans)->appendUndoLog(sysDB_, UpdateOperation, curTuple_, length_);
689 if (ret != OK) return ret;
690 int addSize = 0;
691 int iNullVal=iNullInfo;
692 if (numFlds_ < 31){
693 addSize=4;
694 if(!iNullVal){
695 iNullInfo = *(int*)((char*)(curTuple_) + (length_- addSize));
697 else
699 *(int*)((char*)(curTuple_) + (length_-addSize)) |= iNullInfo;
702 DbRetVal rv = copyValuesFromBindBuffer(curTuple_, false);
703 if (rv != OK) {
704 lMgr_->releaseLock(curTuple_);
705 (*trans)->removeFromHasList(db_, curTuple_);
706 return rv;
709 if (numFlds_ < 31)
711 if (!iNullVal) {
712 *(int*)((char*)(curTuple_) + (length_-addSize)) = iNullInfo;
713 iNullInfo=0;
715 else iNullInfo=iNullVal;
717 else
719 addSize = os::align(numFlds_);
720 //TODO::Do not do blind memcpy. It should OR each and every char
721 //os::memcpy(((char*)(curTuple_) + (length_-addSize)), cNullInfo, addSize);
724 return OK;
727 void TableImpl::printInfo()
729 printf(" <TableName> %s </TableName>\n", tblName_);
730 printf(" <TupleCount> %d </TupleCount>\n", numTuples());
731 printf(" <PagesUsed> %d </PagesUsed>\n", pagesUsed());
732 printf(" <SpaceUsed> %d </SpaceUsed>\n", spaceUsed());
733 printf(" <Indexes> %d <Indexes>\n", numIndexes_);
734 printf(" <TupleLength> %d </TupleLength>\n", length_);
735 printf(" <Fields> %d </Fields>\n", numFlds_);
736 printf(" <Indexes>\n");
737 for (int i =0; i<numIndexes_; i++)
738 printf("<IndexName> %s </IndexName>\n", CatalogTableINDEX::getName(indexPtr_[i]));
739 printf(" </Indexes>\n");
743 DbRetVal TableImpl::copyValuesFromBindBuffer(void *tuplePtr, bool isInsert)
745 //Iterate through the bind list and copy the value here
746 FieldIterator fIter = fldList_.getIterator();
747 char *colPtr = (char*) tuplePtr;
748 int fldpos=1;
749 while (fIter.hasElement())
751 FieldDef def = fIter.nextElement();
752 if (def.isNull_ && !def.isDefault_ && NULL == def.bindVal_ && isInsert)
754 printError(ErrNullViolation, "NOT NULL constraint violation for field %s\n", def.fldName_);
755 return ErrNullViolation;
757 if (def.isDefault_ && NULL == def.bindVal_ && isInsert)
759 void *dest = AllDataType::alloc(def.type_, def.length_);
760 AllDataType::convert(typeString, def.defaultValueBuf_, def.type_, dest, def.length_);
761 AllDataType::copyVal(colPtr, dest, def.type_, def.length_);
762 colPtr = colPtr + os::align(AllDataType::size(def.type_, def.length_));
763 fldpos++;
764 free (dest);
765 continue;
767 switch(def.type_)
769 case typeString:
770 if (NULL != def.bindVal_)
772 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
773 strcpy((char*)colPtr, (char*)def.bindVal_);
774 *(((char*)colPtr) + (def.length_-1)) = '\0';
776 else if (!def.isNull_ && !def.bindVal_ && isInsert) setNullBit(fldpos);
777 colPtr = colPtr + os::align(def.length_);
778 break;
779 case typeBinary:
780 if (NULL != def.bindVal_ ) {
781 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
782 DbRetVal rv = AllDataType::strToValue(colPtr, (char *) def.bindVal_, def.type_, def.length_);
783 if (rv != OK) return ErrBadArg;
785 else if (!def.isNull_ && isInsert && !def.bindVal_) setNullBit(fldpos);
786 colPtr = colPtr + os::align(def.length_);
787 break;
788 default:
789 if (NULL != def.bindVal_){
790 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
791 AllDataType::copyVal(colPtr, def.bindVal_, def.type_);}
792 else { if (!def.isNull_ && isInsert) setNullBit(fldpos); }
793 colPtr = colPtr + os::align(AllDataType::size(def.type_));
794 break;
796 fldpos++;
798 return OK;
800 void TableImpl::clearNullBit(int fldpos)
802 if (isIntUsedForNULL){
803 CLEARBIT(iNullInfo, fldpos);}
804 else
805 cNullInfo[fldpos-1] = 0;
807 void TableImpl::setNullBit(int fldpos)
809 if (isIntUsedForNULL)
810 SETBIT(iNullInfo, fldpos);
811 else
812 cNullInfo[fldpos-1] = 1;
814 DbRetVal TableImpl::copyValuesToBindBuffer(void *tuplePtr)
816 //Iterate through the bind list and copy the value here
817 FieldIterator fIter = fldList_.getIterator();
818 char *colPtr = (char*) tuplePtr;
819 while (fIter.hasElement())
821 FieldDef def = fIter.nextElement();
822 if (NULL != def.bindVal_)
823 AllDataType::copyVal(def.bindVal_, colPtr, def.type_, def.length_);
824 colPtr = colPtr + os::align(AllDataType::size(def.type_, def.length_));
826 return OK;
829 //-1 index not supported
830 DbRetVal TableImpl::insertIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple)
832 CINDEX *iptr = (CINDEX*)indexPtr;
833 DbRetVal ret = OK;
834 printDebug(DM_Table, "Inside insertIndexNode type %d", iptr->indexType_);
835 Index* idx = Index::getIndex(iptr->indexType_);
836 ret = idx->insert(this, tr, indexPtr, info, tuple,undoFlag);
837 return ret;
840 DbRetVal TableImpl::deleteIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple)
842 CINDEX *iptr = (CINDEX*)indexPtr;
843 DbRetVal ret = OK;
844 Index* idx = Index::getIndex(iptr->indexType_);
845 ret = idx->remove(this, tr, indexPtr, info, tuple, undoFlag);
846 return ret;
848 void TableImpl::printSQLIndexString()
850 CatalogTableINDEXFIELD cIndexField(sysDB_);
851 char fName[IDENTIFIER_LENGTH];
852 char *fldName = fName;
853 DataType type;
854 for (int i = 0; i < numIndexes_ ; i++)
856 CINDEX *iptr = (CINDEX*) indexPtr_[i];
857 printf("CREATE INDEX %s on %s ( ", iptr->indName_, getName());
858 FieldList fldList;
859 cIndexField.getFieldInfo(iptr, fldList);
860 FieldIterator fIter = fldList.getIterator();
861 bool firstFld = true;
862 while(fIter.hasElement())
864 FieldDef def = fIter.nextElement();
865 if (firstFld) { printf(" %s ", def.fldName_); firstFld = false; }
866 else printf(" ,%s ", def.fldName_);
868 printf(" ) ");
869 if (iptr->indexType_ == hashIndex) printf(" HASH ");
870 else printf(" TREE ");
871 if (((HashIndexInfo*) idxInfo[i])->isUnique) printf(" UNIQUE;\n"); else printf(";\n");
876 DbRetVal TableImpl::updateIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple)
878 CINDEX *iptr = (CINDEX*)indexPtr;
879 DbRetVal ret = OK;
880 Index* idx = Index::getIndex(iptr->indexType_);
881 //TODO::currently it updates irrespective of whether the key changed or not
882 //because of this commenting the whole index update code. relook at it and uncomment
884 ret = idx->update(this, tr, indexPtr, info, tuple, undoFlag);
886 return ret;
890 void TableImpl::setTableInfo(char *name, int tblid, size_t length,
891 int numFld, int numIdx, void *chunk)
893 strcpy(tblName_, name);
894 tblID_ = tblid;
895 length_ = length;
896 numFlds_ = numFld;
897 numIndexes_ = numIdx;
898 chunkPtr_ = chunk;
901 long TableImpl::spaceUsed()
903 Chunk *chk = (Chunk*)chunkPtr_;
904 long totSize = chk->getTotalDataNodes() * chk->getSize();
905 totSize = totSize + (chk->totalPages() * sizeof (PageInfo));
906 return totSize;
909 int TableImpl::pagesUsed()
911 Chunk *chk = (Chunk*)chunkPtr_;
912 return chk->totalPages();
915 long TableImpl::numTuples()
917 return ((Chunk*)chunkPtr_)->getTotalDataNodes();
920 List TableImpl::getFieldNameList()
922 List fldNameList;
923 FieldIterator fIter = fldList_.getIterator();
924 char fieldName[IDENTIFIER_LENGTH];
925 while (fIter.hasElement())
927 FieldDef def = fIter.nextElement();
928 Identifier *elem = new Identifier();
929 Table::getFieldNameAlone(def.fldName_, fieldName);
930 sprintf(elem->name, "%s.%s", getName(), fieldName);
931 fldNameList.append(elem);
933 return fldNameList;
935 DbRetVal TableImpl::close()
937 if (NULL == iter)
939 //printError(ErrNotOpen,"Scan not open");
940 //return ErrNotOpen;
941 //PRABA::when called multiple times it gives error
942 return OK;
944 iter->close();
945 delete iter;
946 iter = NULL;
947 return OK;
949 DbRetVal TableImpl::closeScan()
951 //do not throw scan not open error
952 //this function will be called by table handle
953 if (iter) {
954 iter->reset();
955 // delete iter;
956 // iter = NULL;
958 return OK;
960 DbRetVal TableImpl::lock(bool shared)
963 DbRetVal ret = OK;
965 if (shared)
966 ret = lMgr_->getSharedLock(chunkPtr_, NULL);
967 else
968 ret = lMgr_->getExclusiveLock(chunkPtr_, NULL);
969 if (OK != ret)
971 printError(ret, "Could not exclusive lock on the table %x", chunkPtr_);
972 }else {
973 //do not append for S to X upgrade
974 if (!ProcessManager::hasLockList.exists(chunkPtr_))
975 ProcessManager::hasLockList.append(chunkPtr_);
978 return ret;
980 DbRetVal TableImpl::unlock()
983 if (!ProcessManager::hasLockList.exists(chunkPtr_)) return OK;
984 DbRetVal ret = lMgr_->releaseLock(chunkPtr_);
985 if (OK != ret)
987 printError(ret, "Could not release exclusive lock on the table %x", chunkPtr_);
988 }else
990 ProcessManager::hasLockList.remove(chunkPtr_);
993 return OK;
996 TableImpl::~TableImpl()
998 if (NULL != iter ) { delete iter; iter = NULL; }
999 if (NULL != indexPtr_) { delete[] indexPtr_; indexPtr_ = NULL; }
1000 if (NULL != idxInfo)
1002 for (int i = 0; i < numIndexes_; i++) delete idxInfo[i];
1003 delete[] idxInfo;
1004 idxInfo = NULL;
1006 if (numFlds_ > 31 && cNullInfo != NULL) { free(cNullInfo); cNullInfo = NULL; }
1008 fldList_.removeAll();
1012 void *TableImpl::getBindFldAddr(const char *name)
1014 return fldList_.getBindField(name);
1016 bool TableImpl::isTableInvolved(char *tblName)
1018 //printf("Table isTableInvolved called for %s with %s\n", tblName, getName());
1019 if (0 == strcmp(getName(), tblName)) return true; else return false;
1021 bool TableImpl::pushPredicate(Predicate *pred)
1023 bool ret = false;
1024 PredicateImpl *pImpl = (PredicateImpl*) pred;
1025 char tableName[IDENTIFIER_LENGTH];
1026 Table::getTableNameAlone(pImpl->getFldName1(), tableName);
1027 //printf("predicate tbl name %s\n", tableName);
1029 //if predicate is of form t1.f1=t2.f1 then do not push here
1030 if (0 != strcmp(pImpl->getFldName2(),"")) return ret;
1032 if (0 == strcmp(getName(), tableName))
1034 setPredicate(pred);
1035 //printf("PRABA::pushed predicate in tablehdl %s\n", getName());
1036 ret = true;
1038 return ret;
1040 void TableImpl::setPredicate(Predicate *pred)
1042 if (NULL == pred_) { pred_ = pred; return; }
1043 Predicate *curPred = pred_;
1044 PredicateImpl *newPred = new PredicateImpl();
1045 newPred->setTerm(curPred, OpAnd, pred);
1046 newPred->setTable(this);
1047 pred_ = newPred;
1048 return;
1050 void TableImpl::printPlan(int space)
1052 char spaceBuf[IDENTIFIER_LENGTH];
1053 memset(spaceBuf, 32, IDENTIFIER_LENGTH);
1054 spaceBuf[space] = '\0';
1055 printf("%s <TABLE-NODE>\n", spaceBuf);
1056 printf("%s <NAME> %s </NAME>\n", spaceBuf, getName());
1057 printf("%s <ScanType> %s </ScanType>\n", spaceBuf, ScanTypeNames[scanType_]);
1058 PredicateImpl *pred = (PredicateImpl*)pred_;
1059 if (pred) pred->print(space+2);
1060 printf("%s </TABLE-NODE>\n", spaceBuf);