1 /* Copyright (c) 2003-2006 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
17 #include <ndb_opt_defaults.h>
18 #include <IPCConfig.hpp>
22 #include <TransporterDefinitions.hpp>
23 #include <TransporterRegistry.hpp>
24 #include <Properties.hpp>
26 #include <mgmapi_configuration.hpp>
27 #include <mgmapi_config_parameters.h>
29 #if defined DEBUG_TRANSPORTER
30 #define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
35 IPCConfig::IPCConfig(Properties
* p
)
37 theNoOfRemoteNodes
= 0;
40 props
= new Properties(* p
);
46 IPCConfig::~IPCConfig()
57 if(props
== 0) return -1;
58 if(!props
->get("LocalNodeId", &nodeId
)) {
59 DEBUG( "Did not find local node id." );
64 Uint32 noOfConnections
;
65 if(!props
->get("NoOfConnections", &noOfConnections
)) {
66 DEBUG( "Did not find noOfConnections." );
70 for(Uint32 i
= 0; i
<noOfConnections
; i
++){
71 const Properties
* tmp
;
74 if(!props
->get("Connection", i
, &tmp
)) {
75 DEBUG( "Did not find Connection." );
78 if(!tmp
->get("NodeId1", &node1
)) {
79 DEBUG( "Did not find NodeId1." );
82 if(!tmp
->get("NodeId2", &node2
)) {
83 DEBUG( "Did not find NodeId2." );
87 if(node1
== the_ownId
&& node2
!= the_ownId
)
88 if(!addRemoteNodeId(node2
)) {
89 DEBUG( "addRemoteNodeId(node2) failed." );
93 if(node1
!= the_ownId
&& node2
== the_ownId
)
94 if(!addRemoteNodeId(node1
)) {
95 DEBUG( "addRemoteNodeId(node2) failed." );
103 IPCConfig::addRemoteNodeId(NodeId nodeId
){
104 for(int i
= 0; i
<theNoOfRemoteNodes
; i
++)
105 if(theRemoteNodeIds
[i
] == nodeId
)
107 theRemoteNodeIds
[theNoOfRemoteNodes
] = nodeId
;
108 theNoOfRemoteNodes
++;
114 * and get next higher node id
115 * Returns false if none found
118 IPCConfig::getNextRemoteNodeId(NodeId
& nodeId
) const {
119 NodeId returnNode
= MAX_NODES
+ 1;
120 for(int i
= 0; i
<theNoOfRemoteNodes
; i
++)
121 if(theRemoteNodeIds
[i
] > nodeId
){
122 if(theRemoteNodeIds
[i
] < returnNode
){
123 returnNode
= theRemoteNodeIds
[i
];
126 if(returnNode
== (MAX_NODES
+ 1))
134 IPCConfig::getREPHBFrequency(NodeId id
) const {
135 const Properties
* tmp
;
139 * Todo: Fix correct heartbeat
141 if (!props
->get("Node", id
, &tmp
) ||
142 !tmp
->get("HeartbeatIntervalRepRep", &out
)) {
143 DEBUG("Illegal Node or HeartbeatIntervalRepRep in config.");
151 IPCConfig::getNodeType(NodeId id
) const {
153 const Properties
* tmp
;
155 if (!props
->get("Node", id
, &tmp
) || !tmp
->get("Type", &out
)) {
156 DEBUG("Illegal Node or NodeType in config.");
165 IPCConfig::configureTransporters(Uint32 nodeId
,
166 const class ndb_mgm_configuration
& config
,
167 class TransporterRegistry
& tr
){
168 TransporterConfiguration conf
;
170 DBUG_ENTER("IPCConfig::configureTransporters");
173 * Iterate over all MGM's an construct a connectstring
174 * create mgm_handle and give it to the Transporter Registry
177 const char *separator
= "";
178 BaseString connect_string
;
179 ndb_mgm_configuration_iterator
iter(config
, CFG_SECTION_NODE
);
180 for(iter
.first(); iter
.valid(); iter
.next())
183 if(iter
.get(CFG_TYPE_OF_SECTION
, &type
)) continue;
184 if(type
!= NODE_TYPE_MGM
) continue;
185 const char* hostname
;
187 if(iter
.get(CFG_NODE_HOST
, &hostname
)) continue;
188 if( strlen(hostname
) == 0 ) continue;
189 if(iter
.get(CFG_MGM_PORT
, &port
)) continue;
190 connect_string
.appfmt("%s%s:%u",separator
,hostname
,port
);
193 NdbMgmHandle h
= ndb_mgm_create_handle();
194 if ( h
&& connect_string
.length() > 0 )
196 ndb_mgm_set_connectstring(h
,connect_string
.c_str());
197 tr
.set_mgm_handle(h
);
201 Uint32 noOfTransportersCreated
= 0;
202 ndb_mgm_configuration_iterator
iter(config
, CFG_SECTION_CONNECTION
);
204 for(iter
.first(); iter
.valid(); iter
.next()){
206 Uint32 nodeId1
, nodeId2
, remoteNodeId
;
207 const char * remoteHostName
= 0, * localHostName
= 0;
208 if(iter
.get(CFG_CONNECTION_NODE_1
, &nodeId1
)) continue;
209 if(iter
.get(CFG_CONNECTION_NODE_2
, &nodeId2
)) continue;
211 if(nodeId1
!= nodeId
&& nodeId2
!= nodeId
) continue;
212 remoteNodeId
= (nodeId
== nodeId1
? nodeId2
: nodeId1
);
215 const char * host1
= 0, * host2
= 0;
216 iter
.get(CFG_CONNECTION_HOSTNAME_1
, &host1
);
217 iter
.get(CFG_CONNECTION_HOSTNAME_2
, &host2
);
218 localHostName
= (nodeId
== nodeId1
? host1
: host2
);
219 remoteHostName
= (nodeId
== nodeId1
? host2
: host1
);
222 Uint32 sendSignalId
= 1;
224 if(iter
.get(CFG_CONNECTION_SEND_SIGNAL_ID
, &sendSignalId
)) continue;
225 if(iter
.get(CFG_CONNECTION_CHECKSUM
, &checksum
)) continue;
228 if(iter
.get(CFG_TYPE_OF_SECTION
, &type
)) continue;
230 Uint32 server_port
= 0;
231 if(iter
.get(CFG_CONNECTION_SERVER_PORT
, &server_port
)) break;
233 Uint32 nodeIdServer
= 0;
234 if(iter
.get(CFG_CONNECTION_NODE_ID_SERVER
, &nodeIdServer
)) break;
237 We check the node type.
239 Uint32 node1type
, node2type
;
240 ndb_mgm_configuration_iterator
node1iter(config
, CFG_SECTION_NODE
);
241 ndb_mgm_configuration_iterator
node2iter(config
, CFG_SECTION_NODE
);
242 node1iter
.find(CFG_NODE_ID
,nodeId1
);
243 node2iter
.find(CFG_NODE_ID
,nodeId2
);
244 node1iter
.get(CFG_TYPE_OF_SECTION
,&node1type
);
245 node2iter
.get(CFG_TYPE_OF_SECTION
,&node2type
);
247 if(node1type
==NODE_TYPE_MGM
|| node2type
==NODE_TYPE_MGM
)
248 conf
.isMgmConnection
= true;
250 conf
.isMgmConnection
= false;
252 if (nodeId
== nodeIdServer
&& !conf
.isMgmConnection
) {
253 tr
.add_transporter_interface(remoteNodeId
, localHostName
, server_port
);
256 DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
257 nodeId
, remoteNodeId
, server_port
, sendSignalId
, checksum
));
259 This may be a dynamic port. It depends on when we're getting
260 our configuration. If we've been restarted, we'll be getting
261 a configuration with our old dynamic port in it, hence the number
262 here is negative (and we try the old port number first).
264 On a first-run, server_port will be zero (with dynamic ports)
266 If we're not using dynamic ports, we don't do anything.
269 conf
.localNodeId
= nodeId
;
270 conf
.remoteNodeId
= remoteNodeId
;
271 conf
.checksum
= checksum
;
272 conf
.signalId
= sendSignalId
;
273 conf
.s_port
= server_port
;
274 conf
.localHostName
= localHostName
;
275 conf
.remoteHostName
= remoteHostName
;
276 conf
.serverNodeId
= nodeIdServer
;
279 case CONNECTION_TYPE_SHM
:
280 if(iter
.get(CFG_SHM_KEY
, &conf
.shm
.shmKey
)) break;
281 if(iter
.get(CFG_SHM_BUFFER_MEM
, &conf
.shm
.shmSize
)) break;
284 if(iter
.get(CFG_SHM_SIGNUM
, &tmp
)) break;
285 conf
.shm
.signum
= tmp
;
287 if(!tr
.createSHMTransporter(&conf
)){
288 DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
289 conf
.localNodeId
, conf
.remoteNodeId
));
290 ndbout
<< "Failed to create SHM Transporter from: "
291 << conf
.localNodeId
<< " to: " << conf
.remoteNodeId
<< endl
;
293 noOfTransportersCreated
++;
295 DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
296 "buf size = %d", conf
.shm
.shmKey
, conf
.shm
.shmSize
));
300 case CONNECTION_TYPE_SCI
:
301 if(iter
.get(CFG_SCI_SEND_LIMIT
, &conf
.sci
.sendLimit
)) break;
302 if(iter
.get(CFG_SCI_BUFFER_MEM
, &conf
.sci
.bufferSize
)) break;
303 if (nodeId
== nodeId1
) {
304 if(iter
.get(CFG_SCI_HOST2_ID_0
, &conf
.sci
.remoteSciNodeId0
)) break;
305 if(iter
.get(CFG_SCI_HOST2_ID_1
, &conf
.sci
.remoteSciNodeId1
)) break;
307 if(iter
.get(CFG_SCI_HOST1_ID_0
, &conf
.sci
.remoteSciNodeId0
)) break;
308 if(iter
.get(CFG_SCI_HOST1_ID_1
, &conf
.sci
.remoteSciNodeId1
)) break;
310 if (conf
.sci
.remoteSciNodeId1
== 0) {
311 conf
.sci
.nLocalAdapters
= 1;
313 conf
.sci
.nLocalAdapters
= 2;
315 if(!tr
.createSCITransporter(&conf
)){
316 DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
317 conf
.localNodeId
, conf
.remoteNodeId
));
318 ndbout
<< "Failed to create SCI Transporter from: "
319 << conf
.localNodeId
<< " to: " << conf
.remoteNodeId
<< endl
;
321 DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
322 "remote SCI node id %d",
323 conf
.sci
.nLocalAdapters
, conf
.sci
.remoteSciNodeId0
));
324 DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
325 "buf size = %d", conf
.localHostName
,
326 conf
.remoteHostName
, conf
.sci
.sendLimit
,
327 conf
.sci
.bufferSize
));
328 if (conf
.sci
.nLocalAdapters
> 1) {
329 DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, "
330 "second remote SCI node id = %d",
331 conf
.sci
.remoteSciNodeId1
));
333 noOfTransportersCreated
++;
338 case CONNECTION_TYPE_TCP
:
339 if(iter
.get(CFG_TCP_SEND_BUFFER_SIZE
, &conf
.tcp
.sendBufferSize
)) break;
340 if(iter
.get(CFG_TCP_RECEIVE_BUFFER_SIZE
, &conf
.tcp
.maxReceiveSize
)) break;
343 if (!iter
.get(CFG_TCP_PROXY
, &proxy
)) {
344 if (strlen(proxy
) > 0 && nodeId2
== nodeId
) {
345 // TODO handle host:port
346 conf
.s_port
= atoi(proxy
);
350 if(!tr
.createTCPTransporter(&conf
)){
351 ndbout
<< "Failed to create TCP Transporter from: "
352 << nodeId
<< " to: " << remoteNodeId
<< endl
;
354 noOfTransportersCreated
++;
356 DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
357 "maxReceiveSize = %d", conf
.tcp
.sendBufferSize
,
358 conf
.tcp
.maxReceiveSize
));
361 ndbout
<< "Unknown transporter type from: " << nodeId
<<
362 " to: " << remoteNodeId
<< endl
;
367 DBUG_RETURN(noOfTransportersCreated
);