code reorg for Transactionw!
[csql.git] / src / storage / Transaction.cxx
blobcb350c1e8395cdb816abe26b85770af1602b60e6
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Transaction.h>
17 #include<Lock.h>
18 #include<Database.h>
19 #include<Allocator.h>
20 #include<CatalogTables.h>
21 #include<Debug.h>
23 //Code assumes that only one thread can work on a transaction
24 DbRetVal Transaction::insertIntoHasList(Database *sysdb, LockHashNode *node)
26 //allocate lock node
27 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
28 DbRetVal rv = OK;
29 TransHasNode *hasNode = NULL; //(TransHasNode*)chunk->allocate(sysdb, &rv);
30 int tries=0;
31 int totalTries = Conf::config.getMutexRetries();
32 while (tries < totalTries)
34 rv = OK;
35 hasNode= (TransHasNode*)chunk->allocate(sysdb, &rv);
36 if (hasNode !=NULL) break;
37 if (rv != ErrLockTimeOut)
39 printError(rv, "Unable to allocate trans has node");
40 return rv;
42 tries++;
44 if (NULL == hasNode)
46 printError(rv, "Could not allocate Trans has node after %d retry", tries);
47 return rv;
49 printDebug(DM_Transaction, "insertIntoHasList new TransHasNode created:%x",
50 hasNode);
51 hasNode->node_ = node;
52 hasNode->next_ = NULL;
53 if (NULL == hasLockList_)
55 printDebug(DM_Transaction, "hasLockList is null:It is now %x",hasNode);
56 hasLockList_ = hasNode;
57 return OK;
60 TransHasNode *it = hasLockList_;
61 while (NULL != it->next_) { it = it->next_; }
62 it->next_ = hasNode;
63 printDebug(DM_Transaction, "Added to hasLockList at end:%x",it);
64 logFinest(Conf::logger, "Added locknode:%x to hasLockList", hasNode->node_);
65 return OK;
68 DbRetVal Transaction::removeFromHasList(Database *sysdb, void *tuple)
70 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
71 TransHasNode *iter = hasLockList_, *prev = hasLockList_;
72 if (NULL == iter)
74 printError(ErrNotFound, "Fatal:HasList is empty");
75 return ErrNotFound;
77 while (iter != NULL)
79 if (tuple == iter->node_->ptrToTuple_)
81 prev->next_ = iter->next_;
82 chunk->free(sysdb, iter);
83 if (iter == hasLockList_) hasLockList_ = NULL;
84 logFinest(Conf::logger, "Removed locknode:%x from hasLockList",
85 iter->node_);
86 return OK;
88 prev = iter;
89 iter = iter->next_;
91 printStackTrace();
92 printError(ErrNotFound, "Fatal:There is no tuple lock in has list for tuple:%x", tuple);
93 //TEMP::for debugging.do not remove
94 /*iter=hasLockList_;
95 int cnt=0;
96 while (iter != NULL)
98 printError(ErrWarning, "Element in hasList: %d %x: %x:%d\n", cnt, iter->node_, iter->node_->ptrToTuple_ , *(int*)iter->node_->ptrToTuple_);
99 cnt++;
100 iter = iter->next_;
102 return ErrNotFound;
106 DbRetVal Transaction::releaseAllLocks(LockManager *lockManager_)
108 Database *sysdb =lockManager_->systemDatabase_;
109 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
110 TransHasNode *iter = hasLockList_, *prev;
111 DbRetVal rv = OK;
112 while (NULL != iter)
114 prev = iter;
115 iter = iter->next_;
116 printDebug(DM_Transaction, "Releasing lock %x",prev->node_->ptrToTuple_);
117 logFinest(Conf::logger, "Releasing lock for tuple:%x",prev->node_->ptrToTuple_);
118 rv = lockManager_->releaseLock(prev->node_->ptrToTuple_);
119 chunk->free(sysdb, prev);
121 hasLockList_ = NULL;
122 return OK;
124 bool Transaction::findInHasList(Database *sysdb, LockHashNode *node)
126 TransHasNode *iter = hasLockList_;
127 while (NULL != iter)
129 if (iter->node_ == node) return true;
130 iter = iter->next_;
132 return false;
135 DbRetVal Transaction::appendUndoLog(Database *sysdb, OperationType type,
136 void *data, size_t size)
138 DbRetVal rv =OK;
139 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
140 if (logInfo == NULL) return rv;
141 if (size) os::memcpy((char*)logInfo + sizeof(UndoLogInfo), data, size);
142 addAtBegin(logInfo);
143 printDebug(DM_Transaction, "creating undo log and append %x optype:%d",
144 logInfo, type);
145 return OK;
150 DbRetVal Transaction::appendLogicalUndoLog(Database *sysdb, OperationType type, void *data, size_t size, void* indexPtr)
152 DbRetVal rv = OK;
153 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
154 if (logInfo == NULL) return rv;
155 char **indPtr = (char**)((char*)logInfo + sizeof(UndoLogInfo));
156 *indPtr = (char*) indexPtr;
157 addAtBegin(logInfo);
158 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
159 return rv;
162 DbRetVal Transaction::appendLogicalHashUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
164 DbRetVal rv = OK;
165 HashUndoLogInfo *hInfo = (HashUndoLogInfo *) data;
166 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
167 if (logInfo == NULL) return rv;
168 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(HashUndoLogInfo));
169 addAtBegin(logInfo);
170 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
171 return rv;
173 DbRetVal Transaction::appendLogicalTreeUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
175 DbRetVal rv = OK;
176 TreeUndoLogInfo *hInfo = (TreeUndoLogInfo *) data;
177 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
178 if (logInfo == NULL) return rv;
179 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TreeUndoLogInfo));
180 addAtBegin(logInfo);
181 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
182 return rv;
185 UndoLogInfo* Transaction::createUndoLog(Database *sysdb, OperationType type, void *data,
186 size_t size, DbRetVal *rv)
188 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
189 int reqSize = size + sizeof(UndoLogInfo);
190 UndoLogInfo *logInfo = NULL;
191 int tries=0;
192 int totalTries = Conf::config.getMutexRetries();
193 while (tries < totalTries)
195 *rv = OK;
196 logInfo= (UndoLogInfo*)chunk->allocate(sysdb, reqSize, rv);
197 if (logInfo !=NULL) break;
198 if (*rv != ErrLockTimeOut)
200 printError(*rv, "Unable to allocate undo log");
201 return NULL;
203 tries++;
206 if (logInfo == NULL) {
207 printError(*rv, "Unable to allocate undo log record after %d retries", tries);
208 return NULL;
210 logInfo->data_ = (char *)logInfo + sizeof(UndoLogInfo);
211 logInfo->opType_ = type;
212 logInfo->size_ = size;
213 logInfo->next_ = NULL;
214 return logInfo;
217 void Transaction::addAtBegin(UndoLogInfo* logInfo)
219 //add it to the begin of the log list
220 logInfo->next_ = firstUndoLog_;
221 firstUndoLog_ = logInfo;
222 return;
225 UndoLogInfo* Transaction::popUndoLog()
227 UndoLogInfo *iter = firstUndoLog_, *prev = firstUndoLog_;
228 if(NULL != iter)
230 prev = iter;
231 iter = iter->next_;
233 firstUndoLog_ = iter;
234 return prev;
238 int Transaction::noOfUndoLogs()
240 UndoLogInfo *iter = firstUndoLog_;
241 int count =0;
242 while(NULL != iter)
244 count++;
245 iter = iter->next_;
247 return count;
249 void Transaction::printDebugInfo(Database *sysdb)
251 printf("<TransactionInfo>\n");
252 if (waitLock_ != NULL)
254 printf("<WaitLock>");
255 waitLock_->print();
256 printf("</WaitLock>");
259 printf("<UndoLogs>\n");
260 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
261 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
262 UndoLogInfo *iter = firstUndoLog_;
263 int count =0;
264 while(NULL != iter)
266 iter->print();
267 iter = iter->next_;
268 count++;
270 printf("</TotalNodes> %d </TotalNodes>\n", count);
271 printf("</UndoLogs>\n");
273 printf("<TransHasList>\n");
274 chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
275 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
276 TransHasNode *hasIter = hasLockList_;
277 count =0;
278 while (NULL != hasIter)
280 hasIter->print();
281 hasIter = hasIter->next_;
282 count++;
284 printf("</TotalNodes> %d </TotalNodes>\n", count);
285 printf("</TransHasList>\n");
287 printf("</TransactionInfo>\n");
288 return ;
290 DbRetVal Transaction::removeUndoLogs(Database *sysdb)
292 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
293 UndoLogInfo *logInfo = NULL;
294 while(NULL != (logInfo = popUndoLog()))
296 chunk->free(sysdb, logInfo);
298 return OK;
302 DbRetVal Transaction::applyUndoLogs(Database *sysdb)
304 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
305 UndoLogInfo *logInfo = NULL;
306 while(NULL != (logInfo = popUndoLog()))
308 logFinest(Conf::logger, "Apply undo log type:%d", logInfo->opType_);
309 switch(logInfo->opType_)
311 case InsertOperation:
313 char *ptr = (char *)logInfo->data_;
314 void *ptrToTuple = (void *)*(long *)ptr;
315 ptr += sizeof(void *);
316 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
317 if (*isUsed == 0) {
318 printError(ErrSysFatal, "Fatal: Row is already not in use");
320 *isUsed = 0;
321 handleVarcharUndoInsert(sysdb, ptr);
322 break;
324 case DeleteOperation:
326 char *ptr = (char *)logInfo->data_;
327 void *ptrToTuple = (void *)*(long *)ptr;
328 ptr += sizeof(void *);
329 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
330 if (*isUsed == 1) {
331 printError(ErrSysFatal, "Fatal: Row is already in use");
333 *isUsed = 1;
334 //data record will be intact as we have lock on that record
335 handleVarcharUndoDelete(sysdb, ptr);
336 break;
338 case UpdateOperation:
340 char *ptr = (char *)logInfo->data_;
341 void *ptrToTuple = (void *)*(long *)ptr;
342 ptr += sizeof(void *);
343 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
344 if (*isUsed == 0) {
345 printError(ErrSysFatal, "Fatal: Row is not in use during update rollback");
347 handleVarcharUndoUpdate(sysdb, ptr, ptrToTuple);
348 break;
350 case InsertHashIndexOperation:
352 HashIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
353 + sizeof(UndoLogInfo));
354 break;
356 case DeleteHashIndexOperation:
358 HashIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
359 + sizeof(UndoLogInfo));
360 break;
362 case InsertTreeIndexOperation:
364 TreeIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
365 + sizeof(UndoLogInfo));
366 break;
368 case DeleteTreeIndexOperation:
370 TreeIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
371 + sizeof(UndoLogInfo));
372 break;
374 default:
376 printError(ErrSysFatal, "Illegal undo log type");
377 break;
380 chunk->free(sysdb, logInfo);
382 return OK;
384 DbRetVal Transaction::handleVarcharUndoInsert(Database *sysdb, char *ptr)
386 // ptr will have following info encapsulated.
387 // metadataPtr + nVarchars + varchar chunk ptr +
388 // ptrs to varchars
390 Database db;
391 void *metaData = (void *)*(long *)ptr;
392 ptr += sizeof(void *);
393 db.setMetaDataPtr((DatabaseMetaData *) metaData);
394 db.setProcSlot(sysdb->procSlot);
396 int noOfVarchar = *(int *)ptr;
397 ptr += sizeof(int);
398 Chunk *vcchunk = (Chunk *) *(long *)ptr;
399 ptr += sizeof(void *);
400 void **ptrToVarchars = (void **) ptr;
401 for (int i = 0; i < noOfVarchar; i++) {
402 if (*(long *) ptrToVarchars[i] != 0L) {
403 vcchunk->free(&db, (void *)*(long *)ptrToVarchars[i]);
404 *(long *) ptrToVarchars[i] = 0L;
407 return OK;
410 DbRetVal Transaction::handleVarcharUndoDelete(Database *sysdb, char *ptr)
412 // ptr will have following info encapsulated.
413 // metadataPtr + nVarchars + varchar chunk ptr + ptrs to varchars +
414 // size and value pairs for varchars
415 void *metaData = (void *)*(long *)ptr;
416 ptr += sizeof(void *);
417 Database db;
418 db.setMetaDataPtr((DatabaseMetaData *) metaData);
419 db.setProcSlot(sysdb->procSlot);
420 DbRetVal rv = OK;
421 int noOfVarchar = *(int *) ptr;
422 ptr+= sizeof(int);
423 Chunk *vcchunk = (Chunk *) *(long *)ptr;
424 ptr += sizeof(void *);
425 void **ptrToVarchars = (void **) ptr;
426 ptr += noOfVarchar * sizeof (void *);
427 char *lenValPtr = (char *) ptr;
428 for (int i = 0; i < noOfVarchar; i++) {
429 int len = *(int *) lenValPtr;
430 lenValPtr += sizeof(int);
431 if (len != 0) {
432 void *ptr = vcchunk->allocate(&db, len, &rv);
433 strcpy((char *)ptr, lenValPtr);
434 lenValPtr += len;
435 *(long *) ptrToVarchars[i] = (long)ptr;
436 } else {
437 *(long *) ptrToVarchars[i] = 0L;
440 return rv;
442 DbRetVal Transaction::handleVarcharUndoUpdate(Database *sysdb, char *ptr, void *ptrToTuple)
444 // logInfo->data_ will have following info encapsulated.
445 // tupleptr + tuple length + actual tuple + metadataPtr +
446 // nVarchars + varchar chunk ptr + ptrs to varchars +
447 // size and value pairs for varchars
449 int tupleLen = *(int *) ptr;
450 ptr += sizeof(int);
451 void *tuple = ptr;
452 ptr += tupleLen;
453 void *metaData = (void *)*(long *)ptr;
454 ptr += sizeof(void *);
455 Database db;
456 db.setMetaDataPtr((DatabaseMetaData *) metaData);
457 db.setProcSlot(sysdb->procSlot);
458 DbRetVal rv = OK;
459 int noOfVarchar = *(int *) ptr;
460 ptr+= sizeof(int);
461 Chunk *vcchunk = (Chunk *) *(long *)ptr;
462 ptr += sizeof(void *);
463 void **ptrToVarchars = (void **) ptr;
464 ptr += noOfVarchar * sizeof (void *);
465 char *lenValPtr = (char *) ptr;
466 for (int i = 0; i < noOfVarchar; i++) {
467 if (*(long *) ptrToVarchars[i] != 0L) {
468 vcchunk->free(&db, (void *)*(long *) ptrToVarchars[i]);
469 *(long *) ptrToVarchars[i] = 0L;
472 os::memcpy(ptrToTuple, tuple, tupleLen);
473 for (int i = 0; i < noOfVarchar; i++) {
474 int len = *(int *) lenValPtr; lenValPtr += sizeof(int);
475 if (len != 0) {
476 void *ptr = vcchunk->allocate(&db, len, &rv);
477 strcpy((char *)ptr, lenValPtr);
478 lenValPtr += len;
479 *(long *) ptrToVarchars[i] = (long) ptr;
480 } else {
481 *(long *) ptrToVarchars[i] = 0L;
484 return rv;