exp changes
[csql.git] / src / storage / Transaction.cxx
blob9242737fa0ff736899ea8b90a2530bf467a806ab
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Transaction.h>
17 #include<Lock.h>
18 #include<Database.h>
19 #include<Allocator.h>
20 #include<CatalogTables.h>
21 #include<Debug.h>
23 //Code assumes that only one thread can work on a transaction
24 DbRetVal Transaction::insertIntoHasList(Database *sysdb, LockHashNode *node)
26 //allocate lock node
27 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
28 DbRetVal rv = OK;
29 TransHasNode *hasNode =(TransHasNode*)chunk->tryAllocate(sysdb, &rv, 1000);
30 if (NULL == hasNode)
32 return rv;
34 printDebug(DM_Transaction, "insertIntoHasList new TransHasNode created:%x",
35 hasNode);
36 hasNode->node_ = node;
37 hasNode->next_ = NULL;
38 if (NULL == hasLockList_)
40 printDebug(DM_Transaction, "hasLockList is null:It is now %x",hasNode);
41 hasLockList_ = hasNode;
42 return OK;
44 TransHasNode *it = hasLockList_;
45 while (NULL != it->next_)
47 it = it->next_;
49 it->next_ = hasNode;
50 printDebug(DM_Transaction, "Added to hasLockList at end:%x",it);
51 logFinest(Conf::logger, "Added locknode:%x to hasLockList", hasNode->node_);
52 return OK;
54 void Transaction::printTotalNodes()
56 TransHasNode *iter = hasLockList_;
57 int cnt=0;
58 while(iter != NULL)
60 cnt++;
61 iter = iter->next_;
63 //printf("TOTAL Lock Nodes %d\n", cnt);
65 DbRetVal Transaction::removeFromHasList(Database *sysdb, void *tuple)
67 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
68 TransHasNode *iter = hasLockList_, *prev = hasLockList_;
69 if (NULL == iter)
71 printError(ErrNotFound, "Fatal:HasList is empty");
72 return ErrNotFound;
74 while (iter != NULL)
76 if (tuple == iter->node_->ptrToTuple_)
78 prev->next_ = iter->next_;
79 chunk->free(sysdb, iter);
80 if (iter == hasLockList_) hasLockList_ = NULL;
81 logFinest(Conf::logger, "Removed locknode:%x from hasLockList",
82 iter->node_);
83 return OK;
85 prev = iter;
86 iter = iter->next_;
88 printStackTrace();
89 printError(ErrNotFound, "Fatal:There is no tuple lock in has list for tuple:%x", tuple);
90 //TEMP::for debugging.do not remove
91 /*iter=hasLockList_;
92 int cnt=0;
93 while (iter != NULL)
95 printError(ErrWarning, "Element in hasList: %d %x: %x:%d\n", cnt, iter->node_, iter->node_->ptrToTuple_ , *(int*)iter->node_->ptrToTuple_);
96 cnt++;
97 iter = iter->next_;
98 }*/
99 return ErrNotFound;
103 DbRetVal Transaction::releaseAllLocks(LockManager *lockManager_)
105 Database *sysdb =lockManager_->systemDatabase_;
106 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
107 TransHasNode *iter = hasLockList_, *prev;
108 DbRetVal rv = OK;
109 while (NULL != iter)
111 prev = iter;
112 iter = iter->next_;
113 printDebug(DM_Transaction, "Releasing lock %x",prev->node_->ptrToTuple_);
114 logFinest(Conf::logger, "Releasing lock for tuple:%x",prev->node_->ptrToTuple_);
115 rv = lockManager_->releaseLock(prev->node_->ptrToTuple_);
116 chunk->free(sysdb, prev);
118 hasLockList_ = NULL;
119 return OK;
121 bool Transaction::findInHasList(Database *sysdb, LockHashNode *node)
123 TransHasNode *iter = hasLockList_;
124 while (NULL != iter)
126 if (iter->node_ == node) return true;
127 iter = iter->next_;
129 return false;
132 DbRetVal Transaction::appendUndoLog(Database *sysdb, OperationType type,
133 void *data, size_t size)
135 DbRetVal rv =OK;
136 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
137 if (logInfo == NULL) return rv;
138 if (size) os::memcpy((char*)logInfo + sizeof(UndoLogInfo), data, size);
139 addAtBegin(logInfo);
140 printDebug(DM_Transaction, "creating undo log and append %x optype:%d",
141 logInfo, type);
142 return OK;
147 DbRetVal Transaction::appendLogicalUndoLog(Database *sysdb, OperationType type, void *data, size_t size, void* indexPtr)
149 DbRetVal rv = OK;
150 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
151 if (logInfo == NULL) return rv;
152 char **indPtr = (char**)((char*)logInfo + sizeof(UndoLogInfo));
153 *indPtr = (char*) indexPtr;
154 addAtBegin(logInfo);
155 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
156 return rv;
159 DbRetVal Transaction::appendLogicalHashUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
161 DbRetVal rv = OK;
162 HashUndoLogInfo *hInfo = (HashUndoLogInfo *) data;
163 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
164 if (logInfo == NULL) return rv;
165 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(HashUndoLogInfo));
166 addAtBegin(logInfo);
167 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
168 return rv;
170 DbRetVal Transaction::appendLogicalTreeUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
172 DbRetVal rv = OK;
173 TreeUndoLogInfo *hInfo = (TreeUndoLogInfo *) data;
174 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
175 if (logInfo == NULL) return rv;
176 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TreeUndoLogInfo));
177 addAtBegin(logInfo);
178 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
179 return rv;
181 DbRetVal Transaction::appendLogicalTrieUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
183 DbRetVal rv = OK;
184 TrieUndoLogInfo *hInfo = (TrieUndoLogInfo *) data;
185 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
186 if (logInfo == NULL) return rv;
187 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TrieUndoLogInfo));
188 addAtBegin(logInfo);
189 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
190 return rv;
193 UndoLogInfo* Transaction::createUndoLog(Database *sysdb, OperationType type, void *data,
194 size_t size, DbRetVal *rv)
196 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
197 int reqSize = size + sizeof(UndoLogInfo);
198 UndoLogInfo *logInfo = NULL;
199 int tries=0;
200 int totalTries = Conf::config.getMutexRetries();
201 while (tries < totalTries)
203 *rv = OK;
204 logInfo= (UndoLogInfo*)chunk->allocate(sysdb, reqSize, rv);
205 if (logInfo !=NULL) break;
206 if (*rv != ErrLockTimeOut)
208 printError(*rv, "Unable to allocate undo log");
209 return NULL;
211 tries++;
214 if (logInfo == NULL) {
215 printError(*rv, "Unable to allocate undo log record after %d retries", tries);
216 return NULL;
218 logInfo->data_ = (char *)logInfo + sizeof(UndoLogInfo);
219 logInfo->opType_ = type;
220 logInfo->size_ = size;
221 logInfo->next_ = NULL;
222 return logInfo;
225 void Transaction::addAtBegin(UndoLogInfo* logInfo)
227 //add it to the begin of the log list
228 logInfo->next_ = firstUndoLog_;
229 firstUndoLog_ = logInfo;
230 return;
233 UndoLogInfo* Transaction::popUndoLog()
235 UndoLogInfo *iter = firstUndoLog_, *prev = firstUndoLog_;
236 if(NULL != iter)
238 prev = iter;
239 iter = iter->next_;
241 firstUndoLog_ = iter;
242 return prev;
246 int Transaction::noOfUndoLogs()
248 UndoLogInfo *iter = firstUndoLog_;
249 int count =0;
250 while(NULL != iter)
252 count++;
253 iter = iter->next_;
255 return count;
257 void Transaction::printDebugInfo(Database *sysdb)
259 printf("<TransactionInfo>\n");
260 if (waitLock_ != NULL)
262 printf("<WaitLock>");
263 waitLock_->print();
264 printf("</WaitLock>");
267 printf("<UndoLogs>\n");
268 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
269 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
270 UndoLogInfo *iter = firstUndoLog_;
271 int count =0;
272 while(NULL != iter)
274 iter->print();
275 iter = iter->next_;
276 count++;
278 printf("</TotalNodes> %d </TotalNodes>\n", count);
279 printf("</UndoLogs>\n");
281 printf("<TransHasList>\n");
282 chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
283 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
284 TransHasNode *hasIter = hasLockList_;
285 count =0;
286 while (NULL != hasIter)
288 hasIter->print();
289 hasIter = hasIter->next_;
290 count++;
292 printf("</TotalNodes> %d </TotalNodes>\n", count);
293 printf("</TransHasList>\n");
295 printf("</TransactionInfo>\n");
296 return ;
298 DbRetVal Transaction::removeUndoLogs(Database *sysdb)
300 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
301 UndoLogInfo *logInfo = NULL;
302 while(NULL != (logInfo = popUndoLog()))
304 chunk->free(sysdb, logInfo);
306 return OK;
310 DbRetVal Transaction::applyUndoLogs(Database *sysdb)
312 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
313 UndoLogInfo *logInfo = NULL;
314 while(NULL != (logInfo = popUndoLog()))
316 logFinest(Conf::logger, "Apply undo log type:%d", logInfo->opType_);
317 switch(logInfo->opType_)
319 case InsertOperation:
321 char *ptr = (char *)logInfo->data_;
322 void *ptrToTuple = (void *)*(long *)ptr;
323 ptr += sizeof(void *);
324 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
325 if (*isUsed == 0) {
326 printError(ErrSysFatal, "Fatal: Row is already not in use");
328 *isUsed = 0;
329 handleVarcharUndoInsert(sysdb, ptr);
330 break;
332 case DeleteOperation:
334 char *ptr = (char *)logInfo->data_;
335 void *ptrToTuple = (void *)*(long *)ptr;
336 ptr += sizeof(void *);
337 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
338 if (*isUsed == 1) {
339 printError(ErrSysFatal, "Fatal: Row is already in use");
341 *isUsed = 1;
342 //data record will be intact as we have lock on that record
343 handleVarcharUndoDelete(sysdb, ptr);
344 break;
346 case UpdateOperation:
348 char *ptr = (char *)logInfo->data_;
349 void *ptrToTuple = (void *)*(long *)ptr;
350 ptr += sizeof(void *);
351 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
352 if (*isUsed == 0) {
353 printError(ErrSysFatal, "Fatal: Row is not in use during update rollback");
355 handleVarcharUndoUpdate(sysdb, ptr, ptrToTuple);
356 break;
358 case InsertHashIndexOperation:
360 HashIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
361 + sizeof(UndoLogInfo));
362 break;
364 case DeleteHashIndexOperation:
366 HashIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
367 + sizeof(UndoLogInfo));
368 break;
370 case InsertTreeIndexOperation:
372 TreeIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
373 + sizeof(UndoLogInfo));
374 break;
376 case DeleteTreeIndexOperation:
378 TreeIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
379 + sizeof(UndoLogInfo));
380 break;
382 case InsertTrieIndexOperation:
384 TrieIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
385 + sizeof(UndoLogInfo));
386 break;
388 case DeleteTrieIndexOperation:
390 TrieIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
391 + sizeof(UndoLogInfo));
392 break;
394 default:
396 printError(ErrSysFatal, "Illegal undo log type");
397 break;
400 chunk->free(sysdb, logInfo);
402 return OK;
404 DbRetVal Transaction::handleVarcharUndoInsert(Database *sysdb, char *ptr)
406 // ptr will have following info encapsulated.
407 // metadataPtr + nVarchars + varchar chunk ptr +
408 // ptrs to varchars
410 Database db;
411 void *metaData = (void *)*(long *)ptr;
412 ptr += sizeof(void *);
413 db.setMetaDataPtr((DatabaseMetaData *) metaData);
414 db.setProcSlot(sysdb->procSlot);
416 int noOfVarchar = *(int *)ptr;
417 ptr += sizeof(int);
418 Chunk *vcchunk = (Chunk *) *(long *)ptr;
419 ptr += sizeof(void *);
420 void **ptrToVarchars = (void **) ptr;
421 for (int i = 0; i < noOfVarchar; i++) {
422 if (*(long *) ptrToVarchars[i] != 0L) {
423 vcchunk->free(&db, (void *)*(long *)ptrToVarchars[i]);
424 *(long *) ptrToVarchars[i] = 0L;
427 return OK;
430 DbRetVal Transaction::handleVarcharUndoDelete(Database *sysdb, char *ptr)
432 // ptr will have following info encapsulated.
433 // metadataPtr + nVarchars + varchar chunk ptr + ptrs to varchars +
434 // size and value pairs for varchars
435 void *metaData = (void *)*(long *)ptr;
436 ptr += sizeof(void *);
437 Database db;
438 db.setMetaDataPtr((DatabaseMetaData *) metaData);
439 db.setProcSlot(sysdb->procSlot);
440 DbRetVal rv = OK;
441 int noOfVarchar = *(int *) ptr;
442 ptr+= sizeof(int);
443 Chunk *vcchunk = (Chunk *) *(long *)ptr;
444 ptr += sizeof(void *);
445 void **ptrToVarchars = (void **) ptr;
446 ptr += noOfVarchar * sizeof (void *);
447 char *lenValPtr = (char *) ptr;
448 for (int i = 0; i < noOfVarchar; i++) {
449 int len = *(int *) lenValPtr;
450 lenValPtr += sizeof(int);
451 if (len != 0) {
452 void *ptr = vcchunk->allocate(&db, len, &rv);
453 strcpy((char *)ptr, lenValPtr);
454 lenValPtr += len;
455 *(long *) ptrToVarchars[i] = (long)ptr;
456 } else {
457 *(long *) ptrToVarchars[i] = 0L;
460 return rv;
462 DbRetVal Transaction::handleVarcharUndoUpdate(Database *sysdb, char *ptr, void *ptrToTuple)
464 // logInfo->data_ will have following info encapsulated.
465 // tupleptr + tuple length + actual tuple + metadataPtr +
466 // nVarchars + varchar chunk ptr + ptrs to varchars +
467 // size and value pairs for varchars
469 int tupleLen = *(int *) ptr;
470 ptr += sizeof(int);
471 void *tuple = ptr;
472 ptr += tupleLen;
473 void *metaData = (void *)*(long *)ptr;
474 ptr += sizeof(void *);
475 Database db;
476 db.setMetaDataPtr((DatabaseMetaData *) metaData);
477 db.setProcSlot(sysdb->procSlot);
478 DbRetVal rv = OK;
479 int noOfVarchar = *(int *) ptr;
480 ptr+= sizeof(int);
481 Chunk *vcchunk = (Chunk *) *(long *)ptr;
482 ptr += sizeof(void *);
483 void **ptrToVarchars = (void **) ptr;
484 ptr += noOfVarchar * sizeof (void *);
485 char *lenValPtr = (char *) ptr;
486 for (int i = 0; i < noOfVarchar; i++) {
487 if (*(long *) ptrToVarchars[i] != 0L) {
488 vcchunk->free(&db, (void *)*(long *) ptrToVarchars[i]);
489 *(long *) ptrToVarchars[i] = 0L;
492 os::memcpy(ptrToTuple, tuple, tupleLen);
493 for (int i = 0; i < noOfVarchar; i++) {
494 int len = *(int *) lenValPtr; lenValPtr += sizeof(int);
495 if (len != 0) {
496 void *ptr = vcchunk->allocate(&db, len, &rv);
497 strcpy((char *)ptr, lenValPtr);
498 lenValPtr += len;
499 *(long *) ptrToVarchars[i] = (long) ptr;
500 } else {
501 *(long *) ptrToVarchars[i] = 0L;
504 return rv;