Log the queue size before and after journal rotation, to see if rotation is killing us.
[lwes-journaller.git] / src / xport_udp.c
blobf1bbfb7673f250c033ad5be93d37dfd0c134e7d1
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "xport.h"
24 #include "xport_udp.h"
26 #include "perror.h"
27 #include "opt.h"
29 #include <arpa/inet.h>
30 #include <netinet/in.h>
31 #include <stdlib.h>
32 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <unistd.h>
36 struct priv {
37 int fd;
39 struct in_addr address;
40 short port; /* In network byte order. */
41 char* iface;
43 struct sockaddr_in ip_addr;
44 struct ip_mreq mreq;
46 int join_group;
48 int bind;
49 int joined_multi_group;
53 static void destructor (struct xport* this_xport)
55 struct priv* ppriv = (struct priv*)this_xport->priv;
57 free (ppriv->iface);
58 free (ppriv);
60 this_xport->vtbl = 0;
61 this_xport->priv = 0;
65 static int xopen (struct xport* this_xport, int flags)
67 char optval = arg_ttl ; // Configured TTL
68 struct priv* ppriv = (struct priv*)this_xport->priv;
69 (void)flags; /* appease -Wall -Werror */
71 memset(&ppriv->ip_addr, 0, sizeof(ppriv->ip_addr));
73 ppriv->ip_addr.sin_family = AF_INET;
74 ppriv->ip_addr.sin_addr = ppriv->address;
75 ppriv->ip_addr.sin_port = ppriv->port;
77 ppriv->mreq.imr_multiaddr = ppriv->ip_addr.sin_addr;
79 if ( ppriv->iface && ppriv->iface[0] )
81 ppriv->mreq.imr_interface.s_addr = inet_addr(ppriv->iface);
83 else
85 ppriv->mreq.imr_interface.s_addr = htonl(INADDR_ANY);
88 if ( (ppriv->fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
90 PERROR("socket");
91 return -1;
93 setsockopt(ppriv->fd, SOL_IP, IP_MULTICAST_TTL, &optval, 1) ;
95 /* If the interface is specified, then we set the interface. */
96 if ( ppriv->iface && ppriv->iface[0] )
98 struct in_addr addr;
100 addr.s_addr = inet_addr(ppriv->iface);
101 if ( setsockopt(ppriv->fd, IPPROTO_IP, IP_MULTICAST_IF,
102 &addr, sizeof(addr)) < 0 )
104 PERROR("can't set interface");
105 return -1;
109 return 0;
112 static int xclose (struct xport* this_xport)
114 struct priv* ppriv = (struct priv*)this_xport->priv;
116 if ( -1 == ppriv->fd )
118 LOG_ER("Close transport called with xport already closed.\n");
119 return -1;
122 if ( ppriv->joined_multi_group )
124 /* remove ourselves from the multicast channel */
125 if ( setsockopt(ppriv->fd, IPPROTO_IP, IP_DROP_MEMBERSHIP,
126 &ppriv->mreq, sizeof(ppriv->mreq)) < 0 )
128 PERROR("can't drop multicast group");
132 close(ppriv->fd);
133 ppriv->fd = -1;
135 return 0;
138 static int xread (struct xport* this_xport, void* buf, size_t count,
139 unsigned long* addr, short* port)
141 struct priv* ppriv = (struct priv*)this_xport->priv;
142 int recvfrom_ret;
144 struct sockaddr_in sender_ip_addr;
145 socklen_t sender_ip_socket_size;
146 struct sockaddr_in ip_addr;
148 /* Set the size for use below: */
149 sender_ip_socket_size = (socklen_t)sizeof(sender_ip_addr);
151 /* This should be done only with multi-cast addresses. */
152 if ( ! ppriv->bind )
154 int on = 1;
156 LOG_PROG("About to join multi-cast group.\n");
158 /* Set address for reuse. */
159 if ( setsockopt(ppriv->fd, SOL_SOCKET, SO_REUSEADDR,
160 &on, sizeof(on)) < 0 )
162 PERROR("setsockopt -- can't reuse address\n");
163 return -1;
166 /* Set the receive buffer size. */
167 if ( arg_sockbuffer )
169 if ( setsockopt(ppriv->fd, SOL_SOCKET, SO_RCVBUF,
170 (char*)&arg_sockbuffer, sizeof(arg_sockbuffer)) < 0 )
172 PERROR("Setsockopt:SO_RCVBUF");
174 /* We'll try to continue with the default buffer size. */
177 /* Bind the socket to the port. */
178 memset(&ip_addr, 0, sizeof(ip_addr));
179 ip_addr.sin_family = AF_INET;
180 ip_addr.sin_addr.s_addr = htonl(INADDR_ANY);
181 ip_addr.sin_port = ppriv->port;
183 if ( bind(ppriv->fd, (struct sockaddr*)&ip_addr, sizeof(ip_addr) ) < 0 )
185 PERROR("can't bind to local address\n");
186 return -1;
188 ppriv->bind = 1;
190 if ( ppriv->join_group && !ppriv->joined_multi_group )
192 /* add the multicast channel given */
193 if ( setsockopt(ppriv->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
194 &ppriv->mreq, sizeof(ppriv->mreq)) < 0 )
196 PERROR("setsockopt can't add to multicast group\n");
197 return -1;
199 ppriv->joined_multi_group = 1;
203 LOG_PROG("about to call recvfrom().\n");
205 if ( (recvfrom_ret = recvfrom(ppriv->fd, buf, count, 0,
206 (struct sockaddr*)&sender_ip_addr,
207 &sender_ip_socket_size)) < 0 )
209 switch ( errno )
211 default:
212 PERROR("recvfrom");
214 case EINTR: /* Quiet return on interrupt. */
215 break;
219 *addr = sender_ip_addr.sin_addr.s_addr;
220 *port = ntohs(sender_ip_addr.sin_port);
222 return recvfrom_ret;
226 static int xwrite (struct xport* this_xport, const void* buf, size_t count)
228 struct priv* ppriv = (struct priv*)this_xport->priv;
230 LOG_PROG("about to call sendto().\n");
232 return sendto (ppriv->fd, buf, count, 0,
233 (struct sockaddr*)&ppriv->ip_addr, sizeof(ppriv->ip_addr));
237 int xport_udp_ctor (struct xport* this_xport,
238 const char* address,
239 const char* iface,
240 short port,
241 int join)
243 static struct xport_vtbl vtbl = {
244 destructor,
246 xopen, xclose,
247 xread, xwrite
250 struct priv* ppriv;
252 this_xport->vtbl = 0;
253 this_xport->priv = 0;
255 ppriv = (struct priv*)malloc(sizeof(*ppriv));
256 if ( 0 == ppriv )
258 LOG_ER("Failed to allocate %d bytes for xport data.\n", sizeof(*ppriv));
259 return -1;
261 memset(ppriv, 0, sizeof(*ppriv));
263 ppriv->fd = -1;
264 if ( 0 == inet_aton(address, &ppriv->address) )
266 LOG_ER("invalid address \"%s\"\n", address);
267 free(ppriv);
268 return -1;
271 ppriv->port = htons(port);
273 ppriv->iface = strdup(iface);
274 if ( 0 == ppriv->iface )
276 LOG_ER("strdup failed attempting to allocate %d bytes\n", strlen(iface));
277 free(ppriv);
278 return -1;
281 ppriv->join_group = join;
283 this_xport->vtbl = &vtbl;
284 this_xport->priv = ppriv;
286 return 0;