1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
17 #include <my_pthread.h>
18 #include <ndb_limits.h>
19 #include <util/version.h>
21 #include "TransporterFacade.hpp"
22 #include "ClusterMgr.hpp"
23 #include <IPCConfig.hpp>
24 #include "NdbApiSignal.hpp"
31 #include <signaldata/NodeFailRep.hpp>
32 #include <signaldata/NFCompleteRep.hpp>
33 #include <signaldata/ApiRegSignalData.hpp>
36 #include <mgmapi_configuration.hpp>
37 #include <mgmapi_config_parameters.h>
39 int global_flag_skip_invalidate_cache
= 0;
42 // Just a C wrapper for threadMain
45 runClusterMgr_C(void * me
)
47 ((ClusterMgr
*) me
)->threadMain();
53 void ndbSetOwnVersion();
55 ClusterMgr::ClusterMgr(TransporterFacade
& _facade
):
59 DBUG_ENTER("ClusterMgr::ClusterMgr");
61 clusterMgrThreadMutex
= NdbMutex_Create();
62 waitForHBCond
= NdbCondition_Create();
64 m_max_api_reg_req_interval
= 0xFFFFFFFF; // MAX_INT
66 noOfConnectedNodes
= 0;
67 theClusterMgrThread
= 0;
69 m_cluster_state
= CS_waiting_for_clean_cache
;
73 ClusterMgr::~ClusterMgr()
75 DBUG_ENTER("ClusterMgr::~ClusterMgr");
77 NdbCondition_Destroy(waitForHBCond
);
78 NdbMutex_Destroy(clusterMgrThreadMutex
);
83 ClusterMgr::init(ndb_mgm_configuration_iterator
& iter
){
84 for(iter
.first(); iter
.valid(); iter
.next()){
86 if(iter
.get(CFG_NODE_ID
, &tmp
))
89 theNodes
[tmp
].defined
= true;
91 ndbout
<< "--------------------------------------" << endl
;
92 ndbout
<< "--------------------------------------" << endl
;
93 ndbout_c("ClusterMgr: Node %d defined as %s", tmp
, config
.getNodeType(tmp
));
97 if(iter
.get(CFG_TYPE_OF_SECTION
, &type
))
102 theNodes
[tmp
].m_info
.m_type
= NodeInfo::DB
;
105 theNodes
[tmp
].m_info
.m_type
= NodeInfo::API
;
108 theNodes
[tmp
].m_info
.m_type
= NodeInfo::MGM
;
113 ndbout_c("ClusterMgr: Unknown node type: %d", type
);
120 ClusterMgr::startThread() {
121 NdbMutex_Lock(clusterMgrThreadMutex
);
125 theClusterMgrThread
= NdbThread_Create(runClusterMgr_C
,
129 NDB_THREAD_PRIO_LOW
);
130 NdbMutex_Unlock(clusterMgrThreadMutex
);
134 ClusterMgr::doStop( ){
135 DBUG_ENTER("ClusterMgr::doStop");
136 NdbMutex_Lock(clusterMgrThreadMutex
);
138 NdbMutex_Unlock(clusterMgrThreadMutex
);
143 if (theClusterMgrThread
) {
144 NdbThread_WaitFor(theClusterMgrThread
, &status
);
145 NdbThread_Destroy(&theClusterMgrThread
);
147 NdbMutex_Unlock(clusterMgrThreadMutex
);
152 ClusterMgr::forceHB()
154 theFacade
.lock_mutex();
158 NdbCondition_WaitTimeout(waitForHBCond
, theFacade
.theMutexPtr
, 1000);
159 theFacade
.unlock_mutex();
165 NodeBitmask ndb_nodes
;
167 waitForHBFromNodes
.clear();
168 for(Uint32 i
= 0; i
< MAX_NODES
; i
++)
170 if(!theNodes
[i
].defined
)
172 if(theNodes
[i
].m_info
.m_type
== NodeInfo::DB
)
175 const ClusterMgr::Node
&node
= getNodeInfo(i
);
176 waitForHBFromNodes
.bitOR(node
.m_state
.m_connected_nodes
);
179 waitForHBFromNodes
.bitAND(ndb_nodes
);
183 ndbout
<< "Waiting for HB from " << waitForHBFromNodes
.getText(buf
) << endl
;
185 NdbApiSignal
signal(numberToRef(API_CLUSTERMGR
, theFacade
.ownId()));
187 signal
.theVerId_signalNumber
= GSN_API_REGREQ
;
188 signal
.theReceiversBlockNumber
= QMGR
;
190 signal
.theLength
= ApiRegReq::SignalLength
;
192 ApiRegReq
* req
= CAST_PTR(ApiRegReq
, signal
.getDataPtrSend());
193 req
->ref
= numberToRef(API_CLUSTERMGR
, theFacade
.ownId());
194 req
->version
= NDB_VERSION
;
198 (int) NodeBitmask::NotFound
!= (nodeId
= waitForHBFromNodes
.find(i
));
202 ndbout
<< "FORCE HB to " << nodeId
<< endl
;
204 theFacade
.sendSignalUnCond(&signal
, nodeId
);
207 /* Wait for nodes to reply - if any heartbeats was sent */
208 if (!waitForHBFromNodes
.isclear())
209 NdbCondition_WaitTimeout(waitForHBCond
, theFacade
.theMutexPtr
, 1000);
213 ndbout
<< "Still waiting for HB from " << waitForHBFromNodes
.getText(buf
) << endl
;
215 theFacade
.unlock_mutex();
219 ClusterMgr::threadMain( ){
220 NdbApiSignal
signal(numberToRef(API_CLUSTERMGR
, theFacade
.ownId()));
222 signal
.theVerId_signalNumber
= GSN_API_REGREQ
;
223 signal
.theReceiversBlockNumber
= QMGR
;
225 signal
.theLength
= ApiRegReq::SignalLength
;
227 ApiRegReq
* req
= CAST_PTR(ApiRegReq
, signal
.getDataPtrSend());
228 req
->ref
= numberToRef(API_CLUSTERMGR
, theFacade
.ownId());
229 req
->version
= NDB_VERSION
;
232 Uint32 timeSlept
= 100;
233 Uint64 now
= NdbTick_CurrentMillisecond();
237 * Start of Secure area for use of Transporter
239 if (m_cluster_state
== CS_waiting_for_clean_cache
)
241 theFacade
.m_globalDictCache
.lock();
242 unsigned sz
= theFacade
.m_globalDictCache
.get_size();
243 theFacade
.m_globalDictCache
.unlock();
246 m_cluster_state
= CS_waiting_for_first_connect
;
249 theFacade
.lock_mutex();
250 for (int i
= 1; i
< MAX_NDB_NODES
; i
++){
252 * Send register request (heartbeat) to all available nodes
253 * at specified timing intervals
255 const NodeId nodeId
= i
;
256 Node
& theNode
= theNodes
[nodeId
];
258 if (!theNode
.defined
)
261 if (theNode
.connected
== false){
262 theFacade
.doConnect(nodeId
);
266 if (!theNode
.compatible
){
270 theNode
.hbCounter
+= timeSlept
;
271 if (theNode
.hbCounter
>= m_max_api_reg_req_interval
||
272 theNode
.hbCounter
>= theNode
.hbFrequency
) {
274 * It is now time to send a new Heartbeat
276 if (theNode
.hbCounter
>= theNode
.hbFrequency
) {
277 theNode
.m_info
.m_heartbeat_cnt
++;
278 theNode
.hbCounter
= 0;
282 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId
);
284 theFacade
.sendSignalUnCond(&signal
, nodeId
);
287 if (theNode
.m_info
.m_heartbeat_cnt
== 4 && theNode
.hbFrequency
> 0){
293 * End of secure area. Let other threads in
295 theFacade
.unlock_mutex();
298 // Sleep for 100 ms between each Registration Heartbeat
300 NdbSleep_MilliSleep(100);
301 now
= NdbTick_CurrentMillisecond();
302 timeSlept
= (now
- before
);
308 ClusterMgr::showState(NodeId nodeId
){
309 ndbout
<< "-- ClusterMgr - NodeId = " << nodeId
<< endl
;
310 ndbout
<< "theNodeList = " << theNodeList
[nodeId
] << endl
;
311 ndbout
<< "theNodeState = " << theNodeState
[nodeId
] << endl
;
312 ndbout
<< "theNodeCount = " << theNodeCount
[nodeId
] << endl
;
313 ndbout
<< "theNodeStopDelay = " << theNodeStopDelay
[nodeId
] << endl
;
314 ndbout
<< "theNodeSendDelay = " << theNodeSendDelay
[nodeId
] << endl
;
318 ClusterMgr::Node::Node()
319 : m_state(NodeState::SL_NOTHING
) {
320 compatible
= nfCompleteRep
= true;
321 connected
= defined
= m_alive
= m_api_reg_conf
= false;
322 m_state
.m_connected_nodes
.clear();
325 /******************************************************************************
326 * API_REGREQ and friends
327 ******************************************************************************/
330 ClusterMgr::execAPI_REGREQ(const Uint32
* theData
){
331 const ApiRegReq
* const apiRegReq
= (ApiRegReq
*)&theData
[0];
332 const NodeId nodeId
= refToNode(apiRegReq
->ref
);
335 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId
);
338 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
340 Node
& node
= theNodes
[nodeId
];
341 assert(node
.defined
== true);
342 assert(node
.connected
== true);
344 if(node
.m_info
.m_version
!= apiRegReq
->version
){
345 node
.m_info
.m_version
= apiRegReq
->version
;
347 if (getMajor(node
.m_info
.m_version
) < getMajor(NDB_VERSION
) ||
348 getMinor(node
.m_info
.m_version
) < getMinor(NDB_VERSION
)) {
349 node
.compatible
= false;
351 node
.compatible
= true;
355 NdbApiSignal
signal(numberToRef(API_CLUSTERMGR
, theFacade
.ownId()));
356 signal
.theVerId_signalNumber
= GSN_API_REGCONF
;
357 signal
.theReceiversBlockNumber
= API_CLUSTERMGR
;
359 signal
.theLength
= ApiRegConf::SignalLength
;
361 ApiRegConf
* const conf
= CAST_PTR(ApiRegConf
, signal
.getDataPtrSend());
362 conf
->qmgrRef
= numberToRef(API_CLUSTERMGR
, theFacade
.ownId());
363 conf
->version
= NDB_VERSION
;
364 conf
->apiHeartbeatFrequency
= node
.hbFrequency
;
365 theFacade
.sendSignalUnCond(&signal
, nodeId
);
369 ClusterMgr::execAPI_REGCONF(const Uint32
* theData
){
370 const ApiRegConf
* const apiRegConf
= (ApiRegConf
*)&theData
[0];
371 const NodeId nodeId
= refToNode(apiRegConf
->qmgrRef
);
374 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId
);
377 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
379 Node
& node
= theNodes
[nodeId
];
380 assert(node
.defined
== true);
381 assert(node
.connected
== true);
383 if(node
.m_info
.m_version
!= apiRegConf
->version
){
384 node
.m_info
.m_version
= apiRegConf
->version
;
385 if(theNodes
[theFacade
.ownId()].m_info
.m_type
== NodeInfo::MGM
)
386 node
.compatible
= ndbCompatible_mgmt_ndb(NDB_VERSION
,
387 node
.m_info
.m_version
);
389 node
.compatible
= ndbCompatible_api_ndb(NDB_VERSION
,
390 node
.m_info
.m_version
);
393 node
.m_api_reg_conf
= true;
395 node
.m_state
= apiRegConf
->nodeState
;
396 if (node
.compatible
&& (node
.m_state
.startLevel
== NodeState::SL_STARTED
||
397 node
.m_state
.getSingleUserMode())){
398 set_node_alive(node
, true);
400 set_node_alive(node
, false);
402 node
.m_info
.m_heartbeat_cnt
= 0;
407 waitForHBFromNodes
.clear(nodeId
);
409 if(waitForHBFromNodes
.isclear())
412 NdbCondition_Broadcast(waitForHBCond
);
415 node
.hbFrequency
= (apiRegConf
->apiHeartbeatFrequency
* 10) - 50;
419 ClusterMgr::execAPI_REGREF(const Uint32
* theData
){
421 ApiRegRef
* ref
= (ApiRegRef
*)theData
;
423 const NodeId nodeId
= refToNode(ref
->ref
);
425 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
427 Node
& node
= theNodes
[nodeId
];
428 assert(node
.connected
== true);
429 assert(node
.defined
== true);
431 node
.compatible
= false;
432 set_node_alive(node
, false);
433 node
.m_state
= NodeState::SL_NOTHING
;
434 node
.m_info
.m_version
= ref
->version
;
436 switch(ref
->errorCode
){
437 case ApiRegRef::WrongType
:
438 ndbout_c("Node %d reports that this node should be a NDB node", nodeId
);
440 case ApiRegRef::UnsupportedVersion
:
445 waitForHBFromNodes
.clear(nodeId
);
446 if(waitForHBFromNodes
.isclear())
447 NdbCondition_Signal(waitForHBCond
);
451 ClusterMgr::execNODE_FAILREP(const Uint32
* theData
){
452 NodeFailRep
* const nodeFail
= (NodeFailRep
*)&theData
[0];
453 for(int i
= 1; i
<MAX_NODES
; i
++){
454 if(NodeBitmask::get(nodeFail
->theNodes
, i
)){
461 ClusterMgr::execNF_COMPLETEREP(const Uint32
* theData
){
462 NFCompleteRep
* const nfComp
= (NFCompleteRep
*)theData
;
464 const NodeId nodeId
= nfComp
->failedNodeId
;
465 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
467 theFacade
.ReportNodeFailureComplete(nodeId
);
468 theNodes
[nodeId
].nfCompleteRep
= true;
472 ClusterMgr::reportConnected(NodeId nodeId
){
473 DBUG_ENTER("ClusterMgr::reportConnected");
474 DBUG_PRINT("info", ("nodeId: %u", nodeId
));
476 * Ensure that we are sending heartbeat every 100 ms
477 * until we have got the first reply from NDB providing
478 * us with the real time-out period to use.
480 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
482 noOfConnectedNodes
++;
484 Node
& theNode
= theNodes
[nodeId
];
485 theNode
.connected
= true;
486 theNode
.m_info
.m_heartbeat_cnt
= 0;
487 theNode
.hbCounter
= 0;
490 * make sure the node itself is marked connected even
491 * if first API_REGCONF has not arrived
493 theNode
.m_state
.m_connected_nodes
.set(nodeId
);
494 theNode
.hbFrequency
= 0;
495 theNode
.m_info
.m_version
= 0;
496 theNode
.compatible
= true;
497 theNode
.nfCompleteRep
= true;
498 theNode
.m_state
.startLevel
= NodeState::SL_NOTHING
;
500 theFacade
.ReportNodeAlive(nodeId
);
505 ClusterMgr::reportDisconnected(NodeId nodeId
){
506 assert(nodeId
> 0 && nodeId
< MAX_NODES
);
507 assert(noOfConnectedNodes
> 0);
509 noOfConnectedNodes
--;
510 theNodes
[nodeId
].connected
= false;
511 theNodes
[nodeId
].m_api_reg_conf
= false;
512 theNodes
[nodeId
].m_state
.m_connected_nodes
.clear();
514 reportNodeFailed(nodeId
, true);
518 ClusterMgr::reportNodeFailed(NodeId nodeId
, bool disconnect
){
520 Node
& theNode
= theNodes
[nodeId
];
522 set_node_alive(theNode
, false);
523 theNode
.m_info
.m_connectCount
++;
525 if(theNode
.connected
)
527 theFacade
.doDisconnect(nodeId
);
530 const bool report
= (theNode
.m_state
.startLevel
!= NodeState::SL_NOTHING
);
531 theNode
.m_state
.startLevel
= NodeState::SL_NOTHING
;
533 if(disconnect
|| report
)
535 theFacade
.ReportNodeDead(nodeId
);
538 theNode
.nfCompleteRep
= false;
539 if(noOfAliveNodes
== 0)
541 if (!global_flag_skip_invalidate_cache
)
543 theFacade
.m_globalDictCache
.lock();
544 theFacade
.m_globalDictCache
.invalidate_all();
545 theFacade
.m_globalDictCache
.unlock();
547 m_cluster_state
= CS_waiting_for_clean_cache
;
550 for(Uint32 i
= 1; i
<MAX_NODES
; i
++){
551 if(theNodes
[i
].defined
&& theNodes
[i
].nfCompleteRep
== false){
552 rep
.failedNodeId
= i
;
553 execNF_COMPLETEREP((Uint32
*)&rep
);
559 /******************************************************************************
561 ******************************************************************************/
562 ArbitMgr::ArbitMgr(TransporterFacade
& _fac
)
565 DBUG_ENTER("ArbitMgr::ArbitMgr");
567 theThreadMutex
= NdbMutex_Create();
568 theInputCond
= NdbCondition_Create();
569 theInputMutex
= NdbMutex_Create();
576 theInputFull
= false;
577 memset(&theInputFull
, 0, sizeof(theInputFull
));
578 theState
= StateInit
;
580 memset(&theStartReq
, 0, sizeof(theStartReq
));
581 memset(&theChooseReq1
, 0, sizeof(theChooseReq1
));
582 memset(&theChooseReq2
, 0, sizeof(theChooseReq2
));
583 memset(&theStopOrd
, 0, sizeof(theStopOrd
));
588 ArbitMgr::~ArbitMgr()
590 DBUG_ENTER("ArbitMgr::~ArbitMgr");
591 NdbMutex_Destroy(theThreadMutex
);
592 NdbCondition_Destroy(theInputCond
);
593 NdbMutex_Destroy(theInputMutex
);
597 // Start arbitrator thread. This is kernel request.
598 // First stop any previous thread since it is a left-over
599 // which was never used and which now has wrong ticket.
601 ArbitMgr::doStart(const Uint32
* theData
)
604 NdbMutex_Lock(theThreadMutex
);
605 if (theThread
!= NULL
) {
606 aSignal
.init(GSN_ARBIT_STOPORD
, NULL
);
607 aSignal
.data
.code
= StopRestart
;
608 sendSignalToThread(aSignal
);
610 NdbThread_WaitFor(theThread
, &value
);
611 NdbThread_Destroy(&theThread
);
612 theState
= StateInit
;
613 theInputFull
= false;
615 aSignal
.init(GSN_ARBIT_STARTREQ
, theData
);
616 sendSignalToThread(aSignal
);
617 theThread
= NdbThread_Create(
618 runArbitMgr_C
, (void**)this, 32768, "ndb_arbitmgr",
619 NDB_THREAD_PRIO_HIGH
);
620 NdbMutex_Unlock(theThreadMutex
);
623 // The "choose me" signal from a candidate.
625 ArbitMgr::doChoose(const Uint32
* theData
)
628 aSignal
.init(GSN_ARBIT_CHOOSEREQ
, theData
);
629 sendSignalToThread(aSignal
);
632 // Stop arbitrator thread via stop signal from the kernel
633 // or when exiting API program.
635 ArbitMgr::doStop(const Uint32
* theData
)
637 DBUG_ENTER("ArbitMgr::doStop");
639 NdbMutex_Lock(theThreadMutex
);
640 if (theThread
!= NULL
) {
641 aSignal
.init(GSN_ARBIT_STOPORD
, theData
);
643 aSignal
.data
.code
= StopExit
;
645 aSignal
.data
.code
= StopRequest
;
647 sendSignalToThread(aSignal
);
649 NdbThread_WaitFor(theThread
, &value
);
650 NdbThread_Destroy(&theThread
);
651 theState
= StateInit
;
653 NdbMutex_Unlock(theThreadMutex
);
661 runArbitMgr_C(void* me
)
663 ((ArbitMgr
*) me
)->threadMain();
668 ArbitMgr::sendSignalToThread(ArbitSignal
& aSignal
)
672 ndbout
<< "arbit recv: ";
673 ndbout
<< " gsn=" << aSignal
.gsn
;
674 ndbout
<< " send=" << aSignal
.data
.sender
;
675 ndbout
<< " code=" << aSignal
.data
.code
;
676 ndbout
<< " node=" << aSignal
.data
.node
;
677 ndbout
<< " ticket=" << aSignal
.data
.ticket
.getText(buf
, sizeof(buf
));
678 ndbout
<< " mask=" << aSignal
.data
.mask
.getText(buf
, sizeof(buf
));
681 aSignal
.setTimestamp(); // signal arrival time
682 NdbMutex_Lock(theInputMutex
);
683 while (theInputFull
) {
684 NdbCondition_WaitTimeout(theInputCond
, theInputMutex
, 1000);
686 theInputBuffer
= aSignal
;
688 NdbCondition_Signal(theInputCond
);
689 NdbMutex_Unlock(theInputMutex
);
693 ArbitMgr::threadMain()
696 aSignal
= theInputBuffer
;
697 threadStart(aSignal
);
700 NdbMutex_Lock(theInputMutex
);
701 while (! theInputFull
) {
702 NdbCondition_WaitTimeout(theInputCond
, theInputMutex
, theInputTimeout
);
705 aSignal
= theInputBuffer
;
706 theInputFull
= false;
707 NdbCondition_Signal(theInputCond
);
708 NdbMutex_Unlock(theInputMutex
);
709 switch (aSignal
.gsn
) {
710 case GSN_ARBIT_CHOOSEREQ
:
711 threadChoose(aSignal
);
713 case GSN_ARBIT_STOPORD
:
721 // handle events in the thread
724 ArbitMgr::threadStart(ArbitSignal
& aSignal
)
726 theStartReq
= aSignal
;
727 sendStartConf(theStartReq
, ArbitCode::ApiStart
);
728 theState
= StateStarted
;
729 theInputTimeout
= 1000;
733 ArbitMgr::threadChoose(ArbitSignal
& aSignal
)
736 case StateStarted
: // first REQ
737 if (! theStartReq
.data
.match(aSignal
.data
)) {
738 sendChooseRef(aSignal
, ArbitCode::ErrTicket
);
741 theChooseReq1
= aSignal
;
743 sendChooseConf(aSignal
, ArbitCode::WinChoose
);
744 theState
= StateFinished
;
745 theInputTimeout
= 1000;
748 theState
= StateChoose1
;
751 case StateChoose1
: // second REQ within Delay
752 if (! theStartReq
.data
.match(aSignal
.data
)) {
753 sendChooseRef(aSignal
, ArbitCode::ErrTicket
);
756 theChooseReq2
= aSignal
;
757 theState
= StateChoose2
;
760 case StateChoose2
: // too many REQs - refuse all
761 if (! theStartReq
.data
.match(aSignal
.data
)) {
762 sendChooseRef(aSignal
, ArbitCode::ErrTicket
);
765 sendChooseRef(theChooseReq1
, ArbitCode::ErrToomany
);
766 sendChooseRef(theChooseReq2
, ArbitCode::ErrToomany
);
767 sendChooseRef(aSignal
, ArbitCode::ErrToomany
);
768 theState
= StateFinished
;
769 theInputTimeout
= 1000;
772 sendChooseRef(aSignal
, ArbitCode::ErrState
);
778 ArbitMgr::threadTimeout()
784 if (theChooseReq1
.getTimediff() < theDelay
)
786 sendChooseConf(theChooseReq1
, ArbitCode::WinChoose
);
787 theState
= StateFinished
;
788 theInputTimeout
= 1000;
791 sendChooseConf(theChooseReq1
, ArbitCode::WinChoose
);
792 sendChooseConf(theChooseReq2
, ArbitCode::LoseChoose
);
793 theState
= StateFinished
;
794 theInputTimeout
= 1000;
802 ArbitMgr::threadStop(ArbitSignal
& aSignal
)
804 switch (aSignal
.data
.code
) {
808 sendStopRep(theStartReq
, 0);
810 case StateChoose1
: // just in time
811 sendChooseConf(theChooseReq1
, ArbitCode::WinChoose
);
814 sendChooseConf(theChooseReq1
, ArbitCode::WinChoose
);
815 sendChooseConf(theChooseReq2
, ArbitCode::LoseChoose
);
833 ArbitMgr::sendStartConf(ArbitSignal
& aSignal
, Uint32 code
)
835 ArbitSignal copySignal
= aSignal
;
836 copySignal
.gsn
= GSN_ARBIT_STARTCONF
;
837 copySignal
.data
.code
= code
;
838 sendSignalToQmgr(copySignal
);
842 ArbitMgr::sendChooseConf(ArbitSignal
& aSignal
, Uint32 code
)
844 ArbitSignal copySignal
= aSignal
;
845 copySignal
.gsn
= GSN_ARBIT_CHOOSECONF
;
846 copySignal
.data
.code
= code
;
847 sendSignalToQmgr(copySignal
);
851 ArbitMgr::sendChooseRef(ArbitSignal
& aSignal
, Uint32 code
)
853 ArbitSignal copySignal
= aSignal
;
854 copySignal
.gsn
= GSN_ARBIT_CHOOSEREF
;
855 copySignal
.data
.code
= code
;
856 sendSignalToQmgr(copySignal
);
860 ArbitMgr::sendStopRep(ArbitSignal
& aSignal
, Uint32 code
)
862 ArbitSignal copySignal
= aSignal
;
863 copySignal
.gsn
= GSN_ARBIT_STOPREP
;
864 copySignal
.data
.code
= code
;
865 sendSignalToQmgr(copySignal
);
869 * Send signal to QMGR. The input includes signal number and
870 * signal data. The signal data is normally a copy of a received
871 * signal so it contains expected arbitrator node id and ticket.
872 * The sender in signal data is the QMGR node id.
875 ArbitMgr::sendSignalToQmgr(ArbitSignal
& aSignal
)
877 NdbApiSignal
signal(numberToRef(API_CLUSTERMGR
, theFacade
.ownId()));
879 signal
.theVerId_signalNumber
= aSignal
.gsn
;
880 signal
.theReceiversBlockNumber
= QMGR
;
882 signal
.theLength
= ArbitSignalData::SignalLength
;
884 ArbitSignalData
* sd
= CAST_PTR(ArbitSignalData
, signal
.getDataPtrSend());
886 sd
->sender
= numberToRef(API_CLUSTERMGR
, theFacade
.ownId());
887 sd
->code
= aSignal
.data
.code
;
888 sd
->node
= aSignal
.data
.node
;
889 sd
->ticket
= aSignal
.data
.ticket
;
890 sd
->mask
= aSignal
.data
.mask
;
894 ndbout
<< "arbit send: ";
895 ndbout
<< " gsn=" << aSignal
.gsn
;
896 ndbout
<< " recv=" << aSignal
.data
.sender
;
897 ndbout
<< " code=" << aSignal
.data
.code
;
898 ndbout
<< " node=" << aSignal
.data
.node
;
899 ndbout
<< " ticket=" << aSignal
.data
.ticket
.getText(buf
, sizeof(buf
));
900 ndbout
<< " mask=" << aSignal
.data
.mask
.getText(buf
, sizeof(buf
));
904 theFacade
.lock_mutex();
905 theFacade
.sendSignalUnCond(&signal
, aSignal
.data
.sender
);
906 theFacade
.unlock_mutex();