mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / common / transporter / TransporterRegistry.cpp
blob0f871d08735aa4719c83171f02d77b94e6c36a4c
1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
17 #include <my_pthread.h>
19 #include <TransporterRegistry.hpp>
20 #include "TransporterInternalDefinitions.hpp"
22 #include "Transporter.hpp"
23 #include <SocketAuthenticator.hpp>
25 #ifdef NDB_TCP_TRANSPORTER
26 #include "TCP_Transporter.hpp"
27 #endif
29 #ifdef NDB_SCI_TRANSPORTER
30 #include "SCI_Transporter.hpp"
31 #endif
33 #ifdef NDB_SHM_TRANSPORTER
34 #include "SHM_Transporter.hpp"
35 extern int g_ndb_shm_signum;
36 #endif
38 #include "TransporterCallback.hpp"
39 #include "NdbOut.hpp"
40 #include <NdbSleep.h>
41 #include <NdbTick.h>
42 #include <InputStream.hpp>
43 #include <OutputStream.hpp>
45 #include <mgmapi/mgmapi.h>
46 #include <mgmapi_internal.h>
47 #include <mgmapi/mgmapi_debug.h>
49 #include <EventLogger.hpp>
50 extern EventLogger g_eventLogger;
52 struct in_addr
53 TransporterRegistry::get_connect_address(NodeId node_id) const
55 return theTransporters[node_id]->m_connect_address;
58 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
60 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
61 if (m_auth && !m_auth->server_authenticate(sockfd)){
62 NDB_CLOSE_SOCKET(sockfd);
63 DBUG_RETURN(0);
66 if (!m_transporter_registry->connect_server(sockfd))
68 NDB_CLOSE_SOCKET(sockfd);
69 DBUG_RETURN(0);
72 DBUG_RETURN(0);
75 TransporterRegistry::TransporterRegistry(void * callback,
76 unsigned _maxTransporters,
77 unsigned sizeOfLongSignalMemory) :
78 m_mgm_handle(0),
79 m_transp_count(0)
81 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
83 nodeIdSpecified = false;
84 maxTransporters = _maxTransporters;
85 sendCounter = 1;
87 callbackObj=callback;
89 theTCPTransporters = new TCP_Transporter * [maxTransporters];
90 theSCITransporters = new SCI_Transporter * [maxTransporters];
91 theSHMTransporters = new SHM_Transporter * [maxTransporters];
92 theTransporterTypes = new TransporterType [maxTransporters];
93 theTransporters = new Transporter * [maxTransporters];
94 performStates = new PerformState [maxTransporters];
95 ioStates = new IOState [maxTransporters];
97 // Initialize member variables
98 nTransporters = 0;
99 nTCPTransporters = 0;
100 nSCITransporters = 0;
101 nSHMTransporters = 0;
103 // Initialize the transporter arrays
104 for (unsigned i=0; i<maxTransporters; i++) {
105 theTCPTransporters[i] = NULL;
106 theSCITransporters[i] = NULL;
107 theSHMTransporters[i] = NULL;
108 theTransporters[i] = NULL;
109 performStates[i] = DISCONNECTED;
110 ioStates[i] = NoHalt;
113 DBUG_VOID_RETURN;
116 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
118 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
119 if (m_mgm_handle)
120 ndb_mgm_destroy_handle(&m_mgm_handle);
121 m_mgm_handle= h;
122 ndb_mgm_set_timeout(m_mgm_handle, 5000);
123 #ifndef DBUG_OFF
124 if (h)
126 char buf[256];
127 DBUG_PRINT("info",("handle set with connectstring: %s",
128 ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
130 else
132 DBUG_PRINT("info",("handle set to NULL"));
134 #endif
135 DBUG_VOID_RETURN;
138 TransporterRegistry::~TransporterRegistry()
140 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
142 removeAll();
144 delete[] theTCPTransporters;
145 delete[] theSCITransporters;
146 delete[] theSHMTransporters;
147 delete[] theTransporterTypes;
148 delete[] theTransporters;
149 delete[] performStates;
150 delete[] ioStates;
152 if (m_mgm_handle)
153 ndb_mgm_destroy_handle(&m_mgm_handle);
155 DBUG_VOID_RETURN;
158 void
159 TransporterRegistry::removeAll(){
160 for(unsigned i = 0; i<maxTransporters; i++){
161 if(theTransporters[i] != NULL)
162 removeTransporter(theTransporters[i]->getRemoteNodeId());
166 void
167 TransporterRegistry::disconnectAll(){
168 for(unsigned i = 0; i<maxTransporters; i++){
169 if(theTransporters[i] != NULL)
170 theTransporters[i]->doDisconnect();
174 bool
175 TransporterRegistry::init(NodeId nodeId) {
176 DBUG_ENTER("TransporterRegistry::init");
177 nodeIdSpecified = true;
178 localNodeId = nodeId;
180 DEBUG("TransporterRegistry started node: " << localNodeId);
182 DBUG_RETURN(true);
185 bool
186 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
188 DBUG_ENTER("TransporterRegistry::connect_server");
190 // read node id from client
191 // read transporter type
192 int nodeId, remote_transporter_type= -1;
193 SocketInputStream s_input(sockfd);
194 char buf[256];
195 if (s_input.gets(buf, 256) == 0) {
196 DBUG_PRINT("error", ("Could not get node id from client"));
197 DBUG_RETURN(false);
199 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
200 switch (r) {
201 case 2:
202 break;
203 case 1:
204 // we're running version prior to 4.1.9
205 // ok, but with no checks on transporter configuration compatability
206 break;
207 default:
208 DBUG_PRINT("error", ("Error in node id from client"));
209 DBUG_RETURN(false);
212 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
213 nodeId,remote_transporter_type));
215 //check that nodeid is valid and that there is an allocated transporter
216 if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
217 DBUG_PRINT("error", ("Node id out of range from client"));
218 DBUG_RETURN(false);
220 if (theTransporters[nodeId] == 0) {
221 DBUG_PRINT("error", ("No transporter for this node id from client"));
222 DBUG_RETURN(false);
225 //check that the transporter should be connected
226 if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
227 DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
228 DBUG_RETURN(false);
231 Transporter *t= theTransporters[nodeId];
233 // send info about own id (just as response to acknowledge connection)
234 // send info on own transporter type
235 SocketOutputStream s_output(sockfd);
236 s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
238 if (remote_transporter_type != -1)
240 if (remote_transporter_type != t->m_type)
242 DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
243 t->m_type, remote_transporter_type));
244 g_eventLogger.error("Incompatible configuration: Transporter type "
245 "mismatch with node %d", nodeId);
247 // wait for socket close for 1 second to let message arrive at client
249 fd_set a_set;
250 FD_ZERO(&a_set);
251 FD_SET(sockfd, &a_set);
252 struct timeval timeout;
253 timeout.tv_sec = 1; timeout.tv_usec = 0;
254 select(sockfd+1, &a_set, 0, 0, &timeout);
256 DBUG_RETURN(false);
259 else if (t->m_type == tt_SHM_TRANSPORTER)
261 g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
264 // setup transporter (transporter responsible for closing sockfd)
265 t->connect_server(sockfd);
267 DBUG_RETURN(true);
270 bool
271 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
272 #ifdef NDB_TCP_TRANSPORTER
274 if(!nodeIdSpecified){
275 init(config->localNodeId);
278 if(config->localNodeId != localNodeId)
279 return false;
281 if(theTransporters[config->remoteNodeId] != NULL)
282 return false;
284 TCP_Transporter * t = new TCP_Transporter(*this,
285 config->tcp.sendBufferSize,
286 config->tcp.maxReceiveSize,
287 config->localHostName,
288 config->remoteHostName,
289 config->s_port,
290 config->isMgmConnection,
291 localNodeId,
292 config->remoteNodeId,
293 config->serverNodeId,
294 config->checksum,
295 config->signalId);
296 if (t == NULL)
297 return false;
298 else if (!t->initTransporter()) {
299 delete t;
300 return false;
303 // Put the transporter in the transporter arrays
304 theTCPTransporters[nTCPTransporters] = t;
305 theTransporters[t->getRemoteNodeId()] = t;
306 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
307 performStates[t->getRemoteNodeId()] = DISCONNECTED;
308 nTransporters++;
309 nTCPTransporters++;
311 return true;
312 #else
313 return false;
314 #endif
317 bool
318 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
319 #ifdef NDB_SCI_TRANSPORTER
321 if(!SCI_Transporter::initSCI())
322 abort();
324 if(!nodeIdSpecified){
325 init(config->localNodeId);
328 if(config->localNodeId != localNodeId)
329 return false;
331 if(theTransporters[config->remoteNodeId] != NULL)
332 return false;
334 SCI_Transporter * t = new SCI_Transporter(*this,
335 config->localHostName,
336 config->remoteHostName,
337 config->s_port,
338 config->isMgmConnection,
339 config->sci.sendLimit,
340 config->sci.bufferSize,
341 config->sci.nLocalAdapters,
342 config->sci.remoteSciNodeId0,
343 config->sci.remoteSciNodeId1,
344 localNodeId,
345 config->remoteNodeId,
346 config->serverNodeId,
347 config->checksum,
348 config->signalId);
350 if (t == NULL)
351 return false;
352 else if (!t->initTransporter()) {
353 delete t;
354 return false;
356 // Put the transporter in the transporter arrays
357 theSCITransporters[nSCITransporters] = t;
358 theTransporters[t->getRemoteNodeId()] = t;
359 theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
360 performStates[t->getRemoteNodeId()] = DISCONNECTED;
361 nTransporters++;
362 nSCITransporters++;
364 return true;
365 #else
366 return false;
367 #endif
370 bool
371 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
372 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
373 #ifdef NDB_SHM_TRANSPORTER
374 if(!nodeIdSpecified){
375 init(config->localNodeId);
378 if(config->localNodeId != localNodeId)
379 return false;
381 if (!g_ndb_shm_signum) {
382 g_ndb_shm_signum= config->shm.signum;
383 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
385 * Make sure to block g_ndb_shm_signum
386 * TransporterRegistry::init is run from "main" thread
388 NdbThread_set_shm_sigmask(TRUE);
391 if(config->shm.signum != g_ndb_shm_signum)
392 return false;
394 if(theTransporters[config->remoteNodeId] != NULL)
395 return false;
397 SHM_Transporter * t = new SHM_Transporter(*this,
398 config->localHostName,
399 config->remoteHostName,
400 config->s_port,
401 config->isMgmConnection,
402 localNodeId,
403 config->remoteNodeId,
404 config->serverNodeId,
405 config->checksum,
406 config->signalId,
407 config->shm.shmKey,
408 config->shm.shmSize
410 if (t == NULL)
411 return false;
412 else if (!t->initTransporter()) {
413 delete t;
414 return false;
416 // Put the transporter in the transporter arrays
417 theSHMTransporters[nSHMTransporters] = t;
418 theTransporters[t->getRemoteNodeId()] = t;
419 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
420 performStates[t->getRemoteNodeId()] = DISCONNECTED;
422 nTransporters++;
423 nSHMTransporters++;
425 DBUG_RETURN(true);
426 #else
427 DBUG_RETURN(false);
428 #endif
432 void
433 TransporterRegistry::removeTransporter(NodeId nodeId) {
435 DEBUG("Removing transporter from " << localNodeId
436 << " to " << nodeId);
438 if(theTransporters[nodeId] == NULL)
439 return;
441 theTransporters[nodeId]->doDisconnect();
443 const TransporterType type = theTransporterTypes[nodeId];
445 int ind = 0;
446 switch(type){
447 case tt_TCP_TRANSPORTER:
448 #ifdef NDB_TCP_TRANSPORTER
449 for(; ind < nTCPTransporters; ind++)
450 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
451 break;
452 ind++;
453 for(; ind<nTCPTransporters; ind++)
454 theTCPTransporters[ind-1] = theTCPTransporters[ind];
455 nTCPTransporters --;
456 #endif
457 break;
458 case tt_SCI_TRANSPORTER:
459 #ifdef NDB_SCI_TRANSPORTER
460 for(; ind < nSCITransporters; ind++)
461 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
462 break;
463 ind++;
464 for(; ind<nSCITransporters; ind++)
465 theSCITransporters[ind-1] = theSCITransporters[ind];
466 nSCITransporters --;
467 #endif
468 break;
469 case tt_SHM_TRANSPORTER:
470 #ifdef NDB_SHM_TRANSPORTER
471 for(; ind < nSHMTransporters; ind++)
472 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
473 break;
474 ind++;
475 for(; ind<nSHMTransporters; ind++)
476 theSHMTransporters[ind-1] = theSHMTransporters[ind];
477 nSHMTransporters --;
478 #endif
479 break;
482 nTransporters--;
484 // Delete the transporter and remove it from theTransporters array
485 delete theTransporters[nodeId];
486 theTransporters[nodeId] = NULL;
489 Uint32
490 TransporterRegistry::get_free_buffer(Uint32 node) const
492 Transporter *t;
493 if(likely((t = theTransporters[node]) != 0))
495 return t->get_free_buffer();
497 return 0;
501 SendStatus
502 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
503 Uint8 prio,
504 const Uint32 * const signalData,
505 NodeId nodeId,
506 const LinearSectionPtr ptr[3]){
509 Transporter *t = theTransporters[nodeId];
510 if(t != NULL &&
511 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
512 ((signalHeader->theReceiversBlockNumber == 252) ||
513 (signalHeader->theReceiversBlockNumber == 4002)))) {
515 if(t->isConnected()){
516 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
517 if(lenBytes <= MAX_MESSAGE_SIZE){
518 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
519 if(insertPtr != 0){
520 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
521 t->updateWritePtr(lenBytes, prio);
522 return SEND_OK;
525 int sleepTime = 2;
528 * @note: on linux/i386 the granularity is 10ms
529 * so sleepTime = 2 generates a 10 ms sleep.
531 for(int i = 0; i<50; i++){
532 if((nSHMTransporters+nSCITransporters) == 0)
533 NdbSleep_MilliSleep(sleepTime);
534 insertPtr = t->getWritePtr(lenBytes, prio);
535 if(insertPtr != 0){
536 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
537 t->updateWritePtr(lenBytes, prio);
538 break;
542 if(insertPtr != 0){
544 * Send buffer full, but resend works
546 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
547 return SEND_OK;
550 WARNING("Signal to " << nodeId << " lost(buffer)");
551 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
552 return SEND_BUFFER_FULL;
553 } else {
554 return SEND_MESSAGE_TOO_BIG;
556 } else {
557 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
558 return SEND_DISCONNECTED;
560 } else {
561 DEBUG("Discarding message to block: "
562 << signalHeader->theReceiversBlockNumber
563 << " node: " << nodeId);
565 if(t == NULL)
566 return SEND_UNKNOWN_NODE;
568 return SEND_BLOCKED;
572 SendStatus
573 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
574 Uint8 prio,
575 const Uint32 * const signalData,
576 NodeId nodeId,
577 class SectionSegmentPool & thePool,
578 const SegmentedSectionPtr ptr[3]){
581 Transporter *t = theTransporters[nodeId];
582 if(t != NULL &&
583 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
584 ((signalHeader->theReceiversBlockNumber == 252)||
585 (signalHeader->theReceiversBlockNumber == 4002)))) {
587 if(t->isConnected()){
588 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
589 if(lenBytes <= MAX_MESSAGE_SIZE){
590 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
591 if(insertPtr != 0){
592 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
593 t->updateWritePtr(lenBytes, prio);
594 return SEND_OK;
599 * @note: on linux/i386 the granularity is 10ms
600 * so sleepTime = 2 generates a 10 ms sleep.
602 int sleepTime = 2;
603 for(int i = 0; i<50; i++){
604 if((nSHMTransporters+nSCITransporters) == 0)
605 NdbSleep_MilliSleep(sleepTime);
606 insertPtr = t->getWritePtr(lenBytes, prio);
607 if(insertPtr != 0){
608 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
609 t->updateWritePtr(lenBytes, prio);
610 break;
614 if(insertPtr != 0){
616 * Send buffer full, but resend works
618 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
619 return SEND_OK;
622 WARNING("Signal to " << nodeId << " lost(buffer)");
623 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
624 return SEND_BUFFER_FULL;
625 } else {
626 return SEND_MESSAGE_TOO_BIG;
628 } else {
629 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
630 return SEND_DISCONNECTED;
632 } else {
633 DEBUG("Discarding message to block: "
634 << signalHeader->theReceiversBlockNumber
635 << " node: " << nodeId);
637 if(t == NULL)
638 return SEND_UNKNOWN_NODE;
640 return SEND_BLOCKED;
644 void
645 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
646 //-----------------------------------------------------------
647 // Most of the time we will send the buffers here and then wait
648 // for new signals. Thus we start by sending without timeout
649 // followed by the receive part where we expect to sleep for
650 // a while.
651 //-----------------------------------------------------------
652 if(pollReceive(timeOutMillis)){
653 performReceive();
655 performSend();
658 Uint32
659 TransporterRegistry::pollReceive(Uint32 timeOutMillis){
660 Uint32 retVal = 0;
662 if((nSCITransporters) > 0)
664 timeOutMillis=0;
667 #ifdef NDB_SHM_TRANSPORTER
668 if(nSHMTransporters > 0)
670 Uint32 res = poll_SHM(0);
671 if(res)
673 retVal |= res;
674 timeOutMillis = 0;
677 #endif
679 #ifdef NDB_TCP_TRANSPORTER
680 if(nTCPTransporters > 0 || retVal == 0)
682 retVal |= poll_TCP(timeOutMillis);
684 else
685 tcpReadSelectReply = 0;
686 #endif
687 #ifdef NDB_SCI_TRANSPORTER
688 if(nSCITransporters > 0)
689 retVal |= poll_SCI(timeOutMillis);
690 #endif
691 #ifdef NDB_SHM_TRANSPORTER
692 if(nSHMTransporters > 0 && retVal == 0)
694 int res = poll_SHM(0);
695 retVal |= res;
697 #endif
698 return retVal;
702 #ifdef NDB_SCI_TRANSPORTER
703 Uint32
704 TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
706 for (int i=0; i<nSCITransporters; i++) {
707 SCI_Transporter * t = theSCITransporters[i];
708 if (t->isConnected()) {
709 if(t->hasDataToRead())
710 return 1;
713 return 0;
715 #endif
718 #ifdef NDB_SHM_TRANSPORTER
719 static int g_shm_counter = 0;
720 Uint32
721 TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
723 for(int j=0; j < 100; j++)
725 for (int i=0; i<nSHMTransporters; i++) {
726 SHM_Transporter * t = theSHMTransporters[i];
727 if (t->isConnected()) {
728 if(t->hasDataToRead()) {
729 return 1;
734 return 0;
736 #endif
738 #ifdef NDB_TCP_TRANSPORTER
739 Uint32
740 TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
742 bool hasdata = false;
743 if (false && nTCPTransporters == 0)
745 tcpReadSelectReply = 0;
746 return 0;
749 NDB_SOCKET_TYPE maxSocketValue = -1;
751 // Needed for TCP/IP connections
752 // The read- and writeset are used by select
754 FD_ZERO(&tcpReadset);
756 // Prepare for sending and receiving
757 for (int i = 0; i < nTCPTransporters; i++) {
758 TCP_Transporter * t = theTCPTransporters[i];
760 // If the transporter is connected
761 NodeId nodeId = t->getRemoteNodeId();
762 if (is_connected(nodeId) && t->isConnected()) {
764 const NDB_SOCKET_TYPE socket = t->getSocket();
765 // Find the highest socket value. It will be used by select
766 if (socket > maxSocketValue)
767 maxSocketValue = socket;
769 // Put the connected transporters in the socket read-set
770 FD_SET(socket, &tcpReadset);
772 hasdata |= t->hasReceiveData();
775 timeOutMillis = hasdata ? 0 : timeOutMillis;
777 struct timeval timeout;
778 timeout.tv_sec = timeOutMillis / 1000;
779 timeout.tv_usec = (timeOutMillis % 1000) * 1000;
781 // The highest socket value plus one
782 maxSocketValue++;
784 tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
785 if(false && tcpReadSelectReply == -1 && errno == EINTR)
786 g_eventLogger.info("woke-up by signal");
788 #ifdef NDB_WIN32
789 if(tcpReadSelectReply == SOCKET_ERROR)
791 NdbSleep_MilliSleep(timeOutMillis);
793 #endif
795 return tcpReadSelectReply || hasdata;
797 #endif
800 void
801 TransporterRegistry::performReceive()
803 #ifdef NDB_TCP_TRANSPORTER
804 for (int i=0; i<nTCPTransporters; i++)
806 checkJobBuffer();
807 TCP_Transporter *t = theTCPTransporters[i];
808 const NodeId nodeId = t->getRemoteNodeId();
809 const NDB_SOCKET_TYPE socket = t->getSocket();
810 if(is_connected(nodeId)){
811 if(t->isConnected())
813 if (FD_ISSET(socket, &tcpReadset))
815 t->doReceive();
818 if (t->hasReceiveData())
820 Uint32 * ptr;
821 Uint32 sz = t->getReceiveData(&ptr);
822 transporter_recv_from(callbackObj, nodeId);
823 Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
824 t->updateReceiveDataPtr(szUsed);
829 #endif
831 #ifdef NDB_SCI_TRANSPORTER
832 //performReceive
833 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
834 for (int i=0; i<nSCITransporters; i++)
836 checkJobBuffer();
837 SCI_Transporter *t = theSCITransporters[i];
838 const NodeId nodeId = t->getRemoteNodeId();
839 if(is_connected(nodeId))
841 if(t->isConnected() && t->checkConnected())
843 Uint32 * readPtr, * eodPtr;
844 t->getReceivePtr(&readPtr, &eodPtr);
845 transporter_recv_from(callbackObj, nodeId);
846 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
847 t->updateReceivePtr(newPtr);
851 #endif
852 #ifdef NDB_SHM_TRANSPORTER
853 for (int i=0; i<nSHMTransporters; i++)
855 checkJobBuffer();
856 SHM_Transporter *t = theSHMTransporters[i];
857 const NodeId nodeId = t->getRemoteNodeId();
858 if(is_connected(nodeId)){
859 if(t->isConnected() && t->checkConnected())
861 Uint32 * readPtr, * eodPtr;
862 t->getReceivePtr(&readPtr, &eodPtr);
863 transporter_recv_from(callbackObj, nodeId);
864 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
865 t->updateReceivePtr(newPtr);
869 #endif
872 void
873 TransporterRegistry::performSend()
875 int i;
876 sendCounter = 1;
878 #ifdef NDB_TCP_TRANSPORTER
879 for (i = m_transp_count; i < nTCPTransporters; i++)
881 TCP_Transporter *t = theTCPTransporters[i];
882 if (t && t->hasDataToSend() && t->isConnected() &&
883 is_connected(t->getRemoteNodeId()))
885 t->doSend();
888 for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
890 TCP_Transporter *t = theTCPTransporters[i];
891 if (t && t->hasDataToSend() && t->isConnected() &&
892 is_connected(t->getRemoteNodeId()))
894 t->doSend();
897 m_transp_count++;
898 if (m_transp_count == nTCPTransporters) m_transp_count = 0;
899 #endif
900 #ifdef NDB_SCI_TRANSPORTER
901 //scroll through the SCI transporters,
902 // get each transporter, check if connected, send data
903 for (i=0; i<nSCITransporters; i++) {
904 SCI_Transporter *t = theSCITransporters[i];
905 const NodeId nodeId = t->getRemoteNodeId();
907 if(is_connected(nodeId))
909 if(t->isConnected() && t->hasDataToSend()) {
910 t->doSend();
911 } //if
912 } //if
914 #endif
916 #ifdef NDB_SHM_TRANSPORTER
917 for (i=0; i<nSHMTransporters; i++)
919 SHM_Transporter *t = theSHMTransporters[i];
920 const NodeId nodeId = t->getRemoteNodeId();
921 if(is_connected(nodeId))
923 if(t->isConnected())
925 t->doSend();
929 #endif
933 TransporterRegistry::forceSendCheck(int sendLimit){
934 int tSendCounter = sendCounter;
935 sendCounter = tSendCounter + 1;
936 if (tSendCounter >= sendLimit) {
937 performSend();
938 sendCounter = 1;
939 return 1;
940 }//if
941 return 0;
942 }//TransporterRegistry::forceSendCheck()
944 #ifdef DEBUG_TRANSPORTER
945 void
946 TransporterRegistry::printState(){
947 ndbout << "-- TransporterRegistry -- " << endl << endl
948 << "Transporters = " << nTransporters << endl;
949 for(int i = 0; i<maxTransporters; i++)
950 if(theTransporters[i] != NULL){
951 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
952 ndbout << "Transporter: " << remoteNodeId
953 << " PerformState: " << performStates[remoteNodeId]
954 << " IOState: " << ioStates[remoteNodeId] << endl;
957 #endif
959 IOState
960 TransporterRegistry::ioState(NodeId nodeId) {
961 return ioStates[nodeId];
964 void
965 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
966 DEBUG("TransporterRegistry::setIOState("
967 << nodeId << ", " << state << ")");
968 ioStates[nodeId] = state;
971 static void *
972 run_start_clients_C(void * me)
974 ((TransporterRegistry*) me)->start_clients_thread();
975 return 0;
978 // Run by kernel thread
979 void
980 TransporterRegistry::do_connect(NodeId node_id)
982 PerformState &curr_state = performStates[node_id];
983 switch(curr_state){
984 case DISCONNECTED:
985 break;
986 case CONNECTED:
987 return;
988 case CONNECTING:
989 return;
990 case DISCONNECTING:
991 break;
993 DBUG_ENTER("TransporterRegistry::do_connect");
994 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
995 curr_state= CONNECTING;
996 DBUG_VOID_RETURN;
998 void
999 TransporterRegistry::do_disconnect(NodeId node_id)
1001 PerformState &curr_state = performStates[node_id];
1002 switch(curr_state){
1003 case DISCONNECTED:
1004 return;
1005 case CONNECTED:
1006 break;
1007 case CONNECTING:
1008 break;
1009 case DISCONNECTING:
1010 return;
1012 DBUG_ENTER("TransporterRegistry::do_disconnect");
1013 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1014 curr_state= DISCONNECTING;
1015 DBUG_VOID_RETURN;
1018 void
1019 TransporterRegistry::report_connect(NodeId node_id)
1021 DBUG_ENTER("TransporterRegistry::report_connect");
1022 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1023 performStates[node_id] = CONNECTED;
1024 reportConnect(callbackObj, node_id);
1025 DBUG_VOID_RETURN;
1028 void
1029 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1031 DBUG_ENTER("TransporterRegistry::report_disconnect");
1032 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1033 performStates[node_id] = DISCONNECTED;
1034 reportDisconnect(callbackObj, node_id, errnum);
1035 DBUG_VOID_RETURN;
1038 void
1039 TransporterRegistry::update_connections()
1041 for (int i= 0, n= 0; n < nTransporters; i++){
1042 Transporter * t = theTransporters[i];
1043 if (!t)
1044 continue;
1045 n++;
1047 const NodeId nodeId = t->getRemoteNodeId();
1048 switch(performStates[nodeId]){
1049 case CONNECTED:
1050 case DISCONNECTED:
1051 break;
1052 case CONNECTING:
1053 if(t->isConnected())
1054 report_connect(nodeId);
1055 break;
1056 case DISCONNECTING:
1057 if(!t->isConnected())
1058 report_disconnect(nodeId, 0);
1059 break;
1064 // run as own thread
1065 void
1066 TransporterRegistry::start_clients_thread()
1068 int persist_mgm_count= 0;
1069 DBUG_ENTER("TransporterRegistry::start_clients_thread");
1070 while (m_run_start_clients_thread) {
1071 NdbSleep_MilliSleep(100);
1072 persist_mgm_count++;
1073 if(persist_mgm_count==50)
1075 ndb_mgm_check_connection(m_mgm_handle);
1076 persist_mgm_count= 0;
1078 for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1079 Transporter * t = theTransporters[i];
1080 if (!t)
1081 continue;
1082 n++;
1084 const NodeId nodeId = t->getRemoteNodeId();
1085 switch(performStates[nodeId]){
1086 case CONNECTING:
1087 if(!t->isConnected() && !t->isServer) {
1088 bool connected= false;
1090 * First, we try to connect (if we have a port number).
1092 if (t->get_s_port())
1093 connected= t->connect_client();
1096 * If dynamic, get the port for connecting from the management server
1098 if( !connected && t->get_s_port() <= 0) { // Port is dynamic
1099 int server_port= 0;
1100 struct ndb_mgm_reply mgm_reply;
1102 if(!ndb_mgm_is_connected(m_mgm_handle))
1103 ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1105 if(ndb_mgm_is_connected(m_mgm_handle))
1107 int res=
1108 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1109 t->getRemoteNodeId(),
1110 t->getLocalNodeId(),
1111 CFG_CONNECTION_SERVER_PORT,
1112 &server_port,
1113 &mgm_reply);
1114 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1115 server_port,t->getRemoteNodeId(),
1116 t->getLocalNodeId(),res));
1117 if( res >= 0 )
1120 * Server_port == 0 just means that that a mgmt server
1121 * has not received a new port yet. Keep the old.
1123 if (server_port)
1124 t->set_s_port(server_port);
1126 else if(ndb_mgm_is_connected(m_mgm_handle))
1128 g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
1129 ndb_mgm_disconnect(m_mgm_handle);
1131 else
1133 g_eventLogger.info("Management server closed connection early. "
1134 "It is probably being shut down (or has problems). "
1135 "We will retry the connection. %d %s %s line: %d",
1136 ndb_mgm_get_latest_error(m_mgm_handle),
1137 ndb_mgm_get_latest_error_desc(m_mgm_handle),
1138 ndb_mgm_get_latest_error_msg(m_mgm_handle),
1139 ndb_mgm_get_latest_error_line(m_mgm_handle)
1143 /** else
1144 * We will not be able to get a new port unless
1145 * the m_mgm_handle is connected. Note that not
1146 * being connected is an ok state, just continue
1147 * until it is able to connect. Continue using the
1148 * old port until we can connect again and get a
1149 * new port.
1153 break;
1154 case DISCONNECTING:
1155 if(t->isConnected())
1156 t->doDisconnect();
1157 break;
1158 default:
1159 break;
1163 DBUG_VOID_RETURN;
1166 bool
1167 TransporterRegistry::start_clients()
1169 m_run_start_clients_thread= true;
1170 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1171 (void**)this,
1172 32768,
1173 "ndb_start_clients",
1174 NDB_THREAD_PRIO_LOW);
1175 if (m_start_clients_thread == 0) {
1176 m_run_start_clients_thread= false;
1177 return false;
1179 return true;
1182 bool
1183 TransporterRegistry::stop_clients()
1185 if (m_start_clients_thread) {
1186 m_run_start_clients_thread= false;
1187 void* status;
1188 NdbThread_WaitFor(m_start_clients_thread, &status);
1189 NdbThread_Destroy(&m_start_clients_thread);
1191 return true;
1194 void
1195 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1196 const char *interf,
1197 int s_port)
1199 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1200 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1201 if (interf && strlen(interf) == 0)
1202 interf= 0;
1204 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1206 Transporter_interface &tmp= m_transporter_interface[i];
1207 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1208 continue;
1209 if (interf != 0 && tmp.m_interface != 0 &&
1210 strcmp(interf, tmp.m_interface) == 0)
1212 DBUG_VOID_RETURN; // found match, no need to insert
1214 if (interf == 0 && tmp.m_interface == 0)
1216 DBUG_VOID_RETURN; // found match, no need to insert
1219 Transporter_interface t;
1220 t.m_remote_nodeId= remoteNodeId;
1221 t.m_s_service_port= s_port;
1222 t.m_interface= interf;
1223 m_transporter_interface.push_back(t);
1224 DBUG_PRINT("exit",("interface and port added"));
1225 DBUG_VOID_RETURN;
1228 bool
1229 TransporterRegistry::start_service(SocketServer& socket_server)
1231 DBUG_ENTER("TransporterRegistry::start_service");
1232 if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
1234 g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
1235 DBUG_RETURN(false);
1238 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1240 Transporter_interface &t= m_transporter_interface[i];
1242 unsigned short port= (unsigned short)t.m_s_service_port;
1243 if(t.m_s_service_port<0)
1244 port= -t.m_s_service_port; // is a dynamic port
1245 TransporterService *transporter_service =
1246 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1247 if(!socket_server.setup(transporter_service,
1248 &port, t.m_interface))
1250 DBUG_PRINT("info", ("Trying new port"));
1251 port= 0;
1252 if(t.m_s_service_port>0
1253 || !socket_server.setup(transporter_service,
1254 &port, t.m_interface))
1257 * If it wasn't a dynamically allocated port, or
1258 * our attempts at getting a new dynamic port failed
1260 g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
1261 "Please check if the port is already used,\n"
1262 "(perhaps the node is already running)",
1263 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1264 delete transporter_service;
1265 DBUG_RETURN(false);
1268 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1269 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1270 transporter_service->setTransporterRegistry(this);
1272 DBUG_RETURN(true);
1275 #ifdef NDB_SHM_TRANSPORTER
1276 static
1277 RETSIGTYPE
1278 shm_sig_handler(int signo)
1280 g_shm_counter++;
1282 #endif
1284 void
1285 TransporterRegistry::startReceiving()
1287 DBUG_ENTER("TransporterRegistry::startReceiving");
1289 #ifdef NDB_SHM_TRANSPORTER
1290 m_shm_own_pid = getpid();
1291 if (g_ndb_shm_signum)
1293 DBUG_PRINT("info",("Install signal handler for signum %d",
1294 g_ndb_shm_signum));
1295 struct sigaction sa;
1296 NdbThread_set_shm_sigmask(FALSE);
1297 sigemptyset(&sa.sa_mask);
1298 sa.sa_handler = shm_sig_handler;
1299 sa.sa_flags = 0;
1300 int ret;
1301 while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
1302 if(ret != 0)
1304 DBUG_PRINT("error",("Install failed"));
1305 g_eventLogger.error("Failed to install signal handler for"
1306 " SHM transporter, signum %d, errno: %d (%s)",
1307 g_ndb_shm_signum, errno, strerror(errno));
1310 #endif // NDB_SHM_TRANSPORTER
1311 DBUG_VOID_RETURN;
1314 void
1315 TransporterRegistry::stopReceiving(){
1317 * Disconnect all transporters, this includes detach from remote node
1318 * and since that must be done from the same process that called attach
1319 * it's done here in the receive thread
1321 disconnectAll();
1324 void
1325 TransporterRegistry::startSending(){
1328 void
1329 TransporterRegistry::stopSending(){
1332 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
1333 out << "-- Signal Header --" << endl;
1334 out << "theLength: " << sh.theLength << endl;
1335 out << "gsn: " << sh.theVerId_signalNumber << endl;
1336 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
1337 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
1338 out << "sendersSig: " << sh.theSendersSignalId << endl;
1339 out << "theSignalId: " << sh.theSignalId << endl;
1340 out << "trace: " << (int)sh.theTrace << endl;
1341 return out;
1344 Transporter*
1345 TransporterRegistry::get_transporter(NodeId nodeId) {
1346 return theTransporters[nodeId];
1349 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
1351 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
1353 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
1355 if(!mgm_nodeid)
1357 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1358 return false;
1360 Transporter * t = theTransporters[mgm_nodeid];
1361 if (!t)
1363 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1364 return false;
1366 DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
1370 * Given a connected NdbMgmHandle, turns it into a transporter
1371 * and returns the socket.
1373 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
1375 struct ndb_mgm_reply mgm_reply;
1377 if ( h==NULL || *h == NULL )
1379 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1380 return NDB_INVALID_SOCKET;
1383 for(unsigned int i=0;i < m_transporter_interface.size();i++)
1384 if (m_transporter_interface[i].m_s_service_port < 0
1385 && ndb_mgm_set_connection_int_parameter(*h,
1386 get_localNodeId(),
1387 m_transporter_interface[i].m_remote_nodeId,
1388 CFG_CONNECTION_SERVER_PORT,
1389 m_transporter_interface[i].m_s_service_port,
1390 &mgm_reply) < 0)
1392 g_eventLogger.error("Error: %s: %d",
1393 ndb_mgm_get_latest_error_desc(*h),
1394 ndb_mgm_get_latest_error(*h));
1395 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1396 ndb_mgm_destroy_handle(h);
1397 return NDB_INVALID_SOCKET;
1401 * convert_to_transporter also disposes of the handle (i.e. we don't leak
1402 * memory here.
1404 NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
1405 if ( sockfd == NDB_INVALID_SOCKET)
1407 g_eventLogger.error("Error: %s: %d",
1408 ndb_mgm_get_latest_error_desc(*h),
1409 ndb_mgm_get_latest_error(*h));
1410 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1411 ndb_mgm_destroy_handle(h);
1413 return sockfd;
1417 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
1418 * and returns the socket.
1420 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
1422 NdbMgmHandle h= ndb_mgm_create_handle();
1424 if ( h == NULL )
1426 return NDB_INVALID_SOCKET;
1430 * Set connectstring
1433 BaseString cs;
1434 cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
1435 ndb_mgm_set_connectstring(h, cs.c_str());
1438 if(ndb_mgm_connect(h, 0, 0, 0)<0)
1440 ndb_mgm_destroy_handle(&h);
1441 return NDB_INVALID_SOCKET;
1444 return connect_ndb_mgmd(&h);
1447 template class Vector<TransporterRegistry::Transporter_interface>;