1 /* Copyright (c) 2003-2008 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 #include <my_config.h>
20 #include <ndb_version.h>
23 #include <Bitmask.hpp>
24 #include <SimpleProperties.hpp>
26 #include <signaldata/NodeFailRep.hpp>
27 #include <signaldata/ReadNodesConf.hpp>
29 #include <signaldata/ListTables.hpp>
30 #include <signaldata/GetTabInfo.hpp>
31 #include <signaldata/GetTableId.hpp>
32 #include <signaldata/DictTabInfo.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <signaldata/ScanFrag.hpp>
35 #include <signaldata/TransIdAI.hpp>
36 #include <signaldata/CreateTrig.hpp>
37 #include <signaldata/AlterTrig.hpp>
38 #include <signaldata/DropTrig.hpp>
39 #include <signaldata/FireTrigOrd.hpp>
40 #include <signaldata/TrigAttrInfo.hpp>
41 #include <signaldata/CheckNodeGroups.hpp>
42 #include <signaldata/GCPSave.hpp>
43 #include <signaldata/CreateTab.hpp>
44 #include <signaldata/DropTab.hpp>
45 #include <signaldata/AlterTable.hpp>
46 #include <signaldata/AlterTab.hpp>
47 #include <signaldata/DihFragCount.hpp>
48 #include <signaldata/SystemError.hpp>
50 #include <ndbapi/NdbDictionary.hpp>
52 #include <DebuggerNames.hpp>
53 #include <../dbtup/Dbtup.hpp>
54 #include <../dbdih/Dbdih.hpp>
56 //#define HANDOVER_DEBUG
57 //#define NODEFAIL_DEBUG
58 //#define NODEFAIL_DEBUG2
59 //#define DEBUG_SUMA_SEQUENCE
61 //#define EVENT_PH3_DEBUG
62 //#define EVENT_DEBUG2
67 #undef DBUG_VOID_RETURN
69 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
70 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
71 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
72 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
77 * SUMA crashes if an index is created at the same time as
78 * global replication. Very easy to reproduce using testIndex.
79 * Note: This only happens occasionally, but is quite easy to reprod.
82 Uint32 g_subPtrI
= RNIL
;
83 static const Uint32 SUMA_SEQUENCE
= 0xBABEBABE;
85 static const Uint32 MAX_CONCURRENT_GCP
= 2;
87 /**************************************************************
96 Suma::getNodeGroupMembers(Signal
* signal
)
99 DBUG_ENTER("Suma::getNodeGroupMembers");
101 * Ask DIH for nodeGroupMembers
103 CheckNodeGroups
* sd
= (CheckNodeGroups
*)signal
->getDataPtrSend();
104 sd
->blockRef
= reference();
106 CheckNodeGroups::Direct
|
107 CheckNodeGroups::GetNodeGroupMembers
;
108 sd
->nodeId
= getOwnNodeId();
109 EXECUTE_DIRECT(DBDIH
, GSN_CHECKNODEGROUPSREQ
, signal
,
110 CheckNodeGroups::SignalLength
);
113 c_nodeGroup
= sd
->output
;
114 c_nodes_in_nodegroup_mask
.assign(sd
->mask
);
115 c_noNodesInGroup
= c_nodes_in_nodegroup_mask
.count();
118 for (i
= 0; i
< MAX_NDB_NODES
; i
++) {
121 c_nodesInGroup
[pos
++] = i
;
125 const Uint32 replicas
= c_noNodesInGroup
;
128 for(i
= 1; i
<= replicas
; i
++)
131 for(i
= 0; i
<buckets
; i
++)
133 Bucket
* ptr
= c_buckets
+i
;
134 for(Uint32 j
= 0; j
< replicas
; j
++)
136 ptr
->m_nodes
[j
] = c_nodesInGroup
[(i
+ j
) % replicas
];
140 c_no_of_buckets
= buckets
;
141 ndbrequire(c_noNodesInGroup
> 0); // at least 1 node in the nodegroup
144 for (Uint32 i
= 0; i
< c_noNodesInGroup
; i
++) {
145 DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, "
147 c_nodeGroup
, getOwnNodeId(),
148 i
, c_nodesInGroup
[i
]));
156 Suma::execREAD_CONFIG_REQ(Signal
* signal
)
160 const ReadConfigReq
* req
= (ReadConfigReq
*)signal
->getDataPtr();
162 Uint32 ref
= req
->senderRef
;
163 Uint32 senderData
= req
->senderData
;
165 const ndb_mgm_configuration_iterator
* p
=
166 m_ctx
.m_config
.getOwnConfigIterator();
170 Uint32 noTables
, noAttrs
;
171 ndb_mgm_get_int_parameter(p
, CFG_DB_NO_TABLES
,
173 ndb_mgm_get_int_parameter(p
, CFG_DB_NO_ATTRIBUTES
,
176 c_tablePool
.setSize(noTables
);
177 c_tables
.setSize(noTables
);
179 c_subscriptions
.setSize(noTables
);
180 c_subscriberPool
.setSize(2*noTables
);
182 c_subscriptionPool
.setSize(noTables
);
183 c_syncPool
.setSize(2);
184 c_dataBufferPool
.setSize(noAttrs
);
186 // Calculate needed gcp pool as 10 records + the ones needed
187 // during a possible api timeout
188 Uint32 dbApiHbInterval
, gcpInterval
;
189 ndb_mgm_get_int_parameter(p
, CFG_DB_API_HEARTBEAT_INTERVAL
,
191 ndb_mgm_get_int_parameter(p
, CFG_DB_GCP_INTERVAL
,
193 c_gcp_pool
.setSize(10 + (4*dbApiHbInterval
)/gcpInterval
);
195 c_page_chunk_pool
.setSize(50);
198 SLList
<SyncRecord
> tmp(c_syncPool
);
200 while(tmp
.seize(ptr
))
201 new (ptr
.p
) SyncRecord(* this, c_dataBufferPool
);
206 c_masterNodeId
= getOwnNodeId();
208 c_nodeGroup
= c_noNodesInGroup
= 0;
209 for (int i
= 0; i
< MAX_REPLICAS
; i
++) {
210 c_nodesInGroup
[i
] = 0;
213 m_first_free_page
= RNIL
;
216 memset(c_buckets
, 0, sizeof(c_buckets
));
217 for(Uint32 i
= 0; i
<NO_OF_BUCKETS
; i
++)
219 Bucket
* bucket
= c_buckets
+i
;
220 bucket
->m_buffer_tail
= RNIL
;
221 bucket
->m_buffer_head
.m_page_id
= RNIL
;
222 bucket
->m_buffer_head
.m_page_pos
= Buffer_page::DATA_WORDS
;
225 m_max_seen_gci
= 0; // FIRE_TRIG_ORD
226 m_max_sent_gci
= 0; // FIRE_TRIG_ORD -> send
227 m_last_complete_gci
= 0; // SUB_GCP_COMPLETE_REP
228 m_gcp_complete_rep_count
= 0;
229 m_out_of_buffer_gci
= 0;
231 c_startup
.m_wait_handover
= false;
232 c_failedApiNodes
.clear();
234 ReadConfigConf
* conf
= (ReadConfigConf
*)signal
->getDataPtrSend();
235 conf
->senderRef
= reference();
236 conf
->senderData
= senderData
;
237 sendSignal(ref
, GSN_READ_CONFIG_CONF
, signal
,
238 ReadConfigConf::SignalLength
, JBB
);
242 Suma::execSTTOR(Signal
* signal
) {
245 DBUG_ENTER("Suma::execSTTOR");
246 const Uint32 startphase
= signal
->theData
[1];
247 const Uint32 typeOfStart
= signal
->theData
[7];
249 DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
250 startphase
, typeOfStart
));
255 ndbrequire((m_tup
= (Dbtup
*)globalData
.getBlock(DBTUP
)) != 0);
256 signal
->theData
[0] = reference();
257 sendSignal(NDBCNTR_REF
, GSN_READ_NODESREQ
, signal
, 1, JBB
);
263 if (ERROR_INSERTED(13029)) /* Hold startphase 5 */
265 sendSignalWithDelay(SUMA_REF
, GSN_STTOR
, signal
,
266 30, signal
->getLength());
270 c_startup
.m_restart_server_node_id
= 0;
271 getNodeGroupMembers(signal
);
272 if (typeOfStart
== NodeState::ST_NODE_RESTART
||
273 typeOfStart
== NodeState::ST_INITIAL_NODE_RESTART
)
277 send_start_me_req(signal
);
284 if (typeOfStart
!= NodeState::ST_NODE_RESTART
&&
285 typeOfStart
!= NodeState::ST_INITIAL_NODE_RESTART
)
287 for( Uint32 i
= 0; i
< c_no_of_buckets
; i
++)
289 if (get_responsible_node(i
) == getOwnNodeId())
291 // I'm running this bucket
292 DBUG_PRINT("info",("bucket %u set to true", i
));
293 m_active_buckets
.set(i
);
294 ndbout_c("m_active_buckets.set(%d)", i
);
299 if(!m_active_buckets
.isclear())
303 while ((bucket
= m_active_buckets
.find(bucket
)) != Bucket_mask::NotFound
)
305 tmp
.set(get_responsible_node(bucket
, c_nodes_in_nodegroup_mask
));
309 ndbassert(tmp
.get(getOwnNodeId()));
310 m_gcp_complete_rep_count
= tmp
.count();// I contribute 1 gcp complete rep
313 m_gcp_complete_rep_count
= 0; // I contribute 1 gcp complete rep
315 if(typeOfStart
== NodeState::ST_INITIAL_START
&&
316 c_masterNodeId
== getOwnNodeId())
319 createSequence(signal
);
323 if (ERROR_INSERTED(13030))
325 ndbout_c("Dont start handover");
330 if(startphase
== 100)
333 * Allow API's to connect
339 if(startphase
== 101)
341 if (typeOfStart
== NodeState::ST_NODE_RESTART
||
342 typeOfStart
== NodeState::ST_INITIAL_NODE_RESTART
)
347 c_startup
.m_wait_handover
= true;
348 check_start_handover(signal
);
358 Suma::send_start_me_req(Signal
* signal
)
360 Uint32 nodeId
= c_startup
.m_restart_server_node_id
;
362 nodeId
= c_alive_nodes
.find(nodeId
+ 1);
364 if(nodeId
== getOwnNodeId())
366 if(nodeId
== NdbNodeBitmask::NotFound
)
375 infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId
);
376 c_startup
.m_restart_server_node_id
= nodeId
;
377 sendSignal(calcSumaBlockRef(nodeId
),
378 GSN_SUMA_START_ME_REQ
, signal
, 1, JBB
);
382 Suma::execSUMA_START_ME_REF(Signal
* signal
)
384 const SumaStartMeRef
* ref
= (SumaStartMeRef
*)signal
->getDataPtr();
385 ndbrequire(ref
->errorCode
== SumaStartMeRef::Busy
);
387 infoEvent("Suma: node %d refused %d",
388 c_startup
.m_restart_server_node_id
, ref
->errorCode
);
390 c_startup
.m_restart_server_node_id
++;
391 send_start_me_req(signal
);
395 Suma::execSUMA_START_ME_CONF(Signal
* signal
)
397 infoEvent("Suma: node %d has completed restoring me",
398 c_startup
.m_restart_server_node_id
);
400 c_startup
.m_restart_server_node_id
= 0;
404 Suma::createSequence(Signal
* signal
)
407 DBUG_ENTER("Suma::createSequence");
409 UtilSequenceReq
* req
= (UtilSequenceReq
*)signal
->getDataPtrSend();
411 req
->senderData
= RNIL
;
412 req
->sequenceId
= SUMA_SEQUENCE
;
413 req
->requestType
= UtilSequenceReq::Create
;
414 sendSignal(DBUTIL_REF
, GSN_UTIL_SEQUENCE_REQ
,
415 signal
, UtilSequenceReq::SignalLength
, JBB
);
416 // execUTIL_SEQUENCE_CONF will call createSequenceReply()
421 Suma::createSequenceReply(Signal
* signal
,
422 UtilSequenceConf
* conf
,
423 UtilSequenceRef
* ref
)
429 switch ((UtilSequenceRef::ErrorCode
)ref
->errorCode
)
431 case UtilSequenceRef::NoSuchSequence
:
433 case UtilSequenceRef::TCError
:
436 snprintf(buf
, sizeof(buf
),
437 "Startup failed during sequence creation. TC error %d",
439 progError(__LINE__
, NDBD_EXIT_RESOURCE_ALLOC_ERROR
, buf
);
449 Suma::execREAD_NODESCONF(Signal
* signal
){
451 ReadNodesConf
* const conf
= (ReadNodesConf
*)signal
->getDataPtr();
453 if(getNodeState().getNodeRestartInProgress())
455 c_alive_nodes
.assign(NdbNodeBitmask::Size
, conf
->startedNodes
);
456 c_alive_nodes
.set(getOwnNodeId());
460 c_alive_nodes
.assign(NdbNodeBitmask::Size
, conf
->startingNodes
);
462 tmp
.assign(NdbNodeBitmask::Size
, conf
->startedNodes
);
463 ndbrequire(tmp
.isclear()); // No nodes can be started during SR
466 c_masterNodeId
= conf
->masterNodeId
;
472 Suma::execAPI_START_REP(Signal
* signal
)
474 Uint32 nodeId
= signal
->theData
[0];
475 c_connected_nodes
.set(nodeId
);
477 check_start_handover(signal
);
481 Suma::check_start_handover(Signal
* signal
)
483 if(c_startup
.m_wait_handover
)
486 tmp
.assign(c_connected_nodes
);
487 tmp
.bitAND(c_subscriber_nodes
);
488 if(!c_subscriber_nodes
.equal(tmp
))
493 c_startup
.m_wait_handover
= false;
494 send_handover_req(signal
);
499 Suma::send_handover_req(Signal
* signal
)
501 c_startup
.m_handover_nodes
.assign(c_alive_nodes
);
502 c_startup
.m_handover_nodes
.bitAND(c_nodes_in_nodegroup_mask
);
503 c_startup
.m_handover_nodes
.clear(getOwnNodeId());
504 Uint32 gci
= m_last_complete_gci
+ 3;
506 SumaHandoverReq
* req
= (SumaHandoverReq
*)signal
->getDataPtrSend();
508 c_startup
.m_handover_nodes
.getText(buf
);
509 infoEvent("Suma: initiate handover with nodes %s GCI: %d",
513 req
->nodeId
= getOwnNodeId();
515 NodeReceiverGroup
rg(SUMA
, c_startup
.m_handover_nodes
);
516 sendSignal(rg
, GSN_SUMA_HANDOVER_REQ
, signal
,
517 SumaHandoverReq::SignalLength
, JBB
);
521 Suma::sendSTTORRY(Signal
* signal
){
522 signal
->theData
[0] = 0;
523 signal
->theData
[3] = 1;
524 signal
->theData
[4] = 3;
525 signal
->theData
[5] = 5;
526 signal
->theData
[6] = 7;
527 signal
->theData
[7] = 100;
528 signal
->theData
[8] = 101;
529 signal
->theData
[9] = 255; // No more start phases from missra
530 sendSignal(NDBCNTR_REF
, GSN_STTORRY
, signal
, 10, JBB
);
534 Suma::execNDB_STTOR(Signal
* signal
)
540 Suma::execCONTINUEB(Signal
* signal
){
542 Uint32 type
= signal
->theData
[0];
544 case SumaContinueB::RELEASE_GCI
:
545 release_gci(signal
, signal
->theData
[1], signal
->theData
[2]);
547 case SumaContinueB::RESEND_BUCKET
:
548 resend_bucket(signal
,
554 case SumaContinueB::OUT_OF_BUFFER_RELEASE
:
555 out_of_buffer_release(signal
, signal
->theData
[1]);
560 /*****************************************************************************
562 * Node state handling
564 *****************************************************************************/
566 void Suma::execAPI_FAILREQ(Signal
* signal
)
569 DBUG_ENTER("Suma::execAPI_FAILREQ");
570 Uint32 failedApiNode
= signal
->theData
[0];
571 //BlockReference retRef = signal->theData[1];
573 if (c_startup
.m_restart_server_node_id
&&
574 c_startup
.m_restart_server_node_id
!= RNIL
)
577 sendSignalWithDelay(reference(), GSN_API_FAILREQ
, signal
,
578 200, signal
->getLength());
582 if (c_failedApiNodes
.get(failedApiNode
))
588 if (!c_subscriber_nodes
.get(failedApiNode
))
594 c_failedApiNodes
.set(failedApiNode
);
595 c_connected_nodes
.clear(failedApiNode
);
596 bool found
= removeSubscribersOnNode(signal
, failedApiNode
);
600 c_failedApiNodes
.clear(failedApiNode
);
603 SubGcpCompleteAck
* const ack
= (SubGcpCompleteAck
*)signal
->getDataPtr();
605 for(c_gcp_list
.first(gcp
); !gcp
.isNull(); c_gcp_list
.next(gcp
))
608 ack
->rep
.gci
= gcp
.p
->m_gci
;
609 if(gcp
.p
->m_subscribers
.get(failedApiNode
))
612 gcp
.p
->m_subscribers
.clear(failedApiNode
);
613 ack
->rep
.senderRef
= numberToRef(0, failedApiNode
);
614 sendSignal(SUMA_REF
, GSN_SUB_GCP_COMPLETE_ACK
, signal
,
615 SubGcpCompleteAck::SignalLength
, JBB
);
619 c_subscriber_nodes
.clear(failedApiNode
);
621 check_start_handover(signal
);
627 Suma::removeSubscribersOnNode(Signal
*signal
, Uint32 nodeId
)
629 DBUG_ENTER("Suma::removeSubscribersOnNode");
632 KeyTable
<Table
>::Iterator it
;
633 LINT_INIT(it
.bucket
);
634 LINT_INIT(it
.curr
.p
);
635 for(c_tables
.first(it
);!it
.isNull();c_tables
.next(it
))
637 LocalDLList
<Subscriber
> subbs(c_subscriberPool
,it
.curr
.p
->c_subscribers
);
638 SubscriberPtr i_subbPtr
;
639 for(subbs
.first(i_subbPtr
);!i_subbPtr
.isNull();)
641 SubscriberPtr subbPtr
= i_subbPtr
;
642 subbs
.next(i_subbPtr
);
644 if (refToNode(subbPtr
.p
->m_senderRef
) == nodeId
) {
646 subbs
.remove(subbPtr
);
647 c_removeDataSubscribers
.add(subbPtr
);
658 sendSubStopReq(signal
);
664 Suma::sendSubStopReq(Signal
*signal
, bool unlock
){
665 static bool remove_lock
= false;
667 DBUG_ENTER("Suma::sendSubStopReq");
669 SubscriberPtr subbPtr
;
670 c_removeDataSubscribers
.first(subbPtr
);
671 if (subbPtr
.isNull()){
674 signal
->theData
[0] = failedApiNode
;
675 signal
->theData
[1] = reference();
676 sendSignal(retRef
, GSN_API_FAILCONF
, signal
, 2, JBB
);
678 c_failedApiNodes
.clear();
684 if(remove_lock
&& !unlock
) {
690 SubscriptionPtr subPtr
;
691 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
693 SubStopReq
* const req
= (SubStopReq
*)signal
->getDataPtrSend();
694 req
->senderRef
= reference();
695 req
->senderData
= subbPtr
.i
;
696 req
->subscriberRef
= subbPtr
.p
->m_senderRef
;
697 req
->subscriberData
= subbPtr
.p
->m_senderData
;
698 req
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
699 req
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
700 req
->part
= SubscriptionData::TableData
;
702 sendSignal(SUMA_REF
,GSN_SUB_STOP_REQ
,signal
,SubStopReq::SignalLength
,JBB
);
707 Suma::execSUB_STOP_CONF(Signal
* signal
){
709 DBUG_ENTER("Suma::execSUB_STOP_CONF");
710 ndbassert(signal
->getNoOfSections() == 0);
711 sendSubStopReq(signal
,true);
716 Suma::execSUB_STOP_REF(Signal
* signal
){
718 DBUG_ENTER("Suma::execSUB_STOP_REF");
719 ndbassert(signal
->getNoOfSections() == 0);
721 SubStopRef
* const ref
= (SubStopRef
*)signal
->getDataPtr();
723 Uint32 senderData
= ref
->senderData
;
724 Uint32 subscriptionId
= ref
->subscriptionId
;
725 Uint32 subscriptionKey
= ref
->subscriptionKey
;
726 Uint32 part
= ref
->part
;
727 Uint32 subscriberData
= ref
->subscriberData
;
728 Uint32 subscriberRef
= ref
->subscriberRef
;
730 if(ref
->errorCode
!= 1411){
734 SubStopReq
* const req
= (SubStopReq
*)signal
->getDataPtrSend();
735 req
->senderRef
= reference();
736 req
->senderData
= senderData
;
737 req
->subscriberRef
= subscriberRef
;
738 req
->subscriberData
= subscriberData
;
739 req
->subscriptionId
= subscriptionId
;
740 req
->subscriptionKey
= subscriptionKey
;
743 sendSignal(SUMA_REF
,GSN_SUB_STOP_REQ
,signal
,SubStopReq::SignalLength
,JBB
);
749 Suma::execNODE_FAILREP(Signal
* signal
){
751 DBUG_ENTER("Suma::execNODE_FAILREP");
752 ndbassert(signal
->getNoOfSections() == 0);
754 const NodeFailRep
* rep
= (NodeFailRep
*)signal
->getDataPtr();
755 NdbNodeBitmask failed
; failed
.assign(NdbNodeBitmask::Size
, rep
->theNodes
);
757 if(failed
.get(Restart
.nodeId
))
759 Restart
.resetRestart(signal
);
762 if (ERROR_INSERTED(13032))
764 Uint32 node
= c_subscriber_nodes
.find(0);
765 if (node
!= NodeBitmask::NotFound
)
767 ndbout_c("Inserting API_FAILREQ node: %u", node
);
768 signal
->theData
[0] = node
;
769 EXECUTE_DIRECT(QMGR
, GSN_API_FAILREQ
, signal
, 1);
773 signal
->theData
[0] = SumaContinueB::RESEND_BUCKET
;
776 tmp
.assign(c_alive_nodes
);
779 NdbNodeBitmask takeover_nodes
;
781 if(c_nodes_in_nodegroup_mask
.overlaps(failed
))
783 for( Uint32 i
= 0; i
< c_no_of_buckets
; i
++)
785 if(m_active_buckets
.get(i
))
787 else if(m_switchover_buckets
.get(i
))
789 Uint32 state
= c_buckets
[i
].m_state
;
790 if((state
& Bucket::BUCKET_HANDOVER
) &&
791 failed
.get(get_responsible_node(i
)))
793 m_active_buckets
.set(i
);
794 m_switchover_buckets
.clear(i
);
795 ndbout_c("aborting handover");
797 else if(state
& Bucket::BUCKET_STARTING
)
799 progError(__LINE__
, NDBD_EXIT_SYSTEM_ERROR
,
800 "Nodefailure during SUMA takeover");
803 else if(get_responsible_node(i
, tmp
) == getOwnNodeId())
805 start_resend(signal
, i
);
810 c_alive_nodes
.assign(tmp
);
816 Suma::execINCL_NODEREQ(Signal
* signal
){
819 const Uint32 senderRef
= signal
->theData
[0];
820 const Uint32 nodeId
= signal
->theData
[1];
822 ndbrequire(!c_alive_nodes
.get(nodeId
));
823 c_alive_nodes
.set(nodeId
);
825 signal
->theData
[0] = nodeId
;
826 signal
->theData
[1] = reference();
827 sendSignal(senderRef
, GSN_INCL_NODECONF
, signal
, 2, JBB
);
831 Suma::execSIGNAL_DROPPED_REP(Signal
* signal
){
836 /********************************************************************
843 count_subscribers(const DLList
<Suma::Subscriber
> &subs
)
846 Suma::SubscriberPtr i_subbPtr
;
847 subs
.first(i_subbPtr
);
848 while(!i_subbPtr
.isNull()){
850 subs
.next(i_subbPtr
);
856 Suma::execDUMP_STATE_ORD(Signal
* signal
){
859 Uint32 tCase
= signal
->theData
[0];
861 if(tCase
>= 8000 && tCase
<= 8003){
862 SubscriptionPtr subPtr
;
863 c_subscriptions
.getPtr(subPtr
, g_subPtrI
);
865 Ptr
<SyncRecord
> syncPtr
;
866 c_syncPool
.getPtr(syncPtr
, subPtr
.p
->m_syncPtrI
);
869 syncPtr
.p
->startMeta(signal
);
873 syncPtr
.p
->startScan(signal
);
877 syncPtr
.p
->startTrigger(signal
);
881 subPtr
.p
->m_subscriptionType
= SubCreateReq::SingleTableScan
;
882 LocalDataBuffer
<15> attrs(c_dataBufferPool
, syncPtr
.p
->m_attributeList
);
884 Uint32 att
[] = { 0, 1, 1 };
885 syncPtr
.p
->m_tableList
.append(&tab
, 1);
886 attrs
.append(att
, 3);
891 infoEvent("Suma: c_subscriberPool size: %d free: %d",
892 c_subscriberPool
.getSize(),
893 c_subscriberPool
.getNoOfFree());
895 infoEvent("Suma: c_tablePool size: %d free: %d",
896 c_tablePool
.getSize(),
897 c_tablePool
.getNoOfFree());
899 infoEvent("Suma: c_subscriptionPool size: %d free: %d",
900 c_subscriptionPool
.getSize(),
901 c_subscriptionPool
.getNoOfFree());
903 infoEvent("Suma: c_syncPool size: %d free: %d",
904 c_syncPool
.getSize(),
905 c_syncPool
.getNoOfFree());
907 infoEvent("Suma: c_dataBufferPool size: %d free: %d",
908 c_dataBufferPool
.getSize(),
909 c_dataBufferPool
.getNoOfFree());
911 infoEvent("Suma: c_metaSubscribers count: %d",
912 count_subscribers(c_metaSubscribers
));
914 infoEvent("Suma: c_dataSubscribers count: %d",
915 count_subscribers(c_dataSubscribers
));
916 infoEvent("Suma: c_prepDataSubscribers count: %d",
917 count_subscribers(c_prepDataSubscribers
));
919 infoEvent("Suma: c_removeDataSubscribers count: %d",
920 count_subscribers(c_removeDataSubscribers
));
925 for(Uint32 i
= 0; i
<c_no_of_buckets
; i
++)
927 Bucket
* ptr
= c_buckets
+ i
;
928 infoEvent("Bucket %d %d%d-%x switch gci: %d max_acked_gci: %d max_gci: %d tail: %d head: %d",
930 m_active_buckets
.get(i
),
931 m_switchover_buckets
.get(i
),
933 ptr
->m_switchover_gci
,
934 ptr
->m_max_acked_gci
,
935 ptr
->m_buffer_head
.m_max_gci
,
937 ptr
->m_buffer_head
.m_page_id
);
943 SET_ERROR_INSERT_VALUE(13029);
948 c_startup
.m_restart_server_node_id
= MAX_NDB_NODES
+ 1;
949 SET_ERROR_INSERT_VALUE(13029);
954 CLEAR_ERROR_INSERT_VALUE
;
959 char buf1
[255], buf2
[255];
960 c_subscriber_nodes
.getText(buf1
);
961 c_connected_nodes
.getText(buf2
);
962 infoEvent("c_subscriber_nodes: %s", buf1
);
963 infoEvent("c_connected_nodes: %s", buf2
);
968 if (ERROR_INSERTED(13030))
970 CLEAR_ERROR_INSERT_VALUE
;
975 SET_ERROR_INSERT_VALUE(13030);
983 Uint32 bucket
= signal
->theData
[1];
984 KeyTable
<Table
>::Iterator it
;
985 if (signal
->getLength() == 1)
989 infoEvent("-- Starting dump of subscribers --");
992 c_tables
.next(bucket
, it
);
993 const Uint32 RT_BREAK
= 16;
994 for(Uint32 i
= 0; i
<RT_BREAK
|| it
.bucket
== bucket
; i
++)
997 if(it
.curr
.i
== RNIL
)
1000 infoEvent("-- Ending dump of subscribers --");
1004 infoEvent("Table: %u ver: %u #n: %u (ref,data,subscritopn)",
1005 it
.curr
.p
->m_tableId
,
1006 it
.curr
.p
->m_schemaVersion
,
1007 it
.curr
.p
->n_subscribers
);
1009 Ptr
<Subscriber
> ptr
;
1010 LocalDLList
<Subscriber
> list(c_subscriberPool
, it
.curr
.p
->c_subscribers
);
1011 for (list
.first(ptr
); !ptr
.isNull(); list
.next(ptr
), i
++)
1014 infoEvent(" [ %x %u %u ]",
1016 ptr
.p
->m_senderData
,
1022 signal
->theData
[0] = tCase
;
1023 signal
->theData
[1] = it
.bucket
;
1024 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD
, signal
, 100, 2);
1029 /*************************************************************
1031 * Creation of subscription id's
1033 ************************************************************/
1036 Suma::execCREATE_SUBID_REQ(Signal
* signal
)
1039 DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
1040 ndbassert(signal
->getNoOfSections() == 0);
1041 CRASH_INSERTION(13001);
1043 CreateSubscriptionIdReq
const * req
=
1044 (CreateSubscriptionIdReq
*)signal
->getDataPtr();
1045 SubscriberPtr subbPtr
;
1046 if(!c_subscriberPool
.seize(subbPtr
)){
1048 sendSubIdRef(signal
, req
->senderRef
, req
->senderData
, 1412);
1051 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1052 c_subscriberPool
.getSize(),
1053 c_subscriberPool
.getNoOfFree()));
1055 subbPtr
.p
->m_senderRef
= req
->senderRef
;
1056 subbPtr
.p
->m_senderData
= req
->senderData
;
1058 UtilSequenceReq
* utilReq
= (UtilSequenceReq
*)signal
->getDataPtrSend();
1059 utilReq
->senderData
= subbPtr
.i
;
1060 utilReq
->sequenceId
= SUMA_SEQUENCE
;
1061 utilReq
->requestType
= UtilSequenceReq::NextVal
;
1062 sendSignal(DBUTIL_REF
, GSN_UTIL_SEQUENCE_REQ
,
1063 signal
, UtilSequenceReq::SignalLength
, JBB
);
1069 Suma::execUTIL_SEQUENCE_CONF(Signal
* signal
)
1072 DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
1073 ndbassert(signal
->getNoOfSections() == 0);
1074 CRASH_INSERTION(13002);
1076 UtilSequenceConf
* conf
= (UtilSequenceConf
*)signal
->getDataPtr();
1077 if(conf
->requestType
== UtilSequenceReq::Create
) {
1079 createSequenceReply(signal
, conf
, NULL
);
1084 memcpy(&subId
,conf
->sequenceValue
,8);
1085 SubscriberPtr subbPtr
;
1086 c_subscriberPool
.getPtr(subbPtr
,conf
->senderData
);
1088 CreateSubscriptionIdConf
* subconf
= (CreateSubscriptionIdConf
*)conf
;
1089 subconf
->senderRef
= reference();
1090 subconf
->senderData
= subbPtr
.p
->m_senderData
;
1091 subconf
->subscriptionId
= (Uint32
)subId
;
1092 subconf
->subscriptionKey
=(getOwnNodeId() << 16) | (Uint32
)(subId
& 0xFFFF);
1094 sendSignal(subbPtr
.p
->m_senderRef
, GSN_CREATE_SUBID_CONF
, signal
,
1095 CreateSubscriptionIdConf::SignalLength
, JBB
);
1097 c_subscriberPool
.release(subbPtr
);
1098 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1099 c_subscriberPool
.getSize(),
1100 c_subscriberPool
.getNoOfFree()));
1105 Suma::execUTIL_SEQUENCE_REF(Signal
* signal
)
1108 DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
1109 ndbassert(signal
->getNoOfSections() == 0);
1110 UtilSequenceRef
* ref
= (UtilSequenceRef
*)signal
->getDataPtr();
1111 Uint32 err
= ref
->errorCode
;
1113 if(ref
->requestType
== UtilSequenceReq::Create
) {
1115 createSequenceReply(signal
, NULL
, ref
);
1119 Uint32 subData
= ref
->senderData
;
1121 SubscriberPtr subbPtr
;
1122 c_subscriberPool
.getPtr(subbPtr
,subData
);
1123 sendSubIdRef(signal
, subbPtr
.p
->m_senderRef
, subbPtr
.p
->m_senderData
, err
);
1124 c_subscriberPool
.release(subbPtr
);
1125 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1126 c_subscriberPool
.getSize(),
1127 c_subscriberPool
.getNoOfFree()));
1129 }//execUTIL_SEQUENCE_REF()
1133 Suma::sendSubIdRef(Signal
* signal
,
1134 Uint32 senderRef
, Uint32 senderData
, Uint32 errCode
)
1137 DBUG_ENTER("Suma::sendSubIdRef");
1138 CreateSubscriptionIdRef
* ref
=
1139 (CreateSubscriptionIdRef
*)signal
->getDataPtrSend();
1141 ref
->senderRef
= reference();
1142 ref
->senderData
= senderData
;
1143 ref
->errorCode
= errCode
;
1144 sendSignal(senderRef
,
1145 GSN_CREATE_SUBID_REF
,
1147 CreateSubscriptionIdRef::SignalLength
,
1150 releaseSections(signal
);
1154 /**********************************************************
1155 * Suma participant interface
1157 * Creation of subscriptions
1161 Suma::addTableId(Uint32 tableId
,
1162 SubscriptionPtr subPtr
, SyncRecord
*psyncRec
)
1164 DBUG_ENTER("Suma::addTableId");
1165 DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId
, subPtr
.i
));
1166 subPtr
.p
->m_tableId
= tableId
;
1167 if(psyncRec
!= NULL
)
1168 psyncRec
->m_tableList
.append(&tableId
, 1);
1173 Suma::execSUB_CREATE_REQ(Signal
* signal
)
1176 DBUG_ENTER("Suma::execSUB_CREATE_REQ");
1177 ndbassert(signal
->getNoOfSections() == 0);
1178 CRASH_INSERTION(13003);
1180 const SubCreateReq req
= *(SubCreateReq
*)signal
->getDataPtr();
1182 const Uint32 subRef
= req
.senderRef
;
1183 const Uint32 subData
= req
.senderData
;
1184 const Uint32 subId
= req
.subscriptionId
;
1185 const Uint32 subKey
= req
.subscriptionKey
;
1186 const Uint32 type
= req
.subscriptionType
& SubCreateReq::RemoveFlags
;
1187 const Uint32 flags
= req
.subscriptionType
& SubCreateReq::GetFlags
;
1188 const bool addTableFlag
= (flags
& SubCreateReq::AddTableFlag
) != 0;
1189 const bool restartFlag
= (flags
& SubCreateReq::RestartFlag
) != 0;
1190 const Uint32 reportAll
= (flags
& SubCreateReq::ReportAll
) ?
1191 Subscription::REPORT_ALL
: 0;
1192 const Uint32 reportSubscribe
= (flags
& SubCreateReq::ReportSubscribe
) ?
1193 Subscription::REPORT_SUBSCRIBE
: 0;
1194 const Uint32 tableId
= req
.tableId
;
1195 Subscription::State state
= (Subscription::State
) req
.state
;
1196 if (signal
->getLength() != SubCreateReq::SignalLength2
)
1199 api or restarted by older version
1200 if restarted by old version, do the best we can
1202 state
= Subscription::DEFINED
;
1206 key
.m_subscriptionId
= subId
;
1207 key
.m_subscriptionKey
= subKey
;
1209 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1210 key
.m_subscriptionId
, key
.m_subscriptionKey
));
1212 SubscriptionPtr subPtr
;
1215 ndbrequire(restartFlag
); //TODO remove this
1217 if(!c_subscriptions
.find(subPtr
, key
)) {
1219 sendSubCreateRef(signal
, 1407);
1225 ndbrequire(type
!= SubCreateReq::SingleTableScan
);
1226 ndbrequire(req
.tableId
!= subPtr
.p
->m_tableId
);
1227 ndbrequire(type
!= SubCreateReq::TableEvent
);
1228 addTableId(req
.tableId
, subPtr
, 0);
1231 if (c_startup
.m_restart_server_node_id
&&
1232 subRef
!= calcSumaBlockRef(c_startup
.m_restart_server_node_id
))
1235 * only allow "restart_server" Suma's to come through
1236 * for restart purposes
1239 sendSubCreateRef(signal
, 1415);
1242 // Check that id/key is unique
1243 if(c_subscriptions
.find(subPtr
, key
)) {
1245 sendSubCreateRef(signal
, 1415);
1248 if(!c_subscriptions
.seize(subPtr
)) {
1250 sendSubCreateRef(signal
, 1412);
1253 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
1254 c_subscriptionPool
.getSize(),
1255 c_subscriptionPool
.getNoOfFree()));
1257 subPtr
.p
->m_senderRef
= subRef
;
1258 subPtr
.p
->m_senderData
= subData
;
1259 subPtr
.p
->m_subscriptionId
= subId
;
1260 subPtr
.p
->m_subscriptionKey
= subKey
;
1261 subPtr
.p
->m_subscriptionType
= type
;
1262 subPtr
.p
->m_options
= reportSubscribe
| reportAll
;
1263 subPtr
.p
->m_tableId
= tableId
;
1264 subPtr
.p
->m_table_ptrI
= RNIL
;
1265 subPtr
.p
->m_state
= state
;
1266 subPtr
.p
->n_subscribers
= 0;
1267 subPtr
.p
->m_current_sync_ptrI
= RNIL
;
1269 fprintf(stderr
, "table %d options %x\n", subPtr
.p
->m_tableId
, subPtr
.p
->m_options
);
1270 DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1271 key
.m_subscriptionId
, key
.m_subscriptionKey
));
1273 c_subscriptions
.add(subPtr
);
1276 SubCreateConf
* const conf
= (SubCreateConf
*)signal
->getDataPtrSend();
1277 conf
->senderRef
= reference();
1278 conf
->senderData
= subPtr
.p
->m_senderData
;
1279 sendSignal(subRef
, GSN_SUB_CREATE_CONF
, signal
, SubCreateConf::SignalLength
, JBB
);
1284 Suma::sendSubCreateRef(Signal
* signal
, Uint32 errCode
)
1287 SubCreateRef
* ref
= (SubCreateRef
*)signal
->getDataPtrSend();
1288 ref
->errorCode
= errCode
;
1289 sendSignal(signal
->getSendersBlockRef(), GSN_SUB_CREATE_REF
, signal
,
1290 SubCreateRef::SignalLength
, JBB
);
1294 /**********************************************************
1296 * Setting upp trigger for subscription
1301 Suma::execSUB_SYNC_REQ(Signal
* signal
)
1304 DBUG_ENTER("Suma::execSUB_SYNC_REQ");
1305 ndbassert(signal
->getNoOfSections() <= 1);
1306 CRASH_INSERTION(13004);
1308 SubSyncReq
* const req
= (SubSyncReq
*)signal
->getDataPtr();
1310 SubscriptionPtr subPtr
;
1312 key
.m_subscriptionId
= req
->subscriptionId
;
1313 key
.m_subscriptionKey
= req
->subscriptionKey
;
1315 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1316 key
.m_subscriptionId
, key
.m_subscriptionKey
));
1318 if(!c_subscriptions
.find(subPtr
, key
))
1321 DBUG_PRINT("info",("Not found"));
1322 sendSubSyncRef(signal
, 1407);
1327 SubscriptionData::Part part
= (SubscriptionData::Part
)req
->part
;
1329 Ptr
<SyncRecord
> syncPtr
;
1330 if(!c_syncPool
.seize(syncPtr
))
1333 sendSubSyncRef(signal
, 1416);
1336 DBUG_PRINT("info",("c_syncPool size: %d free: %d",
1337 c_syncPool
.getSize(),
1338 c_syncPool
.getNoOfFree()));
1340 syncPtr
.p
->m_senderRef
= req
->senderRef
;
1341 syncPtr
.p
->m_senderData
= req
->senderData
;
1342 syncPtr
.p
->m_subscriptionPtrI
= subPtr
.i
;
1343 syncPtr
.p
->ptrI
= syncPtr
.i
;
1344 syncPtr
.p
->m_error
= 0;
1346 subPtr
.p
->m_current_sync_ptrI
= syncPtr
.i
;
1350 syncPtr
.p
->m_tableList
.append(&subPtr
.p
->m_tableId
, 1);
1351 if(signal
->getNoOfSections() > 0){
1352 SegmentedSectionPtr
ptr(0,0,0);
1353 signal
->getSection(ptr
, SubSyncReq::ATTRIBUTE_LIST
);
1354 LocalDataBuffer
<15> attrBuf(c_dataBufferPool
,syncPtr
.p
->m_attributeList
);
1355 append(attrBuf
, ptr
, getSectionSegmentPool());
1356 releaseSections(signal
);
1361 initTable(signal
,subPtr
.p
->m_tableId
,tabPtr
,syncPtr
);
1362 tabPtr
.p
->n_subscribers
++;
1363 if (subPtr
.p
->m_options
& Subscription::REPORT_ALL
)
1364 tabPtr
.p
->m_reportAll
= true;
1365 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
1366 tabPtr
.p
->m_tableId
, tabPtr
.p
->n_subscribers
));
1370 case SubscriptionData::MetaData
:
1375 if (subPtr
.p
->m_subscriptionType
== SubCreateReq::DatabaseSnapshot
) {
1376 TableList::DataBufferIterator it
;
1377 syncPtr
.p
->m_tableList
.first(it
);
1380 * Get all tables from dict
1382 ListTablesReq
* req
= (ListTablesReq
*)signal
->getDataPtrSend();
1383 req
->senderRef
= reference();
1384 req
->senderData
= syncPtr
.i
;
1385 req
->requestData
= 0;
1387 * @todo: accomodate scan of index tables?
1389 req
->setTableType(DictTabInfo::UserTable
);
1391 sendSignal(DBDICT_REF
, GSN_LIST_TABLES_REQ
, signal
,
1392 ListTablesReq::SignalLength
, JBB
);
1397 syncPtr
.p
->startMeta(signal
);
1400 case SubscriptionData::TableData
: {
1403 syncPtr
.p
->startScan(signal
);
1412 Suma::sendSubSyncRef(Signal
* signal
, Uint32 errCode
){
1414 SubSyncRef
* ref
= (SubSyncRef
*)signal
->getDataPtrSend();
1415 ref
->errorCode
= errCode
;
1416 releaseSections(signal
);
1417 sendSignal(signal
->getSendersBlockRef(),
1420 SubSyncRef::SignalLength
,
1425 /**********************************************************
1431 Suma::execLIST_TABLES_CONF(Signal
* signal
){
1433 CRASH_INSERTION(13005);
1434 ListTablesConf
* const conf
= (ListTablesConf
*)signal
->getDataPtr();
1435 SyncRecord
* tmp
= c_syncPool
.getPtr(conf
->senderData
);
1436 tmp
->runLIST_TABLES_CONF(signal
);
1441 /*************************************************************************
1447 Suma::Table::runLIST_TABLES_CONF(Signal
* signal
){
1450 ListTablesConf
* const conf
= (ListTablesConf
*)signal
->getDataPtr();
1451 const Uint32 len
= signal
->length() - ListTablesConf::HeaderLength
;
1453 SubscriptionPtr subPtr
;
1454 suma
.c_subscriptions
.getPtr(subPtr
, m_subscriptionPtrI
);
1456 for (unsigned i
= 0; i
< len
; i
++) {
1457 subPtr
.p
->m_maxTables
++;
1458 suma
.addTableId(ListTablesConf::getTableId(conf
->tableData
[i
]), subPtr
, this);
1461 // for (unsigned i = 0; i < len; i++)
1462 // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
1463 // m_tableList.append(&conf->tableData[0], len);
1466 TableList::DataBufferIterator it
;
1468 for(m_tableList
.first(it
);!it
.isNull();m_tableList
.next(it
)) {
1469 ndbout_c("%u listtableconf tableid %d", i
++, *it
.data
);
1473 if(len
== ListTablesConf::DataLength
){
1475 // we expect more LIST_TABLE_CONF
1480 subPtr
.p
->m_currentTable
= 0;
1481 subPtr
.p
->m_maxTables
= 0;
1483 TableList::DataBufferIterator it
;
1484 for(m_tableList
.first(it
); !it
.isNull(); m_tableList
.next(it
)) {
1485 subPtr
.p
->m_maxTables
++;
1486 suma
.addTableId(*it
.data
, subPtr
, NULL
);
1487 #ifdef NODEFAIL_DEBUG
1488 ndbout_c(" listtableconf tableid %d",*it
.data
);
1499 Suma::initTable(Signal
*signal
, Uint32 tableId
, TablePtr
&tabPtr
,
1500 SubscriberPtr subbPtr
)
1502 DBUG_ENTER("Suma::initTable SubscriberPtr");
1503 DBUG_PRINT("enter",("tableId: %d", tableId
));
1505 int r
= initTable(signal
,tableId
,tabPtr
);
1508 LocalDLList
<Subscriber
> subscribers(c_subscriberPool
,
1509 tabPtr
.p
->c_subscribers
);
1510 subscribers
.add(subbPtr
);
1513 DBUG_PRINT("info",("added subscriber: %i", subbPtr
.i
));
1518 // we have to wait getting tab info
1522 if (tabPtr
.p
->setupTrigger(signal
, *this))
1525 // we have to wait for triggers to be setup
1529 int ret
= completeOneSubscriber(signal
, tabPtr
, subbPtr
);
1533 LocalDLList
<Subscriber
> subscribers(c_subscriberPool
,
1534 tabPtr
.p
->c_subscribers
);
1535 subscribers
.release(subbPtr
);
1537 completeInitTable(signal
, tabPtr
);
1542 Suma::initTable(Signal
*signal
, Uint32 tableId
, TablePtr
&tabPtr
,
1543 Ptr
<SyncRecord
> syncPtr
)
1546 DBUG_ENTER("Suma::initTable Ptr<SyncRecord>");
1547 DBUG_PRINT("enter",("tableId: %d", tableId
));
1549 int r
= initTable(signal
,tableId
,tabPtr
);
1552 LocalDLList
<SyncRecord
> syncRecords(c_syncPool
,tabPtr
.p
->c_syncRecords
);
1553 syncRecords
.add(syncPtr
);
1558 // we have to wait getting tab info
1561 completeInitTable(signal
, tabPtr
);
1566 Suma::initTable(Signal
*signal
, Uint32 tableId
, TablePtr
&tabPtr
)
1569 DBUG_ENTER("Suma::initTable");
1571 if (!c_tables
.find(tabPtr
, tableId
) ||
1572 tabPtr
.p
->m_state
== Table::DROPPED
||
1573 tabPtr
.p
->m_state
== Table::ALTERED
)
1575 // table not being prepared
1576 // seize a new table, initialize and add to c_tables
1577 ndbrequire(c_tablePool
.seize(tabPtr
));
1578 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
1579 c_tablePool
.getSize(),
1580 c_tablePool
.getNoOfFree()));
1581 new (tabPtr
.p
) Table
;
1583 tabPtr
.p
->m_tableId
= tableId
;
1584 tabPtr
.p
->m_ptrI
= tabPtr
.i
;
1585 tabPtr
.p
->n_subscribers
= 0;
1586 DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u",
1587 tabPtr
.p
->m_tableId
, tabPtr
.i
, tabPtr
.p
->n_subscribers
));
1589 tabPtr
.p
->m_reportAll
= false;
1591 tabPtr
.p
->m_error
= 0;
1592 tabPtr
.p
->m_schemaVersion
= RNIL
;
1593 tabPtr
.p
->m_state
= Table::DEFINING
;
1594 tabPtr
.p
->m_drop_subbPtr
.p
= 0;
1595 for (int j
= 0; j
< 3; j
++)
1597 tabPtr
.p
->m_hasTriggerDefined
[j
] = 0;
1598 tabPtr
.p
->m_hasOutstandingTriggerReq
[j
] = 0;
1599 tabPtr
.p
->m_triggerIds
[j
] = ILLEGAL_TRIGGER_ID
;
1602 c_tables
.add(tabPtr
);
1604 GetTabInfoReq
* req
= (GetTabInfoReq
*)signal
->getDataPtrSend();
1605 req
->senderRef
= reference();
1606 req
->senderData
= tabPtr
.i
;
1608 GetTabInfoReq::RequestById
| GetTabInfoReq::LongSignalConf
;
1609 req
->tableId
= tableId
;
1611 DBUG_PRINT("info",("GET_TABINFOREQ id %d", req
->tableId
));
1613 if (ERROR_INSERTED(13031))
1616 CLEAR_ERROR_INSERT_VALUE
;
1617 GetTabInfoRef
* ref
= (GetTabInfoRef
*)signal
->getDataPtrSend();
1618 ref
->tableId
= tableId
;
1619 ref
->senderData
= tabPtr
.i
;
1620 ref
->errorCode
= GetTabInfoRef::TableNotDefined
;
1621 sendSignal(reference(), GSN_GET_TABINFOREF
, signal
,
1622 GetTabInfoRef::SignalLength
, JBB
);
1626 sendSignal(DBDICT_REF
, GSN_GET_TABINFOREQ
, signal
,
1627 GetTabInfoReq::SignalLength
, JBB
);
1630 if (tabPtr
.p
->m_state
== Table::DEFINING
)
1634 // ToDo should be a ref signal instead
1635 ndbrequire(tabPtr
.p
->m_state
== Table::DEFINED
);
1640 Suma::completeOneSubscriber(Signal
*signal
, TablePtr tabPtr
, SubscriberPtr subbPtr
)
1643 DBUG_ENTER("Suma::completeOneSubscriber");
1645 if (tabPtr
.p
->m_error
&&
1646 (c_startup
.m_restart_server_node_id
== 0 ||
1647 tabPtr
.p
->m_state
!= Table::DROPPED
))
1650 sendSubStartRef(signal
,subbPtr
,tabPtr
.p
->m_error
,
1651 SubscriptionData::TableData
);
1652 tabPtr
.p
->n_subscribers
--;
1658 SubscriptionPtr subPtr
;
1659 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
1660 subPtr
.p
->m_table_ptrI
= tabPtr
.i
;
1661 sendSubStartComplete(signal
,subbPtr
, m_last_complete_gci
+ 3,
1662 SubscriptionData::TableData
);
1668 Suma::completeAllSubscribers(Signal
*signal
, TablePtr tabPtr
)
1671 DBUG_ENTER("Suma::completeAllSubscribers");
1672 // handle all subscribers
1674 LocalDLList
<Subscriber
> subscribers(c_subscriberPool
,
1675 tabPtr
.p
->c_subscribers
);
1676 SubscriberPtr subbPtr
;
1677 for(subscribers
.first(subbPtr
); !subbPtr
.isNull();)
1680 Ptr
<Subscriber
> tmp
= subbPtr
;
1681 subscribers
.next(subbPtr
);
1682 int ret
= completeOneSubscriber(signal
, tabPtr
, tmp
);
1686 subscribers
.release(tmp
);
1694 Suma::completeInitTable(Signal
*signal
, TablePtr tabPtr
)
1697 DBUG_ENTER("Suma::completeInitTable");
1699 // handle all syncRecords
1700 while (!tabPtr
.p
->c_syncRecords
.isEmpty())
1702 Ptr
<SyncRecord
> syncPtr
;
1704 LocalDLList
<SyncRecord
> syncRecords(c_syncPool
,
1705 tabPtr
.p
->c_syncRecords
);
1706 syncRecords
.first(syncPtr
);
1707 syncRecords
.remove(syncPtr
);
1709 syncPtr
.p
->ptrI
= syncPtr
.i
;
1710 if (tabPtr
.p
->m_error
== 0)
1713 syncPtr
.p
->startScan(signal
);
1718 syncPtr
.p
->completeScan(signal
, tabPtr
.p
->m_error
);
1719 tabPtr
.p
->n_subscribers
--;
1723 if (tabPtr
.p
->m_error
)
1725 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
1726 tabPtr
.p
->m_tableId
, tabPtr
.p
->n_subscribers
));
1727 tabPtr
.p
->checkRelease(*this);
1731 tabPtr
.p
->m_state
= Table::DEFINED
;
1739 Suma::execGET_TABINFOREF(Signal
* signal
){
1741 GetTabInfoRef
* ref
= (GetTabInfoRef
*)signal
->getDataPtr();
1742 Uint32 tableId
= ref
->tableId
;
1743 Uint32 senderData
= ref
->senderData
;
1744 GetTabInfoRef::ErrorCode errorCode
=
1745 (GetTabInfoRef::ErrorCode
) ref
->errorCode
;
1746 int do_resend_request
= 0;
1748 c_tablePool
.getPtr(tabPtr
, senderData
);
1751 case GetTabInfoRef::TableNotDefined
:
1754 case GetTabInfoRef::InvalidTableId
:
1757 case GetTabInfoRef::Busy
:
1758 do_resend_request
= 1;
1760 case GetTabInfoRef::TableNameTooLong
:
1763 case GetTabInfoRef::NoFetchByName
:
1766 if (do_resend_request
)
1768 GetTabInfoReq
* req
= (GetTabInfoReq
*)signal
->getDataPtrSend();
1769 req
->senderRef
= reference();
1770 req
->senderData
= senderData
;
1772 GetTabInfoReq::RequestById
| GetTabInfoReq::LongSignalConf
;
1773 req
->tableId
= tableId
;
1774 sendSignalWithDelay(DBDICT_REF
, GSN_GET_TABINFOREQ
, signal
,
1775 30, GetTabInfoReq::SignalLength
);
1778 tabPtr
.p
->m_state
= Table::DROPPED
;
1779 tabPtr
.p
->m_error
= errorCode
;
1780 completeAllSubscribers(signal
, tabPtr
);
1781 completeInitTable(signal
, tabPtr
);
1785 Suma::execGET_TABINFO_CONF(Signal
* signal
){
1788 CRASH_INSERTION(13006);
1790 if(!assembleFragments(signal
)){
1794 GetTabInfoConf
* conf
= (GetTabInfoConf
*)signal
->getDataPtr();
1795 Uint32 tableId
= conf
->tableId
;
1797 c_tablePool
.getPtr(tabPtr
, conf
->senderData
);
1798 SegmentedSectionPtr
ptr(0,0,0);
1799 signal
->getSection(ptr
, GetTabInfoConf::DICT_TAB_INFO
);
1800 ndbrequire(tabPtr
.p
->parseTable(ptr
, *this));
1801 releaseSections(signal
);
1803 * We need to gather fragment info
1806 DihFragCountReq
* req
= (DihFragCountReq
*)signal
->getDataPtrSend();
1807 req
->m_connectionData
= RNIL
;
1808 req
->m_tableRef
= tableId
;
1809 req
->m_senderData
= tabPtr
.i
;
1810 sendSignal(DBDIH_REF
, GSN_DI_FCOUNTREQ
, signal
,
1811 DihFragCountReq::SignalLength
, JBB
);
1815 Suma::Table::parseTable(SegmentedSectionPtr ptr
,
1818 DBUG_ENTER("Suma::Table::parseTable");
1820 SimplePropertiesSectionReader
it(ptr
, suma
.getSectionSegmentPool());
1822 SimpleProperties::UnpackStatus s
;
1823 DictTabInfo::Table tableDesc
; tableDesc
.init();
1824 s
= SimpleProperties::unpack(it
, &tableDesc
,
1825 DictTabInfo::TableMapping
,
1826 DictTabInfo::TableMappingSize
,
1830 suma
.suma_ndbrequire(s
== SimpleProperties::Break
);
1834 if(m_schemaVersion
!= tableDesc
.TableVersion
){
1839 // oops wrong schema version in stored tabledesc
1840 // we need to find all subscriptions with old table desc
1841 // and all subscribers to this
1843 c_tables
.release(tabPtr
);
1844 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
1845 suma
.c_tablePool
.getSize(),
1846 suma
.c_tablePool
.getNoOfFree()));
1848 DLHashTable
<Suma::Subscription
>::Iterator i_subPtr
;
1849 c_subscriptions
.first(i_subPtr
);
1850 SubscriptionPtr subPtr
;
1851 for(;!i_subPtr
.isNull();c_subscriptions
.next(i_subPtr
)){
1853 c_subscriptions
.getPtr(subPtr
, i_subPtr
.curr
.i
);
1854 SyncRecord
* tmp
= c_syncPool
.getPtr(subPtr
.p
->m_syncPtrI
);
1855 if (tmp
== syncPtr_p
) {
1859 if (subPtr
.p
->m_tables
.get(tableId
)) {
1861 subPtr
.p
->m_tables
.clear(tableId
); // remove this old table reference
1862 TableList::DataBufferIterator it
;
1863 for(tmp
->m_tableList
.first(it
);!it
.isNull();tmp
->m_tableList
.next(it
)) {
1865 if (*it
.data
== tableId
){
1867 Uint32
*pdata
= it
.data
;
1868 tmp
->m_tableList
.next(it
);
1869 for(;!it
.isNull();tmp
->m_tableList
.next(it
)) {
1874 *pdata
= RNIL
; // todo remove this last item...
1883 if(m_attributes
.getSize() != 0){
1889 * Initialize table object
1891 Uint32 noAttribs
= tableDesc
.NoOfAttributes
;
1892 Uint32 notFixed
= (tableDesc
.NoOfNullable
+tableDesc
.NoOfVariable
);
1893 m_schemaVersion
= tableDesc
.TableVersion
;
1895 // The attribute buffer
1896 LocalDataBuffer
<15> attrBuf(suma
.c_dataBufferPool
, m_attributes
);
1899 DataBuffer
<15> theRest(suma
.c_dataBufferPool
);
1901 if(!attrBuf
.seize(noAttribs
)){
1903 suma
.suma_ndbrequire(false);
1907 if(!theRest
.seize(notFixed
)){
1909 suma
.suma_ndbrequire(false);
1913 DataBuffer
<15>::DataBufferIterator attrIt
; // Fixed not nullable
1914 DataBuffer
<15>::DataBufferIterator restIt
; // variable + nullable
1915 attrBuf
.first(attrIt
);
1916 theRest
.first(restIt
);
1918 for(Uint32 i
= 0; i
< noAttribs
; i
++) {
1919 DictTabInfo::Attribute attrDesc
; attrDesc
.init();
1920 s
= SimpleProperties::unpack(it
, &attrDesc
,
1921 DictTabInfo::AttributeMapping
,
1922 DictTabInfo::AttributeMappingSize
,
1925 suma
.suma_ndbrequire(s
== SimpleProperties::Break
);
1927 if (!attrDesc
.AttributeNullableFlag
1928 /* && !attrDesc.AttributeVariableFlag */) {
1930 * attrIt
.data
= attrDesc
.AttributeId
;
1931 attrBuf
.next(attrIt
);
1934 * restIt
.data
= attrDesc
.AttributeId
;
1935 theRest
.next(restIt
);
1938 // Move to next attribute
1943 * Put the rest in end of attrBuf
1945 theRest
.first(restIt
);
1946 for(; !restIt
.isNull(); theRest
.next(restIt
)){
1947 * attrIt
.data
= * restIt
.data
;
1948 attrBuf
.next(attrIt
);
1957 Suma::execDI_FCOUNTREF(Signal
* signal
)
1960 DBUG_ENTER("Suma::execDI_FCOUNTREF");
1961 DihFragCountRef
* const ref
= (DihFragCountRef
*)signal
->getDataPtr();
1962 switch ((DihFragCountRef::ErrorCode
) ref
->m_error
)
1964 case DihFragCountRef::ErroneousTableState
:
1966 if (ref
->m_tableStatus
== Dbdih::TabRecord::TS_CREATING
)
1968 const Uint32 tableId
= ref
->m_senderData
;
1969 const Uint32 tabPtr_i
= ref
->m_tableRef
;
1970 DihFragCountReq
* const req
= (DihFragCountReq
*)signal
->getDataPtrSend();
1972 req
->m_connectionData
= RNIL
;
1973 req
->m_tableRef
= tabPtr_i
;
1974 req
->m_senderData
= tableId
;
1975 sendSignalWithDelay(DBDIH_REF
, GSN_DI_FCOUNTREQ
, signal
,
1976 DihFragCountReq::SignalLength
,
1977 DihFragCountReq::RetryInterval
);
1989 Suma::execDI_FCOUNTCONF(Signal
* signal
)
1992 DBUG_ENTER("Suma::execDI_FCOUNTCONF");
1993 ndbassert(signal
->getNoOfSections() == 0);
1994 DihFragCountConf
* const conf
= (DihFragCountConf
*)signal
->getDataPtr();
1995 const Uint32 userPtr
= conf
->m_connectionData
;
1996 const Uint32 fragCount
= conf
->m_fragmentCount
;
1997 const Uint32 tableId
= conf
->m_tableRef
;
1999 ndbrequire(userPtr
== RNIL
&& signal
->length() == 5);
2002 tabPtr
.i
= conf
->m_senderData
;
2003 ndbrequire((tabPtr
.p
= c_tablePool
.getPtr(tabPtr
.i
)) != 0);
2004 ndbrequire(tabPtr
.p
->m_tableId
== tableId
);
2006 LocalDataBuffer
<15> fragBuf(c_dataBufferPool
, tabPtr
.p
->m_fragments
);
2007 ndbrequire(fragBuf
.getSize() == 0);
2009 tabPtr
.p
->m_fragCount
= fragCount
;
2011 signal
->theData
[0] = RNIL
;
2012 signal
->theData
[1] = tabPtr
.i
;
2013 signal
->theData
[2] = tableId
;
2014 signal
->theData
[3] = 0; // Frag no
2015 sendSignal(DBDIH_REF
, GSN_DIGETPRIMREQ
, signal
, 4, JBB
);
2021 Suma::execDIGETPRIMCONF(Signal
* signal
){
2023 DBUG_ENTER("Suma::execDIGETPRIMCONF");
2024 ndbassert(signal
->getNoOfSections() == 0);
2026 const Uint32 userPtr
= signal
->theData
[0];
2027 const Uint32 nodeCount
= signal
->theData
[6];
2028 const Uint32 tableId
= signal
->theData
[7];
2029 const Uint32 fragNo
= signal
->theData
[8];
2031 ndbrequire(userPtr
== RNIL
&& signal
->length() == 9);
2032 ndbrequire(nodeCount
> 0 && nodeCount
<= MAX_REPLICAS
);
2035 tabPtr
.i
= signal
->theData
[1];
2036 ndbrequire((tabPtr
.p
= c_tablePool
.getPtr(tabPtr
.i
)) != 0);
2037 ndbrequire(tabPtr
.p
->m_tableId
== tableId
);
2040 LocalDataBuffer
<15> fragBuf(c_dataBufferPool
,tabPtr
.p
->m_fragments
);
2043 * Add primary node for fragment to list
2045 FragmentDescriptor fd
;
2046 fd
.m_fragDesc
.m_nodeId
= signal
->theData
[2];
2047 fd
.m_fragDesc
.m_fragmentNo
= fragNo
;
2048 signal
->theData
[2] = fd
.m_dummy
;
2049 fragBuf
.append(&signal
->theData
[2], 1);
2052 const Uint32 nextFrag
= fragNo
+ 1;
2053 if(nextFrag
== tabPtr
.p
->m_fragCount
)
2056 * Complete frag info for table
2057 * table is not up to date
2060 if (tabPtr
.p
->c_subscribers
.isEmpty())
2062 completeInitTable(signal
,tabPtr
);
2065 tabPtr
.p
->setupTrigger(signal
, *this);
2068 signal
->theData
[0] = RNIL
;
2069 signal
->theData
[1] = tabPtr
.i
;
2070 signal
->theData
[2] = tableId
;
2071 signal
->theData
[3] = nextFrag
; // Frag no
2072 sendSignal(DBDIH_REF
, GSN_DIGETPRIMREQ
, signal
, 4, JBB
);
2079 Suma::SyncRecord::completeTableInit(Signal
* signal
)
2082 SubscriptionPtr subPtr
;
2083 suma
.c_subscriptions
.getPtr(subPtr
, m_subscriptionPtrI
);
2086 ndbout_c("GSN_SUB_SYNC_CONF (meta)");
2089 suma
.releaseSections(signal
);
2092 SubSyncRef
* const ref
= (SubSyncRef
*)signal
->getDataPtrSend();
2093 ref
->senderRef
= suma
.reference();
2094 ref
->senderData
= subPtr
.p
->m_senderData
;
2095 ref
->errorCode
= SubSyncRef::Undefined
;
2096 suma
.sendSignal(subPtr
.p
->m_senderRef
, GSN_SUB_SYNC_REF
, signal
,
2097 SubSyncRef::SignalLength
, JBB
);
2099 SubSyncConf
* const conf
= (SubSyncConf
*)signal
->getDataPtrSend();
2100 conf
->senderRef
= suma
.reference();
2101 conf
->senderData
= subPtr
.p
->m_senderData
;
2102 suma
.sendSignal(subPtr
.p
->m_senderRef
, GSN_SUB_SYNC_CONF
, signal
,
2103 SubSyncConf::SignalLength
, JBB
);
2109 /**********************************************************
2116 Suma::SyncRecord::startScan(Signal
* signal
)
2119 DBUG_ENTER("Suma::SyncRecord::startScan");
2125 m_currentFragment
= 0;
2131 Suma::SyncRecord::getNextFragment(TablePtr
* tab
,
2132 FragmentDescriptor
* fd
)
2135 SubscriptionPtr subPtr
;
2136 suma
.c_subscriptions
.getPtr(subPtr
, m_subscriptionPtrI
);
2137 TableList::DataBufferIterator tabIt
;
2138 DataBuffer
<15>::DataBufferIterator fragIt
;
2140 m_tableList
.position(tabIt
, m_currentTable
);
2141 for(; !tabIt
.curr
.isNull(); m_tableList
.next(tabIt
), m_currentTable
++)
2144 ndbrequire(suma
.c_tables
.find(tabPtr
, * tabIt
.data
));
2145 LocalDataBuffer
<15> fragBuf(suma
.c_dataBufferPool
, tabPtr
.p
->m_fragments
);
2147 fragBuf
.position(fragIt
, m_currentFragment
);
2148 for(; !fragIt
.curr
.isNull(); fragBuf
.next(fragIt
), m_currentFragment
++)
2150 FragmentDescriptor tmp
;
2151 tmp
.m_dummy
= * fragIt
.data
;
2152 if(tmp
.m_fragDesc
.m_nodeId
== suma
.getOwnNodeId()){
2158 m_currentFragment
= 0;
2160 tabPtr
.p
->n_subscribers
--;
2161 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2162 tabPtr
.p
->m_tableId
, tabPtr
.p
->n_subscribers
));
2163 tabPtr
.p
->checkRelease(suma
);
2169 Suma::SyncRecord::nextScan(Signal
* signal
)
2172 DBUG_ENTER("Suma::SyncRecord::nextScan");
2174 FragmentDescriptor fd
;
2175 SubscriptionPtr subPtr
;
2176 if(!getNextFragment(&tabPtr
, &fd
)){
2178 completeScan(signal
);
2181 suma
.c_subscriptions
.getPtr(subPtr
, m_subscriptionPtrI
);
2183 DataBuffer
<15>::Head head
= m_attributeList
;
2184 if(head
.getSize() == 0){
2185 head
= tabPtr
.p
->m_attributes
;
2187 LocalDataBuffer
<15> attrBuf(suma
.c_dataBufferPool
, head
);
2189 ScanFragReq
* req
= (ScanFragReq
*)signal
->getDataPtrSend();
2190 const Uint32 parallelism
= 16;
2191 const Uint32 attrLen
= 5 + attrBuf
.getSize();
2193 req
->senderData
= ptrI
;
2194 req
->resultRef
= suma
.reference();
2195 req
->tableId
= tabPtr
.p
->m_tableId
;
2196 req
->requestInfo
= 0;
2197 req
->savePointId
= 0;
2198 ScanFragReq::setLockMode(req
->requestInfo
, 0);
2199 ScanFragReq::setHoldLockFlag(req
->requestInfo
, 1);
2200 ScanFragReq::setKeyinfoFlag(req
->requestInfo
, 0);
2201 ScanFragReq::setAttrLen(req
->requestInfo
, attrLen
);
2202 req
->fragmentNoKeyLen
= fd
.m_fragDesc
.m_fragmentNo
;
2203 req
->schemaVersion
= tabPtr
.p
->m_schemaVersion
;
2205 req
->transId2
= (SUMA
<< 20) + (suma
.getOwnNodeId() << 8);
2206 req
->clientOpPtr
= (ptrI
<< 16);
2207 req
->batch_size_rows
= parallelism
;
2208 req
->batch_size_bytes
= 0;
2209 suma
.sendSignal(DBLQH_REF
, GSN_SCAN_FRAGREQ
, signal
,
2210 ScanFragReq::SignalLength
, JBB
);
2212 signal
->theData
[0] = ptrI
;
2213 signal
->theData
[1] = 0;
2214 signal
->theData
[2] = (SUMA
<< 20) + (suma
.getOwnNodeId() << 8);
2217 signal
->theData
[3] = attrBuf
.getSize();
2218 signal
->theData
[4] = 0;
2219 signal
->theData
[5] = 0;
2220 signal
->theData
[6] = 0;
2221 signal
->theData
[7] = 0;
2224 DataBuffer
<15>::DataBufferIterator it
;
2225 for(attrBuf
.first(it
); !it
.curr
.isNull(); attrBuf
.next(it
)){
2226 AttributeHeader::init(&signal
->theData
[dataPos
++], * it
.data
, 0);
2228 suma
.sendSignal(DBLQH_REF
, GSN_ATTRINFO
, signal
, 25, JBB
);
2233 suma
.sendSignal(DBLQH_REF
, GSN_ATTRINFO
, signal
, dataPos
, JBB
);
2236 m_currentTableId
= tabPtr
.p
->m_tableId
;
2237 m_currentNoOfAttributes
= attrBuf
.getSize();
2244 Suma::execSCAN_FRAGREF(Signal
* signal
){
2247 // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
2252 Suma::execSCAN_FRAGCONF(Signal
* signal
){
2254 DBUG_ENTER("Suma::execSCAN_FRAGCONF");
2255 ndbassert(signal
->getNoOfSections() == 0);
2256 CRASH_INSERTION(13011);
2258 ScanFragConf
* const conf
= (ScanFragConf
*)signal
->getDataPtr();
2260 const Uint32 completed
= conf
->fragmentCompleted
;
2261 const Uint32 senderData
= conf
->senderData
;
2262 const Uint32 completedOps
= conf
->completedOps
;
2264 Ptr
<SyncRecord
> syncPtr
;
2265 c_syncPool
.getPtr(syncPtr
, senderData
);
2271 SubSyncContinueConf
* const conf
=
2272 (SubSyncContinueConf
*)signal
->getDataPtrSend();
2273 conf
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
2274 conf
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
2275 execSUB_SYNC_CONTINUE_CONF(signal
);
2277 SubSyncContinueReq
* const req
= (SubSyncContinueReq
*)signal
->getDataPtrSend();
2278 req
->subscriberData
= syncPtr
.p
->m_senderData
;
2279 req
->noOfRowsSent
= completedOps
;
2280 sendSignal(syncPtr
.p
->m_senderRef
, GSN_SUB_SYNC_CONTINUE_REQ
, signal
,
2281 SubSyncContinueReq::SignalLength
, JBB
);
2286 ndbrequire(completedOps
== 0);
2288 syncPtr
.p
->m_currentFragment
++;
2289 syncPtr
.p
->nextScan(signal
);
2294 Suma::execSUB_SYNC_CONTINUE_CONF(Signal
* signal
){
2296 ndbassert(signal
->getNoOfSections() == 0);
2298 CRASH_INSERTION(13012);
2300 SubSyncContinueConf
* const conf
=
2301 (SubSyncContinueConf
*)signal
->getDataPtr();
2303 SubscriptionPtr subPtr
;
2305 key
.m_subscriptionId
= conf
->subscriptionId
;
2306 key
.m_subscriptionKey
= conf
->subscriptionKey
;
2308 ndbrequire(c_subscriptions
.find(subPtr
, key
));
2310 ScanFragNextReq
* req
= (ScanFragNextReq
*)signal
->getDataPtrSend();
2311 req
->senderData
= subPtr
.p
->m_current_sync_ptrI
;
2314 req
->transId2
= (SUMA
<< 20) + (getOwnNodeId() << 8);
2315 req
->batch_size_rows
= 16;
2316 req
->batch_size_bytes
= 0;
2317 sendSignal(DBLQH_REF
, GSN_SCAN_NEXTREQ
, signal
,
2318 ScanFragNextReq::SignalLength
, JBB
);
2322 Suma::SyncRecord::completeScan(Signal
* signal
, int error
)
2325 DBUG_ENTER("Suma::SyncRecord::completeScan");
2326 // m_tableList.release();
2329 ndbout_c("GSN_SUB_SYNC_CONF (data)");
2333 SubSyncConf
* const conf
= (SubSyncConf
*)signal
->getDataPtrSend();
2334 conf
->senderRef
= suma
.reference();
2335 conf
->senderData
= m_senderData
;
2336 suma
.sendSignal(m_senderRef
, GSN_SUB_SYNC_CONF
, signal
,
2337 SubSyncConf::SignalLength
, JBB
);
2341 SubSyncRef
* const ref
= (SubSyncRef
*)signal
->getDataPtrSend();
2342 ref
->senderRef
= suma
.reference();
2343 ref
->senderData
= m_senderData
;
2344 suma
.sendSignal(m_senderRef
, GSN_SUB_SYNC_REF
, signal
,
2345 SubSyncRef::SignalLength
, JBB
);
2351 Ptr
<Subscription
> subPtr
;
2352 suma
.c_subscriptions
.getPtr(subPtr
, m_subscriptionPtrI
);
2353 ndbrequire(subPtr
.p
->m_current_sync_ptrI
== ptrI
);
2354 subPtr
.p
->m_current_sync_ptrI
= RNIL
;
2356 suma
.c_syncPool
.release(ptrI
);
2357 DBUG_PRINT("info",("c_syncPool size: %d free: %d",
2358 suma
.c_syncPool
.getSize(),
2359 suma
.c_syncPool
.getNoOfFree()));
2364 Suma::execSCAN_HBREP(Signal
* signal
){
2367 ndbout
<< "execSCAN_HBREP" << endl
<< hex
;
2368 for(int i
= 0; i
<signal
->length(); i
++){
2369 ndbout
<< signal
->theData
[i
] << " ";
2370 if(((i
+ 1) % 8) == 0)
2371 ndbout
<< endl
<< hex
;
2377 /**********************************************************
2379 * Suma participant interface
2381 * Creation of subscriber
2386 Suma::execSUB_START_REQ(Signal
* signal
){
2388 ndbassert(signal
->getNoOfSections() == 0);
2389 DBUG_ENTER("Suma::execSUB_START_REQ");
2390 SubStartReq
* const req
= (SubStartReq
*)signal
->getDataPtr();
2392 CRASH_INSERTION(13013);
2393 Uint32 senderRef
= req
->senderRef
;
2394 Uint32 senderData
= req
->senderData
;
2395 Uint32 subscriberData
= req
->subscriberData
;
2396 Uint32 subscriberRef
= req
->subscriberRef
;
2397 SubscriptionData::Part part
= (SubscriptionData::Part
)req
->part
;
2400 key
.m_subscriptionId
= req
->subscriptionId
;
2401 key
.m_subscriptionKey
= req
->subscriptionKey
;
2403 if (c_startup
.m_restart_server_node_id
&&
2404 senderRef
!= calcSumaBlockRef(c_startup
.m_restart_server_node_id
))
2407 * only allow "restart_server" Suma's to come through
2408 * for restart purposes
2411 Uint32 err
= c_startup
.m_restart_server_node_id
!= RNIL
? 1405 :
2412 SubStartRef::NF_FakeErrorREF
;
2414 sendSubStartRef(signal
, err
);
2418 SubscriptionPtr subPtr
;
2419 if(!c_subscriptions
.find(subPtr
, key
)){
2421 sendSubStartRef(signal
, 1407);
2425 if (subPtr
.p
->m_state
== Subscription::LOCKED
) {
2427 DBUG_PRINT("info",("Locked"));
2428 sendSubStartRef(signal
, 1411);
2432 if (subPtr
.p
->m_state
== Subscription::DROPPED
&&
2433 c_startup
.m_restart_server_node_id
== 0) {
2435 DBUG_PRINT("info",("Dropped"));
2436 sendSubStartRef(signal
, 1418);
2440 ndbrequire(subPtr
.p
->m_state
== Subscription::DEFINED
||
2441 c_startup
.m_restart_server_node_id
);
2443 SubscriberPtr subbPtr
;
2444 if(!c_subscriberPool
.seize(subbPtr
)){
2446 sendSubStartRef(signal
, 1412);
2450 if (c_startup
.m_restart_server_node_id
== 0 &&
2451 !c_connected_nodes
.get(refToNode(subscriberRef
)))
2455 c_subscriberPool
.release(subbPtr
);
2456 sendSubStartRef(signal
, SubStartRef::PartiallyConnected
);
2460 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2461 c_subscriberPool
.getSize(),
2462 c_subscriberPool
.getNoOfFree()));
2464 c_subscriber_nodes
.set(refToNode(subscriberRef
));
2466 // setup subscription record
2467 if (subPtr
.p
->m_state
== Subscription::DEFINED
)
2468 subPtr
.p
->m_state
= Subscription::LOCKED
;
2469 // store these here for later use
2470 subPtr
.p
->m_senderRef
= senderRef
;
2471 subPtr
.p
->m_senderData
= senderData
;
2473 // setup subscriber record
2474 subbPtr
.p
->m_senderRef
= subscriberRef
;
2475 subbPtr
.p
->m_senderData
= subscriberData
;
2476 subbPtr
.p
->m_subPtrI
= subPtr
.i
;
2478 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
2479 "tableId: %u id: %u key: %u",
2480 subbPtr
.i
, subbPtr
.p
->m_senderRef
, subbPtr
.p
->m_senderData
,
2481 subPtr
.i
, subPtr
.p
->m_senderRef
, subPtr
.p
->m_senderData
,
2482 subPtr
.p
->m_tableId
,
2483 subPtr
.p
->m_subscriptionId
,subPtr
.p
->m_subscriptionKey
));
2487 case SubscriptionData::MetaData
:
2489 c_metaSubscribers
.add(subbPtr
);
2490 sendSubStartComplete(signal
, subbPtr
, 0, part
);
2492 case SubscriptionData::TableData
:
2494 initTable(signal
,subPtr
.p
->m_tableId
,tabPtr
,subbPtr
);
2495 tabPtr
.p
->n_subscribers
++;
2496 if (subPtr
.p
->m_options
& Subscription::REPORT_ALL
)
2497 tabPtr
.p
->m_reportAll
= true;
2498 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2499 tabPtr
.p
->m_tableId
, tabPtr
.p
->n_subscribers
));
2506 Suma::sendSubStartComplete(Signal
* signal
,
2507 SubscriberPtr subbPtr
,
2509 SubscriptionData::Part part
)
2512 DBUG_ENTER("Suma::sendSubStartComplete");
2514 SubscriptionPtr subPtr
;
2515 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
2516 ndbrequire(subPtr
.p
->m_state
== Subscription::LOCKED
||
2517 (subPtr
.p
->m_state
== Subscription::DROPPED
&&
2518 c_startup
.m_restart_server_node_id
));
2519 if (subPtr
.p
->m_state
== Subscription::LOCKED
)
2522 subPtr
.p
->m_state
= Subscription::DEFINED
;
2524 subPtr
.p
->n_subscribers
++;
2526 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
2527 "tableId: %u[i=%u] id: %u key: %u",
2528 subbPtr
.i
, subbPtr
.p
->m_senderRef
, subbPtr
.p
->m_senderData
,
2529 subPtr
.i
, subPtr
.p
->m_senderRef
, subPtr
.p
->m_senderData
,
2530 subPtr
.p
->m_tableId
, subPtr
.p
->m_table_ptrI
,
2531 subPtr
.p
->m_subscriptionId
,subPtr
.p
->m_subscriptionKey
));
2533 SubStartConf
* const conf
= (SubStartConf
*)signal
->getDataPtrSend();
2535 conf
->senderRef
= reference();
2536 conf
->senderData
= subPtr
.p
->m_senderData
;
2537 conf
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
2538 conf
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
2539 conf
->firstGCI
= firstGCI
;
2540 conf
->part
= (Uint32
) part
;
2542 DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr
.i
,
2543 subPtr
.p
->m_subscriptionId
,subPtr
.p
->m_subscriptionKey
));
2544 sendSignal(subPtr
.p
->m_senderRef
, GSN_SUB_START_CONF
, signal
,
2545 SubStartConf::SignalLength
, JBB
);
2547 reportAllSubscribers(signal
, NdbDictionary::Event::_TE_SUBSCRIBE
,
2554 Suma::sendSubStartRef(Signal
* signal
, Uint32 errCode
)
2557 SubStartRef
* ref
= (SubStartRef
*)signal
->getDataPtrSend();
2558 ref
->senderRef
= reference();
2559 ref
->errorCode
= errCode
;
2560 releaseSections(signal
);
2561 sendSignal(signal
->getSendersBlockRef(), GSN_SUB_START_REF
, signal
,
2562 SubStartRef::SignalLength
, JBB
);
2565 Suma::sendSubStartRef(Signal
* signal
,
2566 SubscriberPtr subbPtr
, Uint32 error
,
2567 SubscriptionData::Part part
)
2571 SubscriptionPtr subPtr
;
2572 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
2574 ndbrequire(subPtr
.p
->m_state
== Subscription::LOCKED
||
2575 (subPtr
.p
->m_state
== Subscription::DROPPED
&&
2576 c_startup
.m_restart_server_node_id
));
2577 if (subPtr
.p
->m_state
== Subscription::LOCKED
)
2580 subPtr
.p
->m_state
= Subscription::DEFINED
;
2583 SubStartRef
* ref
= (SubStartRef
*)signal
->getDataPtrSend();
2584 ref
->senderRef
= reference();
2585 ref
->senderData
= subPtr
.p
->m_senderData
;
2586 ref
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
2587 ref
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
2588 ref
->part
= (Uint32
) part
;
2589 ref
->errorCode
= error
;
2591 sendSignal(subPtr
.p
->m_senderRef
, GSN_SUB_START_REF
, signal
,
2592 SubStartRef::SignalLength
, JBB
);
2595 /**********************************************************
2596 * Suma participant interface
2598 * Stopping and removing of subscriber
2603 Suma::execSUB_STOP_REQ(Signal
* signal
){
2605 ndbassert(signal
->getNoOfSections() == 0);
2606 DBUG_ENTER("Suma::execSUB_STOP_REQ");
2608 CRASH_INSERTION(13019);
2610 SubStopReq
* const req
= (SubStopReq
*)signal
->getDataPtr();
2611 Uint32 senderRef
= req
->senderRef
;
2612 Uint32 senderData
= req
->senderData
;
2613 Uint32 subscriberRef
= req
->subscriberRef
;
2614 Uint32 subscriberData
= req
->subscriberData
;
2615 SubscriptionPtr subPtr
;
2617 key
.m_subscriptionId
= req
->subscriptionId
;
2618 key
.m_subscriptionKey
= req
->subscriptionKey
;
2619 Uint32 part
= req
->part
;
2621 if (key
.m_subscriptionKey
== 0 &&
2622 key
.m_subscriptionId
== 0 &&
2623 subscriberData
== 0)
2625 SubStopConf
* conf
= (SubStopConf
*)signal
->getDataPtrSend();
2627 conf
->senderRef
= reference();
2628 conf
->senderData
= senderData
;
2629 conf
->subscriptionId
= key
.m_subscriptionId
;
2630 conf
->subscriptionKey
= key
.m_subscriptionKey
;
2631 conf
->subscriberData
= subscriberData
;
2633 sendSignal(senderRef
, GSN_SUB_STOP_CONF
, signal
,
2634 SubStopConf::SignalLength
, JBB
);
2636 removeSubscribersOnNode(signal
, refToNode(senderRef
));
2640 if (c_startup
.m_restart_server_node_id
&&
2641 senderRef
!= calcSumaBlockRef(c_startup
.m_restart_server_node_id
))
2644 * only allow "restart_server" Suma's to come through
2645 * for restart purposes
2648 Uint32 err
= c_startup
.m_restart_server_node_id
!= RNIL
? 1405 :
2649 SubStopRef::NF_FakeErrorREF
;
2651 sendSubStopRef(signal
, err
);
2655 if(!c_subscriptions
.find(subPtr
, key
)){
2657 DBUG_PRINT("error", ("not found"));
2658 sendSubStopRef(signal
, 1407);
2662 if (subPtr
.p
->m_state
== Subscription::LOCKED
) {
2664 DBUG_PRINT("error", ("locked"));
2665 sendSubStopRef(signal
, 1411);
2669 ndbrequire(part
== SubscriptionData::TableData
);
2672 tabPtr
.i
= subPtr
.p
->m_table_ptrI
;
2673 if (tabPtr
.i
== RNIL
||
2674 !(tabPtr
.p
= c_tables
.getPtr(tabPtr
.i
)) ||
2675 tabPtr
.p
->m_tableId
!= subPtr
.p
->m_tableId
)
2678 DBUG_PRINT("error", ("no such table id %u[i=%u]",
2679 subPtr
.p
->m_tableId
, subPtr
.p
->m_table_ptrI
));
2680 sendSubStopRef(signal
, 1417);
2684 if (tabPtr
.p
->m_drop_subbPtr
.p
!= 0) {
2686 DBUG_PRINT("error", ("table locked"));
2687 sendSubStopRef(signal
, 1420);
2691 DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u",
2692 subPtr
.i
, subPtr
.p
->m_tableId
, tabPtr
.i
,
2693 subPtr
.p
->m_subscriptionId
,subPtr
.p
->m_subscriptionKey
));
2695 SubscriberPtr subbPtr
;
2696 if (senderRef
== reference()){
2698 c_subscriberPool
.getPtr(subbPtr
, senderData
);
2699 ndbrequire(subbPtr
.p
->m_subPtrI
== subPtr
.i
&&
2700 subbPtr
.p
->m_senderRef
== subscriberRef
&&
2701 subbPtr
.p
->m_senderData
== subscriberData
);
2702 c_removeDataSubscribers
.remove(subbPtr
);
2707 LocalDLList
<Subscriber
>
2708 subscribers(c_subscriberPool
,tabPtr
.p
->c_subscribers
);
2710 DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d",
2711 subPtr
.i
, subscriberRef
, subscriberData
));
2712 for (subscribers
.first(subbPtr
);!subbPtr
.isNull();subscribers
.next(subbPtr
))
2716 ("search: subscription: %u, ref: %u, data: %u, subscriber %u",
2717 subbPtr
.p
->m_subPtrI
, subbPtr
.p
->m_senderRef
,
2718 subbPtr
.p
->m_senderData
, subbPtr
.i
));
2719 if (subbPtr
.p
->m_subPtrI
== subPtr
.i
&&
2720 subbPtr
.p
->m_senderRef
== subscriberRef
&&
2721 subbPtr
.p
->m_senderData
== subscriberData
)
2724 DBUG_PRINT("info",("found"));
2729 * If we didn't find anyone, send ref
2731 if (subbPtr
.isNull()) {
2733 DBUG_PRINT("error", ("subscriber not found"));
2734 sendSubStopRef(signal
, 1407);
2737 subscribers
.remove(subbPtr
);
2740 subPtr
.p
->m_senderRef
= senderRef
; // store ref to requestor
2741 subPtr
.p
->m_senderData
= senderData
; // store ref to requestor
2743 tabPtr
.p
->m_drop_subbPtr
= subbPtr
;
2745 if (subPtr
.p
->m_state
== Subscription::DEFINED
)
2748 subPtr
.p
->m_state
= Subscription::LOCKED
;
2751 if (tabPtr
.p
->m_state
== Table::DROPPED
)
2752 // not ALTERED here since trigger must be removed
2755 tabPtr
.p
->n_subscribers
--;
2756 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2757 tabPtr
.p
->m_tableId
, tabPtr
.p
->n_subscribers
));
2758 tabPtr
.p
->checkRelease(*this);
2759 sendSubStopComplete(signal
, tabPtr
.p
->m_drop_subbPtr
);
2760 tabPtr
.p
->m_drop_subbPtr
.p
= 0;
2765 tabPtr
.p
->dropTrigger(signal
,*this);
2771 Suma::sendSubStopComplete(Signal
* signal
, SubscriberPtr subbPtr
)
2774 DBUG_ENTER("Suma::sendSubStopComplete");
2775 CRASH_INSERTION(13020);
2777 DBUG_PRINT("info",("removed subscriber: %i", subbPtr
.i
));
2779 SubscriptionPtr subPtr
;
2780 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
2782 Uint32 senderRef
= subPtr
.p
->m_senderRef
;
2783 Uint32 senderData
= subPtr
.p
->m_senderData
;
2785 subPtr
.p
->n_subscribers
--;
2786 ndbassert( subPtr
.p
->m_state
== Subscription::LOCKED
||
2787 subPtr
.p
->m_state
== Subscription::DROPPED
);
2788 if ( subPtr
.p
->m_state
== Subscription::LOCKED
)
2791 subPtr
.p
->m_state
= Subscription::DEFINED
;
2792 if (subPtr
.p
->n_subscribers
== 0)
2796 subPtr
.p
->m_table_ptrI
= RNIL
;
2799 tabPtr
.i
= subPtr
.p
->m_table_ptrI
;
2800 if ((tabPtr
.p
= c_tablePool
.getPtr(tabPtr
.i
)) &&
2801 (tabPtr
.p
->m_state
== Table::DROPPED
||
2802 tabPtr
.p
->m_state
== Table::ALTERED
) &&
2805 // last subscriber, and table is dropped
2806 // safe to drop subscription
2807 c_subscriptions
.release(subPtr
);
2808 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
2809 c_subscriptionPool
.getSize(),
2810 c_subscriptionPool
.getNoOfFree()));
2814 subPtr
.p
->m_table_ptrI
= RNIL
;
2816 ndbassert(tabPtr
.p
!= 0);
2820 else if ( subPtr
.p
->n_subscribers
== 0 )
2822 // subscription is marked to be removed
2823 // and there are no subscribers left
2825 ndbassert(subPtr
.p
->m_state
== Subscription::DROPPED
);
2826 completeSubRemove(subPtr
);
2829 // let subscriber know that subscrber is stopped
2831 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();
2832 data
->gci
= m_last_complete_gci
+ 1; // XXX ???
2834 data
->requestInfo
= 0;
2835 SubTableData::setOperation(data
->requestInfo
,
2836 NdbDictionary::Event::_TE_STOP
);
2837 SubTableData::setNdbdNodeId(data
->requestInfo
,
2839 data
->senderData
= subbPtr
.p
->m_senderData
;
2840 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
2841 SubTableData::SignalLength
, JBB
);
2844 SubStopConf
* const conf
= (SubStopConf
*)signal
->getDataPtrSend();
2846 conf
->senderRef
= reference();
2847 conf
->senderData
= senderData
;
2849 sendSignal(senderRef
, GSN_SUB_STOP_CONF
, signal
,
2850 SubStopConf::SignalLength
, JBB
);
2852 c_subscriberPool
.release(subbPtr
);
2853 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2854 c_subscriberPool
.getSize(),
2855 c_subscriberPool
.getNoOfFree()));
2857 reportAllSubscribers(signal
, NdbDictionary::Event::_TE_UNSUBSCRIBE
,
2863 // report new started subscriber to all other subscribers
2865 Suma::reportAllSubscribers(Signal
*signal
,
2866 NdbDictionary::Event::_TableEvent table_event
,
2867 SubscriptionPtr subPtr
,
2868 SubscriberPtr subbPtr
)
2870 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();
2872 if (table_event
== NdbDictionary::Event::_TE_SUBSCRIBE
&&
2873 !c_startup
.m_restart_server_node_id
)
2875 data
->gci
= m_last_complete_gci
+ 1;
2876 data
->tableId
= subPtr
.p
->m_tableId
;
2877 data
->requestInfo
= 0;
2878 SubTableData::setOperation(data
->requestInfo
,
2879 NdbDictionary::Event::_TE_ACTIVE
);
2880 SubTableData::setNdbdNodeId(data
->requestInfo
, getOwnNodeId());
2881 SubTableData::setReqNodeId(data
->requestInfo
,
2882 refToNode(subbPtr
.p
->m_senderRef
));
2883 data
->changeMask
= 0;
2885 data
->senderData
= subbPtr
.p
->m_senderData
;
2886 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
2887 SubTableData::SignalLength
, JBB
);
2890 if (!(subPtr
.p
->m_options
& Subscription::REPORT_SUBSCRIBE
))
2894 if (subPtr
.p
->n_subscribers
== 0)
2896 ndbrequire(table_event
!= NdbDictionary::Event::_TE_SUBSCRIBE
);
2901 ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
2902 subPtr
.i
, subPtr
.p
->n_subscribers
);
2904 data
->gci
= m_last_complete_gci
+ 1;
2905 data
->tableId
= subPtr
.p
->m_tableId
;
2906 data
->requestInfo
= 0;
2907 SubTableData::setOperation(data
->requestInfo
, table_event
);
2908 SubTableData::setNdbdNodeId(data
->requestInfo
, getOwnNodeId());
2909 data
->changeMask
= 0;
2913 c_tables
.getPtr(tabPtr
, subPtr
.p
->m_table_ptrI
);
2914 LocalDLList
<Subscriber
> subbs(c_subscriberPool
, tabPtr
.p
->c_subscribers
);
2915 SubscriberPtr i_subbPtr
;
2916 for(subbs
.first(i_subbPtr
); !i_subbPtr
.isNull(); subbs
.next(i_subbPtr
))
2918 if (i_subbPtr
.p
->m_subPtrI
== subPtr
.i
)
2920 SubTableData::setReqNodeId(data
->requestInfo
,
2921 refToNode(subbPtr
.p
->m_senderRef
));
2922 data
->senderData
= i_subbPtr
.p
->m_senderData
;
2923 sendSignal(i_subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
2924 SubTableData::SignalLength
, JBB
);
2926 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
2927 table_event
== NdbDictionary::Event::_TE_SUBSCRIBE
?
2928 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event
,
2929 refToNode(i_subbPtr
.p
->m_senderRef
),
2930 refToNode(subbPtr
.p
->m_senderRef
), data
->senderData
2933 if (i_subbPtr
.i
!= subbPtr
.i
)
2935 SubTableData::setReqNodeId(data
->requestInfo
,
2936 refToNode(i_subbPtr
.p
->m_senderRef
));
2938 data
->senderData
= subbPtr
.p
->m_senderData
;
2939 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
2940 SubTableData::SignalLength
, JBB
);
2942 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
2943 table_event
== NdbDictionary::Event::_TE_SUBSCRIBE
?
2944 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event
,
2945 refToNode(subbPtr
.p
->m_senderRef
),
2946 refToNode(i_subbPtr
.p
->m_senderRef
), data
->senderData
2955 Suma::sendSubStopRef(Signal
* signal
, Uint32 errCode
)
2958 DBUG_ENTER("Suma::sendSubStopRef");
2959 SubStopRef
* ref
= (SubStopRef
*)signal
->getDataPtrSend();
2960 ref
->senderRef
= reference();
2961 ref
->errorCode
= errCode
;
2962 sendSignal(signal
->getSendersBlockRef(),
2965 SubStopRef::SignalLength
,
2970 /**********************************************************
2972 * Trigger admin interface
2977 Suma::Table::setupTrigger(Signal
* signal
,
2981 DBUG_ENTER("Suma::Table::setupTrigger");
2985 AttributeMask attrMask
;
2986 createAttributeMask(attrMask
, suma
);
2988 for(Uint32 j
= 0; j
<3; j
++)
2990 Uint32 triggerId
= (m_schemaVersion
<< 18) | (j
<< 16) | m_ptrI
;
2991 if(m_hasTriggerDefined
[j
] == 0)
2993 suma
.suma_ndbrequire(m_triggerIds
[j
] == ILLEGAL_TRIGGER_ID
);
2994 DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId
, j
));
2995 CreateTrigReq
* const req
= (CreateTrigReq
*)signal
->getDataPtrSend();
2996 req
->setUserRef(SUMA_REF
);
2997 req
->setConnectionPtr(m_ptrI
);
2998 req
->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE
);
2999 req
->setTriggerActionTime(TriggerActionTime::TA_DETACHED
);
3000 req
->setMonitorReplicas(true);
3001 req
->setMonitorAllAttributes(j
== TriggerEvent::TE_DELETE
);
3002 req
->setReceiverRef(SUMA_REF
);
3003 req
->setTriggerId(triggerId
);
3004 req
->setTriggerEvent((TriggerEvent::Value
)j
);
3005 req
->setTableId(m_tableId
);
3006 req
->setAttributeMask(attrMask
);
3007 req
->setReportAllMonitoredAttributes(m_reportAll
);
3008 suma
.sendSignal(DBTUP_REF
, GSN_CREATE_TRIG_REQ
,
3009 signal
, CreateTrigReq::SignalLength
, JBB
);
3014 m_hasTriggerDefined
[j
]++;
3015 DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u",
3016 m_tableId
, j
, m_hasTriggerDefined
[j
]));
3023 Suma::Table::createAttributeMask(AttributeMask
& mask
,
3028 DataBuffer
<15>::DataBufferIterator it
;
3029 LocalDataBuffer
<15> attrBuf(suma
.c_dataBufferPool
, m_attributes
);
3030 for(attrBuf
.first(it
); !it
.curr
.isNull(); attrBuf
.next(it
)){
3031 mask
.set(* it
.data
);
3036 Suma::execCREATE_TRIG_CONF(Signal
* signal
){
3038 DBUG_ENTER("Suma::execCREATE_TRIG_CONF");
3039 ndbassert(signal
->getNoOfSections() == 0);
3040 CreateTrigConf
* const conf
= (CreateTrigConf
*)signal
->getDataPtr();
3041 const Uint32 triggerId
= conf
->getTriggerId();
3042 Uint32 type
= (triggerId
>> 16) & 0x3;
3043 Uint32 tableId
= conf
->getTableId();
3046 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
3047 type
, tableId
,conf
->getConnectionPtr(),triggerId
& 0xFFFF));
3050 c_tables
.getPtr(tabPtr
, conf
->getConnectionPtr());
3051 ndbrequire(tabPtr
.p
->m_tableId
== tableId
);
3052 ndbrequire(tabPtr
.p
->m_state
== Table::DEFINING
);
3054 ndbrequire(type
< 3);
3055 tabPtr
.p
->m_triggerIds
[type
] = triggerId
;
3056 ndbrequire(tabPtr
.p
->m_hasTriggerDefined
[type
] == 0);
3057 tabPtr
.p
->m_hasTriggerDefined
[type
] = 1;
3061 completeAllSubscribers(signal
, tabPtr
);
3062 completeInitTable(signal
,tabPtr
);
3069 Suma::execCREATE_TRIG_REF(Signal
* signal
){
3071 DBUG_ENTER("Suma::execCREATE_TRIG_REF");
3072 ndbassert(signal
->getNoOfSections() == 0);
3073 CreateTrigRef
* const ref
= (CreateTrigRef
*)signal
->getDataPtr();
3074 const Uint32 triggerId
= ref
->getTriggerId();
3075 Uint32 type
= (triggerId
>> 16) & 0x3;
3076 Uint32 tableId
= ref
->getTableId();
3078 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
3079 type
, tableId
,ref
->getConnectionPtr(),triggerId
& 0xFFFF));
3082 c_tables
.getPtr(tabPtr
, ref
->getConnectionPtr());
3083 ndbrequire(tabPtr
.p
->m_tableId
== tableId
);
3084 ndbrequire(tabPtr
.p
->m_state
== Table::DEFINING
);
3086 tabPtr
.p
->m_error
= ref
->getErrorCode();
3088 ndbrequire(type
< 3);
3092 completeAllSubscribers(signal
, tabPtr
);
3093 completeInitTable(signal
,tabPtr
);
3101 Suma::Table::dropTrigger(Signal
* signal
,Suma
& suma
)
3104 DBUG_ENTER("Suma::dropTrigger");
3106 m_hasOutstandingTriggerReq
[0] =
3107 m_hasOutstandingTriggerReq
[1] =
3108 m_hasOutstandingTriggerReq
[2] = 1;
3109 for(Uint32 j
= 0; j
<3; j
++){
3111 suma
.suma_ndbrequire(m_triggerIds
[j
] != ILLEGAL_TRIGGER_ID
);
3112 if(m_hasTriggerDefined
[j
] == 1) {
3115 DropTrigReq
* const req
= (DropTrigReq
*)signal
->getDataPtrSend();
3116 req
->setConnectionPtr(m_ptrI
);
3117 req
->setUserRef(SUMA_REF
); // Sending to myself
3118 req
->setRequestType(DropTrigReq::RT_USER
);
3119 req
->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE
);
3120 req
->setTriggerActionTime(TriggerActionTime::TA_DETACHED
);
3121 req
->setIndexId(RNIL
);
3123 req
->setTableId(m_tableId
);
3124 req
->setTriggerId(m_triggerIds
[j
]);
3125 req
->setTriggerEvent((TriggerEvent::Value
)j
);
3127 DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]",
3129 TriggerType::SUBSCRIPTION_BEFORE
,
3130 TriggerActionTime::TA_DETACHED
,
3133 suma
.sendSignal(DBTUP_REF
, GSN_DROP_TRIG_REQ
,
3134 signal
, DropTrigReq::SignalLength
, JBB
);
3137 suma
.suma_ndbrequire(m_hasTriggerDefined
[j
] > 1);
3138 runDropTrigger(signal
,m_triggerIds
[j
],suma
);
3145 Suma::execDROP_TRIG_REF(Signal
* signal
){
3147 DBUG_ENTER("Suma::execDROP_TRIG_REF");
3148 ndbassert(signal
->getNoOfSections() == 0);
3149 DropTrigRef
* const ref
= (DropTrigRef
*)signal
->getDataPtr();
3150 if (ref
->getErrorCode() != DropTrigRef::TriggerNotFound
)
3155 c_tables
.getPtr(tabPtr
, ref
->getConnectionPtr());
3156 ndbrequire(ref
->getTableId() == tabPtr
.p
->m_tableId
);
3158 tabPtr
.p
->runDropTrigger(signal
, ref
->getTriggerId(), *this);
3163 Suma::execDROP_TRIG_CONF(Signal
* signal
){
3165 DBUG_ENTER("Suma::execDROP_TRIG_CONF");
3166 ndbassert(signal
->getNoOfSections() == 0);
3168 DropTrigConf
* const conf
= (DropTrigConf
*)signal
->getDataPtr();
3170 c_tables
.getPtr(tabPtr
, conf
->getConnectionPtr());
3171 ndbrequire(conf
->getTableId() == tabPtr
.p
->m_tableId
);
3173 tabPtr
.p
->runDropTrigger(signal
, conf
->getTriggerId(),*this);
3178 Suma::Table::runDropTrigger(Signal
* signal
,
3183 Uint32 type
= (triggerId
>> 16) & 0x3;
3185 suma
.suma_ndbrequire(type
< 3);
3186 suma
.suma_ndbrequire(m_triggerIds
[type
] == triggerId
);
3187 suma
.suma_ndbrequire(m_hasTriggerDefined
[type
] > 0);
3188 suma
.suma_ndbrequire(m_hasOutstandingTriggerReq
[type
] == 1);
3189 m_hasTriggerDefined
[type
]--;
3190 m_hasOutstandingTriggerReq
[type
] = 0;
3191 if (m_hasTriggerDefined
[type
] == 0)
3194 m_triggerIds
[type
] = ILLEGAL_TRIGGER_ID
;
3196 if( m_hasOutstandingTriggerReq
[0] ||
3197 m_hasOutstandingTriggerReq
[1] ||
3198 m_hasOutstandingTriggerReq
[2])
3206 ndbout_c("trigger completed");
3211 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
3212 m_tableId
, n_subscribers
));
3215 suma
.sendSubStopComplete(signal
, m_drop_subbPtr
);
3216 m_drop_subbPtr
.p
= 0;
3219 void Suma::suma_ndbrequire(bool v
) { ndbrequire(v
); }
3222 Suma::Table::checkRelease(Suma
&suma
)
3225 DBUG_ENTER("Suma::Table::checkRelease");
3226 if (n_subscribers
== 0)
3229 suma
.suma_ndbrequire(m_hasTriggerDefined
[0] == 0);
3230 suma
.suma_ndbrequire(m_hasTriggerDefined
[1] == 0);
3231 suma
.suma_ndbrequire(m_hasTriggerDefined
[2] == 0);
3232 if (!c_subscribers
.isEmpty())
3234 LocalDLList
<Subscriber
>
3235 subscribers(suma
.c_subscriberPool
,c_subscribers
);
3236 SubscriberPtr subbPtr
;
3237 for (subscribers
.first(subbPtr
);!subbPtr
.isNull();
3238 subscribers
.next(subbPtr
))
3241 DBUG_PRINT("info",("subscriber: %u", subbPtr
.i
));
3243 suma
.suma_ndbrequire(false);
3245 if (!c_syncRecords
.isEmpty())
3247 LocalDLList
<SyncRecord
>
3248 syncRecords(suma
.c_syncPool
,c_syncRecords
);
3249 Ptr
<SyncRecord
> syncPtr
;
3250 for (syncRecords
.first(syncPtr
);!syncPtr
.isNull();
3251 syncRecords
.next(syncPtr
))
3254 DBUG_PRINT("info",("syncRecord: %u", syncPtr
.i
));
3256 suma
.suma_ndbrequire(false);
3259 suma
.c_tables
.remove(m_ptrI
);
3260 suma
.c_tablePool
.release(m_ptrI
);
3261 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
3262 suma
.c_tablePool
.getSize(),
3263 suma
.c_tablePool
.getNoOfFree()));
3267 DBUG_PRINT("info",("n_subscribers: %d", n_subscribers
));
3272 /**********************************************************
3273 * Scan data interface
3275 * Assumption: one execTRANSID_AI contains all attr info
3279 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
3280 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
3282 static Uint32 f_bufferLock
= 0;
3283 static Uint32 f_buffer
[SUMA_BUF_SZ
];
3284 static Uint32 f_trigBufferSize
= 0;
3285 static Uint32 b_bufferLock
= 0;
3286 static Uint32 b_buffer
[SUMA_BUF_SZ
];
3287 static Uint32 b_trigBufferSize
= 0;
3290 Suma::execTRANSID_AI(Signal
* signal
)
3293 DBUG_ENTER("Suma::execTRANSID_AI");
3295 CRASH_INSERTION(13015);
3296 TransIdAI
* const data
= (TransIdAI
*)signal
->getDataPtr();
3297 const Uint32 opPtrI
= data
->connectPtr
;
3298 const Uint32 length
= signal
->length() - 3;
3300 if(f_bufferLock
== 0){
3301 f_bufferLock
= opPtrI
;
3303 ndbrequire(f_bufferLock
== opPtrI
);
3306 Ptr
<SyncRecord
> syncPtr
;
3307 c_syncPool
.getPtr(syncPtr
, (opPtrI
>> 16));
3310 Uint32
* dst
= f_buffer
+ MAX_ATTRIBUTES_IN_TABLE
;
3311 Uint32
* headers
= f_buffer
;
3312 const Uint32
* src
= &data
->attrData
[0];
3313 const Uint32
* const end
= &src
[length
];
3315 const Uint32 attribs
= syncPtr
.p
->m_currentNoOfAttributes
;
3316 for(Uint32 i
= 0; i
<attribs
; i
++){
3317 Uint32 tmp
= * src
++;
3319 Uint32 len
= AttributeHeader::getDataSize(tmp
);
3321 memcpy(dst
, src
, 4 * len
);
3327 ndbrequire(src
== end
);
3330 * Send data to subscriber
3332 LinearSectionPtr ptr
[3];
3333 ptr
[0].p
= f_buffer
;
3334 ptr
[0].sz
= attribs
;
3336 ptr
[1].p
= f_buffer
+ MAX_ATTRIBUTES_IN_TABLE
;
3339 SubscriptionPtr subPtr
;
3340 c_subscriptions
.getPtr(subPtr
, syncPtr
.p
->m_subscriptionPtrI
);
3345 SubTableData
* sdata
= (SubTableData
*)signal
->getDataPtrSend();
3346 Uint32 ref
= subPtr
.p
->m_senderRef
;
3347 sdata
->tableId
= syncPtr
.p
->m_currentTableId
;
3348 sdata
->senderData
= subPtr
.p
->m_senderData
;
3349 sdata
->requestInfo
= 0;
3350 SubTableData::setOperation(sdata
->requestInfo
,
3351 NdbDictionary::Event::_TE_SCAN
); // Scan
3352 sdata
->gci
= 0; // Undefined
3354 ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs
, sum
);
3359 SubTableData::SignalLength
, JBB
,
3364 * Reset f_bufferLock
3371 /**********************************************************
3373 * Trigger data interface
3378 Suma::execTRIG_ATTRINFO(Signal
* signal
)
3381 DBUG_ENTER("Suma::execTRIG_ATTRINFO");
3383 CRASH_INSERTION(13016);
3384 TrigAttrInfo
* const trg
= (TrigAttrInfo
*)signal
->getDataPtr();
3385 const Uint32 trigId
= trg
->getTriggerId();
3387 const Uint32 dataLen
= signal
->length() - TrigAttrInfo::StaticLength
;
3389 if(trg
->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES
){
3392 ndbrequire(b_bufferLock
== trigId
);
3394 memcpy(b_buffer
+ b_trigBufferSize
, trg
->getData(), 4 * dataLen
);
3395 b_trigBufferSize
+= dataLen
;
3397 // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
3401 if(f_bufferLock
== 0){
3402 f_bufferLock
= trigId
;
3403 f_trigBufferSize
= 0;
3404 b_bufferLock
= trigId
;
3405 b_trigBufferSize
= 0;
3407 ndbrequire(f_bufferLock
== trigId
);
3410 memcpy(f_buffer
+ f_trigBufferSize
, trg
->getData(), 4 * dataLen
);
3411 f_trigBufferSize
+= dataLen
;
3418 #ifdef NODEFAIL_DEBUG2
3419 static int theCounts
[64] = {0};
3423 Suma::get_responsible_node(Uint32 bucket
) const
3425 // id will contain id to responsible suma or
3426 // RNIL if we don't have nodegroup info yet
3430 const Bucket
* ptr
= c_buckets
+ bucket
;
3431 for(Uint32 i
= 0; i
<MAX_REPLICAS
; i
++)
3433 node
= ptr
->m_nodes
[i
];
3434 if(c_alive_nodes
.get(node
))
3441 #ifdef NODEFAIL_DEBUG2
3445 ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
3446 n
,D
, id
, theCounts
[node
]);
3453 Suma::get_responsible_node(Uint32 bucket
, const NdbNodeBitmask
& mask
) const
3457 const Bucket
* ptr
= c_buckets
+ bucket
;
3458 for(Uint32 i
= 0; i
<MAX_REPLICAS
; i
++)
3460 node
= ptr
->m_nodes
[i
];
3471 Suma::check_switchover(Uint32 bucket
, Uint32 gci
)
3473 const Uint32 send_mask
= (Bucket::BUCKET_STARTING
| Bucket::BUCKET_TAKEOVER
);
3474 bool send
= c_buckets
[bucket
].m_state
& send_mask
;
3475 ndbassert(m_switchover_buckets
.get(bucket
));
3476 if(unlikely(gci
>= c_buckets
[bucket
].m_switchover_gci
))
3485 reformat(Signal
* signal
, LinearSectionPtr ptr
[3],
3486 Uint32
* src_1
, Uint32 sz_1
,
3487 Uint32
* src_2
, Uint32 sz_2
)
3489 Uint32 noOfAttrs
= 0, dataLen
= 0;
3490 Uint32
* headers
= signal
->theData
+ 25;
3491 Uint32
* dst
= signal
->theData
+ 25 + MAX_ATTRIBUTES_IN_TABLE
;
3498 Uint32 tmp
= * src_1
++;
3500 Uint32 len
= AttributeHeader::getDataSize(tmp
);
3501 memcpy(dst
, src_1
, 4 * len
);
3511 ptr
[0].sz
= noOfAttrs
;
3512 ptr
[1].sz
= dataLen
;
3517 return sz_2
> 0 ? 3 : 2;
3521 Suma::execFIRE_TRIG_ORD(Signal
* signal
)
3524 DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
3525 ndbassert(signal
->getNoOfSections() == 0);
3527 CRASH_INSERTION(13016);
3528 FireTrigOrd
* const trg
= (FireTrigOrd
*)signal
->getDataPtr();
3529 const Uint32 trigId
= trg
->getTriggerId();
3530 const Uint32 hashValue
= trg
->getHashValue();
3531 const Uint32 gci
= trg
->getGCI();
3532 const Uint32 event
= trg
->getTriggerEvent();
3533 const Uint32 any_value
= trg
->getAnyValue();
3535 tabPtr
.i
= trigId
& 0xFFFF;
3537 DBUG_PRINT("enter",("tabPtr.i=%u", tabPtr
.i
));
3538 ndbrequire(f_bufferLock
== trigId
);
3540 * Reset f_bufferLock
3545 ndbrequire((tabPtr
.p
= c_tablePool
.getPtr(tabPtr
.i
)) != 0);
3546 Uint32 tableId
= tabPtr
.p
->m_tableId
;
3548 Uint32 bucket
= hashValue
% c_no_of_buckets
;
3549 m_max_seen_gci
= (gci
> m_max_seen_gci
? gci
: m_max_seen_gci
);
3550 if(m_active_buckets
.get(bucket
) ||
3551 (m_switchover_buckets
.get(bucket
) && (check_switchover(bucket
, gci
))))
3553 m_max_sent_gci
= (gci
> m_max_sent_gci
? gci
: m_max_sent_gci
);
3554 Uint32 sz
= trg
->getNoOfPrimaryKeyWords()+trg
->getNoOfAfterValueWords();
3555 ndbrequire(sz
== f_trigBufferSize
);
3557 LinearSectionPtr ptr
[3];
3558 const Uint32 nptr
= reformat(signal
, ptr
,
3559 f_buffer
, sz
, b_buffer
, b_trigBufferSize
);
3561 for(Uint32 i
=0; i
< nptr
; i
++)
3564 * Signal to subscriber(s)
3566 ndbrequire((tabPtr
.p
= c_tablePool
.getPtr(tabPtr
.i
)) != 0);
3568 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();//trg;
3570 data
->tableId
= tableId
;
3571 data
->requestInfo
= 0;
3572 SubTableData::setOperation(data
->requestInfo
, event
);
3574 data
->anyValue
= any_value
;
3575 data
->totalLen
= ptrLen
;
3578 LocalDLList
<Subscriber
> list(c_subscriberPool
,tabPtr
.p
->c_subscribers
);
3579 SubscriberPtr subbPtr
;
3580 for(list
.first(subbPtr
); !subbPtr
.isNull(); list
.next(subbPtr
))
3582 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
3583 refToNode(subbPtr
.p
->m_senderRef
)));
3584 data
->senderData
= subbPtr
.p
->m_senderData
;
3585 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
3586 SubTableData::SignalLength
, JBB
, ptr
, nptr
);
3592 const uint buffer_header_sz
= 4;
3594 Uint32 sz
= f_trigBufferSize
+ b_trigBufferSize
+ buffer_header_sz
;
3595 if((dst
= get_buffer_ptr(signal
, bucket
, gci
, sz
)))
3598 * dst
++ = tabPtr
.p
->m_schemaVersion
;
3599 * dst
++ = (event
<< 16) | f_trigBufferSize
;
3600 * dst
++ = any_value
;
3601 memcpy(dst
, f_buffer
, f_trigBufferSize
<< 2);
3602 dst
+= f_trigBufferSize
;
3603 memcpy(dst
, b_buffer
, b_trigBufferSize
<< 2);
3611 Suma::execSUB_GCP_COMPLETE_REP(Signal
* signal
)
3614 ndbassert(signal
->getNoOfSections() == 0);
3616 SubGcpCompleteRep
* rep
= (SubGcpCompleteRep
*)signal
->getDataPtrSend();
3617 Uint32 gci
= m_last_complete_gci
= rep
->gci
;
3618 m_max_seen_gci
= (gci
> m_max_seen_gci
? gci
: m_max_seen_gci
);
3623 if(!m_switchover_buckets
.isclear())
3625 NdbNodeBitmask takeover_nodes
;
3626 NdbNodeBitmask handover_nodes
;
3627 Uint32 i
= m_switchover_buckets
.find(0);
3628 for(; i
!= Bucket_mask::NotFound
; i
= m_switchover_buckets
.find(i
+ 1))
3630 if(c_buckets
[i
].m_switchover_gci
== gci
)
3632 Uint32 state
= c_buckets
[i
].m_state
;
3633 m_switchover_buckets
.clear(i
);
3634 printf("switchover complete bucket %d state: %x", i
, state
);
3635 if(state
& Bucket::BUCKET_STARTING
)
3640 m_active_buckets
.set(i
);
3641 c_buckets
[i
].m_state
&= ~(Uint32
)Bucket::BUCKET_STARTING
;
3642 ndbout_c("starting");
3643 m_gcp_complete_rep_count
= 1;
3645 else if(state
& Bucket::BUCKET_TAKEOVER
)
3650 Bucket
* bucket
= c_buckets
+ i
;
3651 Page_pos pos
= bucket
->m_buffer_head
;
3652 ndbrequire(pos
.m_max_gci
< gci
);
3654 Buffer_page
* page
= (Buffer_page
*)
3655 m_tup
->c_page_pool
.getPtr(pos
.m_page_id
);
3656 ndbout_c("takeover %d", pos
.m_page_id
);
3657 page
->m_max_gci
= pos
.m_max_gci
;
3658 page
->m_words_used
= pos
.m_page_pos
;
3659 page
->m_next_page
= RNIL
;
3660 memset(&bucket
->m_buffer_head
, 0, sizeof(bucket
->m_buffer_head
));
3661 bucket
->m_buffer_head
.m_page_id
= RNIL
;
3662 bucket
->m_buffer_head
.m_page_pos
= Buffer_page::DATA_WORDS
+ 1;
3664 m_active_buckets
.set(i
);
3665 c_buckets
[i
].m_state
&= ~(Uint32
)Bucket::BUCKET_TAKEOVER
;
3666 takeover_nodes
.set(c_buckets
[i
].m_switchover_node
);
3673 ndbrequire(state
& Bucket::BUCKET_HANDOVER
);
3674 c_buckets
[i
].m_state
&= ~(Uint32
)Bucket::BUCKET_HANDOVER
;
3675 handover_nodes
.set(c_buckets
[i
].m_switchover_node
);
3676 ndbout_c("handover");
3680 ndbassert(handover_nodes
.count() == 0 ||
3681 m_gcp_complete_rep_count
> handover_nodes
.count());
3682 m_gcp_complete_rep_count
-= handover_nodes
.count();
3683 m_gcp_complete_rep_count
+= takeover_nodes
.count();
3685 if(getNodeState().startLevel
== NodeState::SL_STARTING
&&
3686 m_switchover_buckets
.isclear() &&
3687 c_startup
.m_handover_nodes
.isclear())
3689 sendSTTORRY(signal
);
3693 if(ERROR_INSERTED(13010))
3695 CLEAR_ERROR_INSERT_VALUE
;
3696 ndbout_c("Don't send GCP_COMPLETE_REP(%d)", gci
);
3701 * Signal to subscribers
3704 rep
->senderRef
= reference();
3705 rep
->gcp_complete_rep_count
= m_gcp_complete_rep_count
;
3707 if(m_gcp_complete_rep_count
&& !c_subscriber_nodes
.isclear())
3709 CRASH_INSERTION(13033);
3711 NodeReceiverGroup
rg(API_CLUSTERMGR
, c_subscriber_nodes
);
3712 sendSignal(rg
, GSN_SUB_GCP_COMPLETE_REP
, signal
,
3713 SubGcpCompleteRep::SignalLength
, JBB
);
3715 Ptr
<Gcp_record
> gcp
;
3716 if(c_gcp_list
.seize(gcp
))
3719 gcp
.p
->m_subscribers
= c_subscriber_nodes
;
3724 * Add GCP COMPLETE REP to buffer
3726 for(Uint32 i
= 0; i
<c_no_of_buckets
; i
++)
3728 if(m_active_buckets
.get(i
))
3731 if (!c_subscriber_nodes
.isclear())
3734 get_buffer_ptr(signal
, i
, gci
, 0);
3738 if(gci
== m_out_of_buffer_gci
)
3740 infoEvent("Reenable event buffer");
3741 m_out_of_buffer_gci
= 0;
3746 Suma::execCREATE_TAB_CONF(Signal
*signal
)
3749 DBUG_ENTER("Suma::execCREATE_TAB_CONF");
3752 CreateTabConf
* const conf
= (CreateTabConf
*)signal
->getDataPtr();
3753 Uint32 tableId
= conf
->senderData
;
3756 initTable(signal
,tableId
,tabPtr
);
3762 Suma::execDROP_TAB_CONF(Signal
*signal
)
3765 DBUG_ENTER("Suma::execDROP_TAB_CONF");
3766 ndbassert(signal
->getNoOfSections() == 0);
3768 DropTabConf
* const conf
= (DropTabConf
*)signal
->getDataPtr();
3769 Uint32 senderRef
= conf
->senderRef
;
3770 Uint32 tableId
= conf
->tableId
;
3773 if (!c_tables
.find(tabPtr
, tableId
) ||
3774 tabPtr
.p
->m_state
== Table::DROPPED
||
3775 tabPtr
.p
->m_state
== Table::ALTERED
)
3780 DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId
, tabPtr
.i
));
3782 tabPtr
.p
->m_state
= Table::DROPPED
;
3783 for (int j
= 0; j
< 3; j
++)
3785 if (!tabPtr
.p
->m_hasOutstandingTriggerReq
[j
])
3787 tabPtr
.p
->m_hasTriggerDefined
[j
] = 0;
3788 tabPtr
.p
->m_hasOutstandingTriggerReq
[j
] = 0;
3789 tabPtr
.p
->m_triggerIds
[j
] = ILLEGAL_TRIGGER_ID
;
3792 tabPtr
.p
->m_hasTriggerDefined
[j
] = 1;
3798 // dict coordinator sends info to API
3800 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();
3801 data
->gci
= m_last_complete_gci
+1;
3802 data
->tableId
= tableId
;
3803 data
->requestInfo
= 0;
3804 SubTableData::setOperation(data
->requestInfo
,NdbDictionary::Event::_TE_DROP
);
3805 SubTableData::setReqNodeId(data
->requestInfo
, refToNode(senderRef
));
3808 LocalDLList
<Subscriber
> subbs(c_subscriberPool
,tabPtr
.p
->c_subscribers
);
3809 SubscriberPtr subbPtr
;
3810 for(subbs
.first(subbPtr
);!subbPtr
.isNull();subbs
.next(subbPtr
))
3814 * get subscription ptr for this subscriber
3816 SubscriptionPtr subPtr
;
3817 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
3818 if(subPtr
.p
->m_subscriptionType
!= SubCreateReq::TableEvent
) {
3821 //continue in for-loop if the table is not part of
3822 //the subscription. Otherwise, send data to subscriber.
3824 data
->senderData
= subbPtr
.p
->m_senderData
;
3825 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
3826 SubTableData::SignalLength
, JBB
);
3827 DBUG_PRINT("info",("sent to subscriber %d", subbPtr
.i
));
3833 static Uint32 b_dti_buf
[MAX_WORDS_META_FILE
];
3836 Suma::execALTER_TAB_REQ(Signal
*signal
)
3839 DBUG_ENTER("Suma::execALTER_TAB_REQ");
3840 ndbassert(signal
->getNoOfSections() == 1);
3842 AlterTabReq
* const req
= (AlterTabReq
*)signal
->getDataPtr();
3843 Uint32 senderRef
= req
->senderRef
;
3844 Uint32 tableId
= req
->tableId
;
3845 Uint32 changeMask
= req
->changeMask
;
3847 if (!c_tables
.find(tabPtr
, tableId
) ||
3848 tabPtr
.p
->m_state
== Table::DROPPED
||
3849 tabPtr
.p
->m_state
== Table::ALTERED
)
3854 DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId
, tabPtr
.i
));
3855 Table::State old_state
= tabPtr
.p
->m_state
;
3856 tabPtr
.p
->m_state
= Table::ALTERED
;
3857 // triggers must be removed, waiting for sub stop req for that
3863 // dict coordinator sends info to API
3865 // Copy DICT_TAB_INFO to local buffer
3866 SegmentedSectionPtr tabInfoPtr
;
3867 signal
->getSection(tabInfoPtr
, AlterTabReq::DICT_TAB_INFO
);
3869 ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr
.sz
);
3870 SimplePropertiesSectionReader
reader(tabInfoPtr
, getSectionSegmentPool());
3871 reader
.printAll(ndbout
);
3873 copy(b_dti_buf
, tabInfoPtr
);
3874 LinearSectionPtr ptr
[3];
3875 ptr
[0].p
= b_dti_buf
;
3876 ptr
[0].sz
= tabInfoPtr
.sz
;
3878 releaseSections(signal
);
3880 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();
3881 data
->gci
= m_last_complete_gci
+1;
3882 data
->tableId
= tableId
;
3883 data
->requestInfo
= 0;
3884 SubTableData::setOperation(data
->requestInfo
,
3885 NdbDictionary::Event::_TE_ALTER
);
3886 SubTableData::setReqNodeId(data
->requestInfo
, refToNode(senderRef
));
3888 data
->changeMask
= changeMask
;
3889 data
->totalLen
= tabInfoPtr
.sz
;
3891 LocalDLList
<Subscriber
> subbs(c_subscriberPool
,tabPtr
.p
->c_subscribers
);
3892 SubscriberPtr subbPtr
;
3893 for(subbs
.first(subbPtr
);!subbPtr
.isNull();subbs
.next(subbPtr
))
3897 * get subscription ptr for this subscriber
3899 SubscriptionPtr subPtr
;
3900 c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
3901 if(subPtr
.p
->m_subscriptionType
!= SubCreateReq::TableEvent
) {
3904 //continue in for-loop if the table is not part of
3905 //the subscription. Otherwise, send data to subscriber.
3908 data
->senderData
= subbPtr
.p
->m_senderData
;
3909 Callback c
= { 0, 0 };
3910 sendFragmentedSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
3911 SubTableData::SignalLength
, JBB
, ptr
, 1, c
);
3912 DBUG_PRINT("info",("sent to subscriber %d", subbPtr
.i
));
3915 if (AlterTableReq::getFrmFlag(changeMask
))
3917 // Frm changes only are handled on-line
3918 tabPtr
.p
->m_state
= old_state
;
3924 Suma::execSUB_GCP_COMPLETE_ACK(Signal
* signal
)
3927 ndbassert(signal
->getNoOfSections() == 0);
3929 SubGcpCompleteAck
* const ack
= (SubGcpCompleteAck
*)signal
->getDataPtr();
3930 Uint32 gci
= ack
->rep
.gci
;
3931 Uint32 senderRef
= ack
->rep
.senderRef
;
3932 m_max_seen_gci
= (gci
> m_max_seen_gci
? gci
: m_max_seen_gci
);
3934 if (refToBlock(senderRef
) == SUMA
) {
3936 // Ack from other SUMA
3937 Uint32 nodeId
= refToNode(senderRef
);
3938 for(Uint32 i
= 0; i
<c_no_of_buckets
; i
++)
3940 if(m_active_buckets
.get(i
) ||
3941 (m_switchover_buckets
.get(i
) && (check_switchover(i
, gci
))) ||
3942 (!m_switchover_buckets
.get(i
) && get_responsible_node(i
) == nodeId
))
3944 release_gci(signal
, i
, gci
);
3950 // Ack from User and not an ack from other SUMA, redistribute in nodegroup
3952 Uint32 nodeId
= refToNode(senderRef
);
3955 Ptr
<Gcp_record
> gcp
;
3956 for(c_gcp_list
.first(gcp
); !gcp
.isNull(); c_gcp_list
.next(gcp
))
3958 if(gcp
.p
->m_gci
== gci
)
3960 gcp
.p
->m_subscribers
.clear(nodeId
);
3961 if(!gcp
.p
->m_subscribers
.isclear())
3972 ndbout_c("ACK wo/ gcp record (gci: %d)", gci
);
3976 c_gcp_list
.release(gcp
);
3979 CRASH_INSERTION(13011);
3980 if(ERROR_INSERTED(13012))
3982 CLEAR_ERROR_INSERT_VALUE
;
3983 ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK");
3987 ack
->rep
.senderRef
= reference();
3988 NodeReceiverGroup
rg(SUMA
, c_nodes_in_nodegroup_mask
);
3989 sendSignal(rg
, GSN_SUB_GCP_COMPLETE_ACK
, signal
,
3990 SubGcpCompleteAck::SignalLength
, JBB
);
3993 /**************************************************************
3995 * Removing subscription
4000 Suma::execSUB_REMOVE_REQ(Signal
* signal
)
4003 DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
4004 ndbassert(signal
->getNoOfSections() == 0);
4006 CRASH_INSERTION(13021);
4008 const SubRemoveReq req
= *(SubRemoveReq
*)signal
->getDataPtr();
4009 SubscriptionPtr subPtr
;
4011 key
.m_subscriptionId
= req
.subscriptionId
;
4012 key
.m_subscriptionKey
= req
.subscriptionKey
;
4014 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
4015 key
.m_subscriptionId
, key
.m_subscriptionKey
));
4017 if(!c_subscriptions
.find(subPtr
, key
))
4020 DBUG_PRINT("info",("Not found"));
4021 sendSubRemoveRef(signal
, req
, 1407);
4024 if (subPtr
.p
->m_state
== Subscription::LOCKED
)
4027 * we are currently setting up triggers etc. for this event
4030 sendSubRemoveRef(signal
, req
, 1413);
4033 if (subPtr
.p
->m_state
== Subscription::DROPPED
)
4039 sendSubRemoveRef(signal
, req
, 1419);
4043 ndbrequire(subPtr
.p
->m_state
== Subscription::DEFINED
);
4044 DBUG_PRINT("info",("n_subscribers: %u", subPtr
.p
->n_subscribers
));
4046 if (subPtr
.p
->n_subscribers
== 0)
4048 // no subscribers on the subscription
4051 completeSubRemove(subPtr
);
4055 // subscribers left on the subscription
4056 // mark it to be removed once all subscribers
4059 subPtr
.p
->m_state
= Subscription::DROPPED
;
4062 SubRemoveConf
* const conf
= (SubRemoveConf
*)signal
->getDataPtrSend();
4063 conf
->senderRef
= reference();
4064 conf
->senderData
= req
.senderData
;
4065 conf
->subscriptionId
= req
.subscriptionId
;
4066 conf
->subscriptionKey
= req
.subscriptionKey
;
4068 sendSignal(req
.senderRef
, GSN_SUB_REMOVE_CONF
, signal
,
4069 SubRemoveConf::SignalLength
, JBB
);
4075 Suma::completeSubRemove(SubscriptionPtr subPtr
)
4077 DBUG_ENTER("Suma::completeSubRemove");
4078 //Uint32 subscriptionId = subPtr.p->m_subscriptionId;
4079 //Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
4081 c_subscriptions
.release(subPtr
);
4082 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
4083 c_subscriptionPool
.getSize(),
4084 c_subscriptionPool
.getNoOfFree()));
4087 * I was the last subscription to be remove so clear c_tables
4090 ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
4091 c_subscriptionPool
.getSize(),c_subscriptionPool
.getNoOfFree());
4094 if(c_subscriptionPool
.getSize() == c_subscriptionPool
.getNoOfFree()) {
4097 ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
4100 KeyTable
<Table
>::Iterator it
;
4101 for(c_tables
.first(it
); !it
.isNull(); )
4103 // ndbrequire(false);
4105 DBUG_PRINT("error",("trailing table id: %d[i=%d] n_subscribers: %d m_state: %d",
4106 it
.curr
.p
->m_tableId
,
4108 it
.curr
.p
->n_subscribers
,
4109 it
.curr
.p
->m_state
));
4111 LocalDLList
<Subscriber
> subbs(c_subscriberPool
,it
.curr
.p
->c_subscribers
);
4112 SubscriberPtr subbPtr
;
4113 for(subbs
.first(subbPtr
);!subbPtr
.isNull();subbs
.next(subbPtr
))
4115 DBUG_PRINT("error",("subscriber %d, m_subPtrI: %d", subbPtr
.i
, subbPtr
.p
->m_subPtrI
));
4118 it
.curr
.p
->release(* this);
4119 TablePtr tabPtr
= it
.curr
;
4121 c_tables
.remove(tabPtr
);
4122 c_tablePool
.release(tabPtr
);
4123 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
4124 c_tablePool
.getSize(),
4125 c_tablePool
.getNoOfFree()));
4128 DBUG_ASSERT(count
== 0);
4134 Suma::sendSubRemoveRef(Signal
* signal
, const SubRemoveReq
& req
,
4138 DBUG_ENTER("Suma::sendSubRemoveRef");
4139 SubRemoveRef
* ref
= (SubRemoveRef
*)signal
->getDataPtrSend();
4140 ref
->senderRef
= reference();
4141 ref
->senderData
= req
.senderData
;
4142 ref
->subscriptionId
= req
.subscriptionId
;
4143 ref
->subscriptionKey
= req
.subscriptionKey
;
4144 ref
->errorCode
= errCode
;
4145 releaseSections(signal
);
4146 sendSignal(signal
->getSendersBlockRef(), GSN_SUB_REMOVE_REF
,
4147 signal
, SubRemoveRef::SignalLength
, JBB
);
4152 Suma::Table::release(Suma
& suma
){
4155 LocalDataBuffer
<15> attrBuf(suma
.c_dataBufferPool
, m_attributes
);
4158 LocalDataBuffer
<15> fragBuf(suma
.c_dataBufferPool
, m_fragments
);
4161 m_state
= UNDEFINED
;
4163 if (n_subscribers
!= 0)
4169 Suma::SyncRecord::release(){
4171 m_tableList
.release();
4173 LocalDataBuffer
<15> attrBuf(suma
.c_dataBufferPool
, m_attributeList
);
4178 /**************************************************************
4180 * Restarting remote node functions, master functionality
4181 * (slave does nothing special)
4182 * - triggered on INCL_NODEREQ calling startNode
4183 * - included node will issue START_ME when it's ready to start
4189 Suma::execSUMA_START_ME_REQ(Signal
* signal
) {
4191 DBUG_ENTER("Suma::execSUMA_START_ME");
4192 ndbassert(signal
->getNoOfSections() == 0);
4193 Restart
.runSUMA_START_ME_REQ(signal
, signal
->getSendersBlockRef());
4198 Suma::execSUB_CREATE_REF(Signal
* signal
) {
4200 DBUG_ENTER("Suma::execSUB_CREATE_REF");
4201 ndbassert(signal
->getNoOfSections() == 0);
4202 SubCreateRef
*const ref
= (SubCreateRef
*)signal
->getDataPtr();
4203 Uint32 error
= ref
->errorCode
;
4207 * This will happen if an api node connects during while other node
4208 * is restarting, and in this case the subscription will already
4209 * have been created.
4210 * ToDo: more complete handling of api nodes joining during
4213 Uint32 senderRef
= signal
->getSendersBlockRef();
4214 BlockReference cntrRef
= calcNdbCntrBlockRef(refToNode(senderRef
));
4215 // for some reason we did not manage to create a subscription
4216 // on the starting node
4217 SystemError
* const sysErr
= (SystemError
*)&signal
->theData
[0];
4218 sysErr
->errorCode
= SystemError::CopySubscriptionRef
;
4219 sysErr
->errorRef
= reference();
4220 sysErr
->data1
= error
;
4222 sendSignal(cntrRef
, GSN_SYSTEM_ERROR
, signal
,
4223 SystemError::SignalLength
, JBB
);
4224 Restart
.resetRestart(signal
);
4227 // SubCreateConf has same signaldata as SubCreateRef
4228 Restart
.runSUB_CREATE_CONF(signal
);
4233 Suma::execSUB_CREATE_CONF(Signal
* signal
)
4236 DBUG_ENTER("Suma::execSUB_CREATE_CONF");
4237 ndbassert(signal
->getNoOfSections() == 0);
4238 Restart
.runSUB_CREATE_CONF(signal
);
4243 Suma::execSUB_START_CONF(Signal
* signal
)
4246 DBUG_ENTER("Suma::execSUB_START_CONF");
4247 ndbassert(signal
->getNoOfSections() == 0);
4248 Restart
.runSUB_START_CONF(signal
);
4253 Suma::execSUB_START_REF(Signal
* signal
) {
4255 DBUG_ENTER("Suma::execSUB_START_REF");
4256 ndbassert(signal
->getNoOfSections() == 0);
4257 SubStartRef
*const ref
= (SubStartRef
*)signal
->getDataPtr();
4258 Uint32 error
= ref
->errorCode
;
4260 Uint32 senderRef
= signal
->getSendersBlockRef();
4261 BlockReference cntrRef
= calcNdbCntrBlockRef(refToNode(senderRef
));
4262 // for some reason we did not manage to start a subscriber
4263 // on the starting node
4264 SystemError
* const sysErr
= (SystemError
*)&signal
->theData
[0];
4265 sysErr
->errorCode
= SystemError::CopySubscriberRef
;
4266 sysErr
->errorRef
= reference();
4267 sysErr
->data1
= error
;
4269 sendSignal(cntrRef
, GSN_SYSTEM_ERROR
, signal
,
4270 SystemError::SignalLength
, JBB
);
4271 Restart
.resetRestart(signal
);
4276 Suma::Restart::Restart(Suma
& s
) : suma(s
)
4282 Suma::Restart::runSUMA_START_ME_REQ(Signal
* signal
, Uint32 sumaRef
)
4285 DBUG_ENTER("Suma::Restart::runSUMA_START_ME");
4289 SumaStartMeRef
* ref
= (SumaStartMeRef
*)signal
->getDataPtrSend();
4290 ref
->errorCode
= SumaStartMeRef::Busy
;
4291 suma
.sendSignal(sumaRef
, GSN_SUMA_START_ME_REF
, signal
,
4292 SumaStartMeRef::SignalLength
, JBB
);
4296 nodeId
= refToNode(sumaRef
);
4297 startNode(signal
, sumaRef
);
4303 Suma::Restart::startNode(Signal
* signal
, Uint32 sumaRef
)
4306 DBUG_ENTER("Suma::Restart::startNode");
4308 // right now we can only handle restarting one node
4309 // at a time in a node group
4311 createSubscription(signal
, sumaRef
);
4316 Suma::Restart::createSubscription(Signal
* signal
, Uint32 sumaRef
)
4319 DBUG_ENTER("Suma::Restart::createSubscription");
4320 suma
.c_subscriptions
.first(c_subIt
);
4321 nextSubscription(signal
, sumaRef
);
4326 Suma::Restart::nextSubscription(Signal
* signal
, Uint32 sumaRef
)
4329 DBUG_ENTER("Suma::Restart::nextSubscription");
4331 if (c_subIt
.isNull())
4334 completeSubscription(signal
, sumaRef
);
4337 SubscriptionPtr subPtr
;
4338 subPtr
.i
= c_subIt
.curr
.i
;
4339 subPtr
.p
= suma
.c_subscriptions
.getPtr(subPtr
.i
);
4341 suma
.c_subscriptions
.next(c_subIt
);
4343 SubCreateReq
* req
= (SubCreateReq
*)signal
->getDataPtrSend();
4345 req
->senderRef
= suma
.reference();
4346 req
->senderData
= subPtr
.i
;
4347 req
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
4348 req
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
4349 req
->subscriptionType
= subPtr
.p
->m_subscriptionType
|
4350 SubCreateReq::RestartFlag
;
4352 switch (subPtr
.p
->m_subscriptionType
) {
4353 case SubCreateReq::TableEvent
:
4355 req
->tableId
= subPtr
.p
->m_tableId
;
4356 req
->state
= subPtr
.p
->m_state
;
4357 suma
.sendSignal(sumaRef
, GSN_SUB_CREATE_REQ
, signal
,
4358 SubCreateReq::SignalLength2
, JBB
);
4360 case SubCreateReq::SingleTableScan
:
4362 nextSubscription(signal
, sumaRef
);
4364 case SubCreateReq::SelectiveTableSnapshot
:
4365 case SubCreateReq::DatabaseSnapshot
:
4372 Suma::Restart::runSUB_CREATE_CONF(Signal
* signal
)
4375 DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF");
4377 const Uint32 senderRef
= signal
->senderBlockRef();
4378 Uint32 sumaRef
= signal
->getSendersBlockRef();
4380 SubCreateConf
* const conf
= (SubCreateConf
*)signal
->getDataPtr();
4382 SubscriptionPtr subPtr
;
4383 suma
.c_subscriptions
.getPtr(subPtr
,conf
->senderData
);
4385 switch(subPtr
.p
->m_subscriptionType
) {
4386 case SubCreateReq::TableEvent
:
4390 nextSubscription(signal
, sumaRef
);
4393 SubCreateReq
* req
= (SubCreateReq
*)signal
->getDataPtrSend();
4395 req
->senderRef
= suma
.reference();
4396 req
->senderData
= subPtr
.i
;
4397 req
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
4398 req
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
4399 req
->subscriptionType
= subPtr
.p
->m_subscriptionType
|
4400 SubCreateReq::RestartFlag
|
4401 SubCreateReq::AddTableFlag
;
4405 suma
.sendSignal(senderRef
, GSN_SUB_CREATE_REQ
, signal
,
4406 SubCreateReq::SignalLength
, JBB
);
4409 case SubCreateReq::SingleTableScan
:
4410 case SubCreateReq::SelectiveTableSnapshot
:
4411 case SubCreateReq::DatabaseSnapshot
:
4418 Suma::Restart::completeSubscription(Signal
* signal
, Uint32 sumaRef
)
4421 DBUG_ENTER("Suma::Restart::completeSubscription");
4422 startSubscriber(signal
, sumaRef
);
4427 Suma::Restart::startSubscriber(Signal
* signal
, Uint32 sumaRef
)
4430 DBUG_ENTER("Suma::Restart::startSubscriber");
4431 suma
.c_tables
.first(c_tabIt
);
4432 if (c_tabIt
.isNull())
4434 completeSubscriber(signal
, sumaRef
);
4437 SubscriberPtr subbPtr
;
4439 LocalDLList
<Subscriber
>
4440 subbs(suma
.c_subscriberPool
,c_tabIt
.curr
.p
->c_subscribers
);
4441 subbs
.first(subbPtr
);
4443 nextSubscriber(signal
, sumaRef
, subbPtr
);
4448 Suma::Restart::nextSubscriber(Signal
* signal
, Uint32 sumaRef
,
4449 SubscriberPtr subbPtr
)
4452 DBUG_ENTER("Suma::Restart::nextSubscriber");
4453 while (subbPtr
.isNull())
4456 DBUG_PRINT("info",("prev tableId %u",c_tabIt
.curr
.p
->m_tableId
));
4457 suma
.c_tables
.next(c_tabIt
);
4458 if (c_tabIt
.isNull())
4460 completeSubscriber(signal
, sumaRef
);
4463 DBUG_PRINT("info",("next tableId %u",c_tabIt
.curr
.p
->m_tableId
));
4465 LocalDLList
<Subscriber
>
4466 subbs(suma
.c_subscriberPool
,c_tabIt
.curr
.p
->c_subscribers
);
4467 subbs
.first(subbPtr
);
4471 * get subscription ptr for this subscriber
4474 SubscriptionPtr subPtr
;
4475 suma
.c_subscriptions
.getPtr(subPtr
, subbPtr
.p
->m_subPtrI
);
4476 switch (subPtr
.p
->m_subscriptionType
) {
4477 case SubCreateReq::TableEvent
:
4479 sendSubStartReq(subPtr
, subbPtr
, signal
, sumaRef
);
4481 case SubCreateReq::SelectiveTableSnapshot
:
4482 case SubCreateReq::DatabaseSnapshot
:
4483 case SubCreateReq::SingleTableScan
:
4490 Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr
, SubscriberPtr subbPtr
,
4491 Signal
* signal
, Uint32 sumaRef
)
4494 DBUG_ENTER("Suma::Restart::sendSubStartReq");
4495 SubStartReq
* req
= (SubStartReq
*)signal
->getDataPtrSend();
4497 req
->senderRef
= suma
.reference();
4498 req
->senderData
= subbPtr
.i
;
4499 req
->subscriptionId
= subPtr
.p
->m_subscriptionId
;
4500 req
->subscriptionKey
= subPtr
.p
->m_subscriptionKey
;
4501 req
->part
= SubscriptionData::TableData
;
4502 req
->subscriberData
= subbPtr
.p
->m_senderData
;
4503 req
->subscriberRef
= subbPtr
.p
->m_senderRef
;
4505 // restarting suma will not respond to this until startphase 5
4506 // since it is not until then data copying has been completed
4507 DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u] %u",
4509 subPtr
.p
->m_subscriptionId
,
4510 subPtr
.p
->m_subscriptionKey
,
4511 subPtr
.p
->m_tableId
));
4513 suma
.sendSignal(sumaRef
, GSN_SUB_START_REQ
,
4514 signal
, SubStartReq::SignalLength2
, JBB
);
4519 Suma::Restart::runSUB_START_CONF(Signal
* signal
)
4522 DBUG_ENTER("Suma::Restart::runSUB_START_CONF");
4524 SubStartConf
* const conf
= (SubStartConf
*)signal
->getDataPtr();
4527 SubscriptionPtr subPtr
;
4528 key
.m_subscriptionId
= conf
->subscriptionId
;
4529 key
.m_subscriptionKey
= conf
->subscriptionKey
;
4530 ndbrequire(suma
.c_subscriptions
.find(subPtr
, key
));
4533 ndbrequire(suma
.c_tables
.find(tabPtr
, subPtr
.p
->m_tableId
));
4535 SubscriberPtr subbPtr
;
4537 LocalDLList
<Subscriber
>
4538 subbs(suma
.c_subscriberPool
,tabPtr
.p
->c_subscribers
);
4539 subbs
.getPtr(subbPtr
, conf
->senderData
);
4540 DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u",
4541 subbPtr
.i
,key
.m_subscriptionId
,key
.m_subscriptionKey
,
4542 subPtr
.p
->m_tableId
));
4543 subbs
.next(subbPtr
);
4546 Uint32 sumaRef
= signal
->getSendersBlockRef();
4547 nextSubscriber(signal
, sumaRef
, subbPtr
);
4553 Suma::Restart::completeSubscriber(Signal
* signal
, Uint32 sumaRef
)
4555 DBUG_ENTER("Suma::Restart::completeSubscriber");
4556 completeRestartingNode(signal
, sumaRef
);
4561 Suma::Restart::completeRestartingNode(Signal
* signal
, Uint32 sumaRef
)
4564 DBUG_ENTER("Suma::Restart::completeRestartingNode");
4565 //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
4566 suma
.sendSignal(sumaRef
, GSN_SUMA_START_ME_CONF
, signal
,
4567 SumaStartMeConf::SignalLength
, JBB
);
4568 resetRestart(signal
);
4573 Suma::Restart::resetRestart(Signal
* signal
)
4576 DBUG_ENTER("Suma::Restart::resetRestart");
4581 // only run on restarting suma
4584 Suma::execSUMA_HANDOVER_REQ(Signal
* signal
)
4587 DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
4588 // Uint32 sumaRef = signal->getSendersBlockRef();
4589 SumaHandoverReq
const * req
= (SumaHandoverReq
*)signal
->getDataPtr();
4591 Uint32 gci
= req
->gci
;
4592 Uint32 nodeId
= req
->nodeId
;
4593 Uint32 new_gci
= m_last_complete_gci
+ MAX_CONCURRENT_GCP
+ 1;
4595 Uint32 start_gci
= (gci
> new_gci
? gci
: new_gci
);
4596 // mark all active buckets really belonging to restarting SUMA
4599 for( Uint32 i
= 0; i
< c_no_of_buckets
; i
++)
4601 if(get_responsible_node(i
) == nodeId
)
4603 if (m_active_buckets
.get(i
))
4605 // I'm running this bucket but it should really be the restarted node
4607 m_active_buckets
.clear(i
);
4608 m_switchover_buckets
.set(i
);
4609 c_buckets
[i
].m_switchover_gci
= start_gci
;
4610 c_buckets
[i
].m_state
|= Bucket::BUCKET_HANDOVER
;
4611 c_buckets
[i
].m_switchover_node
= nodeId
;
4612 ndbout_c("prepare to handover bucket: %d", i
);
4614 else if(m_switchover_buckets
.get(i
))
4616 ndbout_c("dont handover bucket: %d %d", i
, nodeId
);
4621 SumaHandoverConf
* conf
= (SumaHandoverConf
*)signal
->getDataPtrSend();
4622 tmp
.copyto(BUCKET_MASK_SIZE
, conf
->theBucketMask
);
4623 conf
->gci
= start_gci
;
4624 conf
->nodeId
= getOwnNodeId();
4625 sendSignal(calcSumaBlockRef(nodeId
), GSN_SUMA_HANDOVER_CONF
, signal
,
4626 SumaHandoverConf::SignalLength
, JBB
);
4631 // only run on all but restarting suma
4633 Suma::execSUMA_HANDOVER_REF(Signal
* signal
)
4639 Suma::execSUMA_HANDOVER_CONF(Signal
* signal
) {
4641 DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
4643 SumaHandoverConf
const * conf
= (SumaHandoverConf
*)signal
->getDataPtr();
4645 Uint32 gci
= conf
->gci
;
4646 Uint32 nodeId
= conf
->nodeId
;
4648 tmp
.assign(BUCKET_MASK_SIZE
, conf
->theBucketMask
);
4649 #ifdef HANDOVER_DEBUG
4650 ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci
);
4653 for( Uint32 i
= 0; i
< c_no_of_buckets
; i
++)
4657 ndbrequire(get_responsible_node(i
) == getOwnNodeId());
4658 // We should run this bucket, but _nodeId_ is
4659 c_buckets
[i
].m_switchover_gci
= gci
;
4660 c_buckets
[i
].m_state
|= Bucket::BUCKET_STARTING
;
4666 infoEvent("Suma: handover from node %d gci: %d buckets: %s (%d)",
4667 nodeId
, gci
, buf
, c_no_of_buckets
);
4668 m_switchover_buckets
.bitOR(tmp
);
4669 c_startup
.m_handover_nodes
.clear(nodeId
);
4676 operator<<(NdbOut
& out
, const Suma::Page_pos
& pos
)
4678 out
<< "[ Page_pos:"
4679 << " m_page_id: " << pos
.m_page_id
4680 << " m_page_pos: " << pos
.m_page_pos
4681 << " m_max_gci: " << pos
.m_max_gci
4688 Suma::get_buffer_ptr(Signal
* signal
, Uint32 buck
, Uint32 gci
, Uint32 sz
)
4691 Bucket
* bucket
= c_buckets
+buck
;
4692 Page_pos pos
= bucket
->m_buffer_head
;
4694 Buffer_page
* page
= 0;
4697 if (likely(pos
.m_page_id
!= RNIL
))
4699 page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(pos
.m_page_id
);
4700 ptr
= page
->m_data
+ pos
.m_page_pos
;
4703 const bool same_gci
= (gci
== pos
.m_last_gci
) && (!ERROR_INSERTED(13022));
4705 pos
.m_page_pos
+= sz
;
4706 pos
.m_last_gci
= gci
;
4707 Uint32 max
= pos
.m_max_gci
> gci
? pos
.m_max_gci
: gci
;
4709 if(likely(same_gci
&& pos
.m_page_pos
<= Buffer_page::DATA_WORDS
))
4711 pos
.m_max_gci
= max
;
4712 bucket
->m_buffer_head
= pos
;
4713 * ptr
++ = (0x8000 << 16) | sz
; // Same gci
4716 else if(pos
.m_page_pos
+ 1 <= Buffer_page::DATA_WORDS
)
4719 pos
.m_max_gci
= max
;
4720 pos
.m_page_pos
+= 1;
4721 bucket
->m_buffer_head
= pos
;
4730 * 1) save header on last page
4734 if(unlikely((next
= seize_page()) == RNIL
))
4739 out_of_buffer(signal
);
4743 if(likely(pos
.m_page_id
!= RNIL
))
4745 page
->m_max_gci
= pos
.m_max_gci
;
4746 page
->m_words_used
= pos
.m_page_pos
- sz
;
4747 page
->m_next_page
= next
;
4751 bucket
->m_buffer_tail
= next
;
4754 memset(&pos
, 0, sizeof(pos
));
4755 pos
.m_page_id
= next
;
4756 pos
.m_page_pos
= sz
;
4757 pos
.m_last_gci
= gci
;
4759 page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(pos
.m_page_id
);
4760 page
->m_next_page
= RNIL
;
4767 Suma::out_of_buffer(Signal
* signal
)
4769 if(m_out_of_buffer_gci
)
4774 m_out_of_buffer_gci
= m_last_complete_gci
- 1;
4775 infoEvent("Out of event buffer: nodefailure will cause event failures");
4777 out_of_buffer_release(signal
, 0);
4781 Suma::out_of_buffer_release(Signal
* signal
, Uint32 buck
)
4783 Bucket
* bucket
= c_buckets
+buck
;
4784 Uint32 tail
= bucket
->m_buffer_tail
;
4788 Buffer_page
* page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(tail
);
4789 bucket
->m_buffer_tail
= page
->m_next_page
;
4790 free_page(tail
, page
);
4791 signal
->theData
[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE
;
4792 signal
->theData
[1] = buck
;
4793 sendSignal(SUMA_REF
, GSN_CONTINUEB
, signal
, 2, JBB
);
4800 bucket
->m_buffer_head
.m_page_id
= RNIL
;
4801 bucket
->m_buffer_head
.m_page_pos
= Buffer_page::DATA_WORDS
+ 1;
4804 if(buck
!= c_no_of_buckets
)
4806 signal
->theData
[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE
;
4807 signal
->theData
[1] = buck
;
4808 sendSignal(SUMA_REF
, GSN_CONTINUEB
, signal
, 2, JBB
);
4813 * Finished will all release
4814 * prepare for inclusion
4816 m_out_of_buffer_gci
= m_max_seen_gci
> m_last_complete_gci
4817 ? m_max_seen_gci
+ 1 : m_last_complete_gci
+ 1;
4823 if(unlikely(m_out_of_buffer_gci
))
4828 Ptr
<Page_chunk
> ptr
;
4829 Uint32 ref
= m_first_free_page
;
4830 if(likely(ref
!= RNIL
))
4832 m_first_free_page
= ((Buffer_page
*)m_tup
->c_page_pool
.getPtr(ref
))->m_next_page
;
4833 Uint32 chunk
= ((Buffer_page
*)m_tup
->c_page_pool
.getPtr(ref
))->m_page_chunk_ptr_i
;
4834 c_page_chunk_pool
.getPtr(ptr
, chunk
);
4835 ndbassert(ptr
.p
->m_free
);
4840 if(!c_page_chunk_pool
.seize(ptr
))
4844 m_tup
->allocConsPages(16, count
, ref
);
4848 ndbout_c("alloc_chunk(%d %d) - ", ref
, count
);
4850 m_first_free_page
= ptr
.p
->m_page_id
= ref
;
4851 ptr
.p
->m_size
= count
;
4852 ptr
.p
->m_free
= count
;
4856 for(Uint32 i
= 0; i
<count
; i
++)
4858 page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(ref
);
4859 page
->m_page_state
= SUMA_SEQUENCE
;
4860 page
->m_page_chunk_ptr_i
= ptr
.i
;
4861 page
->m_next_page
= ++ref
;
4863 page
->m_next_page
= RNIL
;
4869 Suma::free_page(Uint32 page_id
, Buffer_page
* page
)
4871 Ptr
<Page_chunk
> ptr
;
4872 ndbrequire(page
->m_page_state
== SUMA_SEQUENCE
);
4874 Uint32 chunk
= page
->m_page_chunk_ptr_i
;
4876 c_page_chunk_pool
.getPtr(ptr
, chunk
);
4879 page
->m_next_page
= m_first_free_page
;
4880 ndbrequire(ptr
.p
->m_free
<= ptr
.p
->m_size
);
4882 m_first_free_page
= page_id
;
4886 Suma::release_gci(Signal
* signal
, Uint32 buck
, Uint32 gci
)
4888 Bucket
* bucket
= c_buckets
+buck
;
4889 Uint32 tail
= bucket
->m_buffer_tail
;
4890 Page_pos head
= bucket
->m_buffer_head
;
4891 Uint32 max_acked
= bucket
->m_max_acked_gci
;
4893 const Uint32 mask
= Bucket::BUCKET_TAKEOVER
| Bucket::BUCKET_RESEND
;
4894 if(unlikely(bucket
->m_state
& mask
))
4897 ndbout_c("release_gci(%d, %d) -> node failure -> abort", buck
, gci
);
4901 bucket
->m_max_acked_gci
= (max_acked
> gci
? max_acked
: gci
);
4902 if(unlikely(tail
== RNIL
))
4907 if(tail
== head
.m_page_id
)
4909 if(gci
>= head
.m_max_gci
)
4912 if (ERROR_INSERTED(13034))
4915 SET_ERROR_INSERT_VALUE(13035);
4918 if (ERROR_INSERTED(13035))
4920 CLEAR_ERROR_INSERT_VALUE
;
4921 NodeReceiverGroup
rg(CMVMI
, c_nodes_in_nodegroup_mask
);
4922 rg
.m_nodes
.clear(getOwnNodeId());
4923 signal
->theData
[0] = 9999;
4924 sendSignal(rg
, GSN_NDB_TAMPER
, signal
, 1, JBA
);
4927 head
.m_page_pos
= 0;
4928 head
.m_max_gci
= gci
;
4929 head
.m_last_gci
= 0;
4930 bucket
->m_buffer_head
= head
;
4937 Buffer_page
* page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(tail
);
4938 Uint32 max_gci
= page
->m_max_gci
;
4939 Uint32 next_page
= page
->m_next_page
;
4946 free_page(tail
, page
);
4948 bucket
->m_buffer_tail
= next_page
;
4949 signal
->theData
[0] = SumaContinueB::RELEASE_GCI
;
4950 signal
->theData
[1] = buck
;
4951 signal
->theData
[2] = gci
;
4952 sendSignal(SUMA_REF
, GSN_CONTINUEB
, signal
, 3, JBB
);
4957 //ndbout_c("do nothing...");
4962 static Uint32 g_cnt
= 0;
4965 Suma::start_resend(Signal
* signal
, Uint32 buck
)
4967 printf("start_resend(%d, ", buck
);
4969 if(m_out_of_buffer_gci
)
4971 progError(__LINE__
, NDBD_EXIT_SYSTEM_ERROR
,
4972 "Nodefailure while out of event buffer");
4977 * Resend from m_max_acked_gci + 1 until max_gci + 1
4979 Bucket
* bucket
= c_buckets
+ buck
;
4980 Page_pos pos
= bucket
->m_buffer_head
;
4982 if(pos
.m_page_id
== RNIL
)
4985 m_active_buckets
.set(buck
);
4986 m_gcp_complete_rep_count
++;
4987 ndbout_c("empty bucket(RNIL) -> active");
4991 Uint32 min
= bucket
->m_max_acked_gci
+ 1;
4992 Uint32 max
= pos
.m_max_gci
;
4994 ndbrequire(max
<= m_max_seen_gci
);
4998 ndbrequire(pos
.m_page_id
== bucket
->m_buffer_tail
);
4999 m_active_buckets
.set(buck
);
5000 m_gcp_complete_rep_count
++;
5001 ndbout_c("empty bucket -> active");
5006 bucket
->m_state
|= (Bucket::BUCKET_TAKEOVER
| Bucket::BUCKET_RESEND
);
5007 bucket
->m_switchover_node
= get_responsible_node(buck
);
5008 bucket
->m_switchover_gci
= max
+ 1;
5010 m_switchover_buckets
.set(buck
);
5012 signal
->theData
[1] = buck
;
5013 signal
->theData
[2] = min
;
5014 signal
->theData
[3] = 0;
5015 signal
->theData
[4] = 0;
5016 sendSignal(reference(), GSN_CONTINUEB
, signal
, 5, JBB
);
5018 ndbout_c("min: %d - max: %d) page: %d", min
, max
, bucket
->m_buffer_tail
);
5019 ndbrequire(max
>= min
);
5023 Suma::resend_bucket(Signal
* signal
, Uint32 buck
, Uint32 min_gci
,
5024 Uint32 pos
, Uint32 last_gci
)
5026 Bucket
* bucket
= c_buckets
+buck
;
5027 Uint32 tail
= bucket
->m_buffer_tail
;
5029 Buffer_page
* page
= (Buffer_page
*)m_tup
->c_page_pool
.getPtr(tail
);
5030 Uint32 max_gci
= page
->m_max_gci
;
5031 Uint32 next_page
= page
->m_next_page
;
5032 Uint32
*ptr
= page
->m_data
+ pos
;
5033 Uint32
*end
= page
->m_data
+ page
->m_words_used
;
5036 ndbrequire(tail
!= RNIL
);
5038 if(tail
== bucket
->m_buffer_head
.m_page_id
)
5040 max_gci
= bucket
->m_buffer_head
.m_max_gci
;
5041 end
= page
->m_data
+ bucket
->m_buffer_head
.m_page_pos
;
5050 else if(pos
== 0 && min_gci
> max_gci
)
5052 free_page(tail
, page
);
5053 tail
= bucket
->m_buffer_tail
= next_page
;
5054 ndbout_c("pos==0 && min_gci(%d) > max_gci(%d) resend switching page to %d", min_gci
, max_gci
, tail
);
5059 for(Uint32 i
= 0; i
<page
->m_words_used
; i
++)
5061 printf("%.8x ", page
->m_data
[i
]);
5062 if(((i
+ 1) % 8) == 0)
5071 Uint32 tmp
= * src
++;
5072 Uint32 sz
= tmp
& 0xFFFF;
5076 if(! (tmp
& (0x8000 << 16)))
5079 last_gci
= * src
++;
5083 ndbrequire(ptr
- sz
> page
->m_data
);
5086 if(last_gci
< min_gci
)
5092 sz
--; // remove *len* part of sz
5096 SubGcpCompleteRep
* rep
= (SubGcpCompleteRep
*)signal
->getDataPtrSend();
5097 rep
->gci
= last_gci
;
5098 rep
->senderRef
= reference();
5099 rep
->gcp_complete_rep_count
= 1;
5102 c_subscriber_nodes
.getText(buf
);
5103 ndbout_c("resending GCI: %d rows: %d -> %s", last_gci
, g_cnt
, buf
);
5106 NodeReceiverGroup
rg(API_CLUSTERMGR
, c_subscriber_nodes
);
5107 sendSignal(rg
, GSN_SUB_GCP_COMPLETE_REP
, signal
,
5108 SubGcpCompleteRep::SignalLength
, JBB
);
5112 const uint buffer_header_sz
= 4;
5114 Uint32 table
= * src
++ ;
5115 Uint32 schemaVersion
= * src
++;
5116 Uint32 event
= * src
>> 16;
5117 Uint32 sz_1
= (* src
++) & 0xFFFF;
5118 Uint32 any_value
= * src
++;
5120 ndbassert(sz
- buffer_header_sz
>= sz_1
);
5122 LinearSectionPtr ptr
[3];
5123 const Uint32 nptr
= reformat(signal
, ptr
,
5125 src
+ sz_1
, sz
- buffer_header_sz
- sz_1
);
5127 for(Uint32 i
=0; i
< nptr
; i
++)
5131 * Signal to subscriber(s)
5134 if (c_tables
.find(tabPtr
, table
) &&
5135 tabPtr
.p
->m_schemaVersion
== schemaVersion
)
5137 SubTableData
* data
= (SubTableData
*)signal
->getDataPtrSend();//trg;
5138 data
->gci
= last_gci
;
5139 data
->tableId
= table
;
5140 data
->requestInfo
= 0;
5141 SubTableData::setOperation(data
->requestInfo
, event
);
5143 data
->anyValue
= any_value
;
5144 data
->totalLen
= ptrLen
;
5147 LocalDLList
<Subscriber
>
5148 list(c_subscriberPool
,tabPtr
.p
->c_subscribers
);
5149 SubscriberPtr subbPtr
;
5150 for(list
.first(subbPtr
); !subbPtr
.isNull(); list
.next(subbPtr
))
5152 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
5153 refToNode(subbPtr
.p
->m_senderRef
)));
5154 data
->senderData
= subbPtr
.p
->m_senderData
;
5155 sendSignal(subbPtr
.p
->m_senderRef
, GSN_SUB_TABLE_DATA
, signal
,
5156 SubTableData::SignalLength
, JBB
, ptr
, nptr
);
5165 if(ptr
== end
&& (tail
!= bucket
->m_buffer_head
.m_page_id
))
5170 free_page(tail
, page
);
5171 tail
= bucket
->m_buffer_tail
= next_page
;
5174 ndbout_c("ptr == end -> resend switching page to %d", tail
);
5178 pos
= (ptr
- page
->m_data
);
5184 bucket
->m_state
&= ~(Uint32
)Bucket::BUCKET_RESEND
;
5185 ndbassert(! (bucket
->m_state
& Bucket::BUCKET_TAKEOVER
));
5186 ndbout_c("resend done...");
5190 signal
->theData
[0] = SumaContinueB::RESEND_BUCKET
;
5191 signal
->theData
[1] = buck
;
5192 signal
->theData
[2] = min_gci
;
5193 signal
->theData
[3] = pos
;
5194 signal
->theData
[4] = last_gci
;
5196 sendSignal(SUMA_REF
, GSN_CONTINUEB
, signal
, 5, JBB
);
5198 sendSignalWithDelay(SUMA_REF
, GSN_CONTINUEB
, signal
, 10, 5);
5201 template void append(DataBuffer
<11>&,SegmentedSectionPtr
,SectionSegmentPool
&);