4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie Sahlberg 2008
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
29 stop any connecting (established or pending) to a node
31 void ctdb_tcp_stop_connection(struct ctdb_node
*node
)
33 struct ctdb_tcp_node
*tnode
= talloc_get_type(
34 node
->private_data
, struct ctdb_tcp_node
);
36 ctdb_queue_set_fd(tnode
->out_queue
, -1);
37 talloc_free(tnode
->connect_te
);
38 talloc_free(tnode
->connect_fde
);
39 tnode
->connect_fde
= NULL
;
40 tnode
->connect_te
= NULL
;
41 if (tnode
->fd
!= -1) {
49 called when a complete packet has come in - should not happen on this socket
50 unless the other side closes the connection with RST or FIN
52 void ctdb_tcp_tnode_cb(uint8_t *data
, size_t cnt
, void *private_data
)
54 struct ctdb_node
*node
= talloc_get_type(private_data
, struct ctdb_node
);
55 struct ctdb_tcp_node
*tnode
= talloc_get_type(
56 node
->private_data
, struct ctdb_tcp_node
);
59 node
->ctdb
->upcalls
->node_dead(node
);
62 ctdb_tcp_stop_connection(node
);
63 tnode
->connect_te
= event_add_timed(node
->ctdb
->ev
, tnode
,
64 timeval_current_ofs(3, 0),
65 ctdb_tcp_node_connect
, node
);
69 called when socket becomes writeable on connect
71 static void ctdb_node_connect_write(struct event_context
*ev
, struct fd_event
*fde
,
72 uint16_t flags
, void *private_data
)
74 struct ctdb_node
*node
= talloc_get_type(private_data
,
76 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->private_data
,
77 struct ctdb_tcp_node
);
78 struct ctdb_context
*ctdb
= node
->ctdb
;
80 socklen_t len
= sizeof(error
);
83 talloc_free(tnode
->connect_te
);
84 tnode
->connect_te
= NULL
;
86 if (getsockopt(tnode
->fd
, SOL_SOCKET
, SO_ERROR
, &error
, &len
) != 0 ||
88 ctdb_tcp_stop_connection(node
);
89 tnode
->connect_te
= event_add_timed(ctdb
->ev
, tnode
,
90 timeval_current_ofs(1, 0),
91 ctdb_tcp_node_connect
, node
);
95 talloc_free(tnode
->connect_fde
);
96 tnode
->connect_fde
= NULL
;
98 if (setsockopt(tnode
->fd
,IPPROTO_TCP
,TCP_NODELAY
,(char *)&one
,sizeof(one
)) == -1) {
99 DEBUG(DEBUG_WARNING
, ("Failed to set TCP_NODELAY on fd - %s\n",
102 if (setsockopt(tnode
->fd
,SOL_SOCKET
,SO_KEEPALIVE
,(char *)&one
,sizeof(one
)) == -1) {
103 DEBUG(DEBUG_WARNING
, ("Failed to set KEEPALIVE on fd - %s\n",
107 ctdb_queue_set_fd(tnode
->out_queue
, tnode
->fd
);
109 /* the queue subsystem now owns this fd */
114 static int ctdb_tcp_get_address(struct ctdb_context
*ctdb
,
115 const char *address
, ctdb_sock_addr
*addr
)
117 if (parse_ip(address
, NULL
, 0, addr
) == 0) {
118 DEBUG(DEBUG_CRIT
, (__location__
" Unparsable address : %s.\n", address
));
125 called when we should try and establish a tcp connection to a node
127 void ctdb_tcp_node_connect(struct event_context
*ev
, struct timed_event
*te
,
128 struct timeval t
, void *private_data
)
130 struct ctdb_node
*node
= talloc_get_type(private_data
,
132 struct ctdb_tcp_node
*tnode
= talloc_get_type(node
->private_data
,
133 struct ctdb_tcp_node
);
134 struct ctdb_context
*ctdb
= node
->ctdb
;
135 ctdb_sock_addr sock_in
;
138 ctdb_sock_addr sock_out
;
140 ctdb_tcp_stop_connection(node
);
142 ZERO_STRUCT(sock_out
);
143 #ifdef HAVE_SOCK_SIN_LEN
144 sock_out
.ip
.sin_len
= sizeof(sock_out
);
146 if (ctdb_tcp_get_address(ctdb
, node
->address
.address
, &sock_out
) != 0) {
149 switch (sock_out
.sa
.sa_family
) {
151 sock_out
.ip
.sin_port
= htons(node
->address
.port
);
154 sock_out
.ip6
.sin6_port
= htons(node
->address
.port
);
157 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
158 sock_out
.sa
.sa_family
));
162 tnode
->fd
= socket(sock_out
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
163 if (tnode
->fd
== -1) {
164 DEBUG(DEBUG_ERR
, (__location__
"Failed to create socket\n"));
167 set_nonblocking(tnode
->fd
);
168 set_close_on_exec(tnode
->fd
);
170 DEBUG(DEBUG_DEBUG
, (__location__
" Created TCP SOCKET FD:%d\n", tnode
->fd
));
172 /* Bind our side of the socketpair to the same address we use to listen
173 * on incoming CTDB traffic.
174 * We must specify this address to make sure that the address we expose to
175 * the remote side is actually routable in case CTDB traffic will run on
176 * a dedicated non-routeable network.
178 ZERO_STRUCT(sock_in
);
179 if (ctdb_tcp_get_address(ctdb
, ctdb
->address
.address
, &sock_in
) != 0) {
180 DEBUG(DEBUG_ERR
, (__location__
" Failed to find our address. Failing bind.\n"));
185 /* AIX libs check to see if the socket address and length
186 arguments are consistent with each other on calls like
187 connect(). Can not get by with just sizeof(sock_in),
188 need sizeof(sock_in.ip).
190 switch (sock_in
.sa
.sa_family
) {
192 sockin_size
= sizeof(sock_in
.ip
);
193 sockout_size
= sizeof(sock_out
.ip
);
196 sockin_size
= sizeof(sock_in
.ip6
);
197 sockout_size
= sizeof(sock_out
.ip6
);
200 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
201 sock_in
.sa
.sa_family
));
205 #ifdef HAVE_SOCK_SIN_LEN
206 sock_in
.ip
.sin_len
= sockin_size
;
207 sock_out
.ip
.sin_len
= sockout_size
;
209 if (bind(tnode
->fd
, (struct sockaddr
*)&sock_in
, sockin_size
) == -1) {
210 DEBUG(DEBUG_ERR
, (__location__
"Failed to bind socket %s(%d)\n",
211 strerror(errno
), errno
));
216 if (connect(tnode
->fd
, (struct sockaddr
*)&sock_out
, sockout_size
) != 0 &&
217 errno
!= EINPROGRESS
) {
218 ctdb_tcp_stop_connection(node
);
219 tnode
->connect_te
= event_add_timed(ctdb
->ev
, tnode
,
220 timeval_current_ofs(1, 0),
221 ctdb_tcp_node_connect
, node
);
225 /* non-blocking connect - wait for write event */
226 tnode
->connect_fde
= event_add_fd(node
->ctdb
->ev
, tnode
, tnode
->fd
,
227 EVENT_FD_WRITE
|EVENT_FD_READ
,
228 ctdb_node_connect_write
, node
);
230 /* don't give it long to connect - retry in one second. This ensures
231 that we find a node is up quickly (tcp normally backs off a syn reply
232 delay by quite a lot) */
233 tnode
->connect_te
= event_add_timed(ctdb
->ev
, tnode
, timeval_current_ofs(1, 0),
234 ctdb_tcp_node_connect
, node
);
238 called when we get contacted by another node
239 currently makes no attempt to check if the connection is really from a ctdb
242 static void ctdb_listen_event(struct event_context
*ev
, struct fd_event
*fde
,
243 uint16_t flags
, void *private_data
)
245 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
246 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->private_data
, struct ctdb_tcp
);
250 struct ctdb_incoming
*in
;
252 const char *incoming_node
;
254 memset(&addr
, 0, sizeof(addr
));
256 fd
= accept(ctcp
->listen_fd
, (struct sockaddr
*)&addr
, &len
);
257 if (fd
== -1) return;
259 incoming_node
= ctdb_addr_to_str(&addr
);
260 nodeid
= ctdb_ip_to_nodeid(ctdb
, incoming_node
);
263 DEBUG(DEBUG_ERR
, ("Refused connection from unknown node %s\n", incoming_node
));
268 in
= talloc_zero(ctcp
, struct ctdb_incoming
);
272 set_nonblocking(in
->fd
);
273 set_close_on_exec(in
->fd
);
275 DEBUG(DEBUG_DEBUG
, (__location__
" Created SOCKET FD:%d to incoming ctdb connection\n", fd
));
277 if (setsockopt(in
->fd
,SOL_SOCKET
,SO_KEEPALIVE
,(char *)&one
,sizeof(one
)) == -1) {
278 DEBUG(DEBUG_WARNING
, ("Failed to set KEEPALIVE on fd - %s\n",
282 in
->queue
= ctdb_queue_setup(ctdb
, in
, in
->fd
, CTDB_TCP_ALIGNMENT
,
283 ctdb_tcp_read_cb
, in
, "ctdbd-%s", incoming_node
);
288 automatically find which address to listen on
290 static int ctdb_tcp_listen_automatic(struct ctdb_context
*ctdb
)
292 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->private_data
,
296 const char *lock_path
= VARDIR
"/run/ctdb/.socket_lock";
300 struct tevent_fd
*fde
;
302 /* If there are no nodes, then it won't be possible to find
303 * the first one. Log a failure and short circuit the whole
306 if (ctdb
->num_nodes
== 0) {
307 DEBUG(DEBUG_CRIT
,("No nodes available to attempt bind to - is the nodes file empty?\n"));
311 /* in order to ensure that we don't get two nodes with the
312 same adddress, we must make the bind() and listen() calls
313 atomic. The SO_REUSEADDR setsockopt only prevents double
314 binds if the first socket is in LISTEN state */
315 lock_fd
= open(lock_path
, O_RDWR
|O_CREAT
, 0666);
317 DEBUG(DEBUG_CRIT
,("Unable to open %s\n", lock_path
));
321 lock
.l_type
= F_WRLCK
;
322 lock
.l_whence
= SEEK_SET
;
327 if (fcntl(lock_fd
, F_SETLKW
, &lock
) != 0) {
328 DEBUG(DEBUG_CRIT
,("Unable to lock %s\n", lock_path
));
333 for (i
=0; i
< ctdb
->num_nodes
; i
++) {
334 if (ctdb
->nodes
[i
]->flags
& NODE_FLAGS_DELETED
) {
338 if (ctdb_tcp_get_address(ctdb
,
339 ctdb
->nodes
[i
]->address
.address
,
344 switch (sock
.sa
.sa_family
) {
346 sock
.ip
.sin_port
= htons(ctdb
->nodes
[i
]->address
.port
);
347 sock_size
= sizeof(sock
.ip
);
350 sock
.ip6
.sin6_port
= htons(ctdb
->nodes
[i
]->address
.port
);
351 sock_size
= sizeof(sock
.ip6
);
354 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
358 #ifdef HAVE_SOCK_SIN_LEN
359 sock
.ip
.sin_len
= sock_size
;
362 ctcp
->listen_fd
= socket(sock
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
363 if (ctcp
->listen_fd
== -1) {
364 ctdb_set_error(ctdb
, "socket failed\n");
368 set_close_on_exec(ctcp
->listen_fd
);
370 if (setsockopt(ctcp
->listen_fd
,SOL_SOCKET
,SO_REUSEADDR
,
371 (char *)&one
,sizeof(one
)) == -1) {
372 DEBUG(DEBUG_WARNING
, ("Failed to set REUSEADDR on fd - %s\n",
376 if (bind(ctcp
->listen_fd
, (struct sockaddr
* )&sock
, sock_size
) == 0) {
380 if (errno
== EADDRNOTAVAIL
) {
381 DEBUG(DEBUG_DEBUG
,(__location__
" Failed to bind() to socket. %s(%d)\n",
382 strerror(errno
), errno
));
384 DEBUG(DEBUG_ERR
,(__location__
" Failed to bind() to socket. %s(%d)\n",
385 strerror(errno
), errno
));
389 if (i
== ctdb
->num_nodes
) {
390 DEBUG(DEBUG_CRIT
,("Unable to bind to any of the node addresses - giving up\n"));
393 ctdb
->address
.address
= talloc_strdup(ctdb
, ctdb
->nodes
[i
]->address
.address
);
394 ctdb
->address
.port
= ctdb
->nodes
[i
]->address
.port
;
395 ctdb
->name
= talloc_asprintf(ctdb
, "%s:%u",
396 ctdb
->address
.address
,
398 ctdb
->pnn
= ctdb
->nodes
[i
]->pnn
;
399 DEBUG(DEBUG_INFO
,("ctdb chose network address %s:%u pnn %u\n",
400 ctdb
->address
.address
,
404 if (listen(ctcp
->listen_fd
, 10) == -1) {
408 fde
= event_add_fd(ctdb
->ev
, ctcp
, ctcp
->listen_fd
, EVENT_FD_READ
,
409 ctdb_listen_event
, ctdb
);
410 tevent_fd_set_auto_close(fde
);
418 if (ctcp
->listen_fd
!= -1) {
419 close(ctcp
->listen_fd
);
420 ctcp
->listen_fd
= -1;
427 listen on our own address
429 int ctdb_tcp_listen(struct ctdb_context
*ctdb
)
431 struct ctdb_tcp
*ctcp
= talloc_get_type(ctdb
->private_data
,
436 struct tevent_fd
*fde
;
438 /* we can either auto-bind to the first available address, or we can
439 use a specified address */
440 if (!ctdb
->address
.address
) {
441 return ctdb_tcp_listen_automatic(ctdb
);
445 if (ctdb_tcp_get_address(ctdb
, ctdb
->address
.address
,
450 switch (sock
.sa
.sa_family
) {
452 sock
.ip
.sin_port
= htons(ctdb
->address
.port
);
453 sock_size
= sizeof(sock
.ip
);
456 sock
.ip6
.sin6_port
= htons(ctdb
->address
.port
);
457 sock_size
= sizeof(sock
.ip6
);
460 DEBUG(DEBUG_ERR
, (__location__
" unknown family %u\n",
464 #ifdef HAVE_SOCK_SIN_LEN
465 sock
.ip
.sin_len
= sock_size
;
468 ctcp
->listen_fd
= socket(sock
.sa
.sa_family
, SOCK_STREAM
, IPPROTO_TCP
);
469 if (ctcp
->listen_fd
== -1) {
470 ctdb_set_error(ctdb
, "socket failed\n");
474 set_close_on_exec(ctcp
->listen_fd
);
476 if (setsockopt(ctcp
->listen_fd
,SOL_SOCKET
,SO_REUSEADDR
,(char *)&one
,sizeof(one
)) == -1) {
477 DEBUG(DEBUG_WARNING
, ("Failed to set REUSEADDR on fd - %s\n",
481 if (bind(ctcp
->listen_fd
, (struct sockaddr
* )&sock
, sock_size
) != 0) {
482 DEBUG(DEBUG_ERR
,(__location__
" Failed to bind() to socket. %s(%d)\n", strerror(errno
), errno
));
486 if (listen(ctcp
->listen_fd
, 10) == -1) {
490 fde
= event_add_fd(ctdb
->ev
, ctcp
, ctcp
->listen_fd
, EVENT_FD_READ
,
491 ctdb_listen_event
, ctdb
);
492 tevent_fd_set_auto_close(fde
);
497 if (ctcp
->listen_fd
!= -1) {
498 close(ctcp
->listen_fd
);
500 ctcp
->listen_fd
= -1;