1 /* Copyright (c) 2003, 2005-2008 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
17 #define DBTUP_SCAN_CPP
19 #include <signaldata/AccScan.hpp>
20 #include <signaldata/NextScan.hpp>
21 #include <signaldata/AccLock.hpp>
22 #include <md5_hash.hpp>
26 #define jam() { jamLine(32000 + __LINE__); }
27 #define jamEntry() { jamEntryLine(32000 + __LINE__); }
30 #define dbg(x) globalSignalLoggers.log x
36 Dbtup::execACC_SCANREQ(Signal
* signal
)
39 const AccScanReq reqCopy
= *(const AccScanReq
*)signal
->getDataPtr();
40 const AccScanReq
* const req
= &reqCopy
;
44 // find table and fragment
46 tablePtr
.i
= req
->tableId
;
47 ptrCheckGuard(tablePtr
, cnoOfTablerec
, tablerec
);
48 FragrecordPtr fragPtr
;
49 Uint32 fragId
= req
->fragmentNo
;
51 getFragmentrec(fragPtr
, fragId
, tablePtr
.p
);
52 ndbrequire(fragPtr
.i
!= RNIL
);
53 Fragrecord
& frag
= *fragPtr
.p
;
58 if (AccScanReq::getLcpScanFlag(req
->requestInfo
))
61 bits
|= ScanOp::SCAN_LCP
;
62 c_scanOpPool
.getPtr(scanPtr
, c_lcp_scan_op
);
66 // seize from pool and link to per-fragment list
67 LocalDLList
<ScanOp
> list(c_scanOpPool
, frag
.m_scanList
);
68 if (! list
.seize(scanPtr
)) {
74 if (!AccScanReq::getNoDiskScanFlag(req
->requestInfo
)
75 && tablePtr
.p
->m_no_of_disk_attributes
)
77 bits
|= ScanOp::SCAN_DD
;
80 bool mm
= (bits
& ScanOp::SCAN_DD
);
81 if (tablePtr
.p
->m_attributes
[mm
].m_no_of_varsize
> 0) {
82 bits
|= ScanOp::SCAN_VS
;
84 // disk pages have fixed page format
85 ndbrequire(! (bits
& ScanOp::SCAN_DD
));
87 if (! AccScanReq::getReadCommittedFlag(req
->requestInfo
)) {
88 if (AccScanReq::getLockMode(req
->requestInfo
) == 0)
89 bits
|= ScanOp::SCAN_LOCK_SH
;
91 bits
|= ScanOp::SCAN_LOCK_EX
;
94 if (AccScanReq::getNRScanFlag(req
->requestInfo
))
97 bits
|= ScanOp::SCAN_NR
;
98 scanPtr
.p
->m_endPage
= req
->maxPage
;
99 if (req
->maxPage
!= RNIL
&& req
->maxPage
> frag
.noOfPages
)
101 ndbout_c("%u %u endPage: %u (noOfPages: %u)",
103 req
->maxPage
, fragPtr
.p
->noOfPages
);
109 scanPtr
.p
->m_endPage
= RNIL
;
112 if (AccScanReq::getLcpScanFlag(req
->requestInfo
))
115 ndbrequire((bits
& ScanOp::SCAN_DD
) == 0);
116 ndbrequire((bits
& ScanOp::SCAN_LOCK
) == 0);
120 new (scanPtr
.p
) ScanOp();
121 ScanOp
& scan
= *scanPtr
.p
;
122 scan
.m_state
= ScanOp::First
;
124 scan
.m_userPtr
= req
->senderData
;
125 scan
.m_userRef
= req
->senderRef
;
126 scan
.m_tableId
= tablePtr
.i
;
127 scan
.m_fragId
= frag
.fragmentId
;
128 scan
.m_fragPtrI
= fragPtr
.i
;
129 scan
.m_transId1
= req
->transId1
;
130 scan
.m_transId2
= req
->transId2
;
131 scan
.m_savePointId
= req
->savePointId
;
134 AccScanConf
* const conf
= (AccScanConf
*)signal
->getDataPtrSend();
135 conf
->scanPtr
= req
->senderData
;
136 conf
->accPtr
= scanPtr
.i
;
137 conf
->flag
= AccScanConf::ZNOT_EMPTY_FRAGMENT
;
138 sendSignal(req
->senderRef
, GSN_ACC_SCANCONF
,
139 signal
, AccScanConf::SignalLength
, JBB
);
142 if (scanPtr
.i
!= RNIL
) {
144 releaseScanOp(scanPtr
);
146 // LQH does not handle REF
147 signal
->theData
[0] = 0x313;
148 sendSignal(req
->senderRef
, GSN_ACC_SCANREF
, signal
, 1, JBB
);
152 Dbtup::execNEXT_SCANREQ(Signal
* signal
)
155 const NextScanReq reqCopy
= *(const NextScanReq
*)signal
->getDataPtr();
156 const NextScanReq
* const req
= &reqCopy
;
158 c_scanOpPool
.getPtr(scanPtr
, req
->accPtr
);
159 ScanOp
& scan
= *scanPtr
.p
;
160 switch (req
->scanFlag
) {
161 case NextScanReq::ZSCAN_NEXT
:
164 case NextScanReq::ZSCAN_NEXT_COMMIT
:
166 case NextScanReq::ZSCAN_COMMIT
:
168 if ((scan
.m_bits
& ScanOp::SCAN_LOCK
) != 0) {
170 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
171 lockReq
->returnCode
= RNIL
;
172 lockReq
->requestInfo
= AccLockReq::Unlock
;
173 lockReq
->accOpPtr
= req
->accOperationPtr
;
174 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
,
175 signal
, AccLockReq::UndoSignalLength
);
177 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
178 removeAccLockOp(scan
, req
->accOperationPtr
);
180 if (req
->scanFlag
== NextScanReq::ZSCAN_COMMIT
) {
181 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
182 conf
->scanPtr
= scan
.m_userPtr
;
183 unsigned signalLength
= 1;
184 sendSignal(scanPtr
.p
->m_userRef
, GSN_NEXT_SCANCONF
,
185 signal
, signalLength
, JBB
);
189 case NextScanReq::ZSCAN_CLOSE
:
191 if (scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
) {
193 ndbrequire(scan
.m_accLockOp
!= RNIL
);
194 // use ACC_ABORTCONF to flush out any reply in job buffer
195 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
196 lockReq
->returnCode
= RNIL
;
197 lockReq
->requestInfo
= AccLockReq::AbortWithConf
;
198 lockReq
->accOpPtr
= scan
.m_accLockOp
;
199 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
,
200 signal
, AccLockReq::UndoSignalLength
);
202 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
203 scan
.m_state
= ScanOp::Aborting
;
206 if (scan
.m_state
== ScanOp::Locked
) {
208 ndbrequire(scan
.m_accLockOp
!= RNIL
);
209 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
210 lockReq
->returnCode
= RNIL
;
211 lockReq
->requestInfo
= AccLockReq::Abort
;
212 lockReq
->accOpPtr
= scan
.m_accLockOp
;
213 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
,
214 signal
, AccLockReq::UndoSignalLength
);
216 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
217 scan
.m_accLockOp
= RNIL
;
219 scan
.m_state
= ScanOp::Aborting
;
220 scanClose(signal
, scanPtr
);
222 case NextScanReq::ZSCAN_NEXT_ABORT
:
229 // start looking for next scan result
230 AccCheckScan
* checkReq
= (AccCheckScan
*)signal
->getDataPtrSend();
231 checkReq
->accPtr
= scanPtr
.i
;
232 checkReq
->checkLcpStop
= AccCheckScan::ZNOT_CHECK_LCP_STOP
;
233 EXECUTE_DIRECT(DBTUP
, GSN_ACC_CHECK_SCAN
, signal
, AccCheckScan::SignalLength
);
238 Dbtup::execACC_CHECK_SCAN(Signal
* signal
)
241 const AccCheckScan reqCopy
= *(const AccCheckScan
*)signal
->getDataPtr();
242 const AccCheckScan
* const req
= &reqCopy
;
244 c_scanOpPool
.getPtr(scanPtr
, req
->accPtr
);
245 ScanOp
& scan
= *scanPtr
.p
;
247 FragrecordPtr fragPtr
;
248 fragPtr
.i
= scan
.m_fragPtrI
;
249 ptrCheckGuard(fragPtr
, cnoOfFragrec
, fragrecord
);
250 Fragrecord
& frag
= *fragPtr
.p
;
251 if (req
->checkLcpStop
== AccCheckScan::ZCHECK_LCP_STOP
) {
253 signal
->theData
[0] = scan
.m_userPtr
;
254 signal
->theData
[1] = true;
255 EXECUTE_DIRECT(DBLQH
, GSN_CHECK_LCP_STOP
, signal
, 2);
259 if (scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
) {
261 // LQH asks if we are waiting for lock and we tell it to ask again
262 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
263 conf
->scanPtr
= scan
.m_userPtr
;
264 conf
->accOperationPtr
= RNIL
; // no tuple returned
265 conf
->fragId
= frag
.fragmentId
;
266 unsigned signalLength
= 3;
267 // if TC has ordered scan close, it will be detected here
268 sendSignal(scan
.m_userRef
, GSN_NEXT_SCANCONF
,
269 signal
, signalLength
, JBB
);
272 if (scan
.m_state
== ScanOp::First
) {
274 scanFirst(signal
, scanPtr
);
276 if (scan
.m_state
== ScanOp::Next
) {
278 bool immediate
= scanNext(signal
, scanPtr
);
281 // time-slicing via TUP or PGMAN
285 scanReply(signal
, scanPtr
);
289 Dbtup::scanReply(Signal
* signal
, ScanOpPtr scanPtr
)
291 ScanOp
& scan
= *scanPtr
.p
;
292 FragrecordPtr fragPtr
;
293 fragPtr
.i
= scan
.m_fragPtrI
;
294 ptrCheckGuard(fragPtr
, cnoOfFragrec
, fragrecord
);
295 Fragrecord
& frag
= *fragPtr
.p
;
296 // for reading tuple key in Current state
297 Uint32
* pkData
= (Uint32
*)c_dataBuffer
;
299 if (scan
.m_state
== ScanOp::Current
) {
300 // found an entry to return
302 ndbrequire(scan
.m_accLockOp
== RNIL
);
303 if (scan
.m_bits
& ScanOp::SCAN_LOCK
) {
305 // read tuple key - use TUX routine
306 const ScanPos
& pos
= scan
.m_scanPos
;
307 const Local_key
& key_mm
= pos
.m_key_mm
;
308 int ret
= tuxReadPk(fragPtr
.i
, pos
.m_realpid_mm
, key_mm
.m_page_idx
,
312 dbg((DBTUP
, "PK size=%d data=%08x", pkSize
, pkData
[0]));
313 // get read lock or exclusive lock
314 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
315 lockReq
->returnCode
= RNIL
;
316 lockReq
->requestInfo
= (scan
.m_bits
& ScanOp::SCAN_LOCK_SH
) ?
317 AccLockReq::LockShared
: AccLockReq::LockExclusive
;
318 lockReq
->accOpPtr
= RNIL
;
319 lockReq
->userPtr
= scanPtr
.i
;
320 lockReq
->userRef
= reference();
321 lockReq
->tableId
= scan
.m_tableId
;
322 lockReq
->fragId
= frag
.fragmentId
;
323 lockReq
->fragPtrI
= RNIL
; // no cached frag ptr yet
324 lockReq
->hashValue
= md5_hash((Uint64
*)pkData
, pkSize
);
325 lockReq
->tupAddr
= key_mm
.ref();
326 lockReq
->transId1
= scan
.m_transId1
;
327 lockReq
->transId2
= scan
.m_transId2
;
328 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
,
329 signal
, AccLockReq::LockSignalLength
);
331 switch (lockReq
->returnCode
) {
332 case AccLockReq::Success
:
334 scan
.m_state
= ScanOp::Locked
;
335 scan
.m_accLockOp
= lockReq
->accOpPtr
;
337 case AccLockReq::IsBlocked
:
340 scan
.m_state
= ScanOp::Blocked
;
341 scan
.m_bits
|= ScanOp::SCAN_LOCK_WAIT
;
342 scan
.m_accLockOp
= lockReq
->accOpPtr
;
343 // LQH will wake us up
344 signal
->theData
[0] = scan
.m_userPtr
;
345 signal
->theData
[1] = true;
346 EXECUTE_DIRECT(DBLQH
, GSN_CHECK_LCP_STOP
, signal
, 2);
350 case AccLockReq::Refused
:
352 // we cannot see deleted tuple (assert only)
355 scan
.m_state
= ScanOp::Next
;
356 signal
->theData
[0] = scan
.m_userPtr
;
357 signal
->theData
[1] = true;
358 EXECUTE_DIRECT(DBLQH
, GSN_CHECK_LCP_STOP
, signal
, 2);
362 case AccLockReq::NoFreeOp
:
364 // max ops should depend on max scans (assert only)
366 // stay in Current state
367 scan
.m_state
= ScanOp::Current
;
368 signal
->theData
[0] = scan
.m_userPtr
;
369 signal
->theData
[1] = true;
370 EXECUTE_DIRECT(DBLQH
, GSN_CHECK_LCP_STOP
, signal
, 2);
379 scan
.m_state
= ScanOp::Locked
;
383 if (scan
.m_state
== ScanOp::Locked
) {
384 // we have lock or do not need one
387 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
388 conf
->scanPtr
= scan
.m_userPtr
;
389 // the lock is passed to LQH
390 Uint32 accLockOp
= scan
.m_accLockOp
;
391 if (accLockOp
!= RNIL
) {
392 scan
.m_accLockOp
= RNIL
;
393 // remember it until LQH unlocks it
394 addAccLockOp(scan
, accLockOp
);
396 ndbrequire(! (scan
.m_bits
& ScanOp::SCAN_LOCK
));
397 // operation RNIL in LQH would signal no tuple returned
398 accLockOp
= (Uint32
)-1;
400 const ScanPos
& pos
= scan
.m_scanPos
;
401 conf
->accOperationPtr
= accLockOp
;
402 conf
->fragId
= frag
.fragmentId
;
403 conf
->localKey
[0] = pos
.m_key_mm
.ref();
404 conf
->localKey
[1] = 0;
405 conf
->localKeyLength
= 1;
406 unsigned signalLength
= 6;
407 if (scan
.m_bits
& ScanOp::SCAN_LOCK
) {
408 sendSignal(scan
.m_userRef
, GSN_NEXT_SCANCONF
,
409 signal
, signalLength
, JBB
);
411 Uint32 blockNo
= refToBlock(scan
.m_userRef
);
412 EXECUTE_DIRECT(blockNo
, GSN_NEXT_SCANCONF
, signal
, signalLength
);
415 // next time look for next entry
416 scan
.m_state
= ScanOp::Next
;
419 if (scan
.m_state
== ScanOp::Last
||
420 scan
.m_state
== ScanOp::Invalid
) {
422 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
423 conf
->scanPtr
= scan
.m_userPtr
;
424 conf
->accOperationPtr
= RNIL
;
426 unsigned signalLength
= 3;
427 sendSignal(scanPtr
.p
->m_userRef
, GSN_NEXT_SCANCONF
,
428 signal
, signalLength
, JBB
);
435 * Lock succeeded (after delay) in ACC. If the lock is for current
436 * entry, set state to Locked. If the lock is for an entry we were
437 * moved away from, simply unlock it. Finally, if we are closing the
438 * scan, do nothing since we have already sent an abort request.
441 Dbtup::execACCKEYCONF(Signal
* signal
)
445 scanPtr
.i
= signal
->theData
[0];
446 c_scanOpPool
.getPtr(scanPtr
);
447 ScanOp
& scan
= *scanPtr
.p
;
448 ndbrequire(scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
&& scan
.m_accLockOp
!= RNIL
);
449 scan
.m_bits
&= ~ ScanOp::SCAN_LOCK_WAIT
;
450 if (scan
.m_state
== ScanOp::Blocked
) {
451 // the lock wait was for current entry
453 scan
.m_state
= ScanOp::Locked
;
457 if (scan
.m_state
!= ScanOp::Aborting
) {
458 // we were moved, release lock
460 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
461 lockReq
->returnCode
= RNIL
;
462 lockReq
->requestInfo
= AccLockReq::Abort
;
463 lockReq
->accOpPtr
= scan
.m_accLockOp
;
464 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
, signal
, AccLockReq::UndoSignalLength
);
466 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
467 scan
.m_accLockOp
= RNIL
;
472 scan
.m_accLockOp
= RNIL
;
473 // continue at ACC_ABORTCONF
477 * Lock failed (after delay) in ACC. Probably means somebody ahead of
478 * us in lock queue deleted the tuple.
481 Dbtup::execACCKEYREF(Signal
* signal
)
485 scanPtr
.i
= signal
->theData
[0];
486 c_scanOpPool
.getPtr(scanPtr
);
487 ScanOp
& scan
= *scanPtr
.p
;
488 ndbrequire(scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
&& scan
.m_accLockOp
!= RNIL
);
489 scan
.m_bits
&= ~ ScanOp::SCAN_LOCK_WAIT
;
490 if (scan
.m_state
!= ScanOp::Aborting
) {
492 // release the operation
493 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
494 lockReq
->returnCode
= RNIL
;
495 lockReq
->requestInfo
= AccLockReq::Abort
;
496 lockReq
->accOpPtr
= scan
.m_accLockOp
;
497 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
, signal
, AccLockReq::UndoSignalLength
);
499 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
500 scan
.m_accLockOp
= RNIL
;
501 // scan position should already have been moved (assert only)
502 if (scan
.m_state
== ScanOp::Blocked
) {
505 if (scan
.m_bits
& ScanOp::SCAN_NR
)
508 scan
.m_state
= ScanOp::Next
;
509 scan
.m_scanPos
.m_get
= ScanPos::Get_tuple
;
510 ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
515 scan
.m_state
= ScanOp::Next
;
516 ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
523 scan
.m_accLockOp
= RNIL
;
524 // continue at ACC_ABORTCONF
528 * Received when scan is closing. This signal arrives after any
529 * ACCKEYCON or ACCKEYREF which may have been in job buffer.
532 Dbtup::execACC_ABORTCONF(Signal
* signal
)
536 scanPtr
.i
= signal
->theData
[0];
537 c_scanOpPool
.getPtr(scanPtr
);
538 ScanOp
& scan
= *scanPtr
.p
;
539 ndbrequire(scan
.m_state
== ScanOp::Aborting
);
540 // most likely we are still in lock wait
541 if (scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
) {
543 scan
.m_bits
&= ~ ScanOp::SCAN_LOCK_WAIT
;
544 scan
.m_accLockOp
= RNIL
;
546 scanClose(signal
, scanPtr
);
550 Dbtup::scanFirst(Signal
*, ScanOpPtr scanPtr
)
552 ScanOp
& scan
= *scanPtr
.p
;
553 ScanPos
& pos
= scan
.m_scanPos
;
554 Local_key
& key
= pos
.m_key
;
555 const Uint32 bits
= scan
.m_bits
;
557 FragrecordPtr fragPtr
;
558 fragPtr
.i
= scan
.m_fragPtrI
;
559 ptrCheckGuard(fragPtr
, cnoOfFragrec
, fragrecord
);
560 Fragrecord
& frag
= *fragPtr
.p
;
561 // in the future should not pre-allocate pages
562 if (frag
.noOfPages
== 0 && ((bits
& ScanOp::SCAN_NR
) == 0)) {
564 scan
.m_state
= ScanOp::Last
;
567 if (! (bits
& ScanOp::SCAN_DD
)) {
568 key
.m_file_no
= ZNIL
;
570 pos
.m_get
= ScanPos::Get_page_mm
;
571 // for MM scan real page id is cached for efficiency
572 pos
.m_realpid_mm
= RNIL
;
574 Disk_alloc_info
& alloc
= frag
.m_disk_alloc_info
;
575 // for now must check disk part explicitly
576 if (alloc
.m_extent_list
.firstItem
== RNIL
) {
578 scan
.m_state
= ScanOp::Last
;
581 pos
.m_extent_info_ptr_i
= alloc
.m_extent_list
.firstItem
;
582 Extent_info
* ext
= c_extent_pool
.getPtr(pos
.m_extent_info_ptr_i
);
583 key
.m_file_no
= ext
->m_key
.m_file_no
;
584 key
.m_page_no
= ext
->m_first_page_no
;
585 pos
.m_get
= ScanPos::Get_page_dd
;
588 // let scanNext() do the work
589 scan
.m_state
= ScanOp::Next
;
593 Dbtup::scanNext(Signal
* signal
, ScanOpPtr scanPtr
)
595 ScanOp
& scan
= *scanPtr
.p
;
596 ScanPos
& pos
= scan
.m_scanPos
;
597 Local_key
& key
= pos
.m_key
;
598 const Uint32 bits
= scan
.m_bits
;
600 TablerecPtr tablePtr
;
601 tablePtr
.i
= scan
.m_tableId
;
602 ptrCheckGuard(tablePtr
, cnoOfTablerec
, tablerec
);
603 Tablerec
& table
= *tablePtr
.p
;
605 FragrecordPtr fragPtr
;
606 fragPtr
.i
= scan
.m_fragPtrI
;
607 ptrCheckGuard(fragPtr
, cnoOfFragrec
, fragrecord
);
608 Fragrecord
& frag
= *fragPtr
.p
;
610 Tuple_header
* th
= 0;
612 Uint32 loop_count
= 0;
613 Uint32 scanGCI
= scanPtr
.p
->m_scanGCI
;
616 const bool mm
= (bits
& ScanOp::SCAN_DD
);
617 const bool lcp
= (bits
& ScanOp::SCAN_LCP
);
619 Uint32 lcp_list
= fragPtr
.p
->m_lcp_keep_list
;
620 Uint32 size
= table
.m_offsets
[mm
].m_fix_header_size
;
622 if (lcp
&& lcp_list
!= RNIL
)
626 case ScanPos::Get_next_tuple
:
627 case ScanPos::Get_next_tuple_fs
:
629 key
.m_page_idx
+= size
;
631 case ScanPos::Get_tuple
:
632 case ScanPos::Get_tuple_fs
:
635 * We need to refetch page after timeslice
637 pos
.m_get
= ScanPos::Get_page
;
645 case ScanPos::Get_next_page
:
649 if (! (bits
& ScanOp::SCAN_DD
))
650 pos
.m_get
= ScanPos::Get_next_page_mm
;
652 pos
.m_get
= ScanPos::Get_next_page_dd
;
655 case ScanPos::Get_page
:
659 if (! (bits
& ScanOp::SCAN_DD
))
660 pos
.m_get
= ScanPos::Get_page_mm
;
662 pos
.m_get
= ScanPos::Get_page_dd
;
665 case ScanPos::Get_next_page_mm
:
666 // move to next logical TUP page
670 if (key
.m_page_no
>= frag
.noOfPages
) {
673 if ((bits
& ScanOp::SCAN_NR
) && (scan
.m_endPage
!= RNIL
))
676 if (key
.m_page_no
< scan
.m_endPage
)
679 ndbout_c("scanning page %u", key
.m_page_no
);
683 // no more pages, scan ends
684 pos
.m_get
= ScanPos::Get_undef
;
685 scan
.m_state
= ScanOp::Last
;
690 pos
.m_get
= ScanPos::Get_page_mm
;
691 // clear cached value
692 pos
.m_realpid_mm
= RNIL
;
695 case ScanPos::Get_page_mm
:
699 if (pos
.m_realpid_mm
== RNIL
) {
701 if (key
.m_page_no
< frag
.noOfPages
)
702 pos
.m_realpid_mm
= getRealpid(fragPtr
.p
, key
.m_page_no
);
705 ndbassert(bits
& ScanOp::SCAN_NR
);
710 c_page_pool
.getPtr(pagePtr
, pos
.m_realpid_mm
);
712 if (pagePtr
.p
->page_state
== ZEMPTY_MM
) {
715 if (! (bits
& ScanOp::SCAN_NR
))
717 pos
.m_get
= ScanPos::Get_next_page_mm
;
718 break; // incr loop count
723 pos
.m_realpid_mm
= RNIL
;
727 pos
.m_page
= pagePtr
.p
;
728 pos
.m_get
= ScanPos::Get_tuple
;
731 case ScanPos::Get_next_page_dd
:
732 // move to next disk page
735 Disk_alloc_info
& alloc
= frag
.m_disk_alloc_info
;
736 Local_fragment_extent_list
list(c_extent_pool
, alloc
.m_extent_list
);
737 Ptr
<Extent_info
> ext_ptr
;
738 c_extent_pool
.getPtr(ext_ptr
, pos
.m_extent_info_ptr_i
);
739 Extent_info
* ext
= ext_ptr
.p
;
741 if (key
.m_page_no
>= ext
->m_first_page_no
+ alloc
.m_extent_size
) {
742 // no more pages in this extent
744 if (! list
.next(ext_ptr
)) {
745 // no more extents, scan ends
747 pos
.m_get
= ScanPos::Get_undef
;
748 scan
.m_state
= ScanOp::Last
;
751 // move to next extent
753 pos
.m_extent_info_ptr_i
= ext_ptr
.i
;
754 ext
= c_extent_pool
.getPtr(pos
.m_extent_info_ptr_i
);
755 key
.m_file_no
= ext
->m_key
.m_file_no
;
756 key
.m_page_no
= ext
->m_first_page_no
;
760 pos
.m_get
= ScanPos::Get_page_dd
;
762 read ahead for scan in disk order
763 do read ahead every 8:th page
765 if ((bits
& ScanOp::SCAN_DD
) &&
766 (((key
.m_page_no
- ext
->m_first_page_no
) & 7) == 0))
769 // initialize PGMAN request
770 Page_cache_client::Request preq
;
771 preq
.m_page
= pos
.m_key
;
772 preq
.m_callback
= TheNULLCallback
;
774 // set maximum read ahead
775 Uint32 read_ahead
= m_max_page_read_ahead
;
779 // prepare page read ahead in current extent
780 Uint32 page_no
= preq
.m_page
.m_page_no
;
781 Uint32 page_no_limit
= page_no
+ read_ahead
;
782 Uint32 limit
= ext
->m_first_page_no
+ alloc
.m_extent_size
;
783 if (page_no_limit
> limit
)
786 // read ahead crosses extent, set limit for this extent
787 read_ahead
= page_no_limit
- limit
;
788 page_no_limit
= limit
;
789 // and make sure we only read one extra extent next time around
790 if (read_ahead
> alloc
.m_extent_size
)
791 read_ahead
= alloc
.m_extent_size
;
796 read_ahead
= 0; // no more to read ahead after this
798 // do read ahead pages for this extent
799 while (page_no
< page_no_limit
)
801 // page request to PGMAN
803 preq
.m_page
.m_page_no
= page_no
;
806 m_pgman
.get_page(signal
, preq
, flags
);
810 if (!read_ahead
|| !list
.next(ext_ptr
))
812 // no more extents after this or read ahead done
816 // move to next extent and initialize PGMAN request accordingly
817 Extent_info
* ext
= c_extent_pool
.getPtr(ext_ptr
.i
);
818 preq
.m_page
.m_file_no
= ext
->m_key
.m_file_no
;
819 preq
.m_page
.m_page_no
= ext
->m_first_page_no
;
821 } // if ScanOp::SCAN_DD read ahead
824 case ScanPos::Get_page_dd
:
825 // get global page in PGMAN cache
828 // check if page is un-allocated or empty
829 if (likely(! (bits
& ScanOp::SCAN_NR
)))
831 Tablespace_client
tsman(signal
, c_tsman
,
834 frag
.m_tablespace_id
);
835 unsigned uncommitted
, committed
;
836 uncommitted
= committed
= ~(unsigned)0;
837 int ret
= tsman
.get_page_free_bits(&key
, &uncommitted
, &committed
);
838 ndbrequire(ret
== 0);
839 if (committed
== 0 && uncommitted
== 0) {
842 pos
.m_get
= ScanPos::Get_next_page_dd
;
843 break; // incr loop count
846 // page request to PGMAN
847 Page_cache_client::Request preq
;
848 preq
.m_page
= pos
.m_key
;
849 preq
.m_callback
.m_callbackData
= scanPtr
.i
;
850 preq
.m_callback
.m_callbackFunction
=
851 safe_cast(&Dbtup::disk_page_tup_scan_callback
);
853 int res
= m_pgman
.get_page(signal
, preq
, flags
);
858 pos
.m_get
= ScanPos::Get_tuple
;
862 pos
.m_page
= (Page
*)m_pgman
.m_ptr
.p
;
864 pos
.m_get
= ScanPos::Get_tuple
;
867 // move to next tuple
868 case ScanPos::Get_next_tuple
:
869 case ScanPos::Get_next_tuple_fs
:
870 // move to next fixed size tuple
873 key
.m_page_idx
+= size
;
874 pos
.m_get
= ScanPos::Get_tuple_fs
;
877 case ScanPos::Get_tuple
:
878 case ScanPos::Get_tuple_fs
:
879 // get fixed size tuple
882 Fix_page
* page
= (Fix_page
*)pos
.m_page
;
883 if (key
.m_page_idx
+ size
<= Fix_page::DATA_WORDS
)
885 pos
.m_get
= ScanPos::Get_next_tuple_fs
;
886 th
= (Tuple_header
*)&page
->m_data
[key
.m_page_idx
];
888 if (likely(! (bits
& ScanOp::SCAN_NR
)))
891 thbits
= th
->m_header_bits
;
892 if (! (thbits
& Tuple_header::FREE
))
899 if (pos
.m_realpid_mm
== RNIL
)
903 goto found_deleted_rowid
;
905 thbits
= th
->m_header_bits
;
906 if ((foundGCI
= *th
->get_mm_gci(tablePtr
.p
)) > scanGCI
||
909 if (! (thbits
& Tuple_header::FREE
))
916 goto found_deleted_rowid
;
919 else if (thbits
!= Fix_page::FREE_RECORD
&&
920 th
->m_operation_ptr_i
!= RNIL
)
923 goto found_tuple
; // Locked tuple...
929 // no more tuples on this page
930 pos
.m_get
= ScanPos::Get_next_page
;
933 break; // incr loop count
935 // found possible tuple to return
938 // caller has already set pos.m_get to next tuple
939 if (! (bits
& ScanOp::SCAN_LCP
&& thbits
& Tuple_header::LCP_SKIP
)) {
940 Local_key
& key_mm
= pos
.m_key_mm
;
941 if (! (bits
& ScanOp::SCAN_DD
)) {
943 // real page id is already set
945 key_mm
.assref(th
->m_base_record_ref
);
946 // recompute for each disk tuple
947 pos
.m_realpid_mm
= getRealpid(fragPtr
.p
, key_mm
.m_page_no
);
949 // TUPKEYREQ handles savepoint stuff
950 scan
.m_state
= ScanOp::Current
;
954 // clear it so that it will show up in next LCP
955 th
->m_header_bits
= thbits
& ~(Uint32
)Tuple_header::LCP_SKIP
;
956 if (tablePtr
.p
->m_bits
& Tablerec::TR_Checksum
) {
958 setChecksum(th
, tablePtr
.p
);
966 ndbassert(bits
& ScanOp::SCAN_NR
);
967 Local_key
& key_mm
= pos
.m_key_mm
;
968 if (! (bits
& ScanOp::SCAN_DD
)) {
970 // caller has already set pos.m_get to next tuple
971 // real page id is already set
973 key_mm
.assref(th
->m_base_record_ref
);
974 // recompute for each disk tuple
975 pos
.m_realpid_mm
= getRealpid(fragPtr
.p
, key_mm
.m_page_no
);
977 Fix_page
*mmpage
= (Fix_page
*)c_page_pool
.getPtr(pos
.m_realpid_mm
);
978 th
= (Tuple_header
*)(mmpage
->m_data
+ key_mm
.m_page_idx
);
979 if ((foundGCI
= *th
->get_mm_gci(tablePtr
.p
)) > scanGCI
||
982 if (! (thbits
& Tuple_header::FREE
))
987 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
988 conf
->scanPtr
= scan
.m_userPtr
;
989 conf
->accOperationPtr
= RNIL
;
990 conf
->fragId
= frag
.fragmentId
;
991 conf
->localKey
[0] = pos
.m_key_mm
.ref();
992 conf
->localKey
[1] = 0;
993 conf
->localKeyLength
= 1;
994 conf
->gci
= foundGCI
;
995 Uint32 blockNo
= refToBlock(scan
.m_userRef
);
996 EXECUTE_DIRECT(blockNo
, GSN_NEXT_SCANCONF
, signal
, 7);
999 // TUPKEYREQ handles savepoint stuff
1001 scan
.m_state
= ScanOp::Next
;
1004 break; // incr loop count
1009 if (++loop_count
>= 32)
1012 // TODO: at drop table we have to flush and terminate these
1014 signal
->theData
[0] = ZTUP_SCAN
;
1015 signal
->theData
[1] = scanPtr
.i
;
1016 sendSignal(reference(), GSN_CONTINUEB
, signal
, 2, JBB
);
1021 tmp
.assref(lcp_list
);
1022 tmp
.m_page_no
= getRealpid(fragPtr
.p
, tmp
.m_page_no
);
1025 c_page_pool
.getPtr(pagePtr
, tmp
.m_page_no
);
1026 Tuple_header
* ptr
= (Tuple_header
*)
1027 ((Fix_page
*)pagePtr
.p
)->get_ptr(tmp
.m_page_idx
, 0);
1028 Uint32 headerbits
= ptr
->m_header_bits
;
1029 ndbrequire(headerbits
& Tuple_header::LCP_KEEP
);
1031 Uint32 next
= ptr
->m_operation_ptr_i
;
1032 ptr
->m_operation_ptr_i
= RNIL
;
1033 ptr
->m_header_bits
= headerbits
& ~(Uint32
)Tuple_header::FREE
;
1035 if (tablePtr
.p
->m_bits
& Tablerec::TR_Checksum
) {
1037 setChecksum(ptr
, tablePtr
.p
);
1040 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
1041 conf
->scanPtr
= scan
.m_userPtr
;
1042 conf
->accOperationPtr
= (Uint32
)-1;
1043 conf
->fragId
= frag
.fragmentId
;
1044 conf
->localKey
[0] = lcp_list
;
1045 conf
->localKey
[1] = 0;
1046 conf
->localKeyLength
= 1;
1048 Uint32 blockNo
= refToBlock(scan
.m_userRef
);
1049 EXECUTE_DIRECT(blockNo
, GSN_NEXT_SCANCONF
, signal
, 7);
1051 fragPtr
.p
->m_lcp_keep_list
= next
;
1052 ptr
->m_header_bits
|= Tuple_header::FREED
; // RESTORE free flag
1053 if (headerbits
& Tuple_header::FREED
)
1055 if (tablePtr
.p
->m_attributes
[MM
].m_no_of_varsize
)
1058 free_var_rec(fragPtr
.p
, tablePtr
.p
, &tmp
, pagePtr
);
1061 free_fix_rec(fragPtr
.p
, tablePtr
.p
, &tmp
, (Fix_page
*)pagePtr
.p
);
1068 Dbtup::scanCont(Signal
* signal
, ScanOpPtr scanPtr
)
1070 bool immediate
= scanNext(signal
, scanPtr
);
1073 // time-slicing again
1076 scanReply(signal
, scanPtr
);
1080 Dbtup::disk_page_tup_scan_callback(Signal
* signal
, Uint32 scanPtrI
, Uint32 page_i
)
1083 c_scanOpPool
.getPtr(scanPtr
, scanPtrI
);
1084 ScanOp
& scan
= *scanPtr
.p
;
1085 ScanPos
& pos
= scan
.m_scanPos
;
1087 Ptr
<GlobalPage
> gptr
;
1088 m_global_page_pool
.getPtr(gptr
, page_i
);
1089 pos
.m_page
= (Page
*)gptr
.p
;
1091 scanCont(signal
, scanPtr
);
1095 Dbtup::scanClose(Signal
* signal
, ScanOpPtr scanPtr
)
1097 ScanOp
& scan
= *scanPtr
.p
;
1098 ndbrequire(! (scan
.m_bits
& ScanOp::SCAN_LOCK_WAIT
) && scan
.m_accLockOp
== RNIL
);
1099 // unlock all not unlocked by LQH
1100 LocalDLFifoList
<ScanLock
> list(c_scanLockPool
, scan
.m_accLockOps
);
1101 ScanLockPtr lockPtr
;
1102 while (list
.first(lockPtr
)) {
1104 AccLockReq
* const lockReq
= (AccLockReq
*)signal
->getDataPtrSend();
1105 lockReq
->returnCode
= RNIL
;
1106 lockReq
->requestInfo
= AccLockReq::Abort
;
1107 lockReq
->accOpPtr
= lockPtr
.p
->m_accLockOp
;
1108 EXECUTE_DIRECT(DBACC
, GSN_ACC_LOCKREQ
, signal
, AccLockReq::UndoSignalLength
);
1110 ndbrequire(lockReq
->returnCode
== AccLockReq::Success
);
1111 list
.release(lockPtr
);
1114 NextScanConf
* const conf
= (NextScanConf
*)signal
->getDataPtrSend();
1115 conf
->scanPtr
= scanPtr
.p
->m_userPtr
;
1116 conf
->accOperationPtr
= RNIL
;
1117 conf
->fragId
= RNIL
;
1118 unsigned signalLength
= 3;
1119 sendSignal(scanPtr
.p
->m_userRef
, GSN_NEXT_SCANCONF
,
1120 signal
, signalLength
, JBB
);
1121 releaseScanOp(scanPtr
);
1125 Dbtup::addAccLockOp(ScanOp
& scan
, Uint32 accLockOp
)
1127 LocalDLFifoList
<ScanLock
> list(c_scanLockPool
, scan
.m_accLockOps
);
1128 ScanLockPtr lockPtr
;
1130 list
.first(lockPtr
);
1131 while (lockPtr
.i
!= RNIL
) {
1132 ndbrequire(lockPtr
.p
->m_accLockOp
!= accLockOp
);
1136 bool ok
= list
.seize(lockPtr
);
1138 lockPtr
.p
->m_accLockOp
= accLockOp
;
1142 Dbtup::removeAccLockOp(ScanOp
& scan
, Uint32 accLockOp
)
1144 LocalDLFifoList
<ScanLock
> list(c_scanLockPool
, scan
.m_accLockOps
);
1145 ScanLockPtr lockPtr
;
1146 list
.first(lockPtr
);
1147 while (lockPtr
.i
!= RNIL
) {
1148 if (lockPtr
.p
->m_accLockOp
== accLockOp
) {
1154 ndbrequire(lockPtr
.i
!= RNIL
);
1155 list
.release(lockPtr
);
1159 Dbtup::releaseScanOp(ScanOpPtr
& scanPtr
)
1161 FragrecordPtr fragPtr
;
1162 fragPtr
.i
= scanPtr
.p
->m_fragPtrI
;
1163 ptrCheckGuard(fragPtr
, cnoOfFragrec
, fragrecord
);
1165 if(scanPtr
.p
->m_bits
& ScanOp::SCAN_LCP
)
1168 fragPtr
.p
->m_lcp_scan_op
= RNIL
;
1169 scanPtr
.p
->m_fragPtrI
= RNIL
;
1174 LocalDLList
<ScanOp
> list(c_scanOpPool
, fragPtr
.p
->m_scanList
);
1175 list
.release(scanPtr
);
1180 Dbtup::execLCP_FRAG_ORD(Signal
* signal
)
1182 LcpFragOrd
* req
= (LcpFragOrd
*)signal
->getDataPtr();
1184 TablerecPtr tablePtr
;
1185 tablePtr
.i
= req
->tableId
;
1186 ptrCheckGuard(tablePtr
, cnoOfTablerec
, tablerec
);
1188 if (tablePtr
.p
->m_no_of_disk_attributes
)
1191 FragrecordPtr fragPtr
;
1192 Uint32 fragId
= req
->fragmentId
;
1194 getFragmentrec(fragPtr
, fragId
, tablePtr
.p
);
1195 ndbrequire(fragPtr
.i
!= RNIL
);
1196 Fragrecord
& frag
= *fragPtr
.p
;
1198 ndbrequire(frag
.m_lcp_scan_op
== RNIL
&& c_lcp_scan_op
!= RNIL
);
1199 frag
.m_lcp_scan_op
= c_lcp_scan_op
;
1201 c_scanOpPool
.getPtr(scanPtr
, frag
.m_lcp_scan_op
);
1202 ndbrequire(scanPtr
.p
->m_fragPtrI
== RNIL
);
1203 scanPtr
.p
->m_fragPtrI
= fragPtr
.i
;
1205 scanFirst(signal
, scanPtr
);
1206 scanPtr
.p
->m_state
= ScanOp::First
;