*** empty log message ***
[csql.git] / src / storage / LockManager.cxx
blob9944d77f348df4d493050dbb813f05f4fd4483a8
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Lock.h>
17 #include<Allocator.h>
18 #include<Database.h>
19 #include<CatalogTables.h>
20 #include<Transaction.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 LockManager::LockManager(Database *sysDb_)
26 systemDatabase_ = sysDb_;
27 lockBuckets = systemDatabase_->getLockHashBuckets();
29 Bucket* LockManager::getLockBucket(void *tuple)
31 //Bucket* buckets = systemDatabase_->getLockHashBuckets();
32 unsigned long key =(unsigned long)tuple ;
33 int bucketNo = key % LOCK_BUCKET_SIZE;
35 //Bucket *bucket = &(buckets[bucketNo]);
36 printDebug(DM_Lock, "getLockBucket bucketno:%d",bucketNo);
37 return &(lockBuckets[bucketNo]);
40 void LockManager::printUsageStatistics()
42 Bucket* buckets = systemDatabase_->getLockHashBuckets();
43 Bucket* bucket;
44 LockHashNode *lockNode;
45 int nodeCount =0, bucketCount =0;
46 for (int i =0; i< LOCK_BUCKET_SIZE; i++)
48 bucket = &(buckets[i]);
49 lockNode = (LockHashNode*) bucket->bucketList_;
50 if (lockNode) bucketCount++; else continue;
51 while (NULL != lockNode) { nodeCount++; lockNode = lockNode->next_; }
53 printf("<LockTable>\n");
54 printf(" <TotalBuckets> %d </TotalBuckets>\n", LOCK_BUCKET_SIZE);
55 printf(" <UsedBuckets> %d </UsedBuckets>\n", bucketCount);
56 printf(" <TotalLockNodes> %d </TotalLockNodes>\n", nodeCount);
57 printf("</LockTable>\n");
61 void LockManager::printDebugInfo()
63 Bucket* buckets = systemDatabase_->getLockHashBuckets();
64 Bucket* bucket;
65 LockHashNode *lockNode;
66 int nodeCount =0, bucketCount =0;
67 printf("<LockTable>\n");
68 for (int i =0; i< LOCK_BUCKET_SIZE; i++)
70 nodeCount =0;
71 bucket = &(buckets[i]);
72 //if (bucket) bucketCount++; else continue;
73 lockNode = (LockHashNode*) bucket->bucketList_;
75 while (NULL != lockNode)
77 nodeCount++;
78 lockNode->print();
79 lockNode = lockNode->next_;
81 if (nodeCount) {
82 bucketCount++;
83 printf(" <LockBucket> \n");
84 printf(" <BucketNo> %d </BucketNo> \n", i);
85 printf(" <TotalNodes> %d </TotalNodes>\n", nodeCount);
86 printf(" <LockBucket>\n");
90 printf(" <TotalUsedBuckets> %d </TotalUsedBuckets>\n", bucketCount);
91 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
92 printf(" <TotalPages> %d </TotalPages>\n", chunk->totalPages());
93 printf("</LockTable>\n");
97 DbRetVal LockManager::getSharedLock(void *tuple, Transaction **trans)
99 //get the bucket list
100 //take the bucket mutex for read
101 //go the the next level bucket list
102 //get the bucket iterator
103 //go the node where the lock info resides
104 //check which mode the lock is taken
105 // if shared then
106 // upgrade the bucket mutex to write
107 // take it and increment the readers count
108 // release bucket mutex and exit
109 // if exclusive then
110 // go into the loop
111 // upgrade the bucket mutex to write
112 // increment waitReaders count
113 // release the bucket mutex
114 // wait for timeout period or (takes shared lock and release it ) till it becomes free.
115 // if times out
116 // take bucket mutex for write
117 // decrement waitReaders count
118 // releaese bucket mutex
120 // return
121 // if it becomes free
122 // take bucket mutex for write
123 // increment readers
124 // releaese bucket mutex
126 // return
127 LockInfo linfo;
128 linfo.noOfReaders_ = 1;
129 //keeping it ready for the allocation, because when
130 //lock node is not present in the list, then it means we are the first
131 //to acquire lock so for sure we will get it.
132 printDebug(DM_Lock, "LockManager::getSharedLock Begin");
133 Bucket *bucket = getLockBucket(tuple);
134 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
135 if (NULL == lockNode)
137 DbRetVal rv = OK;
138 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
139 if (NULL == node)
141 printError(rv, "Could not allocate Lock node");
142 return rv;
144 printDebug(DM_Lock, "Bucket list is null: Allocating new LockHashNode %x", node);
145 rv = OK;
146 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
147 if (rv !=OK) {
148 deallocLockNode(node);
149 printError(ErrLockTimeOut,"Unable to insert into hasList. Timeout.Retry...");
150 return ErrLockTimeOut;
153 //bucket->bucketList_ = (void*)node; //make it as head
154 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
155 if (ret != 0) {
156 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
157 deallocLockNode(node);
158 printError(ErrLockTimeOut, "Unable to set lock list head. Timeout. Retry...");
159 return ErrLockTimeOut;
161 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
162 printDebug(DM_Lock, "LockManager::getSharedLock End");
163 return rv;
165 LockHashNode *cachedLockNode = NULL;
167 LockHashNode *iter = lockNode;
168 //Iterate though the list and find the element's lock info
169 while(iter != NULL)
171 if(iter->ptrToTuple_ == tuple)
173 if (iter->lInfo_.noOfReaders_ == -1)
176 //iter->lInfo_.waitReaders_++;
177 int ret = Mutex::CASGen(&iter->lInfo_.waitReaders_,
178 iter->lInfo_.waitReaders_,
179 iter->lInfo_.waitReaders_+1);
180 if (ret !=0) {
181 printError(ErrLockTimeOut, "Unable to inc waitReaders:%d : Timeout. Retry..", iter->lInfo_.waitReaders_);
182 return ErrLockTimeOut;
184 cachedLockNode = iter;
185 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
186 if (trans != NULL) (*trans)->updateWaitLock(iter);
187 printDebug(DM_Lock, "lock node:%x exclusive locked",iter);
188 break;
190 else if (iter->lInfo_.noOfReaders_ == 0)
192 if(iter->lInfo_.waitWriters_ >0)
194 //iter->lInfo_.waitReaders_++;
195 int ret = Mutex::CASGen(&iter->lInfo_.waitReaders_,
196 iter->lInfo_.waitReaders_,
197 iter->lInfo_.waitReaders_+1);
198 if (ret !=0) {
199 printError(ErrLockTimeOut, "Unable to inc waitReaders:%d : Timeout. Retry..", iter->lInfo_.waitReaders_);
200 return ErrLockTimeOut;
202 cachedLockNode = iter;
203 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
204 if (trans != NULL) (*trans)->updateWaitLock(iter);
205 printDebug(DM_Lock, "lock node:%x Writers waiting.",iter);
206 break;
208 else
210 DbRetVal rv = OK;
211 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
212 if (rv != OK) {
213 printError(ErrLockTimeOut,"Unable to insert into hasList. Timeout. Retry..");
214 return ErrLockTimeOut;
216 //iter->lInfo_.noOfReaders_++;
217 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_,
218 iter->lInfo_.noOfReaders_,
219 iter->lInfo_.noOfReaders_+1);
220 if (ret !=0) {
221 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
222 printError(ErrLockTimeOut, "Unable to inc noOfReaders:%d : Timeout. Retry..", iter->lInfo_.noOfReaders_);
223 return ErrLockTimeOut;
225 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
226 printDebug(DM_Lock, "lock node:%x First to take shared lock",
227 iter);
228 printDebug(DM_Lock, "LockManager::getSharedLock End");
229 return rv;
231 }else {
232 DbRetVal rv = OK;
233 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
234 if (rv != OK) {
235 printError(ErrLockTimeOut, "Unable to insert into hasList. Timeout : Retry..");
236 return ErrLockTimeOut;
238 //iter->lInfo_.noOfReaders_++;
239 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_,
240 iter->lInfo_.noOfReaders_,
241 iter->lInfo_.noOfReaders_+1);
242 if (ret !=0) {
243 if (trans!=NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
244 printError(ErrLockTimeOut, "Unable to take S lock. Timeout : Retry..");
245 return ErrLockTimeOut;
247 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
248 printDebug(DM_Lock, "lock node:%x incr readers",iter);
249 printDebug(DM_Lock, "LockManager::getSharedLock End");
250 return rv;
253 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
254 iter = iter->next_;
256 if (NULL == cachedLockNode)
258 DbRetVal rv =OK;
259 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
260 if (NULL == node)
262 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
263 printError(rv, "Could not allocate Lock node");
264 if (trans != NULL) (*trans)->removeWaitLock();
265 return rv;
267 printDebug(DM_Lock,"Not Found.Created new lock node:%x",node);
268 rv = OK;
269 if (trans != NULL)
270 rv = (*trans)->insertIntoHasList(systemDatabase_, node);
271 if (rv != OK) {
272 deallocLockNode(node);
273 if (trans != NULL) (*trans)->removeWaitLock();
274 printError(ErrLockTimeOut, "Unable to add to hasList : Timeout. Retry..");
275 return ErrLockTimeOut;
277 DbRetVal lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
278 if (lockRet != OK)
280 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
281 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
282 deallocLockNode(node);
283 if (trans != NULL) (*trans)->removeWaitLock();
284 return ErrLockTimeOut;
287 LockHashNode *it = (LockHashNode*) bucket->bucketList_;
289 if (NULL == it) {
290 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
291 if (ret != 0) {
292 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
293 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
294 deallocLockNode(node);
295 if (trans != NULL) (*trans)->removeWaitLock();
296 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...");
297 return ErrLockTimeOut;
299 } else {
300 while (NULL != it->next_) it = it->next_;
301 //it->next_ = node;
302 int ret = Mutex::CASL((long*)&it->next_, 0, (long)node);
303 if (ret !=0) {
304 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
305 if (trans !=NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
306 deallocLockNode(node);
307 if (trans != NULL) (*trans)->removeWaitLock();
308 printError(ErrLockTimeOut, "Unable to add to lock table list : Retry..");
309 return ErrLockTimeOut;
312 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
313 if (trans != NULL) (*trans)->removeWaitLock();
314 printDebug(DM_Lock, "LockManager::getSharedLock End");
315 return OK;
317 //bucket->mutex_.releaseLock();
318 int tries = 0;
319 int ret = 0;
320 struct timeval timeout;
321 timeout.tv_sec = Conf::config.getLockSecs();
322 timeout.tv_usec = Conf::config.getLockUSecs();
324 //printDebug(DM_Lock, "Trying to get mutex: for bucket %x\n", bucket);
325 while (tries < Conf::config.getLockRetries())
327 /*lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
328 if (lockRet != 0)
330 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
331 printDebug(DM_Lock, "LockManager::getSharedLock End");
332 printError(ErrLockTimeOut, "Unable to get bucket mutex");
333 if (trans != NULL) (*trans)->removeWaitLock();
334 return ErrLockTimeOut;
336 InUse oldValue = cachedLockNode->lInfo_.noOfReaders_;
337 if (cachedLockNode->lInfo_.noOfReaders_ == 0)
339 //if there are waiters allow then to take the lock
340 if (cachedLockNode->lInfo_.waitWriters_ <0)
342 DbRetVal rv = OK;
343 if (trans != NULL) rv=(*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
344 if (rv !=OK) {
345 if (trans != NULL) (*trans)->removeWaitLock();
346 printError(ErrLockTimeOut, "Unable to add to hasList. TimeOut : Retry..");
347 return ErrLockTimeOut;
350 //cachedLockNode->lInfo_.noOfReaders_++;
351 int ret = Mutex::CASGen(&cachedLockNode->lInfo_.noOfReaders_,
352 0,cachedLockNode->lInfo_.noOfReaders_+1);
353 if (ret !=0) {
354 printError(ErrLockTimeOut, "Unable to take S lock. TimeOut : Retry..");
355 if (trans != NULL) (*trans)->removeWaitLock();
356 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
357 return ErrLockTimeOut;
359 //cachedLockNode->lInfo_.waitReaders_--;
360 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitReaders_,
361 cachedLockNode->lInfo_.waitReaders_,
362 cachedLockNode->lInfo_.waitReaders_-1);
363 if (ret !=0) {
364 printError(ErrLockTimeOut, "Fatal:Unable to dec waitReaders. Timeout : Retry..");
365 //return ErrLockTimeOut;
367 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
368 if (trans != NULL) (*trans)->removeWaitLock();
369 printDebug(DM_Lock, "LockManager::getSharedLock End");
370 return OK;
372 } else if (cachedLockNode->lInfo_.noOfReaders_ == -1)
374 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
376 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
377 if (trans != NULL) (*trans)->removeWaitLock();
378 printDebug(DM_Lock, "LockManager::getSharedLock End");
379 return OK;
381 } else
383 DbRetVal rv =OK;
384 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
385 if (rv !=OK) {
386 if (trans != NULL) (*trans)->removeWaitLock();
387 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
388 return ErrLockTimeOut;
390 //cachedLockNode->lInfo_.noOfReaders_++;
391 int ret = Mutex::CASGen(&cachedLockNode->lInfo_.noOfReaders_,
392 oldValue, oldValue+1);
393 if (ret !=0) {
394 printError(ErrLockTimeOut, "Unable to take S lock. Timeout : Retry..");
395 if (trans != NULL) (*trans)->removeWaitLock();
396 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
397 return ErrLockTimeOut;
399 //cachedLockNode->lInfo_.waitReaders_--;
400 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitReaders_,
401 cachedLockNode->lInfo_.waitReaders_,
402 cachedLockNode->lInfo_.waitReaders_-1);
403 if (ret !=0) {
404 printError(ErrLockTimeOut, "Unable to dec waitReaders timeout : Retry..");
405 //return ErrLockTimeOut;
407 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
408 if (trans != NULL) (*trans)->removeWaitLock();
409 printDebug(DM_Lock, "LockManager::getSharedLock End");
410 return OK;
413 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
414 os::select(0, 0, 0, 0, &timeout);
415 tries++;
416 printDebug(DM_Lock, "Trying to lock the lock node:%x iteration:%d",cachedLockNode, tries);
418 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
419 printDebug(DM_Lock, "LockManager::getSharedLock End");
420 printError(ErrLockTimeOut, "Unable to acquire lock for long time.Timed out");
421 if (trans != NULL) (*trans)->removeWaitLock();
422 return ErrLockTimeOut;
426 DbRetVal LockManager::getExclusiveLock(void *tuple, Transaction **trans)
428 LockInfo linfo;
429 linfo.noOfReaders_ = -1;
430 printDebug(DM_Lock, "LockManager::getExclusiveLock Begin");
431 //keeping it ready for the allocation, because when
432 //lock node is not present in the list, then it means we are the first
433 //to acquire lock so for sure we will get it.
435 Bucket *bucket = getLockBucket(tuple);
436 /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
437 if (lockRet != 0)
439 printDebug(DM_Lock, "Unable to acquire bucket mutex:May be deadlock");
440 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
441 return ErrLockTimeOut;
443 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
444 if (NULL == lockNode)
446 DbRetVal rv = OK;
447 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
448 if (NULL == node) return rv;
449 rv =OK;
450 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
451 if (rv !=OK) {
452 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout: retry...");
453 deallocLockNode(node);
454 return ErrLockTimeOut;
456 printDebug(DM_Lock, "No head. So new lock node allocated:%x",node);
457 //bucket->bucketList_ = (void*)node; //make it as head
458 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
459 if (ret != 0) {
460 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
461 deallocLockNode(node);
462 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...%x", tuple);
463 return ErrLockTimeOut;
465 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
466 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
467 return OK;
470 LockHashNode *cachedLockNode = NULL;
472 LockHashNode *iter = lockNode;
473 //Iterate though the list and find the element's lock info
474 while(iter != NULL)
476 if(iter->ptrToTuple_ == tuple)
478 if (iter->lInfo_.noOfReaders_ != 0)
480 //iter->lInfo_.waitWriters_++;
481 int ret = Mutex::CASGen(&iter->lInfo_.waitWriters_,
482 iter->lInfo_.waitWriters_,
483 iter->lInfo_.waitWriters_+1);
484 if (ret !=0) {
485 printError(ErrLockTimeOut, "Unable to inc waitWriters. Timeout : Retry..");
486 return ErrLockTimeOut;
489 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
490 if (trans != NULL) (*trans)->updateWaitLock(iter);
491 cachedLockNode = iter;
492 printDebug(DM_Lock, "Either some one has exclusive or shared lock:%x",iter);
493 break;
495 else
497 DbRetVal rv =OK;
498 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, iter);
499 if (rv != OK) {
500 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
501 return ErrLockTimeOut;
503 //iter->lInfo_.noOfReaders_ = -1;
504 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_, 0, -1);
505 if (ret !=0) {
506 if (trans!= NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
507 printError(ErrLockTimeOut, "Unable to take X lock on tuple. Timeout : Retry..");
508 return ErrLockTimeOut;
510 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
511 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
512 return rv;
515 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
516 iter = iter->next_;
518 if (NULL == cachedLockNode)
520 DbRetVal rv =OK;
521 LockHashNode *node = allocLockNode(linfo, tuple, &rv);
522 if (NULL == node)
524 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
525 if (trans != NULL) (*trans)->removeWaitLock();
526 printError(rv, "Could not allocate Lock node");
527 return rv;
529 printDebug(DM_Lock, "Not Found:Creating new lock node:%x",node);
530 rv = OK;
531 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, node);
532 if (rv != OK) {
533 deallocLockNode(node);
534 if (trans != NULL) (*trans)->removeWaitLock();
535 printError(ErrLockTimeOut, "Unable to add to hasList. Timeout : Retry..");
536 return ErrLockTimeOut;
538 int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
539 if (lockRet != 0)
541 printError(ErrLockTimeOut, "Unable to acquire bucket mutex");
542 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
543 deallocLockNode(node);
544 if (trans != NULL) (*trans)->removeWaitLock();
545 return ErrLockTimeOut;
547 LockHashNode *it = (LockHashNode*) bucket->bucketList_;
548 if (NULL == it) {
549 int ret = Mutex::CASL((long*)&bucket->bucketList_, 0 , (long)node);
550 if (ret != 0) {
551 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
552 deallocLockNode(node);
553 if (trans != NULL) (*trans)->removeWaitLock();
554 printError(ErrLockTimeOut, "Unable to set Lock Bucket. Timeout: retry...");
555 return ErrLockTimeOut;
559 } else {
560 while (NULL != it->next_) it = it->next_;
561 //it->next_ = node;
562 int ret = Mutex::CASL((long*)&it->next_, 0, (long)node);
563 if (ret !=0) {
564 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
565 deallocLockNode(node);
566 if (trans != NULL) (*trans)->removeWaitLock();
567 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
568 printError(ErrLockTimeOut, "Unable to add to lock list. Timeout : Retry..");
569 return ErrLockTimeOut;
572 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
573 if (trans != NULL) (*trans)->removeWaitLock();
574 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
575 return rv;
577 //bucket->mutex_.releaseLock();
578 int tries = 0;
579 int ret = 0;
580 struct timeval timeout;
581 timeout.tv_sec = Conf::config.getLockSecs();
582 timeout.tv_usec = Conf::config.getLockUSecs();
584 while (tries < Conf::config.getLockRetries())
586 /*lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
587 if (lockRet != 0)
589 printError(ErrLockTimeOut, "Unable to get bucket mutex");
590 return ErrLockTimeOut;
592 InUse oldValue = cachedLockNode->lInfo_.noOfReaders_;
593 if (cachedLockNode->lInfo_.noOfReaders_ == 0)
595 DbRetVal rv ;
596 if (trans != NULL) rv = (*trans)->insertIntoHasList(systemDatabase_, cachedLockNode);
597 if (rv != OK) {
598 if (trans != NULL) (*trans)->removeWaitLock();
599 printError(ErrLockTimeOut, "Unable to add to hasList: Timeout: Retry..");
600 return ErrLockTimeOut;
602 //cachedLockNode->lInfo_.noOfReaders_ = -1;
603 int ret = Mutex::CASGen(&cachedLockNode->lInfo_.noOfReaders_,0,-1);
604 if (ret !=0) {
605 if (trans != NULL) (*trans)->removeFromHasList(systemDatabase_, tuple);
606 if (trans != NULL) (*trans)->removeWaitLock();
607 printError(ErrLockTimeOut, "Unable to take X lock: Timeout: Retry..");
608 return ErrLockTimeOut;
611 //cachedLockNode->lInfo_.waitWriters_--;
612 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitWriters_,
613 cachedLockNode->lInfo_.waitWriters_,
614 cachedLockNode->lInfo_.waitWriters_-1);
615 if (ret !=0) {
616 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters. TimeOut : Retry.. waitWriters:%d",cachedLockNode->lInfo_.waitWriters_);
617 //do not do error return as this is OK to continue.
618 //return ErrLockTimeOut;
620 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
621 if (trans != NULL) (*trans)->removeWaitLock();
622 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
623 return rv;
624 }else if ( cachedLockNode->lInfo_.noOfReaders_ == 1)
626 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
628 printDebug(DM_Lock, "upgrading shared to exclusive lock:%x",
629 cachedLockNode);
630 //upgrade it to exclusive lock
631 //cachedLockNode->lInfo_.noOfReaders_ = -1;
632 int ret = Mutex::CASGen(&cachedLockNode->lInfo_.noOfReaders_, 1, -1);
633 if (ret !=0) {
634 if (trans != NULL) (*trans)->removeWaitLock();
635 printError(ErrLockTimeOut, "Unable to upgrade lock. Timeout : Retry..");
636 return ErrLockTimeOut;
639 //cachedLockNode->lInfo_.waitWriters_--;
640 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitWriters_,
641 cachedLockNode->lInfo_.waitWriters_,
642 cachedLockNode->lInfo_.waitWriters_-1);
643 if (ret !=0) {
644 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
645 //do not do error return as this is OK to contine
646 //return ErrLockTimeOut;
648 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
649 if (trans != NULL) (*trans)->removeWaitLock();
650 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
651 return OK;
653 if (trans ==NULL && ProcessManager::hasLockList.exists(cachedLockNode->ptrToTuple_))
655 printDebug(DM_Lock, "upgrading shared to exclusive lock:%x",
656 cachedLockNode);
657 //upgrade it to exclusive lock
658 //cachedLockNode->lInfo_.noOfReaders_ = -1;
659 int ret = Mutex::CASGen(&cachedLockNode->lInfo_.noOfReaders_, 1, -1);
660 if (ret !=0) {
661 printError(ErrLockTimeOut, "Unable to upgrade lock. Timeout : Retry..");
662 if (trans != NULL) (*trans)->removeWaitLock();
663 return ErrLockTimeOut;
666 //cachedLockNode->lInfo_.waitWriters_--;
667 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitWriters_,
668 cachedLockNode->lInfo_.waitWriters_,
669 cachedLockNode->lInfo_.waitWriters_-1);
670 if (ret !=0) {
671 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
672 //do not do error return as this is OK to contine
673 //return ErrLockTimeOut;
675 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
676 if (trans != NULL) (*trans)->removeWaitLock();
677 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
678 return OK;
680 }else if ( cachedLockNode->lInfo_.noOfReaders_ == -1)
682 if (trans !=NULL && (*trans)->findInHasList(systemDatabase_, cachedLockNode))
684 printDebug(DM_Lock, "You already have exclusive lock:%x",
685 cachedLockNode);
686 //cachedLockNode->lInfo_.waitWriters_--;
687 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitWriters_,
688 cachedLockNode->lInfo_.waitWriters_,
689 cachedLockNode->lInfo_.waitWriters_-1);
690 if (ret !=0) {
691 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
692 //do not do error return as this is OK to contine
693 //return ErrLockTimeOut;
695 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
696 if (trans != NULL) (*trans)->removeWaitLock();
697 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
698 return OK;
700 if (trans ==NULL && ProcessManager::hasLockList.exists(cachedLockNode->ptrToTuple_))
702 printDebug(DM_Lock, "You already have exclusive lock:%x",
703 cachedLockNode);
704 //cachedLockNode->lInfo_.waitWriters_--;
705 ret = Mutex::CASGen(&cachedLockNode->lInfo_.waitWriters_,
706 cachedLockNode->lInfo_.waitWriters_,
707 cachedLockNode->lInfo_.waitWriters_-1);
708 if (ret !=0) {
709 printError(ErrLockTimeOut, "Fatal:Unable to dec waitWriters:Timeout : Retry.. waitwriters:%d", cachedLockNode->lInfo_.waitWriters_);
710 //do not do error return as this is OK to contine
711 //return ErrLockTimeOut;
713 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
714 if (trans != NULL) (*trans)->removeWaitLock();
715 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
716 return OK;
719 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
720 os::select(0, 0, 0, 0, &timeout);
721 tries++;
722 printDebug(DM_Lock, "Trying to lock the lock node:%x iteration:%d",cachedLockNode, tries);
724 printDebug(DM_Lock, "LockManager::getExclusiveLock End");
725 if (trans != NULL) (*trans)->removeWaitLock();
726 printError(ErrLockTimeOut, "Unable to acquire lock for long time.Timed out");
727 return ErrLockTimeOut;
730 DbRetVal LockManager::releaseLock(void *tuple)
732 printDebug(DM_Lock, "LockManager:releaseLock Start");
733 Bucket *bucket = getLockBucket(tuple);
734 printDebug(DM_Lock,"Bucket is %x", bucket);
735 /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
736 if (lockRet != 0)
738 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
739 printDebug(DM_Lock, "LockManager:releaseLock End");
740 printError(ErrLockTimeOut, "Unable to get bucket mutex");
741 return ErrLockTimeOut;
743 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
744 if (NULL == lockNode)
746 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
747 printDebug(DM_Lock, "LockManager:releaseLock End");
748 //printError(ErrSysFatal, "Fatal:Lock Bucket and Element Not found for tuple: %x:", tuple);
749 printStackTrace();
750 printError(ErrSysFatal, "Fatal:Lock Bucket and Element Not found for tuple: %x:%d", tuple, *(int*)tuple);
751 return ErrSysFatal;
753 DbRetVal rv = OK;
754 LockHashNode *iter = lockNode;
755 //Iterate though the list and find the element's lock info
756 while(iter != NULL)
758 if(iter->ptrToTuple_ == tuple)
760 InUse oldValue = iter->lInfo_.noOfReaders_;
761 if (iter->lInfo_.noOfReaders_ == -1)
763 //iter->lInfo_.noOfReaders_ = 0;
764 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_, -1, 0);
765 if (ret !=0) {
766 printError(ErrLockTimeOut, "Unable to release X lock taken : Retry..");
767 return ErrLockTimeOut;
770 if (iter->lInfo_.waitWriters_ == 0 || iter->lInfo_.waitReaders_ ==0)
772 //TODO::above condition is not atomic
773 //put waitReaders_, WaitReaders in one integer
774 DbRetVal lockRet = getBucketMutex(bucket,systemDatabase_->procSlot);
775 if (lockRet == OK) {
776 int tries = Conf::config.getMutexRetries();
777 do {
778 rv = deallocLockNode(iter, bucket);
779 if (tries == 0) {
780 printError(ErrWarning, "Fatal:Leak: Unable to dealloc lock node %d tries", Conf::config.getMutexRetries());
781 break;
783 tries--;
784 }while (rv == ErrLockTimeOut);
785 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
787 printDebug(DM_Lock, "Releasing exclusive lock and dealloc node:%x", iter);
788 printDebug(DM_Lock, "LockManager:releaseLock End");
789 return OK;
791 else
793 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
794 printDebug(DM_Lock, "Releasing exclusive lock");
795 printDebug(DM_Lock, "LockManager:releaseLock End");
796 return OK;
799 else if (iter->lInfo_.noOfReaders_ == 1)
801 //iter->lInfo_.noOfReaders_ = 0;
802 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_, 1, 0);
803 if (ret !=0) {
804 printError(ErrLockTimeOut, "Unable to release S lock taken. Timeout : Retry..");
805 return ErrLockTimeOut;
807 if (iter->lInfo_.waitWriters_ == 0 || iter->lInfo_.waitReaders_ ==0)
809 DbRetVal lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
810 if (lockRet == OK) {
811 int tries = Conf::config.getMutexRetries();
812 do {
813 rv = deallocLockNode(iter, bucket);
814 if (tries == 0) {
815 printError(ErrWarning, "Fatal:Leak:Unable to dealloc lock node");
817 tries--;
818 }while (rv == ErrLockTimeOut);
819 bucket->mutex_.releaseLock(systemDatabase_->procSlot);
821 printDebug(DM_Lock, "Releasing read lock and dealloc node:%x",iter);
822 printDebug(DM_Lock, "LockManager:releaseLock End");
823 return OK;
825 else
827 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
828 printDebug(DM_Lock, "Releasing read lock");
829 printDebug(DM_Lock, "LockManager:releaseLock End");
830 return OK;
833 else
835 //iter->lInfo_.noOfReaders_--;
836 int ret = Mutex::CASGen(&iter->lInfo_.noOfReaders_, oldValue, oldValue - 1);
837 if (ret !=0) {
838 printError(ErrLockTimeOut, "Unable to release S lock taken : Retry..");
839 return ErrLockTimeOut;
841 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
842 printDebug(DM_Lock, "Decrementing read lock:%x",iter);
843 printDebug(DM_Lock, "LockManager:releaseLock End");
844 return OK;
848 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
849 iter = iter->next_;
851 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
852 printError(ErrSysFatal, "Fatal:Lock Element Not found for tuple:%x", tuple);
853 return ErrSysFatal;
856 DbRetVal LockManager::isExclusiveLocked(void *tuple, Transaction **trans, bool &status)
858 Bucket *bucket = getLockBucket(tuple);
859 printDebug(DM_Lock,"Bucket is %x", bucket);
860 LockHashNode *lockNode = (LockHashNode*) bucket->bucketList_;
861 if (NULL == lockNode)
863 printDebug(DM_Lock, "bucketList is empty. so data element not locked");
864 status = false;
865 return OK;
867 /*int lockRet = getBucketMutex(bucket, systemDatabase_->procSlot);
868 if (lockRet != 0)
870 printDebug(DM_Lock, "Mutex is waiting for long time:May be deadlock");
871 printDebug(DM_Lock, "LockManager:releaseLock End");
872 printError(ErrLockTimeOut, "Unable to get bucket mutex");
873 return ErrLockTimeOut;
876 LockHashNode *iter = lockNode;
877 //Iterate though the list and find the element's lock info
878 //Only exclusive locks are checked. shared locks are not considered for this
879 while(iter != NULL)
881 if(iter->ptrToTuple_ == tuple)
883 if (iter->lInfo_.noOfReaders_ == -1)
885 if (trans != NULL && (*trans)->findInHasList(systemDatabase_, iter))
887 printDebug(DM_Lock, "You already have exclusive Lock: %x", iter);
888 status = false;
890 else
891 status = true;
892 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
893 return OK;
894 }else break;
896 printDebug(DM_Lock, "Finding the lock node. iter:%x",iter);
897 if (iter == iter->next_) {
898 printError(ErrSysFatal, "Fatal:Unable to find lock node. cyclic list found");
899 return ErrSysFatal;
901 iter = iter->next_;
903 //bucket->mutex_.releaseLock(systemDatabase_->procSlot);
904 status = false;
905 return OK;
908 LockHashNode* LockManager::allocLockNode(LockInfo &info, void *tuple, DbRetVal *rv)
910 //allocate lock node
911 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
912 LockHashNode *node = NULL; //(LockHashNode*)chunk->allocate(systemDatabase_, rv);
913 int tries=0;
914 int totalTries = Conf::config.getMutexRetries();
915 while (tries < totalTries)
917 *rv = OK;
918 node= (LockHashNode*)chunk->allocate(systemDatabase_, rv);
919 if (node !=NULL) break;
920 if (*rv != ErrLockTimeOut)
922 printError(*rv, "Unable to allocate hash index node");
923 return NULL;
925 tries++;
927 if (NULL == node)
929 printError(*rv, "Unable to allocate lock node after %d retry", tries);
930 return NULL;
932 node->ptrToTuple_ = tuple;
933 node->lInfo_ = info;
934 node->next_ = NULL;
935 return node;
937 void LockManager::deallocLockNode(LockHashNode *node)
939 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
940 chunk->free(systemDatabase_, node);
941 return;
943 DbRetVal LockManager::deallocLockNode(LockHashNode *node, Bucket *bucket)
945 Chunk *chunk = systemDatabase_->getSystemDatabaseChunk(LockTableId);
946 LockHashNode *nodeList = (LockHashNode*) bucket->bucketList_;
947 LockHashNode *iter = nodeList, *prev = nodeList;
948 if (NULL == nodeList)
950 printError(ErrSysFatal, "Fatal:Lock Bucket is NULL");
951 return ErrSysFatal;
953 //If it is the first node, then make the bucket point to the next node
954 //in the list
955 if (nodeList == node)
957 //bucket->bucketList_ = node->next_;
958 if ( 0 != Mutex::CASL((long*)&bucket->bucketList_, (long)node,
959 (long)node->next_)) {
960 printError(ErrLockTimeOut, "Unable to remove lock node and set lock list head\n");
961 return ErrLockTimeOut;
963 chunk->free(systemDatabase_, node);
964 return OK;
966 void *val = NULL;
967 while(iter != node)
969 if (iter == NULL) {
970 printError(ErrSysFatal, "Fatal: Lock node not found in bucket");
971 printStackTrace();
972 return ErrSysFatal;
974 prev = iter;
975 val = prev->next_;
976 iter = iter->next_;
978 //delete the node by making previous element point to the next element
979 //of the deleted element in the list
980 //prev->next_ = iter->next_;
981 if ( 0 != Mutex::CASL((long*)&prev->next_, (long)val, (long)iter->next_)) {
982 printError(ErrLockTimeOut, "Unable to remove lock node \n");
983 return ErrLockTimeOut;
985 chunk->free(systemDatabase_, node);
986 return OK;
988 DbRetVal LockManager::getBucketMutex(Bucket *bucket, int procSlot)
990 struct timeval timeout, timeval;
991 timeout.tv_sec = Conf::config.getMutexSecs();
992 timeout.tv_usec = Conf::config.getMutexUSecs();
993 int tries=0;
994 int totalTries = Conf::config.getMutexRetries() *2;
995 int ret =0;
996 while (tries < totalTries)
998 ret = bucket->mutex_.getLock(procSlot, true);
999 if (ret == 0) break;
1000 timeval.tv_sec = timeout.tv_sec;
1001 timeval.tv_usec = timeout.tv_usec;
1002 os::select(0, 0, 0, 0, &timeval);
1003 tries++;
1005 if (tries >= totalTries) return ErrLockTimeOut;
1006 return OK;