mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / kernel / blocks / dbtup / DbtupScan.cpp
blobb7538d85d26b4663fb3cc1442ecad5c88e9c7855
1 /* Copyright (c) 2003, 2005-2008 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #define DBTUP_C
17 #define DBTUP_SCAN_CPP
18 #include "Dbtup.hpp"
19 #include <signaldata/AccScan.hpp>
20 #include <signaldata/NextScan.hpp>
21 #include <signaldata/AccLock.hpp>
22 #include <md5_hash.hpp>
24 #undef jam
25 #undef jamEntry
26 #define jam() { jamLine(32000 + __LINE__); }
27 #define jamEntry() { jamEntryLine(32000 + __LINE__); }
29 #ifdef VM_TRACE
30 #define dbg(x) globalSignalLoggers.log x
31 #else
32 #define dbg(x)
33 #endif
35 void
36 Dbtup::execACC_SCANREQ(Signal* signal)
38 jamEntry();
39 const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
40 const AccScanReq* const req = &reqCopy;
41 ScanOpPtr scanPtr;
42 scanPtr.i = RNIL;
43 do {
44 // find table and fragment
45 TablerecPtr tablePtr;
46 tablePtr.i = req->tableId;
47 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
48 FragrecordPtr fragPtr;
49 Uint32 fragId = req->fragmentNo;
50 fragPtr.i = RNIL;
51 getFragmentrec(fragPtr, fragId, tablePtr.p);
52 ndbrequire(fragPtr.i != RNIL);
53 Fragrecord& frag = *fragPtr.p;
54 // flags
55 Uint32 bits = 0;
58 if (AccScanReq::getLcpScanFlag(req->requestInfo))
60 jam();
61 bits |= ScanOp::SCAN_LCP;
62 c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
64 else
66 // seize from pool and link to per-fragment list
67 LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
68 if (! list.seize(scanPtr)) {
69 jam();
70 break;
74 if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
75 && tablePtr.p->m_no_of_disk_attributes)
77 bits |= ScanOp::SCAN_DD;
80 bool mm = (bits & ScanOp::SCAN_DD);
81 if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
82 bits |= ScanOp::SCAN_VS;
84 // disk pages have fixed page format
85 ndbrequire(! (bits & ScanOp::SCAN_DD));
87 if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
88 if (AccScanReq::getLockMode(req->requestInfo) == 0)
89 bits |= ScanOp::SCAN_LOCK_SH;
90 else
91 bits |= ScanOp::SCAN_LOCK_EX;
94 if (AccScanReq::getNRScanFlag(req->requestInfo))
96 jam();
97 bits |= ScanOp::SCAN_NR;
98 scanPtr.p->m_endPage = req->maxPage;
99 if (req->maxPage != RNIL && req->maxPage > frag.noOfPages)
101 ndbout_c("%u %u endPage: %u (noOfPages: %u)",
102 tablePtr.i, fragId,
103 req->maxPage, fragPtr.p->noOfPages);
106 else
108 jam();
109 scanPtr.p->m_endPage = RNIL;
112 if (AccScanReq::getLcpScanFlag(req->requestInfo))
114 jam();
115 ndbrequire((bits & ScanOp::SCAN_DD) == 0);
116 ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
119 // set up scan op
120 new (scanPtr.p) ScanOp();
121 ScanOp& scan = *scanPtr.p;
122 scan.m_state = ScanOp::First;
123 scan.m_bits = bits;
124 scan.m_userPtr = req->senderData;
125 scan.m_userRef = req->senderRef;
126 scan.m_tableId = tablePtr.i;
127 scan.m_fragId = frag.fragmentId;
128 scan.m_fragPtrI = fragPtr.i;
129 scan.m_transId1 = req->transId1;
130 scan.m_transId2 = req->transId2;
131 scan.m_savePointId = req->savePointId;
133 // conf
134 AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
135 conf->scanPtr = req->senderData;
136 conf->accPtr = scanPtr.i;
137 conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
138 sendSignal(req->senderRef, GSN_ACC_SCANCONF,
139 signal, AccScanConf::SignalLength, JBB);
140 return;
141 } while (0);
142 if (scanPtr.i != RNIL) {
143 jam();
144 releaseScanOp(scanPtr);
146 // LQH does not handle REF
147 signal->theData[0] = 0x313;
148 sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
151 void
152 Dbtup::execNEXT_SCANREQ(Signal* signal)
154 jamEntry();
155 const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
156 const NextScanReq* const req = &reqCopy;
157 ScanOpPtr scanPtr;
158 c_scanOpPool.getPtr(scanPtr, req->accPtr);
159 ScanOp& scan = *scanPtr.p;
160 switch (req->scanFlag) {
161 case NextScanReq::ZSCAN_NEXT:
162 jam();
163 break;
164 case NextScanReq::ZSCAN_NEXT_COMMIT:
165 jam();
166 case NextScanReq::ZSCAN_COMMIT:
167 jam();
168 if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
169 jam();
170 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
171 lockReq->returnCode = RNIL;
172 lockReq->requestInfo = AccLockReq::Unlock;
173 lockReq->accOpPtr = req->accOperationPtr;
174 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
175 signal, AccLockReq::UndoSignalLength);
176 jamEntry();
177 ndbrequire(lockReq->returnCode == AccLockReq::Success);
178 removeAccLockOp(scan, req->accOperationPtr);
180 if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
181 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
182 conf->scanPtr = scan.m_userPtr;
183 unsigned signalLength = 1;
184 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
185 signal, signalLength, JBB);
186 return;
188 break;
189 case NextScanReq::ZSCAN_CLOSE:
190 jam();
191 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
192 jam();
193 ndbrequire(scan.m_accLockOp != RNIL);
194 // use ACC_ABORTCONF to flush out any reply in job buffer
195 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
196 lockReq->returnCode = RNIL;
197 lockReq->requestInfo = AccLockReq::AbortWithConf;
198 lockReq->accOpPtr = scan.m_accLockOp;
199 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
200 signal, AccLockReq::UndoSignalLength);
201 jamEntry();
202 ndbrequire(lockReq->returnCode == AccLockReq::Success);
203 scan.m_state = ScanOp::Aborting;
204 return;
206 if (scan.m_state == ScanOp::Locked) {
207 jam();
208 ndbrequire(scan.m_accLockOp != RNIL);
209 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
210 lockReq->returnCode = RNIL;
211 lockReq->requestInfo = AccLockReq::Abort;
212 lockReq->accOpPtr = scan.m_accLockOp;
213 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
214 signal, AccLockReq::UndoSignalLength);
215 jamEntry();
216 ndbrequire(lockReq->returnCode == AccLockReq::Success);
217 scan.m_accLockOp = RNIL;
219 scan.m_state = ScanOp::Aborting;
220 scanClose(signal, scanPtr);
221 return;
222 case NextScanReq::ZSCAN_NEXT_ABORT:
223 jam();
224 default:
225 jam();
226 ndbrequire(false);
227 break;
229 // start looking for next scan result
230 AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
231 checkReq->accPtr = scanPtr.i;
232 checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
233 EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
234 jamEntry();
237 void
238 Dbtup::execACC_CHECK_SCAN(Signal* signal)
240 jamEntry();
241 const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
242 const AccCheckScan* const req = &reqCopy;
243 ScanOpPtr scanPtr;
244 c_scanOpPool.getPtr(scanPtr, req->accPtr);
245 ScanOp& scan = *scanPtr.p;
246 // fragment
247 FragrecordPtr fragPtr;
248 fragPtr.i = scan.m_fragPtrI;
249 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
250 Fragrecord& frag = *fragPtr.p;
251 if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
252 jam();
253 signal->theData[0] = scan.m_userPtr;
254 signal->theData[1] = true;
255 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
256 jamEntry();
257 return;
259 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
260 jam();
261 // LQH asks if we are waiting for lock and we tell it to ask again
262 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
263 conf->scanPtr = scan.m_userPtr;
264 conf->accOperationPtr = RNIL; // no tuple returned
265 conf->fragId = frag.fragmentId;
266 unsigned signalLength = 3;
267 // if TC has ordered scan close, it will be detected here
268 sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
269 signal, signalLength, JBB);
270 return; // stop
272 if (scan.m_state == ScanOp::First) {
273 jam();
274 scanFirst(signal, scanPtr);
276 if (scan.m_state == ScanOp::Next) {
277 jam();
278 bool immediate = scanNext(signal, scanPtr);
279 if (! immediate) {
280 jam();
281 // time-slicing via TUP or PGMAN
282 return;
285 scanReply(signal, scanPtr);
288 void
289 Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
291 ScanOp& scan = *scanPtr.p;
292 FragrecordPtr fragPtr;
293 fragPtr.i = scan.m_fragPtrI;
294 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
295 Fragrecord& frag = *fragPtr.p;
296 // for reading tuple key in Current state
297 Uint32* pkData = (Uint32*)c_dataBuffer;
298 unsigned pkSize = 0;
299 if (scan.m_state == ScanOp::Current) {
300 // found an entry to return
301 jam();
302 ndbrequire(scan.m_accLockOp == RNIL);
303 if (scan.m_bits & ScanOp::SCAN_LOCK) {
304 jam();
305 // read tuple key - use TUX routine
306 const ScanPos& pos = scan.m_scanPos;
307 const Local_key& key_mm = pos.m_key_mm;
308 int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
309 pkData, true);
310 ndbrequire(ret > 0);
311 pkSize = ret;
312 dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
313 // get read lock or exclusive lock
314 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
315 lockReq->returnCode = RNIL;
316 lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
317 AccLockReq::LockShared : AccLockReq::LockExclusive;
318 lockReq->accOpPtr = RNIL;
319 lockReq->userPtr = scanPtr.i;
320 lockReq->userRef = reference();
321 lockReq->tableId = scan.m_tableId;
322 lockReq->fragId = frag.fragmentId;
323 lockReq->fragPtrI = RNIL; // no cached frag ptr yet
324 lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
325 lockReq->tupAddr = key_mm.ref();
326 lockReq->transId1 = scan.m_transId1;
327 lockReq->transId2 = scan.m_transId2;
328 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
329 signal, AccLockReq::LockSignalLength);
330 jamEntry();
331 switch (lockReq->returnCode) {
332 case AccLockReq::Success:
333 jam();
334 scan.m_state = ScanOp::Locked;
335 scan.m_accLockOp = lockReq->accOpPtr;
336 break;
337 case AccLockReq::IsBlocked:
338 jam();
339 // normal lock wait
340 scan.m_state = ScanOp::Blocked;
341 scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
342 scan.m_accLockOp = lockReq->accOpPtr;
343 // LQH will wake us up
344 signal->theData[0] = scan.m_userPtr;
345 signal->theData[1] = true;
346 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
347 jamEntry();
348 return;
349 break;
350 case AccLockReq::Refused:
351 jam();
352 // we cannot see deleted tuple (assert only)
353 ndbassert(false);
354 // skip it
355 scan.m_state = ScanOp::Next;
356 signal->theData[0] = scan.m_userPtr;
357 signal->theData[1] = true;
358 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
359 jamEntry();
360 return;
361 break;
362 case AccLockReq::NoFreeOp:
363 jam();
364 // max ops should depend on max scans (assert only)
365 ndbassert(false);
366 // stay in Current state
367 scan.m_state = ScanOp::Current;
368 signal->theData[0] = scan.m_userPtr;
369 signal->theData[1] = true;
370 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
371 jamEntry();
372 return;
373 break;
374 default:
375 ndbrequire(false);
376 break;
378 } else {
379 scan.m_state = ScanOp::Locked;
383 if (scan.m_state == ScanOp::Locked) {
384 // we have lock or do not need one
385 jam();
386 // conf signal
387 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
388 conf->scanPtr = scan.m_userPtr;
389 // the lock is passed to LQH
390 Uint32 accLockOp = scan.m_accLockOp;
391 if (accLockOp != RNIL) {
392 scan.m_accLockOp = RNIL;
393 // remember it until LQH unlocks it
394 addAccLockOp(scan, accLockOp);
395 } else {
396 ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
397 // operation RNIL in LQH would signal no tuple returned
398 accLockOp = (Uint32)-1;
400 const ScanPos& pos = scan.m_scanPos;
401 conf->accOperationPtr = accLockOp;
402 conf->fragId = frag.fragmentId;
403 conf->localKey[0] = pos.m_key_mm.ref();
404 conf->localKey[1] = 0;
405 conf->localKeyLength = 1;
406 unsigned signalLength = 6;
407 if (scan.m_bits & ScanOp::SCAN_LOCK) {
408 sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
409 signal, signalLength, JBB);
410 } else {
411 Uint32 blockNo = refToBlock(scan.m_userRef);
412 EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
413 jamEntry();
415 // next time look for next entry
416 scan.m_state = ScanOp::Next;
417 return;
419 if (scan.m_state == ScanOp::Last ||
420 scan.m_state == ScanOp::Invalid) {
421 jam();
422 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
423 conf->scanPtr = scan.m_userPtr;
424 conf->accOperationPtr = RNIL;
425 conf->fragId = RNIL;
426 unsigned signalLength = 3;
427 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
428 signal, signalLength, JBB);
429 return;
431 ndbrequire(false);
435 * Lock succeeded (after delay) in ACC. If the lock is for current
436 * entry, set state to Locked. If the lock is for an entry we were
437 * moved away from, simply unlock it. Finally, if we are closing the
438 * scan, do nothing since we have already sent an abort request.
440 void
441 Dbtup::execACCKEYCONF(Signal* signal)
443 jamEntry();
444 ScanOpPtr scanPtr;
445 scanPtr.i = signal->theData[0];
446 c_scanOpPool.getPtr(scanPtr);
447 ScanOp& scan = *scanPtr.p;
448 ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
449 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
450 if (scan.m_state == ScanOp::Blocked) {
451 // the lock wait was for current entry
452 jam();
453 scan.m_state = ScanOp::Locked;
454 // LQH has the ball
455 return;
457 if (scan.m_state != ScanOp::Aborting) {
458 // we were moved, release lock
459 jam();
460 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
461 lockReq->returnCode = RNIL;
462 lockReq->requestInfo = AccLockReq::Abort;
463 lockReq->accOpPtr = scan.m_accLockOp;
464 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
465 jamEntry();
466 ndbrequire(lockReq->returnCode == AccLockReq::Success);
467 scan.m_accLockOp = RNIL;
468 // LQH has the ball
469 return;
471 // lose the lock
472 scan.m_accLockOp = RNIL;
473 // continue at ACC_ABORTCONF
477 * Lock failed (after delay) in ACC. Probably means somebody ahead of
478 * us in lock queue deleted the tuple.
480 void
481 Dbtup::execACCKEYREF(Signal* signal)
483 jamEntry();
484 ScanOpPtr scanPtr;
485 scanPtr.i = signal->theData[0];
486 c_scanOpPool.getPtr(scanPtr);
487 ScanOp& scan = *scanPtr.p;
488 ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
489 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
490 if (scan.m_state != ScanOp::Aborting) {
491 jam();
492 // release the operation
493 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
494 lockReq->returnCode = RNIL;
495 lockReq->requestInfo = AccLockReq::Abort;
496 lockReq->accOpPtr = scan.m_accLockOp;
497 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
498 jamEntry();
499 ndbrequire(lockReq->returnCode == AccLockReq::Success);
500 scan.m_accLockOp = RNIL;
501 // scan position should already have been moved (assert only)
502 if (scan.m_state == ScanOp::Blocked) {
503 jam();
504 //ndbassert(false);
505 if (scan.m_bits & ScanOp::SCAN_NR)
507 jam();
508 scan.m_state = ScanOp::Next;
509 scan.m_scanPos.m_get = ScanPos::Get_tuple;
510 ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
512 else
514 jam();
515 scan.m_state = ScanOp::Next;
516 ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
519 // LQH has the ball
520 return;
522 // lose the lock
523 scan.m_accLockOp = RNIL;
524 // continue at ACC_ABORTCONF
528 * Received when scan is closing. This signal arrives after any
529 * ACCKEYCON or ACCKEYREF which may have been in job buffer.
531 void
532 Dbtup::execACC_ABORTCONF(Signal* signal)
534 jamEntry();
535 ScanOpPtr scanPtr;
536 scanPtr.i = signal->theData[0];
537 c_scanOpPool.getPtr(scanPtr);
538 ScanOp& scan = *scanPtr.p;
539 ndbrequire(scan.m_state == ScanOp::Aborting);
540 // most likely we are still in lock wait
541 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
542 jam();
543 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
544 scan.m_accLockOp = RNIL;
546 scanClose(signal, scanPtr);
549 void
550 Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
552 ScanOp& scan = *scanPtr.p;
553 ScanPos& pos = scan.m_scanPos;
554 Local_key& key = pos.m_key;
555 const Uint32 bits = scan.m_bits;
556 // fragment
557 FragrecordPtr fragPtr;
558 fragPtr.i = scan.m_fragPtrI;
559 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
560 Fragrecord& frag = *fragPtr.p;
561 // in the future should not pre-allocate pages
562 if (frag.noOfPages == 0 && ((bits & ScanOp::SCAN_NR) == 0)) {
563 jam();
564 scan.m_state = ScanOp::Last;
565 return;
567 if (! (bits & ScanOp::SCAN_DD)) {
568 key.m_file_no = ZNIL;
569 key.m_page_no = 0;
570 pos.m_get = ScanPos::Get_page_mm;
571 // for MM scan real page id is cached for efficiency
572 pos.m_realpid_mm = RNIL;
573 } else {
574 Disk_alloc_info& alloc = frag.m_disk_alloc_info;
575 // for now must check disk part explicitly
576 if (alloc.m_extent_list.firstItem == RNIL) {
577 jam();
578 scan.m_state = ScanOp::Last;
579 return;
581 pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
582 Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
583 key.m_file_no = ext->m_key.m_file_no;
584 key.m_page_no = ext->m_first_page_no;
585 pos.m_get = ScanPos::Get_page_dd;
587 key.m_page_idx = 0;
588 // let scanNext() do the work
589 scan.m_state = ScanOp::Next;
592 bool
593 Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
595 ScanOp& scan = *scanPtr.p;
596 ScanPos& pos = scan.m_scanPos;
597 Local_key& key = pos.m_key;
598 const Uint32 bits = scan.m_bits;
599 // table
600 TablerecPtr tablePtr;
601 tablePtr.i = scan.m_tableId;
602 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
603 Tablerec& table = *tablePtr.p;
604 // fragment
605 FragrecordPtr fragPtr;
606 fragPtr.i = scan.m_fragPtrI;
607 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
608 Fragrecord& frag = *fragPtr.p;
609 // tuple found
610 Tuple_header* th = 0;
611 Uint32 thbits = 0;
612 Uint32 loop_count = 0;
613 Uint32 scanGCI = scanPtr.p->m_scanGCI;
614 Uint32 foundGCI;
616 const bool mm = (bits & ScanOp::SCAN_DD);
617 const bool lcp = (bits & ScanOp::SCAN_LCP);
619 Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
620 Uint32 size = table.m_offsets[mm].m_fix_header_size;
622 if (lcp && lcp_list != RNIL)
623 goto found_lcp_keep;
625 switch(pos.m_get){
626 case ScanPos::Get_next_tuple:
627 case ScanPos::Get_next_tuple_fs:
628 jam();
629 key.m_page_idx += size;
630 // fall through
631 case ScanPos::Get_tuple:
632 case ScanPos::Get_tuple_fs:
633 jam();
635 * We need to refetch page after timeslice
637 pos.m_get = ScanPos::Get_page;
638 break;
639 default:
640 break;
643 while (true) {
644 switch (pos.m_get) {
645 case ScanPos::Get_next_page:
646 // move to next page
647 jam();
649 if (! (bits & ScanOp::SCAN_DD))
650 pos.m_get = ScanPos::Get_next_page_mm;
651 else
652 pos.m_get = ScanPos::Get_next_page_dd;
654 continue;
655 case ScanPos::Get_page:
656 // get real page
657 jam();
659 if (! (bits & ScanOp::SCAN_DD))
660 pos.m_get = ScanPos::Get_page_mm;
661 else
662 pos.m_get = ScanPos::Get_page_dd;
664 continue;
665 case ScanPos::Get_next_page_mm:
666 // move to next logical TUP page
667 jam();
669 key.m_page_no++;
670 if (key.m_page_no >= frag.noOfPages) {
671 jam();
673 if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
675 jam();
676 if (key.m_page_no < scan.m_endPage)
678 jam();
679 ndbout_c("scanning page %u", key.m_page_no);
680 goto cont;
683 // no more pages, scan ends
684 pos.m_get = ScanPos::Get_undef;
685 scan.m_state = ScanOp::Last;
686 return true;
688 cont:
689 key.m_page_idx = 0;
690 pos.m_get = ScanPos::Get_page_mm;
691 // clear cached value
692 pos.m_realpid_mm = RNIL;
694 /*FALLTHRU*/
695 case ScanPos::Get_page_mm:
696 // get TUP real page
697 jam();
699 if (pos.m_realpid_mm == RNIL) {
700 jam();
701 if (key.m_page_no < frag.noOfPages)
702 pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
703 else
705 ndbassert(bits & ScanOp::SCAN_NR);
706 goto nopage;
709 PagePtr pagePtr;
710 c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
712 if (pagePtr.p->page_state == ZEMPTY_MM) {
713 // skip empty page
714 jam();
715 if (! (bits & ScanOp::SCAN_NR))
717 pos.m_get = ScanPos::Get_next_page_mm;
718 break; // incr loop count
720 else
722 jam();
723 pos.m_realpid_mm = RNIL;
726 nopage:
727 pos.m_page = pagePtr.p;
728 pos.m_get = ScanPos::Get_tuple;
730 continue;
731 case ScanPos::Get_next_page_dd:
732 // move to next disk page
733 jam();
735 Disk_alloc_info& alloc = frag.m_disk_alloc_info;
736 Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
737 Ptr<Extent_info> ext_ptr;
738 c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
739 Extent_info* ext = ext_ptr.p;
740 key.m_page_no++;
741 if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
742 // no more pages in this extent
743 jam();
744 if (! list.next(ext_ptr)) {
745 // no more extents, scan ends
746 jam();
747 pos.m_get = ScanPos::Get_undef;
748 scan.m_state = ScanOp::Last;
749 return true;
750 } else {
751 // move to next extent
752 jam();
753 pos.m_extent_info_ptr_i = ext_ptr.i;
754 ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
755 key.m_file_no = ext->m_key.m_file_no;
756 key.m_page_no = ext->m_first_page_no;
759 key.m_page_idx = 0;
760 pos.m_get = ScanPos::Get_page_dd;
762 read ahead for scan in disk order
763 do read ahead every 8:th page
765 if ((bits & ScanOp::SCAN_DD) &&
766 (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
768 jam();
769 // initialize PGMAN request
770 Page_cache_client::Request preq;
771 preq.m_page = pos.m_key;
772 preq.m_callback = TheNULLCallback;
774 // set maximum read ahead
775 Uint32 read_ahead = m_max_page_read_ahead;
777 while (true)
779 // prepare page read ahead in current extent
780 Uint32 page_no = preq.m_page.m_page_no;
781 Uint32 page_no_limit = page_no + read_ahead;
782 Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
783 if (page_no_limit > limit)
785 jam();
786 // read ahead crosses extent, set limit for this extent
787 read_ahead = page_no_limit - limit;
788 page_no_limit = limit;
789 // and make sure we only read one extra extent next time around
790 if (read_ahead > alloc.m_extent_size)
791 read_ahead = alloc.m_extent_size;
793 else
795 jam();
796 read_ahead = 0; // no more to read ahead after this
798 // do read ahead pages for this extent
799 while (page_no < page_no_limit)
801 // page request to PGMAN
802 jam();
803 preq.m_page.m_page_no = page_no;
804 int flags = 0;
805 // ignore result
806 m_pgman.get_page(signal, preq, flags);
807 jamEntry();
808 page_no++;
810 if (!read_ahead || !list.next(ext_ptr))
812 // no more extents after this or read ahead done
813 jam();
814 break;
816 // move to next extent and initialize PGMAN request accordingly
817 Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
818 preq.m_page.m_file_no = ext->m_key.m_file_no;
819 preq.m_page.m_page_no = ext->m_first_page_no;
821 } // if ScanOp::SCAN_DD read ahead
823 /*FALLTHRU*/
824 case ScanPos::Get_page_dd:
825 // get global page in PGMAN cache
826 jam();
828 // check if page is un-allocated or empty
829 if (likely(! (bits & ScanOp::SCAN_NR)))
831 Tablespace_client tsman(signal, c_tsman,
832 frag.fragTableId,
833 frag.fragmentId,
834 frag.m_tablespace_id);
835 unsigned uncommitted, committed;
836 uncommitted = committed = ~(unsigned)0;
837 int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
838 ndbrequire(ret == 0);
839 if (committed == 0 && uncommitted == 0) {
840 // skip empty page
841 jam();
842 pos.m_get = ScanPos::Get_next_page_dd;
843 break; // incr loop count
846 // page request to PGMAN
847 Page_cache_client::Request preq;
848 preq.m_page = pos.m_key;
849 preq.m_callback.m_callbackData = scanPtr.i;
850 preq.m_callback.m_callbackFunction =
851 safe_cast(&Dbtup::disk_page_tup_scan_callback);
852 int flags = 0;
853 int res = m_pgman.get_page(signal, preq, flags);
854 jamEntry();
855 if (res == 0) {
856 jam();
857 // request queued
858 pos.m_get = ScanPos::Get_tuple;
859 return false;
861 ndbrequire(res > 0);
862 pos.m_page = (Page*)m_pgman.m_ptr.p;
864 pos.m_get = ScanPos::Get_tuple;
865 continue;
866 // get tuple
867 // move to next tuple
868 case ScanPos::Get_next_tuple:
869 case ScanPos::Get_next_tuple_fs:
870 // move to next fixed size tuple
871 jam();
873 key.m_page_idx += size;
874 pos.m_get = ScanPos::Get_tuple_fs;
876 /*FALLTHRU*/
877 case ScanPos::Get_tuple:
878 case ScanPos::Get_tuple_fs:
879 // get fixed size tuple
880 jam();
882 Fix_page* page = (Fix_page*)pos.m_page;
883 if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
885 pos.m_get = ScanPos::Get_next_tuple_fs;
886 th = (Tuple_header*)&page->m_data[key.m_page_idx];
888 if (likely(! (bits & ScanOp::SCAN_NR)))
890 jam();
891 thbits = th->m_header_bits;
892 if (! (thbits & Tuple_header::FREE))
894 goto found_tuple;
897 else
899 if (pos.m_realpid_mm == RNIL)
901 jam();
902 foundGCI = 0;
903 goto found_deleted_rowid;
905 thbits = th->m_header_bits;
906 if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
907 foundGCI == 0)
909 if (! (thbits & Tuple_header::FREE))
911 jam();
912 goto found_tuple;
914 else
916 goto found_deleted_rowid;
919 else if (thbits != Fix_page::FREE_RECORD &&
920 th->m_operation_ptr_i != RNIL)
922 jam();
923 goto found_tuple; // Locked tuple...
924 // skip free tuple
927 } else {
928 jam();
929 // no more tuples on this page
930 pos.m_get = ScanPos::Get_next_page;
933 break; // incr loop count
934 found_tuple:
935 // found possible tuple to return
936 jam();
938 // caller has already set pos.m_get to next tuple
939 if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
940 Local_key& key_mm = pos.m_key_mm;
941 if (! (bits & ScanOp::SCAN_DD)) {
942 key_mm = pos.m_key;
943 // real page id is already set
944 } else {
945 key_mm.assref(th->m_base_record_ref);
946 // recompute for each disk tuple
947 pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
949 // TUPKEYREQ handles savepoint stuff
950 scan.m_state = ScanOp::Current;
951 return true;
952 } else {
953 jam();
954 // clear it so that it will show up in next LCP
955 th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
956 if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
957 jam();
958 setChecksum(th, tablePtr.p);
962 break;
963 found_deleted_rowid:
964 jam();
966 ndbassert(bits & ScanOp::SCAN_NR);
967 Local_key& key_mm = pos.m_key_mm;
968 if (! (bits & ScanOp::SCAN_DD)) {
969 key_mm = pos.m_key;
970 // caller has already set pos.m_get to next tuple
971 // real page id is already set
972 } else {
973 key_mm.assref(th->m_base_record_ref);
974 // recompute for each disk tuple
975 pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
977 Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
978 th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
979 if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
980 foundGCI == 0)
982 if (! (thbits & Tuple_header::FREE))
983 break;
987 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
988 conf->scanPtr = scan.m_userPtr;
989 conf->accOperationPtr = RNIL;
990 conf->fragId = frag.fragmentId;
991 conf->localKey[0] = pos.m_key_mm.ref();
992 conf->localKey[1] = 0;
993 conf->localKeyLength = 1;
994 conf->gci = foundGCI;
995 Uint32 blockNo = refToBlock(scan.m_userRef);
996 EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
997 jamEntry();
999 // TUPKEYREQ handles savepoint stuff
1000 loop_count = 32;
1001 scan.m_state = ScanOp::Next;
1002 return false;
1004 break; // incr loop count
1005 default:
1006 ndbrequire(false);
1007 break;
1009 if (++loop_count >= 32)
1010 break;
1012 // TODO: at drop table we have to flush and terminate these
1013 jam();
1014 signal->theData[0] = ZTUP_SCAN;
1015 signal->theData[1] = scanPtr.i;
1016 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
1017 return false;
1019 found_lcp_keep:
1020 Local_key tmp;
1021 tmp.assref(lcp_list);
1022 tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
1024 Ptr<Page> pagePtr;
1025 c_page_pool.getPtr(pagePtr, tmp.m_page_no);
1026 Tuple_header* ptr = (Tuple_header*)
1027 ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
1028 Uint32 headerbits = ptr->m_header_bits;
1029 ndbrequire(headerbits & Tuple_header::LCP_KEEP);
1031 Uint32 next = ptr->m_operation_ptr_i;
1032 ptr->m_operation_ptr_i = RNIL;
1033 ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
1035 if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
1036 jam();
1037 setChecksum(ptr, tablePtr.p);
1040 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1041 conf->scanPtr = scan.m_userPtr;
1042 conf->accOperationPtr = (Uint32)-1;
1043 conf->fragId = frag.fragmentId;
1044 conf->localKey[0] = lcp_list;
1045 conf->localKey[1] = 0;
1046 conf->localKeyLength = 1;
1047 conf->gci = 0;
1048 Uint32 blockNo = refToBlock(scan.m_userRef);
1049 EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
1051 fragPtr.p->m_lcp_keep_list = next;
1052 ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
1053 if (headerbits & Tuple_header::FREED)
1055 if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
1057 jam();
1058 free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
1059 } else {
1060 jam();
1061 free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
1064 return false;
1067 void
1068 Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
1070 bool immediate = scanNext(signal, scanPtr);
1071 if (! immediate) {
1072 jam();
1073 // time-slicing again
1074 return;
1076 scanReply(signal, scanPtr);
1079 void
1080 Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
1082 ScanOpPtr scanPtr;
1083 c_scanOpPool.getPtr(scanPtr, scanPtrI);
1084 ScanOp& scan = *scanPtr.p;
1085 ScanPos& pos = scan.m_scanPos;
1086 // get cache page
1087 Ptr<GlobalPage> gptr;
1088 m_global_page_pool.getPtr(gptr, page_i);
1089 pos.m_page = (Page*)gptr.p;
1090 // continue
1091 scanCont(signal, scanPtr);
1094 void
1095 Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
1097 ScanOp& scan = *scanPtr.p;
1098 ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
1099 // unlock all not unlocked by LQH
1100 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1101 ScanLockPtr lockPtr;
1102 while (list.first(lockPtr)) {
1103 jam();
1104 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1105 lockReq->returnCode = RNIL;
1106 lockReq->requestInfo = AccLockReq::Abort;
1107 lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1108 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1109 jamEntry();
1110 ndbrequire(lockReq->returnCode == AccLockReq::Success);
1111 list.release(lockPtr);
1113 // send conf
1114 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1115 conf->scanPtr = scanPtr.p->m_userPtr;
1116 conf->accOperationPtr = RNIL;
1117 conf->fragId = RNIL;
1118 unsigned signalLength = 3;
1119 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1120 signal, signalLength, JBB);
1121 releaseScanOp(scanPtr);
1124 void
1125 Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
1127 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1128 ScanLockPtr lockPtr;
1129 #ifdef VM_TRACE
1130 list.first(lockPtr);
1131 while (lockPtr.i != RNIL) {
1132 ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1133 list.next(lockPtr);
1135 #endif
1136 bool ok = list.seize(lockPtr);
1137 ndbrequire(ok);
1138 lockPtr.p->m_accLockOp = accLockOp;
1141 void
1142 Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
1144 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1145 ScanLockPtr lockPtr;
1146 list.first(lockPtr);
1147 while (lockPtr.i != RNIL) {
1148 if (lockPtr.p->m_accLockOp == accLockOp) {
1149 jam();
1150 break;
1152 list.next(lockPtr);
1154 ndbrequire(lockPtr.i != RNIL);
1155 list.release(lockPtr);
1158 void
1159 Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
1161 FragrecordPtr fragPtr;
1162 fragPtr.i = scanPtr.p->m_fragPtrI;
1163 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1165 if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
1167 jam();
1168 fragPtr.p->m_lcp_scan_op = RNIL;
1169 scanPtr.p->m_fragPtrI = RNIL;
1171 else
1173 jam();
1174 LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
1175 list.release(scanPtr);
1179 void
1180 Dbtup::execLCP_FRAG_ORD(Signal* signal)
1182 LcpFragOrd* req= (LcpFragOrd*)signal->getDataPtr();
1184 TablerecPtr tablePtr;
1185 tablePtr.i = req->tableId;
1186 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1188 if (tablePtr.p->m_no_of_disk_attributes)
1190 jam();
1191 FragrecordPtr fragPtr;
1192 Uint32 fragId = req->fragmentId;
1193 fragPtr.i = RNIL;
1194 getFragmentrec(fragPtr, fragId, tablePtr.p);
1195 ndbrequire(fragPtr.i != RNIL);
1196 Fragrecord& frag = *fragPtr.p;
1198 ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
1199 frag.m_lcp_scan_op = c_lcp_scan_op;
1200 ScanOpPtr scanPtr;
1201 c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
1202 ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
1203 scanPtr.p->m_fragPtrI = fragPtr.i;
1205 scanFirst(signal, scanPtr);
1206 scanPtr.p->m_state = ScanOp::First;