code reorg
[csql.git] / src / storage / Transaction.cxx
blob60c1ab6f6b4efccd81f33caf5c6b98a340853e5a
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Transaction.h>
17 #include<Lock.h>
18 #include<Database.h>
19 #include<Allocator.h>
20 #include<CatalogTables.h>
21 #include<Debug.h>
23 //Code assumes that only one thread can work on a transaction
24 DbRetVal Transaction::insertIntoHasList(Database *sysdb, LockHashNode *node)
26 //allocate lock node
27 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
28 DbRetVal rv = OK;
29 TransHasNode *hasNode =(TransHasNode*)chunk->tryAllocate(sysdb, &rv, 1000);
30 if (NULL == hasNode)
32 return rv;
34 printDebug(DM_Transaction, "insertIntoHasList new TransHasNode created:%x",
35 hasNode);
36 hasNode->node_ = node;
37 hasNode->next_ = NULL;
38 if (NULL == hasLockList_)
40 printDebug(DM_Transaction, "hasLockList is null:It is now %x",hasNode);
41 hasLockList_ = hasNode;
42 return OK;
44 TransHasNode *it = hasLockList_;
45 while (NULL != it->next_)
47 it = it->next_;
49 it->next_ = hasNode;
50 printDebug(DM_Transaction, "Added to hasLockList at end:%x",it);
51 logFinest(Conf::logger, "Added locknode:%x to hasLockList", hasNode->node_);
52 return OK;
55 void Transaction::printTotalNodes()
57 TransHasNode *iter = hasLockList_;
58 int cnt=0;
59 while(iter != NULL)
61 cnt++;
62 iter = iter->next_;
64 //printf("TOTAL Lock Nodes %d\n", cnt);
67 DbRetVal Transaction::removeFromHasList(Database *sysdb, void *tuple)
69 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
70 TransHasNode *iter = hasLockList_, *prev = hasLockList_;
71 if (NULL == iter)
73 printError(ErrNotFound, "Fatal:HasList is empty");
74 return ErrNotFound;
76 while (iter != NULL)
78 if (tuple == iter->node_->ptrToTuple_)
80 prev->next_ = iter->next_;
81 chunk->free(sysdb, iter);
82 if (iter == hasLockList_) hasLockList_ = NULL;
83 logFinest(Conf::logger, "Removed locknode:%x from hasLockList",
84 iter->node_);
85 return OK;
87 prev = iter;
88 iter = iter->next_;
90 printStackTrace();
91 printError(ErrNotFound, "Fatal:There is no tuple lock in has list for tuple:%x", tuple);
92 //TEMP::for debugging.do not remove
93 /*iter=hasLockList_;
94 int cnt=0;
95 while (iter != NULL)
97 printError(ErrWarning, "Element in hasList: %d %x: %x:%d\n", cnt, iter->node_, iter->node_->ptrToTuple_ , *(int*)iter->node_->ptrToTuple_);
98 cnt++;
99 iter = iter->next_;
101 return ErrNotFound;
105 DbRetVal Transaction::releaseAllLocks(LockManager *lockManager_)
107 Database *sysdb =lockManager_->systemDatabase_;
108 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
109 TransHasNode *iter = hasLockList_, *prev;
110 DbRetVal rv = OK;
111 while (NULL != iter)
113 prev = iter;
114 iter = iter->next_;
115 printDebug(DM_Transaction, "Releasing lock %x",prev->node_->ptrToTuple_);
116 logFinest(Conf::logger, "Releasing lock for tuple:%x",prev->node_->ptrToTuple_);
117 rv = lockManager_->releaseLock(prev->node_->ptrToTuple_);
118 chunk->free(sysdb, prev);
120 hasLockList_ = NULL;
121 return OK;
124 bool Transaction::findInHasList(Database *sysdb, LockHashNode *node)
126 TransHasNode *iter = hasLockList_;
127 while (NULL != iter)
129 if (iter->node_ == node) return true;
130 iter = iter->next_;
132 return false;
135 DbRetVal Transaction::appendUndoLog(Database *sysdb, OperationType type,
136 void *data, size_t size)
138 DbRetVal rv =OK;
139 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
140 if (logInfo == NULL) return rv;
141 if (size) os::memcpy((char*)logInfo + sizeof(UndoLogInfo), data, size);
142 addAtBegin(logInfo);
143 printDebug(DM_Transaction, "creating undo log and append %x optype:%d",
144 logInfo, type);
145 return OK;
150 DbRetVal Transaction::appendLogicalUndoLog(Database *sysdb, OperationType type, void *data, size_t size, void* indexPtr)
152 DbRetVal rv = OK;
153 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
154 if (logInfo == NULL) return rv;
155 char **indPtr = (char**)((char*)logInfo + sizeof(UndoLogInfo));
156 *indPtr = (char*) indexPtr;
157 addAtBegin(logInfo);
158 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
159 return rv;
162 DbRetVal Transaction::appendLogicalHashUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
164 DbRetVal rv = OK;
165 HashUndoLogInfo *hInfo = (HashUndoLogInfo *) data;
166 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
167 if (logInfo == NULL) return rv;
168 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(HashUndoLogInfo));
169 addAtBegin(logInfo);
170 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
171 return rv;
174 DbRetVal Transaction::appendLogicalTreeUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
176 DbRetVal rv = OK;
177 TreeUndoLogInfo *hInfo = (TreeUndoLogInfo *) data;
178 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
179 if (logInfo == NULL) return rv;
180 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TreeUndoLogInfo));
181 addAtBegin(logInfo);
182 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
183 return rv;
186 DbRetVal Transaction::appendLogicalTrieUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
188 DbRetVal rv = OK;
189 TrieUndoLogInfo *hInfo = (TrieUndoLogInfo *) data;
190 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
191 if (logInfo == NULL) return rv;
192 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TrieUndoLogInfo));
193 addAtBegin(logInfo);
194 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
195 return rv;
198 UndoLogInfo* Transaction::createUndoLog(Database *sysdb, OperationType type, void *data,
199 size_t size, DbRetVal *rv)
201 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
202 int reqSize = size + sizeof(UndoLogInfo);
203 UndoLogInfo *logInfo = NULL;
204 int tries=0;
205 int totalTries = Conf::config.getMutexRetries();
206 while (tries < totalTries)
208 *rv = OK;
209 logInfo= (UndoLogInfo*)chunk->allocate(sysdb, reqSize, rv);
210 if (logInfo !=NULL) break;
211 if (*rv != ErrLockTimeOut)
213 printError(*rv, "Unable to allocate undo log");
214 return NULL;
216 tries++;
219 if (logInfo == NULL) {
220 printError(*rv, "Unable to allocate undo log record after %d retries", tries);
221 return NULL;
223 logInfo->data_ = (char *)logInfo + sizeof(UndoLogInfo);
224 logInfo->opType_ = type;
225 logInfo->size_ = size;
226 logInfo->next_ = NULL;
227 return logInfo;
230 void Transaction::addAtBegin(UndoLogInfo* logInfo)
232 //add it to the begin of the log list
233 logInfo->next_ = firstUndoLog_;
234 firstUndoLog_ = logInfo;
235 return;
238 UndoLogInfo* Transaction::popUndoLog()
240 UndoLogInfo *iter = firstUndoLog_, *prev = firstUndoLog_;
241 if(NULL != iter)
243 prev = iter;
244 iter = iter->next_;
246 firstUndoLog_ = iter;
247 return prev;
251 int Transaction::noOfUndoLogs()
253 UndoLogInfo *iter = firstUndoLog_;
254 int count =0;
255 while(NULL != iter)
257 count++;
258 iter = iter->next_;
260 return count;
263 void Transaction::printDebugInfo(Database *sysdb)
265 printf("<TransactionInfo>\n");
266 if (waitLock_ != NULL)
268 printf("<WaitLock>");
269 waitLock_->print();
270 printf("</WaitLock>");
273 printf("<UndoLogs>\n");
274 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
275 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
276 UndoLogInfo *iter = firstUndoLog_;
277 int count =0;
278 while(NULL != iter)
280 iter->print();
281 iter = iter->next_;
282 count++;
284 printf("</TotalNodes> %d </TotalNodes>\n", count);
285 printf("</UndoLogs>\n");
287 printf("<TransHasList>\n");
288 chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
289 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
290 TransHasNode *hasIter = hasLockList_;
291 count =0;
292 while (NULL != hasIter)
294 hasIter->print();
295 hasIter = hasIter->next_;
296 count++;
298 printf("</TotalNodes> %d </TotalNodes>\n", count);
299 printf("</TransHasList>\n");
301 printf("</TransactionInfo>\n");
302 return ;
305 DbRetVal Transaction::removeUndoLogs(Database *sysdb)
307 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
308 UndoLogInfo *logInfo = NULL;
309 while(NULL != (logInfo = popUndoLog()))
311 chunk->free(sysdb, logInfo);
313 return OK;
316 DbRetVal Transaction::applyUndoLogs(Database *sysdb)
318 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
319 UndoLogInfo *logInfo = NULL;
320 while(NULL != (logInfo = popUndoLog()))
322 logFinest(Conf::logger, "Apply undo log type:%d", logInfo->opType_);
323 switch(logInfo->opType_)
325 case InsertOperation:
327 char *ptr = (char *)logInfo->data_;
328 void *ptrToTuple = (void *)*(long *)ptr;
329 ptr += sizeof(void *);
330 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
331 if (*isUsed == 0) {
332 printError(ErrSysFatal, "Fatal: Row is already not in use");
334 *isUsed = 0;
335 handleVarcharUndoInsert(sysdb, ptr);
336 break;
338 case DeleteOperation:
340 char *ptr = (char *)logInfo->data_;
341 void *ptrToTuple = (void *)*(long *)ptr;
342 ptr += sizeof(void *);
343 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
344 if (*isUsed == 1) {
345 printError(ErrSysFatal, "Fatal: Row is already in use");
347 *isUsed = 1;
348 //data record will be intact as we have lock on that record
349 handleVarcharUndoDelete(sysdb, ptr);
350 break;
352 case UpdateOperation:
354 char *ptr = (char *)logInfo->data_;
355 void *ptrToTuple = (void *)*(long *)ptr;
356 ptr += sizeof(void *);
357 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
358 if (*isUsed == 0) {
359 printError(ErrSysFatal, "Fatal: Row is not in use during update rollback");
361 handleVarcharUndoUpdate(sysdb, ptr, ptrToTuple);
362 break;
364 case InsertHashIndexOperation:
366 HashIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
367 + sizeof(UndoLogInfo));
368 break;
370 case DeleteHashIndexOperation:
372 HashIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
373 + sizeof(UndoLogInfo));
374 break;
376 case InsertTreeIndexOperation:
378 TreeIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
379 + sizeof(UndoLogInfo));
380 break;
382 case DeleteTreeIndexOperation:
384 TreeIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
385 + sizeof(UndoLogInfo));
386 break;
388 case InsertTrieIndexOperation:
390 TrieIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
391 + sizeof(UndoLogInfo));
392 break;
394 case DeleteTrieIndexOperation:
396 TrieIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
397 + sizeof(UndoLogInfo));
398 break;
400 default:
402 printError(ErrSysFatal, "Illegal undo log type");
403 break;
406 chunk->free(sysdb, logInfo);
408 return OK;
411 DbRetVal Transaction::handleVarcharUndoInsert(Database *sysdb, char *ptr)
413 // ptr will have following info encapsulated.
414 // metadataPtr + nVarchars + varchar chunk ptr +
415 // ptrs to varchars
417 Database db;
418 void *metaData = (void *)*(long *)ptr;
419 ptr += sizeof(void *);
420 db.setMetaDataPtr((DatabaseMetaData *) metaData);
421 db.setProcSlot(sysdb->procSlot);
423 int noOfVarchar = *(int *)ptr;
424 ptr += sizeof(int);
425 Chunk *vcchunk = (Chunk *) *(long *)ptr;
426 ptr += sizeof(void *);
427 void **ptrToVarchars = (void **) ptr;
428 for (int i = 0; i < noOfVarchar; i++) {
429 if (*(long *) ptrToVarchars[i] != 0L) {
430 vcchunk->free(&db, (void *)*(long *)ptrToVarchars[i]);
431 *(long *) ptrToVarchars[i] = 0L;
434 return OK;
437 DbRetVal Transaction::handleVarcharUndoDelete(Database *sysdb, char *ptr)
439 // ptr will have following info encapsulated.
440 // metadataPtr + nVarchars + varchar chunk ptr + ptrs to varchars +
441 // size and value pairs for varchars
442 void *metaData = (void *)*(long *)ptr;
443 ptr += sizeof(void *);
444 Database db;
445 db.setMetaDataPtr((DatabaseMetaData *) metaData);
446 db.setProcSlot(sysdb->procSlot);
447 DbRetVal rv = OK;
448 int noOfVarchar = *(int *) ptr;
449 ptr+= sizeof(int);
450 Chunk *vcchunk = (Chunk *) *(long *)ptr;
451 ptr += sizeof(void *);
452 void **ptrToVarchars = (void **) ptr;
453 ptr += noOfVarchar * sizeof (void *);
454 char *lenValPtr = (char *) ptr;
455 for (int i = 0; i < noOfVarchar; i++) {
456 int len = *(int *) lenValPtr;
457 lenValPtr += sizeof(int);
458 if (len != 0) {
459 void *ptr = vcchunk->allocate(&db, len, &rv);
460 strcpy((char *)ptr, lenValPtr);
461 lenValPtr += len;
462 *(long *) ptrToVarchars[i] = (long)ptr;
463 } else {
464 *(long *) ptrToVarchars[i] = 0L;
467 return rv;
470 DbRetVal Transaction::handleVarcharUndoUpdate(Database *sysdb, char *ptr, void *ptrToTuple)
472 // logInfo->data_ will have following info encapsulated.
473 // tupleptr + tuple length + actual tuple + metadataPtr +
474 // nVarchars + varchar chunk ptr + ptrs to varchars +
475 // size and value pairs for varchars
477 int tupleLen = *(int *) ptr;
478 ptr += sizeof(int);
479 void *tuple = ptr;
480 ptr += tupleLen;
481 void *metaData = (void *)*(long *)ptr;
482 ptr += sizeof(void *);
483 Database db;
484 db.setMetaDataPtr((DatabaseMetaData *) metaData);
485 db.setProcSlot(sysdb->procSlot);
486 DbRetVal rv = OK;
487 int noOfVarchar = *(int *) ptr;
488 ptr+= sizeof(int);
489 Chunk *vcchunk = (Chunk *) *(long *)ptr;
490 ptr += sizeof(void *);
491 void **ptrToVarchars = (void **) ptr;
492 ptr += noOfVarchar * sizeof (void *);
493 char *lenValPtr = (char *) ptr;
494 for (int i = 0; i < noOfVarchar; i++) {
495 if (*(long *) ptrToVarchars[i] != 0L) {
496 vcchunk->free(&db, (void *)*(long *) ptrToVarchars[i]);
497 *(long *) ptrToVarchars[i] = 0L;
500 os::memcpy(ptrToTuple, tuple, tupleLen);
501 for (int i = 0; i < noOfVarchar; i++) {
502 int len = *(int *) lenValPtr; lenValPtr += sizeof(int);
503 if (len != 0) {
504 void *ptr = vcchunk->allocate(&db, len, &rv);
505 strcpy((char *)ptr, lenValPtr);
506 lenValPtr += len;
507 *(long *) ptrToVarchars[i] = (long) ptr;
508 } else {
509 *(long *) ptrToVarchars[i] = 0L;
512 return rv;