Removing dependency for Cache module in MMDB build
[csql.git] / src / storage / LockManager.cxx
blob01796457c08f6bb521bcd62ab9a9b53561885b31
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Lock.h>
17 #include<Allocator.h>
18 #include<Database.h>
19 #include<CatalogTables.h>
20 #include<Transaction.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 LockManager::LockManager(Database *sysDb_)
26 systemDatabase_ = sysDb_;
27 lockBuckets = systemDatabase_->getLockHashBuckets();
29 Bucket* LockManager::getLockBucket(void *tuple)
31 //Bucket* buckets = systemDatabase_->getLockHashBuckets();
32 unsigned long key =(unsigned long)tuple ;
33 int bucketNo = key % LOCK_BUCKET_SIZE;
35 //Bucket *bucket = &(buckets[bucketNo]);
36 printDebug(DM_Lock, "getLockBucket bucketno:%d",bucketNo);
37 return &(lockBuckets[bucketNo]);
40 void LockManager::printUsageStatistics()
42 Bucket* buckets = systemDatabase_->getLockHashBuckets();
43 Bucket* bucket;
44 LockHashNode *lockNode;
45 int nodeCount =0, bucketCount =0;
46 for (int i =0; i< LOCK_BUCKET_SIZE; i++)
48 bucket = &(buckets[i]);
49 lockNode = (LockHashNode*) bucket->bucketList_;
50 if (lockNode) bucketCount++; else continue;
51 while (NULL != lockNode) { nodeCount++; lockNode = lockNode->next_; }
53 printf("<LockTable>\n");
54 printf(" <TotalBuckets> %d </TotalBuckets>\n", LOCK_BUCKET_SIZE);
55 printf(" <UsedBuckets> %d </UsedBuckets>\n", bucketCount);
56 printf(" <TotalLockNodes> %d </TotalLockNodes>\n", nodeCount);
57 printf("</LockTable>\n");
61 void LockManager::printDebugInfo()
63 Bucket* buckets = systemDatabase_->getLockHashBuckets();
64 Bucket* bucket;
65 LockHashNode *lockNode;
66 int nodeCount =0, bucketCount =0;
67 printf("<LockTable>\n");
68 for (int i =0; i< LOCK_BUCKET_SIZE; i++)
70 nodeCount =0;
71 bucket = &(buckets[i]);
72 //if (bucket) bucketCount++; else continue;
73 lockNode = (LockHashNode*) bucket->bucketList_;
75 while (NULL != lockNode)
77 nodeCount++;
78 lockNode->print();
79 lockNode = lockNode->next_;
81 if (nodeCount) {
82 bucketCount++;
83 printf(" <LockBucket> \n");
84 printf(" <BucketNo> %d </BucketNo> \n", i);
85 printf(" <TotalNodes> %d </TotalNodes>\n", nodeCount);
86 printf(" <LockBucket>\n");
90 printf(" <TotalUsedBuckets> %d </TotalUsedBuckets>\n", bucketCount);
91 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
92 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
93 printf("</LockTable>\n");
97 DbRetVal LockManager::getSharedLock(void *tuple, Transaction **trans)
99 //get the bucket list
100 //take the bucket mutex for read
101 //go the the next level bucket list
102 //get the bucket iterator
103 //go the node where the lock info resides
104 //check which mode the lock is taken
105 // if shared then
106 // upgrade the bucket mutex to write
107 // take it and increment the readers count
108 // release bucket mutex and exit
109 // if exclusive then
110 // go into the loop
111 // upgrade the bucket mutex to write
112 // increment waitReaders count
113 // release the bucket mutex
114 // wait for timeout period or (takes shared lock and release it ) till it becomes free.
115 // if times out
116 // take bucket mutex for write
117 // decrement waitReaders count
118 // releaese bucket mutex
120 // return
121 // if it becomes free
122 // take bucket mutex for write
123 // increment readers
124 // releaese bucket mutex
126 // return
127 LockInfo linfo;
128 linfo.noOfReaders_ = 1;
129 //keeping it ready for the allocation, because when
130 //lock node is not present in the list, then it means we are the first
131 //to acquire lock so for sure we will get it.
132 printDebug(DM_Lock, "LockManager::getSharedLock Begin");
133 Bucket *bucket = getLockBucket(tuple);
134 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
135 if (NULL == lockNode)
137 DbRetVal rv = OK;
138 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
139 if (NULL == node)
141 printError(rv, "Could not allocate Lock node");
142 return rv;
144 printDebug(DM_Lock, "Bucket list is null: Allocating new LockHashNode %x", node);
145 rv = OK;
146 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
147 if (rv !=OK) {
148 deallocLockNode(node);
149 printError(ErrLockTimeOut,"Unable to insert into hasList. Timeout.Retry...");
150 return ErrLockTimeOut;
153 //bucket->bucketList_ = (void*)node; //make it as head
154 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
155 if (ret != 0) {
156 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
157 deallocLockNode(node);
158 printError(ErrLockTimeOut, "Unable to set lock list head. Timeout. Retry...");
159 return ErrLockTimeOut;
161 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
162 printDebug(DM_Lock, "LockManager::getSharedLock End");
163 return rv;
165 LockHashNode *cachedLockNode = NULL;
167 LockHashNode *iter = lockNode;
168 //Iterate though the list and find the element's lock info
169 while(iter != NULL)
171 if(iter->ptrToTuple_ == tuple)
173 if (iter->lInfo_.noOfReaders_ == -1)
176 //iter->lInfo_.waitReaders_++;
177 int ret = Mutex::CAS((int*)&iter->lInfo_.waitReaders_,
178 iter->lInfo_.waitReaders_,
179 iter->lInfo_.waitReaders_+1);
180 if (ret !=0) {
181 printError(ErrLockTimeOut, "Unable to inc waitReaders:%d : Timeout. Retry..", iter->lInfo_.waitReaders_);
182 return ErrLockTimeOut;
184 cachedLockNode = iter;
185 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
186 if (trans != NULL) (*trans)->updateWaitLock(iter);
187 printDebug(DM_Lock, "lock node:%x exclusive locked",iter);
188 break;
190 else if (iter->lInfo_.noOfReaders_ == 0)
192 if(iter->lInfo_.waitWriters_ >0)
194 //iter->lInfo_.waitReaders_++;
195 int ret = Mutex::CAS((int*)&iter->lInfo_.waitReaders_,
196 iter->lInfo_.waitReaders_,
197 iter->lInfo_.waitReaders_+1);
198 if (ret !=0) {
199 printError(ErrLockTimeOut, "Unable to inc waitReaders:%d : Timeout. Retry..", iter->lInfo_.waitReaders_);
200 return ErrLockTimeOut;
202 cachedLockNode = iter;
203 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
204 if (trans != NULL) (*trans)->updateWaitLock(iter);
205 printDebug(DM_Lock, "lock node:%x Writers waiting.",iter);
206 break;
208 else
210 DbRetVal rv = OK;
211 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
212 if (rv != OK) {
213 printError(ErrLockTimeOut,"Unable to insert into hasList. Timeout. Retry..");
214 return ErrLockTimeOut;
216 //iter->lInfo_.noOfReaders_++;
217 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_,
218 iter->lInfo_.noOfReaders_,
219 iter->lInfo_.noOfReaders_+1);
220 if (ret !=0) {
221 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
222 printError(ErrLockTimeOut, "Unable to inc noOfReaders:%d : Timeout. Retry..", iter->lInfo_.noOfReaders_);
223 return ErrLockTimeOut;
225 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
226 printDebug(DM_Lock, "lock node:%x First to take shared lock",
227 iter);
228 printDebug(DM_Lock, "LockManager::getSharedLock End");
229 return rv;
231 }else
233 DbRetVal rv = OK;
234 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
235 if (rv != OK) {
236 printError(ErrLockTimeOut, "Unable to insert into hasList. Timeout : Retry..");
237 return ErrLockTimeOut;
239 //iter->lInfo_.noOfReaders_++;
240 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_,
241 iter->lInfo_.noOfReaders_,
242 iter->lInfo_.noOfReaders_+1);
243 if (ret !=0) {
244 if (trans!=NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
245 printError(ErrLockTimeOut, "Unable to take S lock. Timeout : Retry..");
246 return ErrLockTimeOut;
248 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
249 printDebug(DM_Lock, "lock node:%x incr readers",iter);
250 printDebug(DM_Lock, "LockManager::getSharedLock End");
251 return rv;
254 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
255 iter = iter->next_;
257 if (NULL == cachedLockNode)
259 DbRetVal rv =OK;
260 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
261 if (NULL == node)
263 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
264 printError(rv, "Could not allocate Lock node");
265 if (trans != NULL) (*trans)->removeWaitLock();
266 return rv;
268 printDebug(DM_Lock,"Not Found.Created new lock node:%x",node);
269 rv = OK;
270 if (trans != NULL)
271 rv = (*trans)->insertIntoHasList(systemDatabase_, node);
272 if (rv != OK) {
273 deallocLockNode(node);
274 if (trans != NULL) (*trans)->removeWaitLock();
275 printError(ErrLockTimeOut, "Unable to add to hasList : Timeout. Retry..");
276 return ErrLockTimeOut;
278 int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
279 if (lockRet != 0)
281 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
282 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
283 deallocLockNode(node);
284 if (trans != NULL) (*trans)->removeWaitLock();
285 return ErrLockTimeOut;
288 LockHashNode *it = (LockHashNode*) bucket->bucketList_;
290 if (NULL == it) {
291 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
292 if (ret != 0) {
293 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
294 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
295 deallocLockNode(node);
296 if (trans != NULL) (*trans)->removeWaitLock();
297 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...");
298 return ErrLockTimeOut;
300 } else {
301 while (NULL != it->next_) it = it->next_;
302 //it->next_ = node;
303 int ret = Mutex::CASL((long*)&it->next_, 0, (long)node);
304 if (ret !=0) {
305 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
306 if (trans !=NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
307 deallocLockNode(node);
308 if (trans != NULL) (*trans)->removeWaitLock();
309 printError(ErrLockTimeOut, "Unable to add to lock table list : Retry..");
310 return ErrLockTimeOut;
313 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
314 if (trans != NULL) (*trans)->removeWaitLock();
315 printDebug(DM_Lock, "LockManager::getSharedLock End");
316 return OK;
318 //bucket->mutex_.releaseLock();
319 int tries = 0;
320 int ret = 0;
321 struct timeval timeout;
322 timeout.tv_sec = Conf::config.getLockSecs();
323 timeout.tv_usec = Conf::config.getLockUSecs();
325 //printDebug(DM_Lock, "Trying to get mutex: for bucket %x\n", bucket);
326 while (tries < Conf::config.getLockRetries())
328 /*lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
329 if (lockRet != 0)
331 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
332 printDebug(DM_Lock, "LockManager::getSharedLock End");
333 printError(ErrLockTimeOut, "Unable to get bucket mutex");
334 if (trans != NULL) (*trans)->removeWaitLock();
335 return ErrLockTimeOut;
337 int oldValue = cachedLockNode->lInfo_.noOfReaders_;
338 if (cachedLockNode->lInfo_.noOfReaders_ == 0)
340 //if there are waiters allow then to take the lock
341 if (cachedLockNode->lInfo_.waitWriters_ <0)
343 DbRetVal rv = OK;
344 if (trans != NULL) rv=(*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
345 if (rv !=OK) {
346 if (trans != NULL) (*trans)->removeWaitLock();
347 printError(ErrLockTimeOut, "Unable to add to hasList. TimeOut : Retry..");
348 return ErrLockTimeOut;
351 //cachedLockNode->lInfo_.noOfReaders_++;
352 int ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.noOfReaders_,
353 0,cachedLockNode->lInfo_.noOfReaders_+1);
354 if (ret !=0) {
355 printError(ErrLockTimeOut, "Unable to take S lock. TimeOut : Retry..");
356 if (trans != NULL) (*trans)->removeWaitLock();
357 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
358 return ErrLockTimeOut;
360 //cachedLockNode->lInfo_.waitReaders_--;
361 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitReaders_,
362 cachedLockNode->lInfo_.waitReaders_,
363 cachedLockNode->lInfo_.waitReaders_-1);
364 if (ret !=0) {
365 printError(ErrLockTimeOut, "Fatal:Unable to dec waitReaders. Timeout : Retry..");
366 //return ErrLockTimeOut;
368 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
369 if (trans != NULL) (*trans)->removeWaitLock();
370 printDebug(DM_Lock, "LockManager::getSharedLock End");
371 return OK;
373 } else if (cachedLockNode->lInfo_.noOfReaders_ == -1)
375 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
377 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
378 if (trans != NULL) (*trans)->removeWaitLock();
379 printDebug(DM_Lock, "LockManager::getSharedLock End");
380 return OK;
382 } else
384 DbRetVal rv =OK;
385 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
386 if (rv !=OK) {
387 if (trans != NULL) (*trans)->removeWaitLock();
388 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
389 return ErrLockTimeOut;
391 //cachedLockNode->lInfo_.noOfReaders_++;
392 int ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.noOfReaders_,
393 oldValue, oldValue+1);
394 if (ret !=0) {
395 printError(ErrLockTimeOut, "Unable to take S lock. Timeout : Retry..");
396 if (trans != NULL) (*trans)->removeWaitLock();
397 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
398 return ErrLockTimeOut;
400 //cachedLockNode->lInfo_.waitReaders_--;
401 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitReaders_,
402 cachedLockNode->lInfo_.waitReaders_,
403 cachedLockNode->lInfo_.waitReaders_-1);
404 if (ret !=0) {
405 printError(ErrLockTimeOut, "Unable to dec waitReaders timeout : Retry..");
406 //return ErrLockTimeOut;
408 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
409 if (trans != NULL) (*trans)->removeWaitLock();
410 printDebug(DM_Lock, "LockManager::getSharedLock End");
411 return OK;
414 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
415 os::select(0, 0, 0, 0, &timeout);
416 tries++;
417 printDebug(DM_Lock, "Trying to lock the lock node:%x iteration:%d",cachedLockNode, tries);
419 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
420 printDebug(DM_Lock, "LockManager::getSharedLock End");
421 printError(ErrLockTimeOut, "Unable to acquire lock for long time.Timed out");
422 if (trans != NULL) (*trans)->removeWaitLock();
423 return ErrLockTimeOut;
427 DbRetVal LockManager::getExclusiveLock(void *tuple, Transaction **trans)
429 LockInfo linfo;
430 linfo.noOfReaders_ = -1;
431 printDebug(DM_Lock, "LockManager::getExclusiveLock Begin");
432 //keeping it ready for the allocation, because when
433 //lock node is not present in the list, then it means we are the first
434 //to acquire lock so for sure we will get it.
436 Bucket *bucket = getLockBucket(tuple);
437 /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
438 if (lockRet != 0)
440 printDebug(DM_Lock, "Unable to acquire bucket mutex:May be deadlock");
441 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
442 return ErrLockTimeOut;
444 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
445 if (NULL == lockNode)
447 DbRetVal rv = OK;
448 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
449 if (NULL == node) return rv;
450 rv =OK;
451 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
452 if (rv !=OK) {
453 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout: retry...");
454 deallocLockNode(node);
455 return ErrLockTimeOut;
457 printDebug(DM_Lock, "No head. So new lock node allocated:%x",node);
458 //bucket->bucketList_ = (void*)node; //make it as head
459 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
460 if (ret != 0) {
461 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
462 deallocLockNode(node);
463 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...%x", tuple);
464 return ErrLockTimeOut;
466 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
467 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
468 return OK;
471 LockHashNode *cachedLockNode = NULL;
473 LockHashNode *iter = lockNode;
474 //Iterate though the list and find the element's lock info
475 while(iter != NULL)
477 if(iter->ptrToTuple_ == tuple)
479 if (iter->lInfo_.noOfReaders_ != 0)
481 //iter->lInfo_.waitWriters_++;
482 int ret = Mutex::CAS((int*)&iter->lInfo_.waitWriters_,
483 iter->lInfo_.waitWriters_,
484 iter->lInfo_.waitWriters_+1);
485 if (ret !=0) {
486 printError(ErrLockTimeOut, "Unable to inc waitWriters. Timeout : Retry..");
487 return ErrLockTimeOut;
490 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
491 if (trans != NULL) (*trans)->updateWaitLock(iter);
492 cachedLockNode = iter;
493 printDebug(DM_Lock, "Either some one has exclusive or shared lock:%x",iter);
494 break;
496 else
498 DbRetVal rv =OK;
499 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
500 if (rv != OK) {
501 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
502 return ErrLockTimeOut;
504 //iter->lInfo_.noOfReaders_ = -1;
505 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_, 0, -1);
506 if (ret !=0) {
507 if (trans!= NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
508 printError(ErrLockTimeOut, "Unable to take X lock on tuple. Timeout : Retry..");
509 return ErrLockTimeOut;
511 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
512 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
513 return rv;
516 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
517 iter = iter->next_;
519 if (NULL == cachedLockNode)
521 DbRetVal rv =OK;
522 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
523 if (NULL == node)
525 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
526 if (trans != NULL) (*trans)->removeWaitLock();
527 printError(rv, "Could not allocate Lock node");
528 return rv;
530 printDebug(DM_Lock, "Not Found:Creating new lock node:%x",node);
531 rv = OK;
532 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
533 if (rv != OK) {
534 deallocLockNode(node);
535 if (trans != NULL) (*trans)->removeWaitLock();
536 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
537 return ErrLockTimeOut;
539 int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
540 if (lockRet != 0)
542 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
543 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
544 deallocLockNode(node);
545 if (trans != NULL) (*trans)->removeWaitLock();
546 return ErrLockTimeOut;
548 LockHashNode *it = (LockHashNode*) bucket->bucketList_;
549 if (NULL == it) {
550 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
551 if (ret != 0) {
552 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
553 deallocLockNode(node);
554 if (trans != NULL) (*trans)->removeWaitLock();
555 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...");
556 return ErrLockTimeOut;
560 } else {
561 while (NULL != it->next_) it = it->next_;
562 //it->next_ = node;
563 int ret = Mutex::CASL((long*)&it->next_, 0, (long)node);
564 if (ret !=0) {
565 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
566 deallocLockNode(node);
567 if (trans != NULL) (*trans)->removeWaitLock();
568 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
569 printError(ErrLockTimeOut, "Unable to add to lock list. Timeout : Retry..");
570 return ErrLockTimeOut;
573 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
574 if (trans != NULL) (*trans)->removeWaitLock();
575 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
576 return rv;
578 //bucket->mutex_.releaseLock();
579 int tries = 0;
580 int ret = 0;
581 struct timeval timeout;
582 timeout.tv_sec = Conf::config.getLockSecs();
583 timeout.tv_usec = Conf::config.getLockUSecs();
585 while (tries < Conf::config.getLockRetries())
587 /*lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
588 if (lockRet != 0)
590 printError(ErrLockTimeOut, "Unable to get bucket mutex");
591 return ErrLockTimeOut;
593 int oldValue = cachedLockNode->lInfo_.noOfReaders_;
594 if (cachedLockNode->lInfo_.noOfReaders_ == 0)
596 DbRetVal rv ;
597 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
598 if (rv != OK) {
599 if (trans != NULL) (*trans)->removeWaitLock();
600 printError(ErrLockTimeOut, "Unable to add to hasList: Timeout: Retry..");
601 return ErrLockTimeOut;
603 //cachedLockNode->lInfo_.noOfReaders_ = -1;
604 int ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.noOfReaders_,
605 0, -1);
606 if (ret !=0) {
607 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
608 if (trans != NULL) (*trans)->removeWaitLock();
609 printError(ErrLockTimeOut, "Unable to take X lock: Timeout: Retry..");
610 return ErrLockTimeOut;
613 //cachedLockNode->lInfo_.waitWriters_--;
614 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitWriters_,
615 cachedLockNode->lInfo_.waitWriters_,
616 cachedLockNode->lInfo_.waitWriters_-1);
617 if (ret !=0) {
618 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters. TimeOut : Retry.. waitWriters:%d",cachedLockNode->lInfo_.waitWriters_);
619 //do not do error return as this is OK to continue.
620 //return ErrLockTimeOut;
622 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
623 if (trans != NULL) (*trans)->removeWaitLock();
624 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
625 return rv;
626 }else if ( cachedLockNode->lInfo_.noOfReaders_ == 1)
628 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
630 printDebug(DM_Lock, "upgrading shared to exclusive lock:%x",
631 cachedLockNode);
632 //upgrade it to exclusive lock
633 //cachedLockNode->lInfo_.noOfReaders_ = -1;
634 int ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.noOfReaders_,
635 1, -1);
636 if (ret !=0) {
637 if (trans != NULL) (*trans)->removeWaitLock();
638 printError(ErrLockTimeOut, "Unable to upgrade lock. Timeout : Retry..");
639 return ErrLockTimeOut;
642 //cachedLockNode->lInfo_.waitWriters_--;
643 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitWriters_,
644 cachedLockNode->lInfo_.waitWriters_,
645 cachedLockNode->lInfo_.waitWriters_-1);
646 if (ret !=0) {
647 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
648 //do not do error return as this is OK to contine
649 //return ErrLockTimeOut;
651 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
652 if (trans != NULL) (*trans)->removeWaitLock();
653 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
654 return OK;
656 if (trans ==NULL && ProcessManager::hasLockList.exists(cachedLockNode->ptrToTuple_))
658 printDebug(DM_Lock, "upgrading shared to exclusive lock:%x",
659 cachedLockNode);
660 //upgrade it to exclusive lock
661 //cachedLockNode->lInfo_.noOfReaders_ = -1;
662 int ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.noOfReaders_,
663 1, -1);
664 if (ret !=0) {
665 printError(ErrLockTimeOut, "Unable to upgrade lock. Timeout : Retry..");
666 if (trans != NULL) (*trans)->removeWaitLock();
667 return ErrLockTimeOut;
670 //cachedLockNode->lInfo_.waitWriters_--;
671 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitWriters_,
672 cachedLockNode->lInfo_.waitWriters_,
673 cachedLockNode->lInfo_.waitWriters_-1);
674 if (ret !=0) {
675 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
676 //do not do error return as this is OK to contine
677 //return ErrLockTimeOut;
679 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
680 if (trans != NULL) (*trans)->removeWaitLock();
681 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
682 return OK;
684 }else if ( cachedLockNode->lInfo_.noOfReaders_ == -1)
686 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
688 printDebug(DM_Lock, "You already have exclusive lock:%x",
689 cachedLockNode);
690 //cachedLockNode->lInfo_.waitWriters_--;
691 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitWriters_,
692 cachedLockNode->lInfo_.waitWriters_,
693 cachedLockNode->lInfo_.waitWriters_-1);
694 if (ret !=0) {
695 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
696 //do not do error return as this is OK to contine
697 //return ErrLockTimeOut;
699 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
700 if (trans != NULL) (*trans)->removeWaitLock();
701 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
702 return OK;
704 if (trans ==NULL && ProcessManager::hasLockList.exists(cachedLockNode->ptrToTuple_))
706 printDebug(DM_Lock, "You already have exclusive lock:%x",
707 cachedLockNode);
708 //cachedLockNode->lInfo_.waitWriters_--;
709 ret = Mutex::CAS((int*)&cachedLockNode->lInfo_.waitWriters_,
710 cachedLockNode->lInfo_.waitWriters_,
711 cachedLockNode->lInfo_.waitWriters_-1);
712 if (ret !=0) {
713 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
714 //do not do error return as this is OK to contine
715 //return ErrLockTimeOut;
717 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
718 if (trans != NULL) (*trans)->removeWaitLock();
719 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
720 return OK;
723 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
724 os::select(0, 0, 0, 0, &timeout);
725 tries++;
726 printDebug(DM_Lock, "Trying to lock the lock node:%x iteration:%d",cachedLockNode, tries);
728 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
729 if (trans != NULL) (*trans)->removeWaitLock();
730 printError(ErrLockTimeOut, "Unable to acquire lock for long time.Timed out");
731 return ErrLockTimeOut;
734 DbRetVal LockManager::releaseLock(void *tuple)
736 printDebug(DM_Lock, "LockManager:releaseLock Start");
737 Bucket *bucket = getLockBucket(tuple);
738 printDebug(DM_Lock,"Bucket is %x", bucket);
739 /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
740 if (lockRet != 0)
742 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
743 printDebug(DM_Lock, "LockManager:releaseLock End");
744 printError(ErrLockTimeOut, "Unable to get bucket mutex");
745 return ErrLockTimeOut;
747 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
748 if (NULL == lockNode)
750 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
751 printDebug(DM_Lock, "LockManager:releaseLock End");
752 //printError(ErrSysFatal, "Fatal:Lock Bucket and Element Not found for tuple: %x:", tuple);
753 printStackTrace();
754 printError(ErrSysFatal, "Fatal:Lock Bucket and Element Not found for tuple: %x:%d", tuple, *(int*)tuple);
755 return ErrSysFatal;
757 DbRetVal rv = OK;
758 LockHashNode *iter = lockNode;
759 //Iterate though the list and find the element's lock info
760 while(iter != NULL)
762 if(iter->ptrToTuple_ == tuple)
764 int oldValue = iter->lInfo_.noOfReaders_;
765 if (iter->lInfo_.noOfReaders_ == -1)
767 //iter->lInfo_.noOfReaders_ = 0;
768 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_,
769 -1, 0);
770 if (ret !=0) {
771 printError(ErrLockTimeOut, "Unable to release X lock taken : Retry..");
772 return ErrLockTimeOut;
775 if (iter->lInfo_.waitWriters_ == 0 || iter->lInfo_.waitReaders_ ==0)
777 //TODO::above condition is not atomic
778 //put waitReaders_, WaitReaders in one integer
779 int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
780 if (lockRet == 0) {
781 int tries = Conf::config.getMutexRetries();
782 do {
783 rv = deallocLockNode(iter, bucket);
784 if (tries == 0) {
785 printError(ErrWarning, "Fatal:Leak: Unable to dealloc lock node %d tries", Conf::config.getMutexRetries());
786 break;
788 tries--;
789 }while (rv == ErrLockTimeOut);
790 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
792 printDebug(DM_Lock, "Releasing exclusive lock and dealloc node:%x", iter);
793 printDebug(DM_Lock, "LockManager:releaseLock End");
794 return OK;
796 else
798 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
799 printDebug(DM_Lock, "Releasing exclusive lock");
800 printDebug(DM_Lock, "LockManager:releaseLock End");
801 return OK;
804 else if (iter->lInfo_.noOfReaders_ == 1)
806 //iter->lInfo_.noOfReaders_ = 0;
807 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_,
808 1, 0);
809 if (ret !=0) {
810 printError(ErrLockTimeOut, "Unable to release S lock taken. Timeout : Retry..");
811 return ErrLockTimeOut;
813 if (iter->lInfo_.waitWriters_ == 0 || iter->lInfo_.waitReaders_ ==0)
815 int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
816 if (lockRet == 0) {
817 int tries = Conf::config.getMutexRetries();
818 do {
819 rv = deallocLockNode(iter, bucket);
820 if (tries == 0) {
821 printError(ErrWarning, "Fatal:Leak:Unable to dealloc lock node");
823 tries--;
824 }while (rv == ErrLockTimeOut);
825 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
827 printDebug(DM_Lock, "Releasing read lock and dealloc node:%x",iter);
828 printDebug(DM_Lock, "LockManager:releaseLock End");
829 return OK;
831 else
833 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
834 printDebug(DM_Lock, "Releasing read lock");
835 printDebug(DM_Lock, "LockManager:releaseLock End");
836 return OK;
839 else
841 //iter->lInfo_.noOfReaders_--;
842 int ret = Mutex::CAS((int*)&iter->lInfo_.noOfReaders_,
843 oldValue,
844 oldValue - 1);
845 if (ret !=0) {
846 printError(ErrLockTimeOut, "Unable to release S lock taken : Retry..");
847 return ErrLockTimeOut;
849 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
850 printDebug(DM_Lock, "Decrementing read lock:%x",iter);
851 printDebug(DM_Lock, "LockManager:releaseLock End");
852 return OK;
856 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
857 iter = iter->next_;
859 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
860 printError(ErrSysFatal, "Fatal:Lock Element Not found for tuple:%x", tuple);
861 return ErrSysFatal;
864 DbRetVal LockManager::isExclusiveLocked(void *tuple, Transaction **trans, bool &status)
866 Bucket *bucket = getLockBucket(tuple);
867 printDebug(DM_Lock,"Bucket is %x", bucket);
868 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
869 if (NULL == lockNode)
871 printDebug(DM_Lock, "bucketList is empty. so data element not locked");
872 status = false;
873 return OK;
875 /*int lockRet = bucket->mutex_.getLock(systemDatabase_->procSlot);
876 if (lockRet != 0)
878 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
879 printDebug(DM_Lock, "LockManager:releaseLock End");
880 printError(ErrLockTimeOut, "Unable to get bucket mutex");
881 return ErrLockTimeOut;
884 LockHashNode *iter = lockNode;
885 //Iterate though the list and find the element's lock info
886 //Only exclusive locks are checked. shared locks are not considered for this
887 while(iter != NULL)
889 if(iter->ptrToTuple_ == tuple)
891 if (iter->lInfo_.noOfReaders_ == -1)
893 if (trans != NULL && (*trans)->findInHasList(systemDatabase_, iter))
895 printDebug(DM_Lock, "You already have exclusive Lock: %x", iter);
896 status = false;
898 else
899 status = true;
900 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
901 return OK;
904 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
905 if (iter == iter->next_) {
906 printError(ErrSysFatal, "Fatal:Unable to find lock node. cyclic list found");
907 return ErrSysFatal;
909 iter = iter->next_;
911 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
912 status = false;
913 return OK;
916 LockHashNode* LockManager::allocLockNode(LockInfo &info, void *tuple, DbRetVal *rv)
918 //allocate lock node
919 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
920 LockHashNode *node = NULL; //(LockHashNode*)chunk->allocate(systemDatabase_, rv);
921 int tries=0;
922 int totalTries = Conf::config.getMutexRetries();
923 while (tries < totalTries)
925 *rv = OK;
926 node= (LockHashNode*)chunk->allocate(systemDatabase_, rv);
927 if (node !=NULL) break;
928 if (*rv != ErrLockTimeOut)
930 printError(*rv, "Unable to allocate hash index node");
931 return NULL;
933 tries++;
935 if (NULL == node)
937 printError(*rv, "Unable to allocate lock node after %d retry", tries);
938 return NULL;
940 node->ptrToTuple_ = tuple;
941 node->lInfo_ = info;
942 node->next_ = NULL;
943 return node;
945 void LockManager::deallocLockNode(LockHashNode *node)
947 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
948 chunk->free(systemDatabase_, node);
949 return;
951 DbRetVal LockManager::deallocLockNode(LockHashNode *node, Bucket *bucket)
953 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
954 LockHashNode *nodeList = (LockHashNode*) bucket->bucketList_;
955 LockHashNode *iter = nodeList, *prev = nodeList;
956 if (NULL == nodeList)
958 printError(ErrSysFatal, "Fatal:Lock Bucket is NULL");
959 return ErrSysFatal;
961 //If it is the first node, then make the bucket point to the next node
962 //in the list
963 if (nodeList == node)
965 //bucket->bucketList_ = node->next_;
966 if ( 0 != Mutex::CASL((long*)&bucket->bucketList_, (long)node,
967 (long)node->next_)) {
968 printError(ErrLockTimeOut, "Unable to remove lock node and set lock list head\n");
969 return ErrLockTimeOut;
971 chunk->free(systemDatabase_, node);
972 return OK;
974 void *val = NULL;
975 while(iter != node)
977 if (iter == NULL) {
978 printError(ErrSysFatal, "Fatal: Lock node not found in bucket");
979 printStackTrace();
980 return ErrSysFatal;
982 prev = iter;
983 val = prev->next_;
984 iter = iter->next_;
986 //delete the node by making previous element point to the next element
987 //of the deleted element in the list
988 //prev->next_ = iter->next_;
989 if ( 0 != Mutex::CASL((long*)&prev->next_, (long)val,
990 (long)iter->next_)) {
991 printError(ErrLockTimeOut, "Unable to remove lock node \n");
992 return ErrLockTimeOut;
994 chunk->free(systemDatabase_, node);
995 return OK;