1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
17 #include <my_pthread.h>
19 #include <TransporterRegistry.hpp>
20 #include "TransporterInternalDefinitions.hpp"
22 #include "Transporter.hpp"
23 #include <SocketAuthenticator.hpp>
25 #ifdef NDB_TCP_TRANSPORTER
26 #include "TCP_Transporter.hpp"
29 #ifdef NDB_SCI_TRANSPORTER
30 #include "SCI_Transporter.hpp"
33 #ifdef NDB_SHM_TRANSPORTER
34 #include "SHM_Transporter.hpp"
35 extern int g_ndb_shm_signum
;
38 #include "TransporterCallback.hpp"
42 #include <InputStream.hpp>
43 #include <OutputStream.hpp>
45 #include <mgmapi/mgmapi.h>
46 #include <mgmapi_internal.h>
47 #include <mgmapi/mgmapi_debug.h>
49 #include <EventLogger.hpp>
50 extern EventLogger g_eventLogger
;
53 TransporterRegistry::get_connect_address(NodeId node_id
) const
55 return theTransporters
[node_id
]->m_connect_address
;
58 SocketServer::Session
* TransporterService::newSession(NDB_SOCKET_TYPE sockfd
)
60 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
61 if (m_auth
&& !m_auth
->server_authenticate(sockfd
)){
62 NDB_CLOSE_SOCKET(sockfd
);
66 if (!m_transporter_registry
->connect_server(sockfd
))
68 NDB_CLOSE_SOCKET(sockfd
);
75 TransporterRegistry::TransporterRegistry(void * callback
,
76 unsigned _maxTransporters
,
77 unsigned sizeOfLongSignalMemory
) :
81 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
83 nodeIdSpecified
= false;
84 maxTransporters
= _maxTransporters
;
89 theTCPTransporters
= new TCP_Transporter
* [maxTransporters
];
90 theSCITransporters
= new SCI_Transporter
* [maxTransporters
];
91 theSHMTransporters
= new SHM_Transporter
* [maxTransporters
];
92 theTransporterTypes
= new TransporterType
[maxTransporters
];
93 theTransporters
= new Transporter
* [maxTransporters
];
94 performStates
= new PerformState
[maxTransporters
];
95 ioStates
= new IOState
[maxTransporters
];
97 // Initialize member variables
100 nSCITransporters
= 0;
101 nSHMTransporters
= 0;
103 // Initialize the transporter arrays
104 for (unsigned i
=0; i
<maxTransporters
; i
++) {
105 theTCPTransporters
[i
] = NULL
;
106 theSCITransporters
[i
] = NULL
;
107 theSHMTransporters
[i
] = NULL
;
108 theTransporters
[i
] = NULL
;
109 performStates
[i
] = DISCONNECTED
;
110 ioStates
[i
] = NoHalt
;
116 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h
)
118 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
120 ndb_mgm_destroy_handle(&m_mgm_handle
);
122 ndb_mgm_set_timeout(m_mgm_handle
, 5000);
127 DBUG_PRINT("info",("handle set with connectstring: %s",
128 ndb_mgm_get_connectstring(h
,buf
, sizeof(buf
))));
132 DBUG_PRINT("info",("handle set to NULL"));
138 TransporterRegistry::~TransporterRegistry()
140 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
144 delete[] theTCPTransporters
;
145 delete[] theSCITransporters
;
146 delete[] theSHMTransporters
;
147 delete[] theTransporterTypes
;
148 delete[] theTransporters
;
149 delete[] performStates
;
153 ndb_mgm_destroy_handle(&m_mgm_handle
);
159 TransporterRegistry::removeAll(){
160 for(unsigned i
= 0; i
<maxTransporters
; i
++){
161 if(theTransporters
[i
] != NULL
)
162 removeTransporter(theTransporters
[i
]->getRemoteNodeId());
167 TransporterRegistry::disconnectAll(){
168 for(unsigned i
= 0; i
<maxTransporters
; i
++){
169 if(theTransporters
[i
] != NULL
)
170 theTransporters
[i
]->doDisconnect();
175 TransporterRegistry::init(NodeId nodeId
) {
176 DBUG_ENTER("TransporterRegistry::init");
177 nodeIdSpecified
= true;
178 localNodeId
= nodeId
;
180 DEBUG("TransporterRegistry started node: " << localNodeId
);
186 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd
)
188 DBUG_ENTER("TransporterRegistry::connect_server");
190 // read node id from client
191 // read transporter type
192 int nodeId
, remote_transporter_type
= -1;
193 SocketInputStream
s_input(sockfd
);
195 if (s_input
.gets(buf
, 256) == 0) {
196 DBUG_PRINT("error", ("Could not get node id from client"));
199 int r
= sscanf(buf
, "%d %d", &nodeId
, &remote_transporter_type
);
204 // we're running version prior to 4.1.9
205 // ok, but with no checks on transporter configuration compatability
208 DBUG_PRINT("error", ("Error in node id from client"));
212 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
213 nodeId
,remote_transporter_type
));
215 //check that nodeid is valid and that there is an allocated transporter
216 if ( nodeId
< 0 || nodeId
>= (int)maxTransporters
) {
217 DBUG_PRINT("error", ("Node id out of range from client"));
220 if (theTransporters
[nodeId
] == 0) {
221 DBUG_PRINT("error", ("No transporter for this node id from client"));
225 //check that the transporter should be connected
226 if (performStates
[nodeId
] != TransporterRegistry::CONNECTING
) {
227 DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
231 Transporter
*t
= theTransporters
[nodeId
];
233 // send info about own id (just as response to acknowledge connection)
234 // send info on own transporter type
235 SocketOutputStream
s_output(sockfd
);
236 s_output
.println("%d %d", t
->getLocalNodeId(), t
->m_type
);
238 if (remote_transporter_type
!= -1)
240 if (remote_transporter_type
!= t
->m_type
)
242 DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
243 t
->m_type
, remote_transporter_type
));
244 g_eventLogger
.error("Incompatible configuration: Transporter type "
245 "mismatch with node %d", nodeId
);
247 // wait for socket close for 1 second to let message arrive at client
251 FD_SET(sockfd
, &a_set
);
252 struct timeval timeout
;
253 timeout
.tv_sec
= 1; timeout
.tv_usec
= 0;
254 select(sockfd
+1, &a_set
, 0, 0, &timeout
);
259 else if (t
->m_type
== tt_SHM_TRANSPORTER
)
261 g_eventLogger
.warning("Unable to verify transporter compatability with node %d", nodeId
);
264 // setup transporter (transporter responsible for closing sockfd)
265 t
->connect_server(sockfd
);
271 TransporterRegistry::createTCPTransporter(TransporterConfiguration
*config
) {
272 #ifdef NDB_TCP_TRANSPORTER
274 if(!nodeIdSpecified
){
275 init(config
->localNodeId
);
278 if(config
->localNodeId
!= localNodeId
)
281 if(theTransporters
[config
->remoteNodeId
] != NULL
)
284 TCP_Transporter
* t
= new TCP_Transporter(*this,
285 config
->tcp
.sendBufferSize
,
286 config
->tcp
.maxReceiveSize
,
287 config
->localHostName
,
288 config
->remoteHostName
,
290 config
->isMgmConnection
,
292 config
->remoteNodeId
,
293 config
->serverNodeId
,
298 else if (!t
->initTransporter()) {
303 // Put the transporter in the transporter arrays
304 theTCPTransporters
[nTCPTransporters
] = t
;
305 theTransporters
[t
->getRemoteNodeId()] = t
;
306 theTransporterTypes
[t
->getRemoteNodeId()] = tt_TCP_TRANSPORTER
;
307 performStates
[t
->getRemoteNodeId()] = DISCONNECTED
;
318 TransporterRegistry::createSCITransporter(TransporterConfiguration
*config
) {
319 #ifdef NDB_SCI_TRANSPORTER
321 if(!SCI_Transporter::initSCI())
324 if(!nodeIdSpecified
){
325 init(config
->localNodeId
);
328 if(config
->localNodeId
!= localNodeId
)
331 if(theTransporters
[config
->remoteNodeId
] != NULL
)
334 SCI_Transporter
* t
= new SCI_Transporter(*this,
335 config
->localHostName
,
336 config
->remoteHostName
,
338 config
->isMgmConnection
,
339 config
->sci
.sendLimit
,
340 config
->sci
.bufferSize
,
341 config
->sci
.nLocalAdapters
,
342 config
->sci
.remoteSciNodeId0
,
343 config
->sci
.remoteSciNodeId1
,
345 config
->remoteNodeId
,
346 config
->serverNodeId
,
352 else if (!t
->initTransporter()) {
356 // Put the transporter in the transporter arrays
357 theSCITransporters
[nSCITransporters
] = t
;
358 theTransporters
[t
->getRemoteNodeId()] = t
;
359 theTransporterTypes
[t
->getRemoteNodeId()] = tt_SCI_TRANSPORTER
;
360 performStates
[t
->getRemoteNodeId()] = DISCONNECTED
;
371 TransporterRegistry::createSHMTransporter(TransporterConfiguration
*config
) {
372 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
373 #ifdef NDB_SHM_TRANSPORTER
374 if(!nodeIdSpecified
){
375 init(config
->localNodeId
);
378 if(config
->localNodeId
!= localNodeId
)
381 if (!g_ndb_shm_signum
) {
382 g_ndb_shm_signum
= config
->shm
.signum
;
383 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum
));
385 * Make sure to block g_ndb_shm_signum
386 * TransporterRegistry::init is run from "main" thread
388 NdbThread_set_shm_sigmask(TRUE
);
391 if(config
->shm
.signum
!= g_ndb_shm_signum
)
394 if(theTransporters
[config
->remoteNodeId
] != NULL
)
397 SHM_Transporter
* t
= new SHM_Transporter(*this,
398 config
->localHostName
,
399 config
->remoteHostName
,
401 config
->isMgmConnection
,
403 config
->remoteNodeId
,
404 config
->serverNodeId
,
412 else if (!t
->initTransporter()) {
416 // Put the transporter in the transporter arrays
417 theSHMTransporters
[nSHMTransporters
] = t
;
418 theTransporters
[t
->getRemoteNodeId()] = t
;
419 theTransporterTypes
[t
->getRemoteNodeId()] = tt_SHM_TRANSPORTER
;
420 performStates
[t
->getRemoteNodeId()] = DISCONNECTED
;
433 TransporterRegistry::removeTransporter(NodeId nodeId
) {
435 DEBUG("Removing transporter from " << localNodeId
436 << " to " << nodeId
);
438 if(theTransporters
[nodeId
] == NULL
)
441 theTransporters
[nodeId
]->doDisconnect();
443 const TransporterType type
= theTransporterTypes
[nodeId
];
447 case tt_TCP_TRANSPORTER
:
448 #ifdef NDB_TCP_TRANSPORTER
449 for(; ind
< nTCPTransporters
; ind
++)
450 if(theTCPTransporters
[ind
]->getRemoteNodeId() == nodeId
)
453 for(; ind
<nTCPTransporters
; ind
++)
454 theTCPTransporters
[ind
-1] = theTCPTransporters
[ind
];
458 case tt_SCI_TRANSPORTER
:
459 #ifdef NDB_SCI_TRANSPORTER
460 for(; ind
< nSCITransporters
; ind
++)
461 if(theSCITransporters
[ind
]->getRemoteNodeId() == nodeId
)
464 for(; ind
<nSCITransporters
; ind
++)
465 theSCITransporters
[ind
-1] = theSCITransporters
[ind
];
469 case tt_SHM_TRANSPORTER
:
470 #ifdef NDB_SHM_TRANSPORTER
471 for(; ind
< nSHMTransporters
; ind
++)
472 if(theSHMTransporters
[ind
]->getRemoteNodeId() == nodeId
)
475 for(; ind
<nSHMTransporters
; ind
++)
476 theSHMTransporters
[ind
-1] = theSHMTransporters
[ind
];
484 // Delete the transporter and remove it from theTransporters array
485 delete theTransporters
[nodeId
];
486 theTransporters
[nodeId
] = NULL
;
490 TransporterRegistry::get_free_buffer(Uint32 node
) const
493 if(likely((t
= theTransporters
[node
]) != 0))
495 return t
->get_free_buffer();
502 TransporterRegistry::prepareSend(const SignalHeader
* const signalHeader
,
504 const Uint32
* const signalData
,
506 const LinearSectionPtr ptr
[3]){
509 Transporter
*t
= theTransporters
[nodeId
];
511 (((ioStates
[nodeId
] != HaltOutput
) && (ioStates
[nodeId
] != HaltIO
)) ||
512 ((signalHeader
->theReceiversBlockNumber
== 252) ||
513 (signalHeader
->theReceiversBlockNumber
== 4002)))) {
515 if(t
->isConnected()){
516 Uint32 lenBytes
= t
->m_packer
.getMessageLength(signalHeader
, ptr
);
517 if(lenBytes
<= MAX_MESSAGE_SIZE
){
518 Uint32
* insertPtr
= t
->getWritePtr(lenBytes
, prio
);
520 t
->m_packer
.pack(insertPtr
, prio
, signalHeader
, signalData
, ptr
);
521 t
->updateWritePtr(lenBytes
, prio
);
528 * @note: on linux/i386 the granularity is 10ms
529 * so sleepTime = 2 generates a 10 ms sleep.
531 for(int i
= 0; i
<50; i
++){
532 if((nSHMTransporters
+nSCITransporters
) == 0)
533 NdbSleep_MilliSleep(sleepTime
);
534 insertPtr
= t
->getWritePtr(lenBytes
, prio
);
536 t
->m_packer
.pack(insertPtr
, prio
, signalHeader
, signalData
, ptr
);
537 t
->updateWritePtr(lenBytes
, prio
);
544 * Send buffer full, but resend works
546 reportError(callbackObj
, nodeId
, TE_SEND_BUFFER_FULL
);
550 WARNING("Signal to " << nodeId
<< " lost(buffer)");
551 reportError(callbackObj
, nodeId
, TE_SIGNAL_LOST_SEND_BUFFER_FULL
);
552 return SEND_BUFFER_FULL
;
554 return SEND_MESSAGE_TOO_BIG
;
557 DEBUG("Signal to " << nodeId
<< " lost(disconnect) ");
558 return SEND_DISCONNECTED
;
561 DEBUG("Discarding message to block: "
562 << signalHeader
->theReceiversBlockNumber
563 << " node: " << nodeId
);
566 return SEND_UNKNOWN_NODE
;
573 TransporterRegistry::prepareSend(const SignalHeader
* const signalHeader
,
575 const Uint32
* const signalData
,
577 class SectionSegmentPool
& thePool
,
578 const SegmentedSectionPtr ptr
[3]){
581 Transporter
*t
= theTransporters
[nodeId
];
583 (((ioStates
[nodeId
] != HaltOutput
) && (ioStates
[nodeId
] != HaltIO
)) ||
584 ((signalHeader
->theReceiversBlockNumber
== 252)||
585 (signalHeader
->theReceiversBlockNumber
== 4002)))) {
587 if(t
->isConnected()){
588 Uint32 lenBytes
= t
->m_packer
.getMessageLength(signalHeader
, ptr
);
589 if(lenBytes
<= MAX_MESSAGE_SIZE
){
590 Uint32
* insertPtr
= t
->getWritePtr(lenBytes
, prio
);
592 t
->m_packer
.pack(insertPtr
, prio
, signalHeader
, signalData
, thePool
, ptr
);
593 t
->updateWritePtr(lenBytes
, prio
);
599 * @note: on linux/i386 the granularity is 10ms
600 * so sleepTime = 2 generates a 10 ms sleep.
603 for(int i
= 0; i
<50; i
++){
604 if((nSHMTransporters
+nSCITransporters
) == 0)
605 NdbSleep_MilliSleep(sleepTime
);
606 insertPtr
= t
->getWritePtr(lenBytes
, prio
);
608 t
->m_packer
.pack(insertPtr
, prio
, signalHeader
, signalData
, thePool
, ptr
);
609 t
->updateWritePtr(lenBytes
, prio
);
616 * Send buffer full, but resend works
618 reportError(callbackObj
, nodeId
, TE_SEND_BUFFER_FULL
);
622 WARNING("Signal to " << nodeId
<< " lost(buffer)");
623 reportError(callbackObj
, nodeId
, TE_SIGNAL_LOST_SEND_BUFFER_FULL
);
624 return SEND_BUFFER_FULL
;
626 return SEND_MESSAGE_TOO_BIG
;
629 DEBUG("Signal to " << nodeId
<< " lost(disconnect) ");
630 return SEND_DISCONNECTED
;
633 DEBUG("Discarding message to block: "
634 << signalHeader
->theReceiversBlockNumber
635 << " node: " << nodeId
);
638 return SEND_UNKNOWN_NODE
;
645 TransporterRegistry::external_IO(Uint32 timeOutMillis
) {
646 //-----------------------------------------------------------
647 // Most of the time we will send the buffers here and then wait
648 // for new signals. Thus we start by sending without timeout
649 // followed by the receive part where we expect to sleep for
651 //-----------------------------------------------------------
652 if(pollReceive(timeOutMillis
)){
659 TransporterRegistry::pollReceive(Uint32 timeOutMillis
){
662 if((nSCITransporters
) > 0)
667 #ifdef NDB_SHM_TRANSPORTER
668 if(nSHMTransporters
> 0)
670 Uint32 res
= poll_SHM(0);
679 #ifdef NDB_TCP_TRANSPORTER
680 if(nTCPTransporters
> 0 || retVal
== 0)
682 retVal
|= poll_TCP(timeOutMillis
);
685 tcpReadSelectReply
= 0;
687 #ifdef NDB_SCI_TRANSPORTER
688 if(nSCITransporters
> 0)
689 retVal
|= poll_SCI(timeOutMillis
);
691 #ifdef NDB_SHM_TRANSPORTER
692 if(nSHMTransporters
> 0 && retVal
== 0)
694 int res
= poll_SHM(0);
702 #ifdef NDB_SCI_TRANSPORTER
704 TransporterRegistry::poll_SCI(Uint32 timeOutMillis
)
706 for (int i
=0; i
<nSCITransporters
; i
++) {
707 SCI_Transporter
* t
= theSCITransporters
[i
];
708 if (t
->isConnected()) {
709 if(t
->hasDataToRead())
718 #ifdef NDB_SHM_TRANSPORTER
719 static int g_shm_counter
= 0;
721 TransporterRegistry::poll_SHM(Uint32 timeOutMillis
)
723 for(int j
=0; j
< 100; j
++)
725 for (int i
=0; i
<nSHMTransporters
; i
++) {
726 SHM_Transporter
* t
= theSHMTransporters
[i
];
727 if (t
->isConnected()) {
728 if(t
->hasDataToRead()) {
738 #ifdef NDB_TCP_TRANSPORTER
740 TransporterRegistry::poll_TCP(Uint32 timeOutMillis
)
742 bool hasdata
= false;
743 if (false && nTCPTransporters
== 0)
745 tcpReadSelectReply
= 0;
749 NDB_SOCKET_TYPE maxSocketValue
= -1;
751 // Needed for TCP/IP connections
752 // The read- and writeset are used by select
754 FD_ZERO(&tcpReadset
);
756 // Prepare for sending and receiving
757 for (int i
= 0; i
< nTCPTransporters
; i
++) {
758 TCP_Transporter
* t
= theTCPTransporters
[i
];
760 // If the transporter is connected
761 NodeId nodeId
= t
->getRemoteNodeId();
762 if (is_connected(nodeId
) && t
->isConnected()) {
764 const NDB_SOCKET_TYPE socket
= t
->getSocket();
765 // Find the highest socket value. It will be used by select
766 if (socket
> maxSocketValue
)
767 maxSocketValue
= socket
;
769 // Put the connected transporters in the socket read-set
770 FD_SET(socket
, &tcpReadset
);
772 hasdata
|= t
->hasReceiveData();
775 timeOutMillis
= hasdata
? 0 : timeOutMillis
;
777 struct timeval timeout
;
778 timeout
.tv_sec
= timeOutMillis
/ 1000;
779 timeout
.tv_usec
= (timeOutMillis
% 1000) * 1000;
781 // The highest socket value plus one
784 tcpReadSelectReply
= select(maxSocketValue
, &tcpReadset
, 0, 0, &timeout
);
785 if(false && tcpReadSelectReply
== -1 && errno
== EINTR
)
786 g_eventLogger
.info("woke-up by signal");
789 if(tcpReadSelectReply
== SOCKET_ERROR
)
791 NdbSleep_MilliSleep(timeOutMillis
);
795 return tcpReadSelectReply
|| hasdata
;
801 TransporterRegistry::performReceive()
803 #ifdef NDB_TCP_TRANSPORTER
804 for (int i
=0; i
<nTCPTransporters
; i
++)
807 TCP_Transporter
*t
= theTCPTransporters
[i
];
808 const NodeId nodeId
= t
->getRemoteNodeId();
809 const NDB_SOCKET_TYPE socket
= t
->getSocket();
810 if(is_connected(nodeId
)){
813 if (FD_ISSET(socket
, &tcpReadset
))
818 if (t
->hasReceiveData())
821 Uint32 sz
= t
->getReceiveData(&ptr
);
822 transporter_recv_from(callbackObj
, nodeId
);
823 Uint32 szUsed
= unpack(ptr
, sz
, nodeId
, ioStates
[nodeId
]);
824 t
->updateReceiveDataPtr(szUsed
);
831 #ifdef NDB_SCI_TRANSPORTER
833 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
834 for (int i
=0; i
<nSCITransporters
; i
++)
837 SCI_Transporter
*t
= theSCITransporters
[i
];
838 const NodeId nodeId
= t
->getRemoteNodeId();
839 if(is_connected(nodeId
))
841 if(t
->isConnected() && t
->checkConnected())
843 Uint32
* readPtr
, * eodPtr
;
844 t
->getReceivePtr(&readPtr
, &eodPtr
);
845 transporter_recv_from(callbackObj
, nodeId
);
846 Uint32
*newPtr
= unpack(readPtr
, eodPtr
, nodeId
, ioStates
[nodeId
]);
847 t
->updateReceivePtr(newPtr
);
852 #ifdef NDB_SHM_TRANSPORTER
853 for (int i
=0; i
<nSHMTransporters
; i
++)
856 SHM_Transporter
*t
= theSHMTransporters
[i
];
857 const NodeId nodeId
= t
->getRemoteNodeId();
858 if(is_connected(nodeId
)){
859 if(t
->isConnected() && t
->checkConnected())
861 Uint32
* readPtr
, * eodPtr
;
862 t
->getReceivePtr(&readPtr
, &eodPtr
);
863 transporter_recv_from(callbackObj
, nodeId
);
864 Uint32
*newPtr
= unpack(readPtr
, eodPtr
, nodeId
, ioStates
[nodeId
]);
865 t
->updateReceivePtr(newPtr
);
873 TransporterRegistry::performSend()
878 #ifdef NDB_TCP_TRANSPORTER
879 for (i
= m_transp_count
; i
< nTCPTransporters
; i
++)
881 TCP_Transporter
*t
= theTCPTransporters
[i
];
882 if (t
&& t
->hasDataToSend() && t
->isConnected() &&
883 is_connected(t
->getRemoteNodeId()))
888 for (i
= 0; i
< m_transp_count
&& i
< nTCPTransporters
; i
++)
890 TCP_Transporter
*t
= theTCPTransporters
[i
];
891 if (t
&& t
->hasDataToSend() && t
->isConnected() &&
892 is_connected(t
->getRemoteNodeId()))
898 if (m_transp_count
== nTCPTransporters
) m_transp_count
= 0;
900 #ifdef NDB_SCI_TRANSPORTER
901 //scroll through the SCI transporters,
902 // get each transporter, check if connected, send data
903 for (i
=0; i
<nSCITransporters
; i
++) {
904 SCI_Transporter
*t
= theSCITransporters
[i
];
905 const NodeId nodeId
= t
->getRemoteNodeId();
907 if(is_connected(nodeId
))
909 if(t
->isConnected() && t
->hasDataToSend()) {
916 #ifdef NDB_SHM_TRANSPORTER
917 for (i
=0; i
<nSHMTransporters
; i
++)
919 SHM_Transporter
*t
= theSHMTransporters
[i
];
920 const NodeId nodeId
= t
->getRemoteNodeId();
921 if(is_connected(nodeId
))
933 TransporterRegistry::forceSendCheck(int sendLimit
){
934 int tSendCounter
= sendCounter
;
935 sendCounter
= tSendCounter
+ 1;
936 if (tSendCounter
>= sendLimit
) {
942 }//TransporterRegistry::forceSendCheck()
944 #ifdef DEBUG_TRANSPORTER
946 TransporterRegistry::printState(){
947 ndbout
<< "-- TransporterRegistry -- " << endl
<< endl
948 << "Transporters = " << nTransporters
<< endl
;
949 for(int i
= 0; i
<maxTransporters
; i
++)
950 if(theTransporters
[i
] != NULL
){
951 const NodeId remoteNodeId
= theTransporters
[i
]->getRemoteNodeId();
952 ndbout
<< "Transporter: " << remoteNodeId
953 << " PerformState: " << performStates
[remoteNodeId
]
954 << " IOState: " << ioStates
[remoteNodeId
] << endl
;
960 TransporterRegistry::ioState(NodeId nodeId
) {
961 return ioStates
[nodeId
];
965 TransporterRegistry::setIOState(NodeId nodeId
, IOState state
) {
966 DEBUG("TransporterRegistry::setIOState("
967 << nodeId
<< ", " << state
<< ")");
968 ioStates
[nodeId
] = state
;
972 run_start_clients_C(void * me
)
974 ((TransporterRegistry
*) me
)->start_clients_thread();
978 // Run by kernel thread
980 TransporterRegistry::do_connect(NodeId node_id
)
982 PerformState
&curr_state
= performStates
[node_id
];
993 DBUG_ENTER("TransporterRegistry::do_connect");
994 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id
));
995 curr_state
= CONNECTING
;
999 TransporterRegistry::do_disconnect(NodeId node_id
)
1001 PerformState
&curr_state
= performStates
[node_id
];
1012 DBUG_ENTER("TransporterRegistry::do_disconnect");
1013 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id
));
1014 curr_state
= DISCONNECTING
;
1019 TransporterRegistry::report_connect(NodeId node_id
)
1021 DBUG_ENTER("TransporterRegistry::report_connect");
1022 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id
));
1023 performStates
[node_id
] = CONNECTED
;
1024 reportConnect(callbackObj
, node_id
);
1029 TransporterRegistry::report_disconnect(NodeId node_id
, int errnum
)
1031 DBUG_ENTER("TransporterRegistry::report_disconnect");
1032 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id
));
1033 performStates
[node_id
] = DISCONNECTED
;
1034 reportDisconnect(callbackObj
, node_id
, errnum
);
1039 TransporterRegistry::update_connections()
1041 for (int i
= 0, n
= 0; n
< nTransporters
; i
++){
1042 Transporter
* t
= theTransporters
[i
];
1047 const NodeId nodeId
= t
->getRemoteNodeId();
1048 switch(performStates
[nodeId
]){
1053 if(t
->isConnected())
1054 report_connect(nodeId
);
1057 if(!t
->isConnected())
1058 report_disconnect(nodeId
, 0);
1064 // run as own thread
1066 TransporterRegistry::start_clients_thread()
1068 int persist_mgm_count
= 0;
1069 DBUG_ENTER("TransporterRegistry::start_clients_thread");
1070 while (m_run_start_clients_thread
) {
1071 NdbSleep_MilliSleep(100);
1072 persist_mgm_count
++;
1073 if(persist_mgm_count
==50)
1075 ndb_mgm_check_connection(m_mgm_handle
);
1076 persist_mgm_count
= 0;
1078 for (int i
= 0, n
= 0; n
< nTransporters
&& m_run_start_clients_thread
; i
++){
1079 Transporter
* t
= theTransporters
[i
];
1084 const NodeId nodeId
= t
->getRemoteNodeId();
1085 switch(performStates
[nodeId
]){
1087 if(!t
->isConnected() && !t
->isServer
) {
1088 bool connected
= false;
1090 * First, we try to connect (if we have a port number).
1092 if (t
->get_s_port())
1093 connected
= t
->connect_client();
1096 * If dynamic, get the port for connecting from the management server
1098 if( !connected
&& t
->get_s_port() <= 0) { // Port is dynamic
1100 struct ndb_mgm_reply mgm_reply
;
1102 if(!ndb_mgm_is_connected(m_mgm_handle
))
1103 ndb_mgm_connect(m_mgm_handle
, 0, 0, 0);
1105 if(ndb_mgm_is_connected(m_mgm_handle
))
1108 ndb_mgm_get_connection_int_parameter(m_mgm_handle
,
1109 t
->getRemoteNodeId(),
1110 t
->getLocalNodeId(),
1111 CFG_CONNECTION_SERVER_PORT
,
1114 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1115 server_port
,t
->getRemoteNodeId(),
1116 t
->getLocalNodeId(),res
));
1120 * Server_port == 0 just means that that a mgmt server
1121 * has not received a new port yet. Keep the old.
1124 t
->set_s_port(server_port
);
1126 else if(ndb_mgm_is_connected(m_mgm_handle
))
1128 g_eventLogger
.info("Failed to get dynamic port to connect to: %d", res
);
1129 ndb_mgm_disconnect(m_mgm_handle
);
1133 g_eventLogger
.info("Management server closed connection early. "
1134 "It is probably being shut down (or has problems). "
1135 "We will retry the connection. %d %s %s line: %d",
1136 ndb_mgm_get_latest_error(m_mgm_handle
),
1137 ndb_mgm_get_latest_error_desc(m_mgm_handle
),
1138 ndb_mgm_get_latest_error_msg(m_mgm_handle
),
1139 ndb_mgm_get_latest_error_line(m_mgm_handle
)
1144 * We will not be able to get a new port unless
1145 * the m_mgm_handle is connected. Note that not
1146 * being connected is an ok state, just continue
1147 * until it is able to connect. Continue using the
1148 * old port until we can connect again and get a
1155 if(t
->isConnected())
1167 TransporterRegistry::start_clients()
1169 m_run_start_clients_thread
= true;
1170 m_start_clients_thread
= NdbThread_Create(run_start_clients_C
,
1173 "ndb_start_clients",
1174 NDB_THREAD_PRIO_LOW
);
1175 if (m_start_clients_thread
== 0) {
1176 m_run_start_clients_thread
= false;
1183 TransporterRegistry::stop_clients()
1185 if (m_start_clients_thread
) {
1186 m_run_start_clients_thread
= false;
1188 NdbThread_WaitFor(m_start_clients_thread
, &status
);
1189 NdbThread_Destroy(&m_start_clients_thread
);
1195 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId
,
1199 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1200 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf
, s_port
));
1201 if (interf
&& strlen(interf
) == 0)
1204 for (unsigned i
= 0; i
< m_transporter_interface
.size(); i
++)
1206 Transporter_interface
&tmp
= m_transporter_interface
[i
];
1207 if (s_port
!= tmp
.m_s_service_port
|| tmp
.m_s_service_port
==0)
1209 if (interf
!= 0 && tmp
.m_interface
!= 0 &&
1210 strcmp(interf
, tmp
.m_interface
) == 0)
1212 DBUG_VOID_RETURN
; // found match, no need to insert
1214 if (interf
== 0 && tmp
.m_interface
== 0)
1216 DBUG_VOID_RETURN
; // found match, no need to insert
1219 Transporter_interface t
;
1220 t
.m_remote_nodeId
= remoteNodeId
;
1221 t
.m_s_service_port
= s_port
;
1222 t
.m_interface
= interf
;
1223 m_transporter_interface
.push_back(t
);
1224 DBUG_PRINT("exit",("interface and port added"));
1229 TransporterRegistry::start_service(SocketServer
& socket_server
)
1231 DBUG_ENTER("TransporterRegistry::start_service");
1232 if (m_transporter_interface
.size() > 0 && !nodeIdSpecified
)
1234 g_eventLogger
.error("TransporterRegistry::startReceiving: localNodeId not specified");
1238 for (unsigned i
= 0; i
< m_transporter_interface
.size(); i
++)
1240 Transporter_interface
&t
= m_transporter_interface
[i
];
1242 unsigned short port
= (unsigned short)t
.m_s_service_port
;
1243 if(t
.m_s_service_port
<0)
1244 port
= -t
.m_s_service_port
; // is a dynamic port
1245 TransporterService
*transporter_service
=
1246 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1247 if(!socket_server
.setup(transporter_service
,
1248 &port
, t
.m_interface
))
1250 DBUG_PRINT("info", ("Trying new port"));
1252 if(t
.m_s_service_port
>0
1253 || !socket_server
.setup(transporter_service
,
1254 &port
, t
.m_interface
))
1257 * If it wasn't a dynamically allocated port, or
1258 * our attempts at getting a new dynamic port failed
1260 g_eventLogger
.error("Unable to setup transporter service port: %s:%d!\n"
1261 "Please check if the port is already used,\n"
1262 "(perhaps the node is already running)",
1263 t
.m_interface
? t
.m_interface
: "*", t
.m_s_service_port
);
1264 delete transporter_service
;
1268 t
.m_s_service_port
= (t
.m_s_service_port
<=0)?-port
:port
; // -`ve if dynamic
1269 DBUG_PRINT("info", ("t.m_s_service_port = %d",t
.m_s_service_port
));
1270 transporter_service
->setTransporterRegistry(this);
1275 #ifdef NDB_SHM_TRANSPORTER
1278 shm_sig_handler(int signo
)
1285 TransporterRegistry::startReceiving()
1287 DBUG_ENTER("TransporterRegistry::startReceiving");
1289 #ifdef NDB_SHM_TRANSPORTER
1290 m_shm_own_pid
= getpid();
1291 if (g_ndb_shm_signum
)
1293 DBUG_PRINT("info",("Install signal handler for signum %d",
1295 struct sigaction sa
;
1296 NdbThread_set_shm_sigmask(FALSE
);
1297 sigemptyset(&sa
.sa_mask
);
1298 sa
.sa_handler
= shm_sig_handler
;
1301 while((ret
= sigaction(g_ndb_shm_signum
, &sa
, 0)) == -1 && errno
== EINTR
);
1304 DBUG_PRINT("error",("Install failed"));
1305 g_eventLogger
.error("Failed to install signal handler for"
1306 " SHM transporter, signum %d, errno: %d (%s)",
1307 g_ndb_shm_signum
, errno
, strerror(errno
));
1310 #endif // NDB_SHM_TRANSPORTER
1315 TransporterRegistry::stopReceiving(){
1317 * Disconnect all transporters, this includes detach from remote node
1318 * and since that must be done from the same process that called attach
1319 * it's done here in the receive thread
1325 TransporterRegistry::startSending(){
1329 TransporterRegistry::stopSending(){
1332 NdbOut
& operator <<(NdbOut
& out
, SignalHeader
& sh
){
1333 out
<< "-- Signal Header --" << endl
;
1334 out
<< "theLength: " << sh
.theLength
<< endl
;
1335 out
<< "gsn: " << sh
.theVerId_signalNumber
<< endl
;
1336 out
<< "recBlockNo: " << sh
.theReceiversBlockNumber
<< endl
;
1337 out
<< "sendBlockRef: " << sh
.theSendersBlockRef
<< endl
;
1338 out
<< "sendersSig: " << sh
.theSendersSignalId
<< endl
;
1339 out
<< "theSignalId: " << sh
.theSignalId
<< endl
;
1340 out
<< "trace: " << (int)sh
.theTrace
<< endl
;
1345 TransporterRegistry::get_transporter(NodeId nodeId
) {
1346 return theTransporters
[nodeId
];
1349 bool TransporterRegistry::connect_client(NdbMgmHandle
*h
)
1351 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
1353 Uint32 mgm_nodeid
= ndb_mgm_get_mgmd_nodeid(*h
);
1357 g_eventLogger
.error("%s: %d", __FILE__
, __LINE__
);
1360 Transporter
* t
= theTransporters
[mgm_nodeid
];
1363 g_eventLogger
.error("%s: %d", __FILE__
, __LINE__
);
1366 DBUG_RETURN(t
->connect_client(connect_ndb_mgmd(h
)));
1370 * Given a connected NdbMgmHandle, turns it into a transporter
1371 * and returns the socket.
1373 NDB_SOCKET_TYPE
TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle
*h
)
1375 struct ndb_mgm_reply mgm_reply
;
1377 if ( h
==NULL
|| *h
== NULL
)
1379 g_eventLogger
.error("%s: %d", __FILE__
, __LINE__
);
1380 return NDB_INVALID_SOCKET
;
1383 for(unsigned int i
=0;i
< m_transporter_interface
.size();i
++)
1384 if (m_transporter_interface
[i
].m_s_service_port
< 0
1385 && ndb_mgm_set_connection_int_parameter(*h
,
1387 m_transporter_interface
[i
].m_remote_nodeId
,
1388 CFG_CONNECTION_SERVER_PORT
,
1389 m_transporter_interface
[i
].m_s_service_port
,
1392 g_eventLogger
.error("Error: %s: %d",
1393 ndb_mgm_get_latest_error_desc(*h
),
1394 ndb_mgm_get_latest_error(*h
));
1395 g_eventLogger
.error("%s: %d", __FILE__
, __LINE__
);
1396 ndb_mgm_destroy_handle(h
);
1397 return NDB_INVALID_SOCKET
;
1401 * convert_to_transporter also disposes of the handle (i.e. we don't leak
1404 NDB_SOCKET_TYPE sockfd
= ndb_mgm_convert_to_transporter(h
);
1405 if ( sockfd
== NDB_INVALID_SOCKET
)
1407 g_eventLogger
.error("Error: %s: %d",
1408 ndb_mgm_get_latest_error_desc(*h
),
1409 ndb_mgm_get_latest_error(*h
));
1410 g_eventLogger
.error("%s: %d", __FILE__
, __LINE__
);
1411 ndb_mgm_destroy_handle(h
);
1417 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
1418 * and returns the socket.
1420 NDB_SOCKET_TYPE
TransporterRegistry::connect_ndb_mgmd(SocketClient
*sc
)
1422 NdbMgmHandle h
= ndb_mgm_create_handle();
1426 return NDB_INVALID_SOCKET
;
1434 cs
.assfmt("%s:%u",sc
->get_server_name(),sc
->get_port());
1435 ndb_mgm_set_connectstring(h
, cs
.c_str());
1438 if(ndb_mgm_connect(h
, 0, 0, 0)<0)
1440 ndb_mgm_destroy_handle(&h
);
1441 return NDB_INVALID_SOCKET
;
1444 return connect_ndb_mgmd(&h
);
1447 template class Vector
<TransporterRegistry::Transporter_interface
>;