*** empty log message ***
[csql.git] / src / storage / Transaction.cxx
blobb6e2fa70b7caa2c19ae73d165b8bf4a45fc1d941
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Transaction.h>
17 #include<Lock.h>
18 #include<Database.h>
19 #include<Allocator.h>
20 #include<CatalogTables.h>
21 #include<Debug.h>
23 //Code assumes that only one thread can work on a transaction
24 DbRetVal Transaction::insertIntoHasList(Database *sysdb, LockHashNode *node)
26 //allocate lock node
27 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
28 DbRetVal rv = OK;
29 TransHasNode *hasNode = NULL; //(TransHasNode*)chunk->allocate(sysdb, &rv);
30 int tries=0;
31 int totalTries = Conf::config.getMutexRetries();
32 while (tries < totalTries)
34 rv = OK;
35 hasNode= (TransHasNode*)chunk->allocate(sysdb, &rv);
36 if (hasNode !=NULL) break;
37 if (rv != ErrLockTimeOut)
39 printError(rv, "Unable to allocate trans has node");
40 return rv;
42 tries++;
44 if (NULL == hasNode)
46 printError(rv, "Could not allocate Trans has node after %d retry", tries);
47 return rv;
49 printDebug(DM_Transaction, "insertIntoHasList new TransHasNode created:%x",
50 hasNode);
51 hasNode->node_ = node;
52 hasNode->next_ = NULL;
53 if (NULL == hasLockList_)
55 printDebug(DM_Transaction, "hasLockList is null:It is now %x",hasNode);
56 hasLockList_ = hasNode;
57 return OK;
60 TransHasNode *it = hasLockList_;
61 while (NULL != it->next_) { it = it->next_; }
62 it->next_ = hasNode;
63 printDebug(DM_Transaction, "Added to hasLockList at end:%x",it);
64 logFinest(Conf::logger, "Added locknode:%x to hasLockList", hasNode->node_);
65 return OK;
68 DbRetVal Transaction::removeFromHasList(Database *sysdb, void *tuple)
70 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
71 TransHasNode *iter = hasLockList_, *prev = hasLockList_;
72 if (NULL == iter)
74 printError(ErrNotFound, "Fatal:HasList is empty");
75 return ErrNotFound;
77 while (iter != NULL)
79 if (tuple == iter->node_->ptrToTuple_)
81 prev->next_ = iter->next_;
82 chunk->free(sysdb, iter);
83 if (iter == hasLockList_) hasLockList_ = NULL;
84 logFinest(Conf::logger, "Removed locknode:%x from hasLockList",
85 iter->node_);
86 return OK;
88 prev = iter;
89 iter = iter->next_;
91 printStackTrace();
92 printError(ErrNotFound, "Fatal:There is no tuple lock in has list for tuple:%x", tuple);
93 //TEMP::for debugging.do not remove
94 /*iter=hasLockList_;
95 int cnt=0;
96 while (iter != NULL)
98 printError(ErrWarning, "Element in hasList: %d %x: %x:%d\n", cnt, iter->node_, iter->node_->ptrToTuple_ , *(int*)iter->node_->ptrToTuple_);
99 cnt++;
100 iter = iter->next_;
102 return ErrNotFound;
106 DbRetVal Transaction::releaseAllLocks(LockManager *lockManager_)
108 Database *sysdb =lockManager_->systemDatabase_;
109 Chunk *chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
110 TransHasNode *iter = hasLockList_, *prev;
111 DbRetVal rv = OK;
112 while (NULL != iter)
114 prev = iter;
115 iter = iter->next_;
116 printDebug(DM_Transaction, "Releasing lock %x",prev->node_->ptrToTuple_);
117 logFinest(Conf::logger, "Releasing lock for tuple:%x",prev->node_->ptrToTuple_);
118 rv = lockManager_->releaseLock(prev->node_->ptrToTuple_);
119 chunk->free(sysdb, prev);
121 hasLockList_ = NULL;
122 return OK;
124 bool Transaction::findInHasList(Database *sysdb, LockHashNode *node)
126 TransHasNode *iter = hasLockList_;
127 while (NULL != iter)
129 if (iter->node_ == node) return true;
130 iter = iter->next_;
132 return false;
135 DbRetVal Transaction::appendUndoLog(Database *sysdb, OperationType type,
136 void *data, size_t size)
138 DbRetVal rv =OK;
139 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
140 if (logInfo == NULL) return rv;
141 if (size) os::memcpy((char*)logInfo + sizeof(UndoLogInfo), data, size);
142 addAtBegin(logInfo);
143 printDebug(DM_Transaction, "creating undo log and append %x optype:%d",
144 logInfo, type);
145 return OK;
150 DbRetVal Transaction::appendLogicalUndoLog(Database *sysdb, OperationType type, void *data, size_t size, void* indexPtr)
152 DbRetVal rv = OK;
153 UndoLogInfo *logInfo = createUndoLog(sysdb, type, data, size, &rv);
154 if (logInfo == NULL) return rv;
155 char **indPtr = (char**)((char*)logInfo + sizeof(UndoLogInfo));
156 *indPtr = (char*) indexPtr;
157 addAtBegin(logInfo);
158 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
159 return rv;
162 DbRetVal Transaction::appendLogicalHashUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
164 DbRetVal rv = OK;
165 HashUndoLogInfo *hInfo = (HashUndoLogInfo *) data;
166 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
167 if (logInfo == NULL) return rv;
168 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(HashUndoLogInfo));
169 addAtBegin(logInfo);
170 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
171 return rv;
173 DbRetVal Transaction::appendLogicalTreeUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
175 DbRetVal rv = OK;
176 TreeUndoLogInfo *hInfo = (TreeUndoLogInfo *) data;
177 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
178 if (logInfo == NULL) return rv;
179 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TreeUndoLogInfo));
180 addAtBegin(logInfo);
181 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
182 return rv;
184 DbRetVal Transaction::appendLogicalTrieUndoLog(Database *sysdb, OperationType type, void *data, size_t size)
186 DbRetVal rv = OK;
187 TrieUndoLogInfo *hInfo = (TrieUndoLogInfo *) data;
188 UndoLogInfo *logInfo = createUndoLog(sysdb, type, hInfo->tuple_, size, &rv);
189 if (logInfo == NULL) return rv;
190 memcpy((char*)logInfo + sizeof(UndoLogInfo), data, sizeof(TrieUndoLogInfo));
191 addAtBegin(logInfo);
192 printDebug(DM_Transaction, "creating logical undo log and append %x optype:%d", logInfo, type);
193 return rv;
196 UndoLogInfo* Transaction::createUndoLog(Database *sysdb, OperationType type, void *data,
197 size_t size, DbRetVal *rv)
199 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
200 int reqSize = size + sizeof(UndoLogInfo);
201 UndoLogInfo *logInfo = NULL;
202 int tries=0;
203 int totalTries = Conf::config.getMutexRetries();
204 while (tries < totalTries)
206 *rv = OK;
207 logInfo= (UndoLogInfo*)chunk->allocate(sysdb, reqSize, rv);
208 if (logInfo !=NULL) break;
209 if (*rv != ErrLockTimeOut)
211 printError(*rv, "Unable to allocate undo log");
212 return NULL;
214 tries++;
217 if (logInfo == NULL) {
218 printError(*rv, "Unable to allocate undo log record after %d retries", tries);
219 return NULL;
221 logInfo->data_ = (char *)logInfo + sizeof(UndoLogInfo);
222 logInfo->opType_ = type;
223 logInfo->size_ = size;
224 logInfo->next_ = NULL;
225 return logInfo;
228 void Transaction::addAtBegin(UndoLogInfo* logInfo)
230 //add it to the begin of the log list
231 logInfo->next_ = firstUndoLog_;
232 firstUndoLog_ = logInfo;
233 return;
236 UndoLogInfo* Transaction::popUndoLog()
238 UndoLogInfo *iter = firstUndoLog_, *prev = firstUndoLog_;
239 if(NULL != iter)
241 prev = iter;
242 iter = iter->next_;
244 firstUndoLog_ = iter;
245 return prev;
249 int Transaction::noOfUndoLogs()
251 UndoLogInfo *iter = firstUndoLog_;
252 int count =0;
253 while(NULL != iter)
255 count++;
256 iter = iter->next_;
258 return count;
260 void Transaction::printDebugInfo(Database *sysdb)
262 printf("<TransactionInfo>\n");
263 if (waitLock_ != NULL)
265 printf("<WaitLock>");
266 waitLock_->print();
267 printf("</WaitLock>");
270 printf("<UndoLogs>\n");
271 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
272 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
273 UndoLogInfo *iter = firstUndoLog_;
274 int count =0;
275 while(NULL != iter)
277 iter->print();
278 iter = iter->next_;
279 count++;
281 printf("</TotalNodes> %d </TotalNodes>\n", count);
282 printf("</UndoLogs>\n");
284 printf("<TransHasList>\n");
285 chunk = sysdb->getSystemDatabaseChunk(TransHasTableId);
286 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
287 TransHasNode *hasIter = hasLockList_;
288 count =0;
289 while (NULL != hasIter)
291 hasIter->print();
292 hasIter = hasIter->next_;
293 count++;
295 printf("</TotalNodes> %d </TotalNodes>\n", count);
296 printf("</TransHasList>\n");
298 printf("</TransactionInfo>\n");
299 return ;
301 DbRetVal Transaction::removeUndoLogs(Database *sysdb)
303 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
304 UndoLogInfo *logInfo = NULL;
305 while(NULL != (logInfo = popUndoLog()))
307 chunk->free(sysdb, logInfo);
309 return OK;
313 DbRetVal Transaction::applyUndoLogs(Database *sysdb)
315 Chunk *chunk = sysdb->getSystemDatabaseChunk(UndoLogTableID);
316 UndoLogInfo *logInfo = NULL;
317 while(NULL != (logInfo = popUndoLog()))
319 logFinest(Conf::logger, "Apply undo log type:%d", logInfo->opType_);
320 switch(logInfo->opType_)
322 case InsertOperation:
324 char *ptr = (char *)logInfo->data_;
325 void *ptrToTuple = (void *)*(long *)ptr;
326 ptr += sizeof(void *);
327 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
328 if (*isUsed == 0) {
329 printError(ErrSysFatal, "Fatal: Row is already not in use");
331 *isUsed = 0;
332 handleVarcharUndoInsert(sysdb, ptr);
333 break;
335 case DeleteOperation:
337 char *ptr = (char *)logInfo->data_;
338 void *ptrToTuple = (void *)*(long *)ptr;
339 ptr += sizeof(void *);
340 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
341 if (*isUsed == 1) {
342 printError(ErrSysFatal, "Fatal: Row is already in use");
344 *isUsed = 1;
345 //data record will be intact as we have lock on that record
346 handleVarcharUndoDelete(sysdb, ptr);
347 break;
349 case UpdateOperation:
351 char *ptr = (char *)logInfo->data_;
352 void *ptrToTuple = (void *)*(long *)ptr;
353 ptr += sizeof(void *);
354 InUse *isUsed = ((InUse*)(ptrToTuple) - 1);
355 if (*isUsed == 0) {
356 printError(ErrSysFatal, "Fatal: Row is not in use during update rollback");
358 handleVarcharUndoUpdate(sysdb, ptr, ptrToTuple);
359 break;
361 case InsertHashIndexOperation:
363 HashIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
364 + sizeof(UndoLogInfo));
365 break;
367 case DeleteHashIndexOperation:
369 HashIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
370 + sizeof(UndoLogInfo));
371 break;
373 case InsertTreeIndexOperation:
375 TreeIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
376 + sizeof(UndoLogInfo));
377 break;
379 case DeleteTreeIndexOperation:
381 TreeIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
382 + sizeof(UndoLogInfo));
383 break;
385 case InsertTrieIndexOperation:
387 TrieIndex::deleteLogicalUndoLog(sysdb, (char *)logInfo
388 + sizeof(UndoLogInfo));
389 break;
391 case DeleteTrieIndexOperation:
393 TrieIndex::insertLogicalUndoLog(sysdb, (char *)logInfo
394 + sizeof(UndoLogInfo));
395 break;
397 default:
399 printError(ErrSysFatal, "Illegal undo log type");
400 break;
403 chunk->free(sysdb, logInfo);
405 return OK;
407 DbRetVal Transaction::handleVarcharUndoInsert(Database *sysdb, char *ptr)
409 // ptr will have following info encapsulated.
410 // metadataPtr + nVarchars + varchar chunk ptr +
411 // ptrs to varchars
413 Database db;
414 void *metaData = (void *)*(long *)ptr;
415 ptr += sizeof(void *);
416 db.setMetaDataPtr((DatabaseMetaData *) metaData);
417 db.setProcSlot(sysdb->procSlot);
419 int noOfVarchar = *(int *)ptr;
420 ptr += sizeof(int);
421 Chunk *vcchunk = (Chunk *) *(long *)ptr;
422 ptr += sizeof(void *);
423 void **ptrToVarchars = (void **) ptr;
424 for (int i = 0; i < noOfVarchar; i++) {
425 if (*(long *) ptrToVarchars[i] != 0L) {
426 vcchunk->free(&db, (void *)*(long *)ptrToVarchars[i]);
427 *(long *) ptrToVarchars[i] = 0L;
430 return OK;
433 DbRetVal Transaction::handleVarcharUndoDelete(Database *sysdb, char *ptr)
435 // ptr will have following info encapsulated.
436 // metadataPtr + nVarchars + varchar chunk ptr + ptrs to varchars +
437 // size and value pairs for varchars
438 void *metaData = (void *)*(long *)ptr;
439 ptr += sizeof(void *);
440 Database db;
441 db.setMetaDataPtr((DatabaseMetaData *) metaData);
442 db.setProcSlot(sysdb->procSlot);
443 DbRetVal rv = OK;
444 int noOfVarchar = *(int *) ptr;
445 ptr+= sizeof(int);
446 Chunk *vcchunk = (Chunk *) *(long *)ptr;
447 ptr += sizeof(void *);
448 void **ptrToVarchars = (void **) ptr;
449 ptr += noOfVarchar * sizeof (void *);
450 char *lenValPtr = (char *) ptr;
451 for (int i = 0; i < noOfVarchar; i++) {
452 int len = *(int *) lenValPtr;
453 lenValPtr += sizeof(int);
454 if (len != 0) {
455 void *ptr = vcchunk->allocate(&db, len, &rv);
456 strcpy((char *)ptr, lenValPtr);
457 lenValPtr += len;
458 *(long *) ptrToVarchars[i] = (long)ptr;
459 } else {
460 *(long *) ptrToVarchars[i] = 0L;
463 return rv;
465 DbRetVal Transaction::handleVarcharUndoUpdate(Database *sysdb, char *ptr, void *ptrToTuple)
467 // logInfo->data_ will have following info encapsulated.
468 // tupleptr + tuple length + actual tuple + metadataPtr +
469 // nVarchars + varchar chunk ptr + ptrs to varchars +
470 // size and value pairs for varchars
472 int tupleLen = *(int *) ptr;
473 ptr += sizeof(int);
474 void *tuple = ptr;
475 ptr += tupleLen;
476 void *metaData = (void *)*(long *)ptr;
477 ptr += sizeof(void *);
478 Database db;
479 db.setMetaDataPtr((DatabaseMetaData *) metaData);
480 db.setProcSlot(sysdb->procSlot);
481 DbRetVal rv = OK;
482 int noOfVarchar = *(int *) ptr;
483 ptr+= sizeof(int);
484 Chunk *vcchunk = (Chunk *) *(long *)ptr;
485 ptr += sizeof(void *);
486 void **ptrToVarchars = (void **) ptr;
487 ptr += noOfVarchar * sizeof (void *);
488 char *lenValPtr = (char *) ptr;
489 for (int i = 0; i < noOfVarchar; i++) {
490 if (*(long *) ptrToVarchars[i] != 0L) {
491 vcchunk->free(&db, (void *)*(long *) ptrToVarchars[i]);
492 *(long *) ptrToVarchars[i] = 0L;
495 os::memcpy(ptrToTuple, tuple, tupleLen);
496 for (int i = 0; i < noOfVarchar; i++) {
497 int len = *(int *) lenValPtr; lenValPtr += sizeof(int);
498 if (len != 0) {
499 void *ptr = vcchunk->allocate(&db, len, &rv);
500 strcpy((char *)ptr, lenValPtr);
501 lenValPtr += len;
502 *(long *) ptrToVarchars[i] = (long) ptr;
503 } else {
504 *(long *) ptrToVarchars[i] = 0L;
507 return rv;