4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie Sahlberg 2008
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/time.h"
30 #include "lib/util/blocking.h"
32 #include "ctdb_private.h"
34 #include "common/system.h"
35 #include "common/common.h"
36 #include "common/logging.h"
41 stop any outgoing connection (established or pending) to a node
43 void ctdb_tcp_stop_outgoing(struct ctdb_node
*node
)
45 struct ctdb_tcp_node
*tnode
= talloc_get_type(
46 node
->transport_data
, struct ctdb_tcp_node
);
48 TALLOC_FREE(tnode
->out_queue
);
49 TALLOC_FREE(tnode
->connect_te
);
50 TALLOC_FREE(tnode
->connect_fde
);
51 if (tnode
->out_fd
!= -1) {
58 stop incoming connection to a node
60 void ctdb_tcp_stop_incoming(struct ctdb_node
*node
)
62 struct ctdb_tcp_node
*tnode
= talloc_get_type(
63 node
->transport_data
, struct ctdb_tcp_node
);
65 TALLOC_FREE(tnode
->in_queue
);
69 called when a complete packet has come in - should not happen on this socket
70 unless the other side closes the connection with RST or FIN
72 void ctdb_tcp_tnode_cb(uint8_t *data
, size_t cnt
, void *private_data
)
74 struct ctdb_node
*node
= talloc_get_type(private_data
, struct ctdb_node
);
76 node
->ctdb
->upcalls
->node_dead(node
);
82 called when socket becomes writeable on connect
84 static void ctdb_node_connect_write(struct tevent_context
*ev
,
85 struct tevent_fd
*fde
,
86 uint16_t flags
, void *private_data
)
88 struct ctdb_node
*node
= talloc_get_type(private_data
,
90 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->transport_data
,
91 struct ctdb_tcp_node
);
92 struct ctdb_context
*ctdb
= node
->ctdb
;
94 socklen_t len
= sizeof(error
);
98 TALLOC_FREE(tnode
->connect_te
);
100 ret
= getsockopt(tnode
->out_fd
, SOL_SOCKET
, SO_ERROR
, &error
, &len
);
101 if (ret
!= 0 || error
!= 0) {
102 ctdb_tcp_stop_outgoing(node
);
103 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
, tnode
,
104 timeval_current_ofs(1, 0),
105 ctdb_tcp_node_connect
, node
);
109 TALLOC_FREE(tnode
->connect_fde
);
111 ret
= setsockopt(tnode
->out_fd
,
117 DBG_WARNING("Failed to set TCP_NODELAY on fd - %s\n",
120 ret
= setsockopt(tnode
->out_fd
,
122 SO_KEEPALIVE
,(char *)&one
,
125 DBG_WARNING("Failed to set KEEPALIVE on fd - %s\n",
129 tnode
->out_queue
= ctdb_queue_setup(node
->ctdb
,
137 if (tnode
->out_queue
== NULL
) {
138 DBG_ERR("Failed to set up outgoing queue\n");
139 ctdb_tcp_stop_outgoing(node
);
140 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
142 timeval_current_ofs(1, 0),
143 ctdb_tcp_node_connect
,
148 /* the queue subsystem now owns this fd */
152 * Mark the node to which this connection has been established
153 * as connected, but only if the corresponding listening
154 * socket is also connected
156 if (tnode
->in_queue
!= NULL
) {
157 node
->ctdb
->upcalls
->node_connected(node
);
162 static void ctdb_tcp_node_connect_timeout(struct tevent_context
*ev
,
163 struct tevent_timer
*te
,
168 called when we should try and establish a tcp connection to a node
170 static void ctdb_tcp_start_outgoing(struct ctdb_node
*node
)
172 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->transport_data
,
173 struct ctdb_tcp_node
);
174 struct ctdb_context
*ctdb
= node
->ctdb
;
175 ctdb_sock_addr sock_in
;
178 ctdb_sock_addr sock_out
;
181 sock_out
= node
->address
;
183 tnode
->out_fd
= socket(sock_out
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
184 if (tnode
->out_fd
== -1) {
185 DBG_ERR("Failed to create socket\n");
189 ret
= set_blocking(tnode
->out_fd
, false);
191 DBG_ERR("Failed to set socket non-blocking (%s)\n",
196 set_close_on_exec(tnode
->out_fd
);
198 DBG_DEBUG("Created TCP SOCKET FD:%d\n", tnode
->out_fd
);
200 /* Bind our side of the socketpair to the same address we use to listen
201 * on incoming CTDB traffic.
202 * We must specify this address to make sure that the address we expose to
203 * the remote side is actually routable in case CTDB traffic will run on
204 * a dedicated non-routeable network.
206 sock_in
= *ctdb
->address
;
208 /* AIX libs check to see if the socket address and length
209 arguments are consistent with each other on calls like
210 connect(). Can not get by with just sizeof(sock_in),
211 need sizeof(sock_in.ip).
213 switch (sock_in
.sa
.sa_family
) {
215 sock_in
.ip
.sin_port
= 0 /* Any port */;
216 sockin_size
= sizeof(sock_in
.ip
);
217 sockout_size
= sizeof(sock_out
.ip
);
220 sock_in
.ip6
.sin6_port
= 0 /* Any port */;
221 sockin_size
= sizeof(sock_in
.ip6
);
222 sockout_size
= sizeof(sock_out
.ip6
);
225 DBG_ERR("Unknown address family %u\n", sock_in
.sa
.sa_family
);
226 /* Can't happen to due to address parsing restrictions */
230 ret
= bind(tnode
->out_fd
, (struct sockaddr
*)&sock_in
, sockin_size
);
232 DBG_ERR("Failed to bind socket (%s)\n", strerror(errno
));
236 ret
= connect(tnode
->out_fd
,
237 (struct sockaddr
*)&sock_out
,
239 if (ret
!= 0 && errno
!= EINPROGRESS
) {
243 /* non-blocking connect - wait for write event */
244 tnode
->connect_fde
= tevent_add_fd(node
->ctdb
->ev
,
247 TEVENT_FD_WRITE
|TEVENT_FD_READ
,
248 ctdb_node_connect_write
,
251 /* don't give it long to connect - retry in one second. This ensures
252 that we find a node is up quickly (tcp normally backs off a syn reply
253 delay by quite a lot) */
254 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
256 timeval_current_ofs(1, 0),
257 ctdb_tcp_node_connect_timeout
,
263 ctdb_tcp_stop_outgoing(node
);
264 tnode
->connect_te
= tevent_add_timer(ctdb
->ev
,
266 timeval_current_ofs(1, 0),
267 ctdb_tcp_node_connect
,
271 void ctdb_tcp_node_connect(struct tevent_context
*ev
,
272 struct tevent_timer
*te
,
276 struct ctdb_node
*node
= talloc_get_type_abort(private_data
,
279 ctdb_tcp_start_outgoing(node
);
282 static void ctdb_tcp_node_connect_timeout(struct tevent_context
*ev
,
283 struct tevent_timer
*te
,
287 struct ctdb_node
*node
= talloc_get_type_abort(private_data
,
290 ctdb_tcp_stop_outgoing(node
);
291 ctdb_tcp_start_outgoing(node
);
295 called when we get contacted by another node
296 currently makes no attempt to check if the connection is really from a ctdb
299 static void ctdb_listen_event(struct tevent_context
*ev
, struct tevent_fd
*fde
,
300 uint16_t flags
, void *private_data
)
302 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
303 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->transport_data
,
308 struct ctdb_node
*node
;
309 struct ctdb_tcp_node
*tnode
;
313 memset(&addr
, 0, sizeof(addr
));
315 fd
= accept(ctcp
->listen_fd
, (struct sockaddr
*)&addr
, &len
);
316 if (fd
== -1) return;
317 smb_set_close_on_exec(fd
);
319 node
= ctdb_ip_to_node(ctdb
, &addr
);
321 D_ERR("Refused connection from unknown node %s\n",
322 ctdb_addr_to_str(&addr
));
327 tnode
= talloc_get_type_abort(node
->transport_data
,
328 struct ctdb_tcp_node
);
330 /* This can't happen - see ctdb_tcp_initialise() */
331 DBG_ERR("INTERNAL ERROR setting up connection from node %s\n",
332 ctdb_addr_to_str(&addr
));
337 if (tnode
->in_queue
!= NULL
) {
338 DBG_ERR("Incoming queue active, rejecting connection from %s\n",
339 ctdb_addr_to_str(&addr
));
344 ret
= set_blocking(fd
, false);
346 DBG_ERR("Failed to set socket non-blocking (%s)\n",
352 set_close_on_exec(fd
);
354 DBG_DEBUG("Created SOCKET FD:%d to incoming ctdb connection\n", fd
);
362 DBG_WARNING("Failed to set KEEPALIVE on fd - %s\n",
366 tnode
->in_queue
= ctdb_queue_setup(ctdb
,
373 ctdb_addr_to_str(&addr
));
374 if (tnode
->in_queue
== NULL
) {
375 DBG_ERR("Failed to set up incoming queue\n");
381 * Mark the connecting node as connected, but only if the
382 * corresponding outbound connected is also up
384 if (tnode
->out_queue
!= NULL
) {
385 node
->ctdb
->upcalls
->node_connected(node
);
391 automatically find which address to listen on
393 static int ctdb_tcp_listen_automatic(struct ctdb_context
*ctdb
)
395 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->transport_data
,
400 const char *lock_path
= CTDB_RUNDIR
"/.socket_lock";
404 struct tevent_fd
*fde
;
406 /* If there are no nodes, then it won't be possible to find
407 * the first one. Log a failure and short circuit the whole
410 if (ctdb
->num_nodes
== 0) {
411 DEBUG(DEBUG_CRIT
,("No nodes available to attempt bind to - is the nodes file empty?\n"));
415 /* in order to ensure that we don't get two nodes with the
416 same adddress, we must make the bind() and listen() calls
417 atomic. The SO_REUSEADDR setsockopt only prevents double
418 binds if the first socket is in LISTEN state */
419 lock_fd
= open(lock_path
, O_RDWR
|O_CREAT
, 0666);
421 DEBUG(DEBUG_CRIT
,("Unable to open %s\n", lock_path
));
425 lock
.l_type
= F_WRLCK
;
426 lock
.l_whence
= SEEK_SET
;
431 if (fcntl(lock_fd
, F_SETLKW
, &lock
) != 0) {
432 DEBUG(DEBUG_CRIT
,("Unable to lock %s\n", lock_path
));
437 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
438 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
441 sock
= ctdb
->nodes
[i
]->address
;
443 switch (sock
.sa
.sa_family
) {
445 sock_size
= sizeof(sock
.ip
);
448 sock_size
= sizeof(sock
.ip6
);
451 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
456 ctcp
->listen_fd
= socket(sock
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
457 if (ctcp
->listen_fd
== -1) {
458 ctdb_set_error(ctdb
, "socket failed\n");
462 set_close_on_exec(ctcp
->listen_fd
);
464 if (setsockopt(ctcp
->listen_fd
,SOL_SOCKET
,SO_REUSEADDR
,
465 (char *)&one
,sizeof(one
)) == -1) {
466 DEBUG(DEBUG_WARNING
, ("Failed to set REUSEADDR on fd - %s\n",
470 if (bind(ctcp
->listen_fd
, (struct sockaddr
* )&sock
, sock_size
) == 0) {
474 if (errno
== EADDRNOTAVAIL
) {
475 DEBUG(DEBUG_DEBUG
,(__location__
" Failed to bind() to socket. %s(%d)\n",
476 strerror(errno
), errno
));
478 DEBUG(DEBUG_ERR
,(__location__
" Failed to bind() to socket. %s(%d)\n",
479 strerror(errno
), errno
));
482 close(ctcp
->listen_fd
);
483 ctcp
->listen_fd
= -1;
486 if (i
== ctdb
->num_nodes
) {
487 DEBUG(DEBUG_CRIT
,("Unable to bind to any of the node addresses - giving up\n"));
490 ctdb
->address
= talloc_memdup(ctdb
,
491 &ctdb
->nodes
[i
]->address
,
492 sizeof(ctdb_sock_addr
));
493 if (ctdb
->address
== NULL
) {
494 ctdb_set_error(ctdb
, "Out of memory at %s:%d",
499 ctdb
->name
= talloc_asprintf(ctdb
, "%s:%u",
500 ctdb_addr_to_str(ctdb
->address
),
501 ctdb_addr_to_port(ctdb
->address
));
502 if (ctdb
->name
== NULL
) {
503 ctdb_set_error(ctdb
, "Out of memory at %s:%d",
507 DEBUG(DEBUG_INFO
,("ctdb chose network address %s\n", ctdb
->name
));
509 if (listen(ctcp
->listen_fd
, 10) == -1) {
513 fde
= tevent_add_fd(ctdb
->ev
, ctcp
, ctcp
->listen_fd
, TEVENT_FD_READ
,
514 ctdb_listen_event
, ctdb
);
515 tevent_fd_set_auto_close(fde
);
523 if (ctcp
->listen_fd
!= -1) {
524 close(ctcp
->listen_fd
);
525 ctcp
->listen_fd
= -1;
532 listen on our own address
534 int ctdb_tcp_listen(struct ctdb_context
*ctdb
)
536 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->transport_data
,
541 struct tevent_fd
*fde
;
543 /* we can either auto-bind to the first available address, or we can
544 use a specified address */
545 if (!ctdb
->address
) {
546 return ctdb_tcp_listen_automatic(ctdb
);
549 sock
= *ctdb
->address
;
551 switch (sock
.sa
.sa_family
) {
553 sock_size
= sizeof(sock
.ip
);
556 sock_size
= sizeof(sock
.ip6
);
559 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
564 ctcp
->listen_fd
= socket(sock
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
565 if (ctcp
->listen_fd
== -1) {
566 ctdb_set_error(ctdb
, "socket failed\n");
570 set_close_on_exec(ctcp
->listen_fd
);
572 if (setsockopt(ctcp
->listen_fd
,SOL_SOCKET
,SO_REUSEADDR
,(char *)&one
,sizeof(one
)) == -1) {
573 DEBUG(DEBUG_WARNING
, ("Failed to set REUSEADDR on fd - %s\n",
577 if (bind(ctcp
->listen_fd
, (struct sockaddr
* )&sock
, sock_size
) != 0) {
578 DEBUG(DEBUG_ERR
,(__location__
" Failed to bind() to socket. %s(%d)\n", strerror(errno
), errno
));
582 if (listen(ctcp
->listen_fd
, 10) == -1) {
586 fde
= tevent_add_fd(ctdb
->ev
, ctcp
, ctcp
->listen_fd
, TEVENT_FD_READ
,
587 ctdb_listen_event
, ctdb
);
588 tevent_fd_set_auto_close(fde
);
593 if (ctcp
->listen_fd
!= -1) {
594 close(ctcp
->listen_fd
);
596 ctcp
->listen_fd
= -1;