submitting patch from enterprise version
[csql.git] / src / storage / TableImpl.cxx
blob71a494050f7488edcc95f0c8f74a4d0a3ddf8641
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Index.h>
17 #include<CatalogTables.h>
18 #include<Lock.h>
19 #include<Debug.h>
20 #include<Table.h>
21 #include<TableImpl.h>
22 #include<Predicate.h>
23 #include<PredicateImpl.h>
24 #include<Index.h>
25 #include<Config.h>
26 #include<AggTableImpl.h> //for AggType
28 void Table::getFieldNameAlone(char *fname, char *name) {
29 bool dotFound= false;
30 char *fullname = fname;
31 while(*fullname != '\0')
33 if (*fullname == '.') { dotFound = true; break; }
34 fullname++;
36 if (dotFound) strcpy(name, ++fullname); else strcpy(name, fname);
39 void Table::getTableNameAlone(char *fname, char *name) {
40 strcpy(name, fname);
41 char *start = name;
42 bool dotFound = false;
43 while(*name != '\0')
45 if (*name == '.') { *name='\0'; dotFound = true; break; }
46 name++;
48 if (!dotFound) strcpy(start, "");
49 return;
52 DbRetVal TableImpl::bindFld(const char *name, void *val)
54 if (name[0] == '*' ) return OK;
55 //set it in the field list
56 char fieldName[IDENTIFIER_LENGTH];
57 getFieldNameAlone((char*)name, fieldName);
58 DbRetVal rv = fldList_.updateBindVal(fieldName, val);
59 if (OK != rv) {
60 printError(ErrNotExists, "Field %s does not exist", fieldName);
61 return rv;
63 return OK;
66 bool TableImpl::isFldNull(const char *name){
67 if (name[0] == '*') return false;
68 char fieldName[IDENTIFIER_LENGTH];
69 getFieldNameAlone((char*)name, fieldName);
70 int colpos = fldList_.getFieldPosition(fieldName);
71 if (-1 == colpos)
73 printError(ErrNotExists, "Field %s does not exist", name);
74 return false;
77 return isFldNull(colpos);
80 int TableImpl::getFldPos(char *name)
82 return fldList_.getFieldPosition(name);
84 bool TableImpl::isFldNull(int colpos)
86 if (!curTuple_) return false;
87 if (colpos <1 || colpos > numFlds_) return false;
88 if (isIntUsedForNULL) {
89 int nullVal = *(int*)((char*)curTuple_ + (length_ - 4));
90 if (BITSET(nullVal, colpos)) return true;
92 else {
93 char *nullOffset = (char*)curTuple_ - os::align(numFlds_);
94 if (nullOffset[colpos-1]) return true;
96 return false;
98 void TableImpl::resetNullinfo()
100 if (isIntUsedForNULL) {
101 iNullInfo =0;
103 else {
104 int i=0;
105 while(i < numFlds_) { cNullInfo[0] = 0;}
108 DbRetVal TableImpl::markFldNull(char const* name)
110 DbRetVal rv = OK;
111 int colpos = fldList_.getFieldPosition(name);
112 if (-1 == colpos)
114 printError(ErrNotExists, "Field %s does not exist", name);
115 return ErrNotExists;
117 rv = markFldNull(colpos);
118 return rv;
121 DbRetVal TableImpl::markFldNull(int fldpos)
123 if (fldpos <1 || fldpos > numFlds_) return ErrBadArg;
124 bool isBitSet = false;
125 if (isIntUsedForNULL) {
126 if (!BITSET(iNotNullInfo, fldpos)) {
127 SETBIT(iNullInfo, fldpos);
128 isBitSet = true;
130 else {
131 printError(ErrNullViolation, "NOT NULL constraint violation");
132 return ErrNullViolation;
135 else {
136 if (!BITSET(iNotNullInfo, fldpos)) cNullInfo[fldpos-1] = 1;
137 else {
138 printError(ErrNullViolation, "NOT NULL constraint violation");
139 return ErrNullViolation;
142 return OK;
145 void TableImpl::clearFldNull(const char *name)
147 int colpos = fldList_.getFieldPosition(name);
148 if (-1 == colpos)
150 printError(ErrNotExists, "Field %s does not exist", name);
151 return;
154 clearFldNull(colpos);
157 void TableImpl::clearFldNull(int colpos)
159 if (colpos <1 || colpos > numFlds_) return;
160 if (isIntUsedForNULL) {
161 CLEARBIT(iNullInfo, colpos);
163 else
164 cNullInfo[colpos-1] = 0;
165 return;
168 bool TableImpl::hasIndex(char* fName)
170 if (NULL == indexPtr_) return false;
171 for (int i =0; i < numIndexes_; i++)
173 HashIndexInfo* info = (HashIndexInfo*) idxInfo[i];
174 FieldIterator iter = info->idxFldList.getIterator();
175 if(iter.hasElement())
177 FieldDef *def = iter.nextElement();
178 if(strcmp(def->fldName_, fName) == 0)
179 if(!iter.hasElement())//neglet if it is composite index
180 return true;
183 return false;
186 IndexType TableImpl::getIndexType(char *fName, int *pos)
188 if (NULL == indexPtr_) return unknownIndex;
189 for (int i =0; i < numIndexes_; i++)
191 HashIndexInfo* info = (HashIndexInfo*) idxInfo[i];
192 FieldIterator iter = info->idxFldList.getIterator();
193 if(iter.hasElement())
195 FieldDef *def = iter.nextElement();
196 if(strcmp(def->fldName_, fName) == 0)
197 if(!iter.hasElement()) {//neglet if it is composite index
198 *(int*)pos = i;
199 return info->indType;
203 *(int*)pos = -1;
204 return unknownIndex;
206 void TableImpl::addPredicate(char *fName, ComparisionOp op, void *buf)
208 char fieldName[IDENTIFIER_LENGTH];
209 Table::getFieldNameAlone(fName, fieldName);
210 PredicateImpl *pred = (PredicateImpl*) pred_;
211 PredicateImpl *newPred = new PredicateImpl();
212 newPred->setTerm(fName, op, buf);
213 if (NULL == pred) { pred_ = newPred; return; }
214 if (pred->isSingleTerm())
216 bool res = pred->appendIfSameFld(fName, op, buf);
217 if(res) {
218 delete newPred;
219 return;
222 PredicateImpl *bothPred = new PredicateImpl();
223 bothPred->setTerm(pred, OpAnd, newPred);
224 pred_ = bothPred;
227 DbRetVal TableImpl::optimize()
229 //table ptr is set in predicate because it needs to access the
230 //type and length to evaluate
231 if( NULL != pred_)
233 PredicateImpl *pred = (PredicateImpl*) pred_;
234 pred->setTable(this);
235 pred->setProjectionList(NULL);
236 pred->setOffsetAndType();
238 return createPlan();
241 DbRetVal TableImpl::execute()
243 if (NULL != iter)
245 //printError(ErrAlready,"Scan already open:Close and re execute");
246 return ErrAlready;
248 DbRetVal ret = OK;
249 ret = optimize();
250 if (OK != ret)
252 printError(ErrSysInternal,"Unable to create the plan");
253 return ErrSysInternal;
255 if (useIndex_ >= 0)
256 iter = new TupleIterator(pred_, scanType_, idxInfo[useIndex_], chunkPtr_, sysDB_->procSlot,isBetween,isPointLook);
257 else if (scanType_ == fullTableScan)
258 iter = new TupleIterator(pred_, scanType_, NULL, chunkPtr_, sysDB_->procSlot,isBetween,isPointLook);
259 else
261 printError(ErrSysFatal,"Unable to create tuple iterator");//should never happen
262 return ErrSysFatal;
264 ret = iter->open();
265 if (OK != ret)
267 printError(ret,"Unable to open the iterator");
268 return ret;
270 return OK;
274 DbRetVal TableImpl::createPlan()
276 if (isPlanCreated) {
277 //will do early return here. plan is generated only when setPredicate is called.
278 if (scanType_ == unknownScan) return ErrSysFatal; //this should never happen
279 else return OK;
281 isBetween=false;
282 isPointLook = false;
283 useIndex_ = -1;
285 FieldIterator fIter = fldList_.getIterator();
286 FieldDef *def = NULL;
287 while ((def = fIter.nextElement())!= NULL) {
288 if (NULL != def->bindVal_) bindList_.append(def);
290 numBindFlds_ = bindList_.size();
291 bindListArray_ = (void **) malloc(numBindFlds_ * sizeof (void *));
292 void *elem = NULL;
293 int i = 0;
294 ListIterator it = bindList_.getIterator();
295 while ((elem = it.nextElement()) != NULL) bindListArray_[i++] = elem;
297 //if there are no predicates then go for full scan
298 //if there are no indexes then go for full scan
299 if (NULL == pred_ || NULL == indexPtr_)
301 scanType_ = fullTableScan;
302 isPlanCreated = true;
303 return OK;
305 if (NULL != indexPtr_)
307 PredicateImpl *pred = (PredicateImpl*)pred_;
308 printDebug(DM_Predicate, "predicate does not involve NOT , OR operator");
309 if (!pred->isNotOrInvolved())
311 printDebug(DM_Predicate, "predicate does not involve NOT , OR operator");
312 for (int i =0; i < numIndexes_; i++)
314 HashIndexInfo* info = (HashIndexInfo*) idxInfo[i];
315 FieldIterator iter = info->idxFldList.getIterator();
316 while(iter.hasElement())
318 FieldDef *def = iter.nextElement();
319 if (pred->pointLookupInvolved(def->fldName_))
321 printDebug(DM_Predicate, "point lookup involved for field %s",def->fldName_);
322 if(hashIndex == info->indType) scanType_ = hashIndexScan;
323 else scanType_ = treeIndexScan;
324 isPlanCreated = true;
325 isPointLook = true;
326 useIndex_ = i;
328 else if (pred->isBetweenInvolved(def->fldName_))
330 if (treeIndex == info->indType)
332 scanType_ = treeIndexScan;
333 isPlanCreated = true;
334 useIndex_ = i;
335 isBetween=true;
336 break; //no composite index for tree index
339 else if (pred->rangeQueryInvolved(def->fldName_))
341 printDebug(DM_Predicate, "range lookup involved for field %s",def->fldName_);
342 if (treeIndex == info->indType)
344 scanType_ = treeIndexScan;
345 isPlanCreated = true;
346 useIndex_ = i;
347 break; //no composite index for tree index
349 }else {
350 useIndex_ = -1;
351 break;
353 }//while iter.hasElement()
354 if (useIndex_ != -1) return OK;
355 }//for
358 scanType_ = fullTableScan;
359 isPlanCreated = true;
360 return OK;
363 void* TableImpl::fetch()
365 fetchNoBind();
366 if (NULL == curTuple_) return curTuple_;
367 copyValuesToBindBuffer(curTuple_);
368 return curTuple_;
370 void* TableImpl::fetch(DbRetVal &rv)
372 fetchNoBind(rv);
373 if (NULL == curTuple_) return curTuple_;
374 copyValuesToBindBuffer(curTuple_);
375 return curTuple_;
378 void* TableImpl::fetchNoBind()
380 if (NULL == iter)
382 printError(ErrNotOpen,"Scan not open or Scan is closed\n");
383 return NULL;
385 void *prevTuple = curTuple_;
386 curTuple_ = iter->next();
387 if (NULL == curTuple_)
389 return NULL;
391 DbRetVal lockRet = OK;
392 if(!loadFlag) {
393 if ((*trans)->isoLevel_ == READ_COMMITTED)
395 //if iso level is read committed, operation duration lock is sufficent so release it here itself.
396 int tries = 5;
397 struct timeval timeout;
398 timeout.tv_sec = Conf::config.getMutexSecs();
399 timeout.tv_usec = Conf::config.getMutexUSecs();
401 bool status = false;
402 while(true) {
403 lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status);
404 if (OK != lockRet)
406 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
407 curTuple_ = prevTuple;
408 return NULL;
410 if (!status) break;
411 tries--;
412 if (tries == 0) break;
413 os::select(0, 0, 0, 0, &timeout);
415 if (tries == 0)
417 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
418 curTuple_ = prevTuple;
419 return NULL;
422 else if ((*trans)->isoLevel_ == READ_REPEATABLE) {
423 lockRet = lMgr_->getSharedLock(curTuple_, trans);
424 if (OK != lockRet)
426 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
427 curTuple_ = prevTuple;
428 return NULL;
432 return curTuple_;
435 void* TableImpl::fetchNoBind(DbRetVal &rv)
437 rv = OK;
438 if (NULL == iter)
440 printError(ErrNotOpen,"Scan not open or Scan is closed\n");
441 rv = ErrNotOpen;
442 return NULL;
444 void *prevTuple = curTuple_;
445 curTuple_ = iter->next();
446 if (NULL == curTuple_)
448 return NULL;
450 DbRetVal lockRet = OK;
451 if(!loadFlag) {
452 if ((*trans)->isoLevel_ == READ_REPEATABLE) {
453 lockRet = lMgr_->getSharedLock(curTuple_, trans);
454 if (OK != lockRet)
456 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
457 rv = ErrLockTimeOut;
458 curTuple_ = prevTuple;
459 return NULL;
462 else if ((*trans)->isoLevel_ == READ_COMMITTED)
464 //if iso level is read committed, operation duration lock is sufficent so release it here itself.
465 int tries = 5;
466 struct timeval timeout;
467 timeout.tv_sec = Conf::config.getMutexSecs();
468 timeout.tv_usec = Conf::config.getMutexUSecs();
470 bool status = false;
471 while(true) {
472 lockRet = lMgr_->isExclusiveLocked( curTuple_, trans, status);
473 if (OK != lockRet)
475 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
476 curTuple_ = prevTuple;
477 rv = ErrLockTimeOut;
478 return NULL;
480 if (!status) break;
481 tries--;
482 if (tries == 0) break;
483 os::select(0, 0, 0, 0, &timeout);
485 if (tries == 0)
487 printError(lockRet, "Unable to get the lock for the tuple %x", curTuple_);
488 curTuple_ = prevTuple;
489 rv = ErrLockTimeOut;
490 return NULL;
494 return curTuple_;
496 DbRetVal TableImpl::fetchAgg(const char * fldName, AggType aType, void *buf)
498 FieldInfo *info = new FieldInfo();
499 DbRetVal rv = getFieldInfo(fldName, info);
500 if (OK != rv) return rv;
501 bool res= false;
502 if (AGG_MIN == aType || AGG_MAX == aType) {
503 int pos =0;
504 IndexType iType = getIndexType((char*)fldName, &pos);
505 if(treeIndex == iType && pos >=0) {
506 if (AGG_MIN == aType) {
507 HashIndexInfo* hInfo = (HashIndexInfo*) idxInfo[pos];
508 CINDEX *iptr = (CINDEX*) hInfo->indexPtr;
509 TreeIter *iter = new TreeIter((TreeNode*)iptr->hashNodeChunk_);
510 char *tuple = (char*) iter->getFirstElement();
511 if (tuple != NULL) {
512 AllDataType::copyVal(buf,(void*)(tuple+info->offset),
513 info->type, info->length);
514 delete iter;
515 return OK;
517 delete iter;
519 else if (AGG_MAX == aType) {
520 HashIndexInfo* hInfo = (HashIndexInfo*) idxInfo[pos];
521 CINDEX *iptr = (CINDEX*) hInfo->indexPtr;
522 TreeIter *iter = new TreeIter((TreeNode*)iptr->hashNodeChunk_);
523 char *tuple = (char*) iter->getLastElement();
524 if (tuple != NULL) {
525 AllDataType::copyVal(buf,(void*)(tuple+info->offset),
526 info->type, info->length);
527 delete iter;
528 return OK;
530 delete iter;
535 DataType type = info->type;
536 int length = info->length;
537 int offset = info->offset;
539 if (NULL == pred_ && typeInt == type)
540 { //perf opt
541 ChunkIterator cIter = ((Chunk*)chunkPtr_)->getIterator();
542 char *tuple =(char*)cIter.nextElement();
543 if (NULL == tuple) return OK;
544 int count =1;
545 AllDataType::copyVal(buf, (void*) (tuple+offset), type, length);
546 while(1) {
547 tuple = (char*)cIter.nextElement();
548 if (NULL == tuple) break;
549 switch(aType) {
550 case AGG_MIN:
552 if (*(int*)buf >= *((int*)(tuple+offset)))
553 *(int*)buf = *((int*)(tuple+offset));
554 break;
556 case AGG_MAX:
558 if (*(int*)buf <= *((int*)(tuple+offset)))
559 *(int*)buf = *((int*)(tuple+offset));
560 break;
562 case AGG_SUM:
564 *(int*)buf = *(int*)buf + *((int*)(tuple+offset));
565 break;
567 case AGG_AVG:
569 *(int*)buf = *(int*)buf + *((int*)(tuple+offset));
570 count++;
571 break;
573 case AGG_COUNT:
575 count++;
576 break;
580 if( AGG_AVG == aType) AllDataType::divVal(buf, &count, type);
581 else if (AGG_COUNT == aType) (*(int*)buf) = count;
582 delete info;
583 return OK;
586 char *tuple = (char*) fetchNoBind(rv);
587 if ( NULL == tuple) return OK;
588 int count =1;
589 AllDataType::copyVal(buf, (void*) (tuple+offset), type, length);
590 while(1) {
591 tuple = (char*) fetchNoBind(rv);
592 if (NULL == tuple) break;
593 switch(aType) {
594 case AGG_MIN:
596 res = AllDataType::compareVal(buf, (void*) (tuple+offset),
597 OpGreaterThanEquals,
598 type, length);
599 if (res) AllDataType::copyVal(buf, (void*) (tuple+offset),
600 type, length);
601 break;
603 case AGG_MAX:
605 res = AllDataType::compareVal(buf, (void*) (tuple+offset),
606 OpLessThanEquals,
607 type, length);
608 if (res) AllDataType::copyVal(buf, (void*) (tuple+offset),
609 type, length);
610 break;
612 case AGG_SUM:
614 AllDataType::addVal(buf, (void*) (tuple+offset),
615 type);
616 break;
618 case AGG_AVG:
620 AllDataType::addVal(buf, (void*) (tuple+offset),
621 type);
622 count++;
623 break;
625 case AGG_COUNT:
627 count++;
628 break;
632 switch(aType) {
633 case AGG_AVG:
635 AllDataType::divVal(buf, &count,type);
636 break;
638 case AGG_COUNT:
640 (*(int*)buf) = count;
641 break;
644 delete info;
645 return OK;
647 DbRetVal TableImpl::insertTuple()
649 DbRetVal ret =OK;
650 void *tptr = ((Chunk*)chunkPtr_)->allocate(db_, &ret);
651 if (NULL == tptr)
653 printError(ret, "Unable to allocate record from chunk");
654 return ret;
656 if (!loadFlag) {
657 ret = lMgr_->getExclusiveLock(tptr, trans);
658 if (OK != ret)
660 ((Chunk*)chunkPtr_)->free(db_, tptr);
661 printError(ret, "Could not get lock for the insert tuple %x", tptr);
662 return ErrLockTimeOut;
665 curTuple_ = tptr;
667 ret = copyValuesFromBindBuffer(tptr);
668 if (ret != OK)
670 printError(ret, "Unable to copy values from bind buffer");
671 if (!loadFlag) {
672 (*trans)->removeFromHasList(db_, tptr);
673 lMgr_->releaseLock(tptr);
675 ((Chunk*)chunkPtr_)->free(db_, tptr);
676 return ret;
678 int addSize = 0;
679 if (numFlds_ < 31)
681 addSize = 4;
682 *(int*)((char*)(tptr) + (length_-addSize)) = iNullInfo;
684 else
686 addSize = os::align(numFlds_);
687 os::memcpy(((char*)(tptr) + (length_-addSize)), cNullInfo, addSize);
690 //int tupleSize = length_ + addSize;
691 if (NULL != indexPtr_)
693 int i;
694 //it has index
695 for (i = 0; i < numIndexes_ ; i++)
697 ret = insertIndexNode(*trans, indexPtr_[i], idxInfo[i], tptr);
698 if (ret != OK) { printError(ret, "Error in inserting to index"); break;}
700 if (i != numIndexes_ )
702 for (int j = 0; j < i ; j++) {
703 printError(ErrWarning, "Deleting index node");
704 deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr);
706 if (!loadFlag) {
707 (*trans)->removeFromHasList(db_, tptr);
708 lMgr_->releaseLock(tptr);
710 ((Chunk*)chunkPtr_)->free(db_, tptr);
711 printError(ret, "Unable to insert index node for tuple %x ", tptr);
712 return ret;
715 if (!loadFlag)
716 ret = (*trans)->appendUndoLog(sysDB_, InsertOperation, tptr, length_);
717 if (ret != OK) {
718 printError(ret, "Unable to create undo log for %x %d", tptr, *(int*)tptr);
719 for (int j = 0; j < numIndexes_ ; j++) {
720 printError(ErrWarning, "Deleting index node");
721 deleteIndexNode(*trans, indexPtr_[j], idxInfo[j], tptr);
723 if (!loadFlag) {
724 (*trans)->removeFromHasList(db_, tptr);
725 lMgr_->releaseLock(tptr);
727 ((Chunk*)chunkPtr_)->free(db_, tptr);
729 return ret;
732 DbRetVal TableImpl::deleteTuple()
734 if (NULL == curTuple_)
736 printError(ErrNotOpen, "Scan not open: No Current tuple");
737 return ErrNotOpen;
739 DbRetVal ret = OK;
740 if (!loadFlag) {
741 ret = lMgr_->getExclusiveLock(curTuple_, trans);
742 if (OK != ret)
744 printError(ret, "Could not get lock for the delete tuple %x", curTuple_);
745 return ErrLockTimeOut;
749 if (NULL != indexPtr_)
751 int i;
752 //it has index
753 for (i = 0; i < numIndexes_ ; i++)
755 ret = deleteIndexNode(*trans, indexPtr_[i], idxInfo[i], curTuple_);
756 if (ret != OK) break;
758 if (i != numIndexes_ )
760 for (int j = 0; j < i ; j++)
761 insertIndexNode(*trans, indexPtr_[j], idxInfo[j], curTuple_);
762 if (!loadFlag) {
763 lMgr_->releaseLock(curTuple_);
764 (*trans)->removeFromHasList(db_, curTuple_);
766 printError(ret, "Unable to insert index node for tuple %x", curTuple_);
767 return ret;
770 ((Chunk*)chunkPtr_)->free(db_, curTuple_);
771 if (!loadFlag)
772 ret = (*trans)->appendUndoLog(sysDB_, DeleteOperation, curTuple_, length_);
773 iter->prev();
774 return ret;
777 int TableImpl::deleteWhere()
779 int tuplesDeleted = 0;
780 DbRetVal rv = OK;
781 rv = execute();
782 if (rv !=OK) return (int) rv;
783 while(true){
784 fetchNoBind( rv);
785 if (rv != OK) { tuplesDeleted = (int)rv; break; }
786 if (NULL == curTuple_) break;
787 rv = deleteTuple();
788 if (rv != OK) {
789 printError(rv, "Error: Could only delete %d tuples", tuplesDeleted);
790 closeScan();
791 return (int) rv;
793 tuplesDeleted++;
795 closeScan();
796 return tuplesDeleted;
799 int TableImpl::truncate()
801 //take exclusive lock on the table
802 //get the chunk ptr of the table
803 //traverse the tablechunks and free all the pages except the first one
804 //get the chunk ptr of all its indexes
805 //traverse the indexchunks and free all the pages except the first one
806 //release table lock
808 //TEMPORARY FIX
809 DbRetVal rv = OK;
810 Predicate* tmpPred = pred_;
811 pred_ = NULL;
812 isPlanCreated = false;
813 int tuplesDeleted = deleteWhere();
814 isPlanCreated = false;
815 pred_ = tmpPred;
816 return tuplesDeleted;
819 DbRetVal TableImpl::updateTuple()
821 if (NULL == curTuple_)
823 printError(ErrNotOpen, "Scan not open: No Current tuple");
824 return ErrNotOpen;
826 DbRetVal ret = OK;
827 if (!loadFlag) {
828 ret = lMgr_->getExclusiveLock(curTuple_, trans);
829 if (OK != ret)
831 printError(ret, "Could not get lock for the update tuple %x", curTuple_);
832 return ErrLockTimeOut;
835 if (NULL != indexPtr_)
837 //it has index
838 //TODO::If it fails while updating index node, we have to undo all the updates
839 //on other indexes on the table.Currently it will leave the database in an
840 //inconsistent state.
841 for (int i = 0; i < numIndexes_ ; i++)
843 ret = updateIndexNode(*trans, indexPtr_[i], idxInfo[i], curTuple_);
844 if (ret != OK)
846 if (!loadFlag) {
847 lMgr_->releaseLock(curTuple_);
848 (*trans)->removeFromHasList(db_, curTuple_);
850 printError(ret, "Unable to update index node for tuple %x", curTuple_);
851 return ret;
855 if (!loadFlag)
856 ret = (*trans)->appendUndoLog(sysDB_, UpdateOperation, curTuple_, length_);
857 if (ret != OK) return ret;
858 int addSize = 0;
859 int iNullVal=iNullInfo;
860 if (numFlds_ < 31){
861 addSize=4;
862 if(!iNullVal){
863 iNullInfo = *(int*)((char*)(curTuple_) + (length_- addSize));
865 else
867 *(int*)((char*)(curTuple_) + (length_-addSize)) |= iNullInfo;
870 DbRetVal rv = copyValuesFromBindBuffer(curTuple_, false);
871 if (rv != OK && !loadFlag) {
872 lMgr_->releaseLock(curTuple_);
873 (*trans)->removeFromHasList(db_, curTuple_);
874 return rv;
877 if (numFlds_ < 31)
879 if (!iNullVal) {
880 *(int*)((char*)(curTuple_) + (length_-addSize)) = iNullInfo;
881 iNullInfo=0;
883 else iNullInfo=iNullVal;
885 else
887 addSize = os::align(numFlds_);
888 //TODO::Do not do blind memcpy. It should OR each and every char
889 //os::memcpy(((char*)(curTuple_) + (length_-addSize)), cNullInfo, addSize);
892 return OK;
895 void TableImpl::printInfo()
897 printf(" <TableName> %s </TableName>\n", tblName_);
898 printf(" <TupleCount> %d </TupleCount>\n", numTuples());
899 printf(" <PagesUsed> %d </PagesUsed>\n", pagesUsed());
900 printf(" <SpaceUsed> %d </SpaceUsed>\n", spaceUsed());
901 printf(" <Indexes> %d <Indexes>\n", numIndexes_);
902 printf(" <TupleLength> %d </TupleLength>\n", length_);
903 printf(" <Fields> %d </Fields>\n", numFlds_);
904 printf(" <Indexes>\n");
905 for (int i =0; i<numIndexes_; i++)
906 printf("<IndexName> %s </IndexName>\n", CatalogTableINDEX::getName(indexPtr_[i]));
907 printf(" </Indexes>\n");
911 DbRetVal TableImpl::copyValuesFromBindBuffer(void *tuplePtr, bool isInsert)
913 //Iterate through the bind list and copy the value here
914 FieldIterator fIter = fldList_.getIterator();
915 char *colPtr = (char*) tuplePtr;
916 int fldpos=1;
917 while (fIter.hasElement())
919 FieldDef *def = fIter.nextElement();
920 if(def->isAutoIncrement_ && isInsert)
922 void *dest = AllDataType::alloc(def->type_, def->length_);
923 AllDataType::copyVal(dest,ptrToAuto, def->type_, def->length_);
924 if(def->bindVal_==NULL)
926 AllDataType::increment(colPtr, dest , def->type_);
927 AllDataType::copyVal(ptrToAuto,colPtr, def->type_, def->length_);
928 colPtr = colPtr + def->length_;
929 fldpos++;
930 free(dest);
931 continue;
932 }else {
933 if(AllDataType::compareVal(def->bindVal_, dest, OpGreaterThan, def->type_)){
934 AllDataType::copyVal(ptrToAuto,def->bindVal_, def->type_, def->length_);
936 free(dest);
939 if (def->isNull_ && !def->isDefault_ && NULL == def->bindVal_ && isInsert)
941 printError(ErrNullViolation, "NOT NULL constraint violation for field %s\n", def->fldName_);
942 return ErrNullViolation;
944 if (def->isDefault_ && NULL == def->bindVal_ && isInsert)
946 void *dest = AllDataType::alloc(def->type_, def->length_);
947 AllDataType::convert(typeString, def->defaultValueBuf_, def->type_, dest, def->length_);
948 AllDataType::copyVal(colPtr, dest, def->type_, def->length_);
949 colPtr = colPtr + def->length_;
950 fldpos++;
951 free (dest);
952 continue;
954 switch(def->type_)
956 case typeString:
957 if (NULL != def->bindVal_)
959 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
960 strcpy((char*)colPtr, (char*)def->bindVal_);
961 *(((char*)colPtr) + (def->length_-1)) = '\0';
963 else if (!def->isNull_ && !def->bindVal_ && isInsert) setNullBit(fldpos);
964 colPtr = colPtr + def->length_;
965 break;
966 case typeBinary:
967 if (NULL != def->bindVal_ )
969 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
970 DbRetVal rv = AllDataType::strToValue(colPtr, (char *) def->bindVal_, def->type_, def->length_);
971 if (rv != OK) return ErrBadArg;
973 else if (!def->isNull_ && isInsert && !def->bindVal_) setNullBit(fldpos);
974 colPtr = colPtr + def->length_;
975 break;
976 default:
977 if (NULL != def->bindVal_){
978 if(!isInsert && isFldNull(fldpos)){clearNullBit(fldpos);}
979 AllDataType::copyVal(colPtr, def->bindVal_, def->type_);}
980 else { if (!def->isNull_ && isInsert) setNullBit(fldpos); }
981 colPtr = colPtr + def->length_;
982 break;
984 fldpos++;
986 return OK;
988 void TableImpl::clearNullBit(int fldpos)
990 if (isIntUsedForNULL){
991 CLEARBIT(iNullInfo, fldpos);}
992 else
993 cNullInfo[fldpos-1] = 0;
995 void TableImpl::setNullBit(int fldpos)
997 if (isIntUsedForNULL)
998 SETBIT(iNullInfo, fldpos);
999 else
1000 cNullInfo[fldpos-1] = 1;
1002 DbRetVal TableImpl::copyValuesToBindBuffer(void *tuplePtr)
1004 //Iterate through the bind list and copy the value here
1005 char *colPtr = (char*) tuplePtr;
1006 FieldDef *def = NULL;
1007 for (int i = 0; i < numBindFlds_; i++) {
1008 def = (FieldDef *) bindListArray_[i];
1009 colPtr = (char *) tuplePtr + def->offset_;
1010 AllDataType::copyVal(def->bindVal_, colPtr, def->type_, def->length_);
1012 return OK;
1015 //-1 index not supported
1016 DbRetVal TableImpl::insertIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple, bool loadFlag)
1018 CINDEX *iptr = (CINDEX*)indexPtr;
1019 DbRetVal ret = OK;
1020 printDebug(DM_Table, "Inside insertIndexNode type %d", iptr->indexType_);
1021 Index* idx = Index::getIndex(iptr->indexType_);
1022 ret = idx->insert(this, tr, indexPtr, info, tuple, loadFlag);
1023 return ret;
1026 DbRetVal TableImpl::deleteIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple)
1028 CINDEX *iptr = (CINDEX*)indexPtr;
1029 DbRetVal ret = OK;
1030 Index* idx = Index::getIndex(iptr->indexType_);
1031 ret = idx->remove(this, tr, indexPtr, info, tuple, loadFlag);
1032 return ret;
1034 void TableImpl::printSQLIndexString()
1036 CatalogTableINDEXFIELD cIndexField(sysDB_);
1037 char fName[IDENTIFIER_LENGTH];
1038 char *fldName = fName;
1039 char idxName[IDENTIFIER_LENGTH];
1040 DataType type;
1041 for (int i = 0; i < numIndexes_ ; i++)
1043 CINDEX *iptr = (CINDEX*) indexPtr_[i];
1044 sprintf(idxName,"%s_idx_Auto_increment",getName());
1045 if(strcmp(iptr->indName_,idxName)==0){ continue; }
1046 printf("CREATE INDEX %s on %s ( ", iptr->indName_, getName());
1047 FieldList fldList;
1048 cIndexField.getFieldInfo(iptr, fldList);
1049 FieldIterator fIter = fldList.getIterator();
1050 bool firstFld = true;
1051 while(fIter.hasElement())
1053 FieldDef *def = fIter.nextElement();
1054 if (firstFld) { printf(" %s ", def->fldName_); firstFld = false; }
1055 else printf(" ,%s ", def->fldName_);
1057 printf(" ) ");
1058 if (iptr->indexType_ == hashIndex) printf(" HASH ");
1059 else printf(" TREE ");
1060 if (((HashIndexInfo*) idxInfo[i])->isUnique) printf(" UNIQUE");
1061 if(((HashIndexInfo*) idxInfo[i])->noOfBuckets != 1009 ) printf(" SIZE %d ",((HashIndexInfo*) idxInfo[i])->noOfBuckets );
1062 printf(";\n");
1067 DbRetVal TableImpl::updateIndexNode(Transaction *tr, void *indexPtr, IndexInfo *info, void *tuple)
1069 CINDEX *iptr = (CINDEX*)indexPtr;
1070 DbRetVal ret = OK;
1071 Index* idx = Index::getIndex(iptr->indexType_);
1072 //TODO::currently it updates irrespective of whether the key changed or not
1073 //because of this commenting the whole index update code. relook at it and uncomment
1075 ret = idx->update(this, tr, indexPtr, info, tuple, loadFlag);
1077 return ret;
1081 void TableImpl::setTableInfo(char *name, int tblid, size_t length,
1082 int numFld, int numIdx, void *chunk)
1084 strcpy(tblName_, name);
1085 tblID_ = tblid;
1086 length_ = length;
1087 numFlds_ = numFld;
1088 numIndexes_ = numIdx;
1089 chunkPtr_ = chunk;
1092 long TableImpl::spaceUsed()
1094 Chunk *chk = (Chunk*)chunkPtr_;
1095 long totSize = chk->getTotalDataNodes() * chk->getSize();
1096 totSize = totSize + (chk->totalPages() * sizeof (PageInfo));
1097 return totSize;
1100 int TableImpl::pagesUsed()
1102 Chunk *chk = (Chunk*)chunkPtr_;
1103 return chk->totalPages();
1106 long TableImpl::numTuples()
1108 return ((Chunk*)chunkPtr_)->getTotalDataNodes();
1111 List TableImpl::getFieldNameList()
1113 List fldNameList;
1114 FieldIterator fIter = fldList_.getIterator();
1115 char fieldName[IDENTIFIER_LENGTH];
1116 while (fIter.hasElement())
1118 FieldDef *def = fIter.nextElement();
1119 Identifier *elem = new Identifier();
1120 Table::getFieldNameAlone(def->fldName_, fieldName);
1121 sprintf(elem->name, "%s.%s", getName(), fieldName);
1122 fldNameList.append(elem);
1124 return fldNameList;
1126 DbRetVal TableImpl::close()
1128 if (iter) { iter->close(); delete iter; iter = NULL; }
1129 printDebug(DM_Database,"Closing table handle: %x", this);
1130 //table->unlock();
1131 delete pred_;
1132 delete this;
1133 logFinest(Conf::logger, "Closing Table");
1134 return OK;
1137 DbRetVal TableImpl::closeScan()
1139 //do not throw scan not open error
1140 //this function will be called by table handle
1141 if (iter) {
1142 // iter->reset();
1143 //PRABA::TEMP::otherwise fails.check with kishor
1144 delete iter;
1145 iter = NULL;
1147 return OK;
1149 DbRetVal TableImpl::lock(bool shared)
1152 DbRetVal ret = OK;
1154 if (shared)
1155 ret = lMgr_->getSharedLock(chunkPtr_, NULL);
1156 else
1157 ret = lMgr_->getExclusiveLock(chunkPtr_, NULL);
1158 if (OK != ret)
1160 printError(ret, "Could not exclusive lock on the table %x", chunkPtr_);
1161 }else {
1162 //do not append for S to X upgrade
1163 if (!ProcessManager::hasLockList.exists(chunkPtr_))
1164 ProcessManager::hasLockList.append(chunkPtr_);
1167 return ret;
1169 DbRetVal TableImpl::unlock()
1172 if (!ProcessManager::hasLockList.exists(chunkPtr_)) return OK;
1173 DbRetVal ret = lMgr_->releaseLock(chunkPtr_);
1174 if (OK != ret)
1176 printError(ret, "Could not release exclusive lock on the table %x", chunkPtr_);
1177 }else
1179 ProcessManager::hasLockList.remove(chunkPtr_);
1182 return OK;
1185 TableImpl::~TableImpl()
1187 if (NULL != iter ) { delete iter; iter = NULL; }
1188 if (NULL != indexPtr_) { delete[] indexPtr_; indexPtr_ = NULL; }
1189 if (NULL != idxInfo)
1191 for (int i = 0; i < numIndexes_; i++) delete idxInfo[i];
1192 delete[] idxInfo;
1193 idxInfo = NULL;
1195 if (numFlds_ > 31 && cNullInfo != NULL) { free(cNullInfo); cNullInfo = NULL; }
1196 if (bindList_.size()) bindList_.reset();
1197 if (bindListArray_) { free (bindListArray_); bindListArray_ = NULL; }
1198 fldList_.removeAll();
1202 void *TableImpl::getBindFldAddr(const char *name)
1204 return fldList_.getBindField(name);
1206 bool TableImpl::isTableInvolved(char *tblName)
1208 //printf("Table isTableInvolved called for %s with %s\n", tblName, getName());
1209 if (0 == strcmp(getName(), tblName)) return true; else return false;
1211 bool TableImpl::pushPredicate(Predicate *pred)
1213 bool ret = false;
1214 PredicateImpl *pImpl = (PredicateImpl*) pred;
1215 char tableName[IDENTIFIER_LENGTH];
1216 Table::getTableNameAlone(pImpl->getFldName1(), tableName);
1217 //printf("predicate tbl name %s\n", tableName);
1219 //if predicate is of form t1.f1=t2.f1 then do not push here
1220 if (0 != strcmp(pImpl->getFldName2(),"")) return ret;
1222 if (0 == strcmp(getName(), tableName))
1224 setPredicate(pred);
1225 //printf("PRABA::pushed predicate in tablehdl %s\n", getName());
1226 ret = true;
1228 return ret;
1230 void TableImpl::setPredicate(Predicate *pred)
1232 if (NULL == pred_) { pred_ = pred; return; }
1233 Predicate *curPred = pred_;
1234 PredicateImpl *newPred = new PredicateImpl();
1235 newPred->setTerm(curPred, OpAnd, pred);
1236 newPred->setTable(this);
1237 pred_ = newPred;
1238 return;
1240 void TableImpl::printPlan(int space)
1242 char spaceBuf[IDENTIFIER_LENGTH];
1243 memset(spaceBuf, 32, IDENTIFIER_LENGTH);
1244 spaceBuf[space] = '\0';
1245 printf("%s <TABLE-NODE>\n", spaceBuf);
1246 printf("%s <NAME> %s </NAME>\n", spaceBuf, getName());
1247 printf("%s <ScanType> %s </ScanType>\n", spaceBuf, ScanTypeNames[scanType_]);
1248 PredicateImpl *pred = (PredicateImpl*)pred_;
1249 if (pred) pred->print(space+2);
1250 printf("%s </TABLE-NODE>\n", spaceBuf);