mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / ndbapi / ClusterMgr.cpp
blob7948f272e58f0e0d2a6068634ccdd3565346a307
1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
17 #include <my_pthread.h>
18 #include <ndb_limits.h>
19 #include <util/version.h>
21 #include "TransporterFacade.hpp"
22 #include "ClusterMgr.hpp"
23 #include <IPCConfig.hpp>
24 #include "NdbApiSignal.hpp"
25 #include "API.hpp"
26 #include <NdbSleep.h>
27 #include <NdbOut.hpp>
28 #include <NdbTick.h>
31 #include <signaldata/NodeFailRep.hpp>
32 #include <signaldata/NFCompleteRep.hpp>
33 #include <signaldata/ApiRegSignalData.hpp>
35 #include <mgmapi.h>
36 #include <mgmapi_configuration.hpp>
37 #include <mgmapi_config_parameters.h>
39 int global_flag_skip_invalidate_cache = 0;
40 //#define DEBUG_REG
42 // Just a C wrapper for threadMain
43 extern "C"
44 void*
45 runClusterMgr_C(void * me)
47 ((ClusterMgr*) me)->threadMain();
49 return NULL;
52 extern "C" {
53 void ndbSetOwnVersion();
55 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
56 theStop(0),
57 theFacade(_facade)
59 DBUG_ENTER("ClusterMgr::ClusterMgr");
60 ndbSetOwnVersion();
61 clusterMgrThreadMutex = NdbMutex_Create();
62 waitForHBCond= NdbCondition_Create();
63 waitingForHB= false;
64 m_max_api_reg_req_interval= 0xFFFFFFFF; // MAX_INT
65 noOfAliveNodes= 0;
66 noOfConnectedNodes= 0;
67 theClusterMgrThread= 0;
68 m_connect_count = 0;
69 m_cluster_state = CS_waiting_for_clean_cache;
70 DBUG_VOID_RETURN;
73 ClusterMgr::~ClusterMgr()
75 DBUG_ENTER("ClusterMgr::~ClusterMgr");
76 doStop();
77 NdbCondition_Destroy(waitForHBCond);
78 NdbMutex_Destroy(clusterMgrThreadMutex);
79 DBUG_VOID_RETURN;
82 void
83 ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
84 for(iter.first(); iter.valid(); iter.next()){
85 Uint32 tmp = 0;
86 if(iter.get(CFG_NODE_ID, &tmp))
87 continue;
89 theNodes[tmp].defined = true;
90 #if 0
91 ndbout << "--------------------------------------" << endl;
92 ndbout << "--------------------------------------" << endl;
93 ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));
94 #endif
96 unsigned type;
97 if(iter.get(CFG_TYPE_OF_SECTION, &type))
98 continue;
100 switch(type){
101 case NODE_TYPE_DB:
102 theNodes[tmp].m_info.m_type = NodeInfo::DB;
103 break;
104 case NODE_TYPE_API:
105 theNodes[tmp].m_info.m_type = NodeInfo::API;
106 break;
107 case NODE_TYPE_MGM:
108 theNodes[tmp].m_info.m_type = NodeInfo::MGM;
109 break;
110 default:
111 type = type;
112 #if 0
113 ndbout_c("ClusterMgr: Unknown node type: %d", type);
114 #endif
119 void
120 ClusterMgr::startThread() {
121 NdbMutex_Lock(clusterMgrThreadMutex);
123 theStop = 0;
125 theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
126 (void**)this,
127 32768,
128 "ndb_clustermgr",
129 NDB_THREAD_PRIO_LOW);
130 NdbMutex_Unlock(clusterMgrThreadMutex);
133 void
134 ClusterMgr::doStop( ){
135 DBUG_ENTER("ClusterMgr::doStop");
136 NdbMutex_Lock(clusterMgrThreadMutex);
137 if(theStop){
138 NdbMutex_Unlock(clusterMgrThreadMutex);
139 DBUG_VOID_RETURN;
141 void *status;
142 theStop = 1;
143 if (theClusterMgrThread) {
144 NdbThread_WaitFor(theClusterMgrThread, &status);
145 NdbThread_Destroy(&theClusterMgrThread);
147 NdbMutex_Unlock(clusterMgrThreadMutex);
148 DBUG_VOID_RETURN;
151 void
152 ClusterMgr::forceHB()
154 theFacade.lock_mutex();
156 if(waitingForHB)
158 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
159 theFacade.unlock_mutex();
160 return;
163 waitingForHB= true;
165 NodeBitmask ndb_nodes;
166 ndb_nodes.clear();
167 waitForHBFromNodes.clear();
168 for(Uint32 i = 0; i < MAX_NODES; i++)
170 if(!theNodes[i].defined)
171 continue;
172 if(theNodes[i].m_info.m_type == NodeInfo::DB)
174 ndb_nodes.set(i);
175 const ClusterMgr::Node &node= getNodeInfo(i);
176 waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
179 waitForHBFromNodes.bitAND(ndb_nodes);
181 #ifdef DEBUG_REG
182 char buf[128];
183 ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
184 #endif
185 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
187 signal.theVerId_signalNumber = GSN_API_REGREQ;
188 signal.theReceiversBlockNumber = QMGR;
189 signal.theTrace = 0;
190 signal.theLength = ApiRegReq::SignalLength;
192 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
193 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
194 req->version = NDB_VERSION;
196 int nodeId= 0;
197 for(int i=0;
198 (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
199 i= nodeId+1)
201 #ifdef DEBUG_REG
202 ndbout << "FORCE HB to " << nodeId << endl;
203 #endif
204 theFacade.sendSignalUnCond(&signal, nodeId);
207 /* Wait for nodes to reply - if any heartbeats was sent */
208 if (!waitForHBFromNodes.isclear())
209 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
211 waitingForHB= false;
212 #ifdef DEBUG_REG
213 ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
214 #endif
215 theFacade.unlock_mutex();
218 void
219 ClusterMgr::threadMain( ){
220 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
222 signal.theVerId_signalNumber = GSN_API_REGREQ;
223 signal.theReceiversBlockNumber = QMGR;
224 signal.theTrace = 0;
225 signal.theLength = ApiRegReq::SignalLength;
227 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
228 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
229 req->version = NDB_VERSION;
232 Uint32 timeSlept = 100;
233 Uint64 now = NdbTick_CurrentMillisecond();
235 while(!theStop){
237 * Start of Secure area for use of Transporter
239 if (m_cluster_state == CS_waiting_for_clean_cache)
241 theFacade.m_globalDictCache.lock();
242 unsigned sz= theFacade.m_globalDictCache.get_size();
243 theFacade.m_globalDictCache.unlock();
244 if (sz)
245 goto next;
246 m_cluster_state = CS_waiting_for_first_connect;
249 theFacade.lock_mutex();
250 for (int i = 1; i < MAX_NDB_NODES; i++){
252 * Send register request (heartbeat) to all available nodes
253 * at specified timing intervals
255 const NodeId nodeId = i;
256 Node & theNode = theNodes[nodeId];
258 if (!theNode.defined)
259 continue;
261 if (theNode.connected == false){
262 theFacade.doConnect(nodeId);
263 continue;
266 if (!theNode.compatible){
267 continue;
270 theNode.hbCounter += timeSlept;
271 if (theNode.hbCounter >= m_max_api_reg_req_interval ||
272 theNode.hbCounter >= theNode.hbFrequency) {
274 * It is now time to send a new Heartbeat
276 if (theNode.hbCounter >= theNode.hbFrequency) {
277 theNode.m_info.m_heartbeat_cnt++;
278 theNode.hbCounter = 0;
281 #ifdef DEBUG_REG
282 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
283 #endif
284 theFacade.sendSignalUnCond(&signal, nodeId);
285 }//if
287 if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){
288 reportNodeFailed(i);
289 }//if
293 * End of secure area. Let other threads in
295 theFacade.unlock_mutex();
297 next:
298 // Sleep for 100 ms between each Registration Heartbeat
299 Uint64 before = now;
300 NdbSleep_MilliSleep(100);
301 now = NdbTick_CurrentMillisecond();
302 timeSlept = (now - before);
306 #if 0
307 void
308 ClusterMgr::showState(NodeId nodeId){
309 ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl;
310 ndbout << "theNodeList = " << theNodeList[nodeId] << endl;
311 ndbout << "theNodeState = " << theNodeState[nodeId] << endl;
312 ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl;
313 ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl;
314 ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;
316 #endif
318 ClusterMgr::Node::Node()
319 : m_state(NodeState::SL_NOTHING) {
320 compatible = nfCompleteRep = true;
321 connected = defined = m_alive = m_api_reg_conf = false;
322 m_state.m_connected_nodes.clear();
325 /******************************************************************************
326 * API_REGREQ and friends
327 ******************************************************************************/
329 void
330 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
331 const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
332 const NodeId nodeId = refToNode(apiRegReq->ref);
334 #ifdef DEBUG_REG
335 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
336 #endif
338 assert(nodeId > 0 && nodeId < MAX_NODES);
340 Node & node = theNodes[nodeId];
341 assert(node.defined == true);
342 assert(node.connected == true);
344 if(node.m_info.m_version != apiRegReq->version){
345 node.m_info.m_version = apiRegReq->version;
347 if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
348 getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
349 node.compatible = false;
350 } else {
351 node.compatible = true;
355 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
356 signal.theVerId_signalNumber = GSN_API_REGCONF;
357 signal.theReceiversBlockNumber = API_CLUSTERMGR;
358 signal.theTrace = 0;
359 signal.theLength = ApiRegConf::SignalLength;
361 ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
362 conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
363 conf->version = NDB_VERSION;
364 conf->apiHeartbeatFrequency = node.hbFrequency;
365 theFacade.sendSignalUnCond(&signal, nodeId);
368 void
369 ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
370 const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
371 const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
373 #ifdef DEBUG_REG
374 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
375 #endif
377 assert(nodeId > 0 && nodeId < MAX_NODES);
379 Node & node = theNodes[nodeId];
380 assert(node.defined == true);
381 assert(node.connected == true);
383 if(node.m_info.m_version != apiRegConf->version){
384 node.m_info.m_version = apiRegConf->version;
385 if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
386 node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
387 node.m_info.m_version);
388 else
389 node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
390 node.m_info.m_version);
393 node.m_api_reg_conf = true;
395 node.m_state = apiRegConf->nodeState;
396 if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
397 node.m_state.getSingleUserMode())){
398 set_node_alive(node, true);
399 } else {
400 set_node_alive(node, false);
401 }//if
402 node.m_info.m_heartbeat_cnt = 0;
403 node.hbCounter = 0;
405 if(waitingForHB)
407 waitForHBFromNodes.clear(nodeId);
409 if(waitForHBFromNodes.isclear())
411 waitingForHB= false;
412 NdbCondition_Broadcast(waitForHBCond);
415 node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
418 void
419 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
421 ApiRegRef * ref = (ApiRegRef*)theData;
423 const NodeId nodeId = refToNode(ref->ref);
425 assert(nodeId > 0 && nodeId < MAX_NODES);
427 Node & node = theNodes[nodeId];
428 assert(node.connected == true);
429 assert(node.defined == true);
431 node.compatible = false;
432 set_node_alive(node, false);
433 node.m_state = NodeState::SL_NOTHING;
434 node.m_info.m_version = ref->version;
436 switch(ref->errorCode){
437 case ApiRegRef::WrongType:
438 ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
439 abort();
440 case ApiRegRef::UnsupportedVersion:
441 default:
442 break;
445 waitForHBFromNodes.clear(nodeId);
446 if(waitForHBFromNodes.isclear())
447 NdbCondition_Signal(waitForHBCond);
450 void
451 ClusterMgr::execNODE_FAILREP(const Uint32 * theData){
452 NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];
453 for(int i = 1; i<MAX_NODES; i++){
454 if(NodeBitmask::get(nodeFail->theNodes, i)){
455 reportNodeFailed(i);
460 void
461 ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){
462 NFCompleteRep * const nfComp = (NFCompleteRep *)theData;
464 const NodeId nodeId = nfComp->failedNodeId;
465 assert(nodeId > 0 && nodeId < MAX_NODES);
467 theFacade.ReportNodeFailureComplete(nodeId);
468 theNodes[nodeId].nfCompleteRep = true;
471 void
472 ClusterMgr::reportConnected(NodeId nodeId){
473 DBUG_ENTER("ClusterMgr::reportConnected");
474 DBUG_PRINT("info", ("nodeId: %u", nodeId));
476 * Ensure that we are sending heartbeat every 100 ms
477 * until we have got the first reply from NDB providing
478 * us with the real time-out period to use.
480 assert(nodeId > 0 && nodeId < MAX_NODES);
482 noOfConnectedNodes++;
484 Node & theNode = theNodes[nodeId];
485 theNode.connected = true;
486 theNode.m_info.m_heartbeat_cnt = 0;
487 theNode.hbCounter = 0;
490 * make sure the node itself is marked connected even
491 * if first API_REGCONF has not arrived
493 theNode.m_state.m_connected_nodes.set(nodeId);
494 theNode.hbFrequency = 0;
495 theNode.m_info.m_version = 0;
496 theNode.compatible = true;
497 theNode.nfCompleteRep = true;
498 theNode.m_state.startLevel = NodeState::SL_NOTHING;
500 theFacade.ReportNodeAlive(nodeId);
501 DBUG_VOID_RETURN;
504 void
505 ClusterMgr::reportDisconnected(NodeId nodeId){
506 assert(nodeId > 0 && nodeId < MAX_NODES);
507 assert(noOfConnectedNodes > 0);
509 noOfConnectedNodes--;
510 theNodes[nodeId].connected = false;
511 theNodes[nodeId].m_api_reg_conf = false;
512 theNodes[nodeId].m_state.m_connected_nodes.clear();
514 reportNodeFailed(nodeId, true);
517 void
518 ClusterMgr::reportNodeFailed(NodeId nodeId, bool disconnect){
520 Node & theNode = theNodes[nodeId];
522 set_node_alive(theNode, false);
523 theNode.m_info.m_connectCount ++;
525 if(theNode.connected)
527 theFacade.doDisconnect(nodeId);
530 const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING);
531 theNode.m_state.startLevel = NodeState::SL_NOTHING;
533 if(disconnect || report)
535 theFacade.ReportNodeDead(nodeId);
538 theNode.nfCompleteRep = false;
539 if(noOfAliveNodes == 0)
541 if (!global_flag_skip_invalidate_cache)
543 theFacade.m_globalDictCache.lock();
544 theFacade.m_globalDictCache.invalidate_all();
545 theFacade.m_globalDictCache.unlock();
546 m_connect_count ++;
547 m_cluster_state = CS_waiting_for_clean_cache;
549 NFCompleteRep rep;
550 for(Uint32 i = 1; i<MAX_NODES; i++){
551 if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
552 rep.failedNodeId = i;
553 execNF_COMPLETEREP((Uint32*)&rep);
559 /******************************************************************************
560 * Arbitrator
561 ******************************************************************************/
562 ArbitMgr::ArbitMgr(TransporterFacade & _fac)
563 : theFacade(_fac)
565 DBUG_ENTER("ArbitMgr::ArbitMgr");
567 theThreadMutex = NdbMutex_Create();
568 theInputCond = NdbCondition_Create();
569 theInputMutex = NdbMutex_Create();
571 theRank = 0;
572 theDelay = 0;
573 theThread = 0;
575 theInputTimeout = 0;
576 theInputFull = false;
577 memset(&theInputFull, 0, sizeof(theInputFull));
578 theState = StateInit;
580 memset(&theStartReq, 0, sizeof(theStartReq));
581 memset(&theChooseReq1, 0, sizeof(theChooseReq1));
582 memset(&theChooseReq2, 0, sizeof(theChooseReq2));
583 memset(&theStopOrd, 0, sizeof(theStopOrd));
585 DBUG_VOID_RETURN;
588 ArbitMgr::~ArbitMgr()
590 DBUG_ENTER("ArbitMgr::~ArbitMgr");
591 NdbMutex_Destroy(theThreadMutex);
592 NdbCondition_Destroy(theInputCond);
593 NdbMutex_Destroy(theInputMutex);
594 DBUG_VOID_RETURN;
597 // Start arbitrator thread. This is kernel request.
598 // First stop any previous thread since it is a left-over
599 // which was never used and which now has wrong ticket.
600 void
601 ArbitMgr::doStart(const Uint32* theData)
603 ArbitSignal aSignal;
604 NdbMutex_Lock(theThreadMutex);
605 if (theThread != NULL) {
606 aSignal.init(GSN_ARBIT_STOPORD, NULL);
607 aSignal.data.code = StopRestart;
608 sendSignalToThread(aSignal);
609 void* value;
610 NdbThread_WaitFor(theThread, &value);
611 NdbThread_Destroy(&theThread);
612 theState = StateInit;
613 theInputFull = false;
615 aSignal.init(GSN_ARBIT_STARTREQ, theData);
616 sendSignalToThread(aSignal);
617 theThread = NdbThread_Create(
618 runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr",
619 NDB_THREAD_PRIO_HIGH);
620 NdbMutex_Unlock(theThreadMutex);
623 // The "choose me" signal from a candidate.
624 void
625 ArbitMgr::doChoose(const Uint32* theData)
627 ArbitSignal aSignal;
628 aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
629 sendSignalToThread(aSignal);
632 // Stop arbitrator thread via stop signal from the kernel
633 // or when exiting API program.
634 void
635 ArbitMgr::doStop(const Uint32* theData)
637 DBUG_ENTER("ArbitMgr::doStop");
638 ArbitSignal aSignal;
639 NdbMutex_Lock(theThreadMutex);
640 if (theThread != NULL) {
641 aSignal.init(GSN_ARBIT_STOPORD, theData);
642 if (theData == 0) {
643 aSignal.data.code = StopExit;
644 } else {
645 aSignal.data.code = StopRequest;
647 sendSignalToThread(aSignal);
648 void* value;
649 NdbThread_WaitFor(theThread, &value);
650 NdbThread_Destroy(&theThread);
651 theState = StateInit;
653 NdbMutex_Unlock(theThreadMutex);
654 DBUG_VOID_RETURN;
657 // private methods
659 extern "C"
660 void*
661 runArbitMgr_C(void* me)
663 ((ArbitMgr*) me)->threadMain();
664 return NULL;
667 void
668 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
670 #ifdef DEBUG_ARBIT
671 char buf[17] = "";
672 ndbout << "arbit recv: ";
673 ndbout << " gsn=" << aSignal.gsn;
674 ndbout << " send=" << aSignal.data.sender;
675 ndbout << " code=" << aSignal.data.code;
676 ndbout << " node=" << aSignal.data.node;
677 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
678 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
679 ndbout << endl;
680 #endif
681 aSignal.setTimestamp(); // signal arrival time
682 NdbMutex_Lock(theInputMutex);
683 while (theInputFull) {
684 NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
686 theInputBuffer = aSignal;
687 theInputFull = true;
688 NdbCondition_Signal(theInputCond);
689 NdbMutex_Unlock(theInputMutex);
692 void
693 ArbitMgr::threadMain()
695 ArbitSignal aSignal;
696 aSignal = theInputBuffer;
697 threadStart(aSignal);
698 bool stop = false;
699 while (! stop) {
700 NdbMutex_Lock(theInputMutex);
701 while (! theInputFull) {
702 NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
703 threadTimeout();
705 aSignal = theInputBuffer;
706 theInputFull = false;
707 NdbCondition_Signal(theInputCond);
708 NdbMutex_Unlock(theInputMutex);
709 switch (aSignal.gsn) {
710 case GSN_ARBIT_CHOOSEREQ:
711 threadChoose(aSignal);
712 break;
713 case GSN_ARBIT_STOPORD:
714 stop = true;
715 break;
718 threadStop(aSignal);
721 // handle events in the thread
723 void
724 ArbitMgr::threadStart(ArbitSignal& aSignal)
726 theStartReq = aSignal;
727 sendStartConf(theStartReq, ArbitCode::ApiStart);
728 theState = StateStarted;
729 theInputTimeout = 1000;
732 void
733 ArbitMgr::threadChoose(ArbitSignal& aSignal)
735 switch (theState) {
736 case StateStarted: // first REQ
737 if (! theStartReq.data.match(aSignal.data)) {
738 sendChooseRef(aSignal, ArbitCode::ErrTicket);
739 break;
741 theChooseReq1 = aSignal;
742 if (theDelay == 0) {
743 sendChooseConf(aSignal, ArbitCode::WinChoose);
744 theState = StateFinished;
745 theInputTimeout = 1000;
746 break;
748 theState = StateChoose1;
749 theInputTimeout = 1;
750 return;
751 case StateChoose1: // second REQ within Delay
752 if (! theStartReq.data.match(aSignal.data)) {
753 sendChooseRef(aSignal, ArbitCode::ErrTicket);
754 break;
756 theChooseReq2 = aSignal;
757 theState = StateChoose2;
758 theInputTimeout = 1;
759 return;
760 case StateChoose2: // too many REQs - refuse all
761 if (! theStartReq.data.match(aSignal.data)) {
762 sendChooseRef(aSignal, ArbitCode::ErrTicket);
763 break;
765 sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
766 sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
767 sendChooseRef(aSignal, ArbitCode::ErrToomany);
768 theState = StateFinished;
769 theInputTimeout = 1000;
770 return;
771 default:
772 sendChooseRef(aSignal, ArbitCode::ErrState);
773 break;
777 void
778 ArbitMgr::threadTimeout()
780 switch (theState) {
781 case StateStarted:
782 break;
783 case StateChoose1:
784 if (theChooseReq1.getTimediff() < theDelay)
785 break;
786 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
787 theState = StateFinished;
788 theInputTimeout = 1000;
789 break;
790 case StateChoose2:
791 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
792 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
793 theState = StateFinished;
794 theInputTimeout = 1000;
795 break;
796 default:
797 break;
801 void
802 ArbitMgr::threadStop(ArbitSignal& aSignal)
804 switch (aSignal.data.code) {
805 case StopExit:
806 switch (theState) {
807 case StateStarted:
808 sendStopRep(theStartReq, 0);
809 break;
810 case StateChoose1: // just in time
811 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
812 break;
813 case StateChoose2:
814 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
815 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
816 break;
817 case StateInit:
818 case StateFinished:
819 //??
820 break;
822 break;
823 case StopRequest:
824 break;
825 case StopRestart:
826 break;
830 // output routines
832 void
833 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
835 ArbitSignal copySignal = aSignal;
836 copySignal.gsn = GSN_ARBIT_STARTCONF;
837 copySignal.data.code = code;
838 sendSignalToQmgr(copySignal);
841 void
842 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
844 ArbitSignal copySignal = aSignal;
845 copySignal.gsn = GSN_ARBIT_CHOOSECONF;
846 copySignal.data.code = code;
847 sendSignalToQmgr(copySignal);
850 void
851 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
853 ArbitSignal copySignal = aSignal;
854 copySignal.gsn = GSN_ARBIT_CHOOSEREF;
855 copySignal.data.code = code;
856 sendSignalToQmgr(copySignal);
859 void
860 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
862 ArbitSignal copySignal = aSignal;
863 copySignal.gsn = GSN_ARBIT_STOPREP;
864 copySignal.data.code = code;
865 sendSignalToQmgr(copySignal);
869 * Send signal to QMGR. The input includes signal number and
870 * signal data. The signal data is normally a copy of a received
871 * signal so it contains expected arbitrator node id and ticket.
872 * The sender in signal data is the QMGR node id.
874 void
875 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
877 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
879 signal.theVerId_signalNumber = aSignal.gsn;
880 signal.theReceiversBlockNumber = QMGR;
881 signal.theTrace = 0;
882 signal.theLength = ArbitSignalData::SignalLength;
884 ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
886 sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
887 sd->code = aSignal.data.code;
888 sd->node = aSignal.data.node;
889 sd->ticket = aSignal.data.ticket;
890 sd->mask = aSignal.data.mask;
892 #ifdef DEBUG_ARBIT
893 char buf[17] = "";
894 ndbout << "arbit send: ";
895 ndbout << " gsn=" << aSignal.gsn;
896 ndbout << " recv=" << aSignal.data.sender;
897 ndbout << " code=" << aSignal.data.code;
898 ndbout << " node=" << aSignal.data.node;
899 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
900 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
901 ndbout << endl;
902 #endif
904 theFacade.lock_mutex();
905 theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
906 theFacade.unlock_mutex();