2 #include <Agelena/Logger.h>
3 #include <boost/bind.hpp>
4 #include "Exceptions/Socket.h"
6 #if defined(_WIN32) && ! defined(__CYGWIN__)
9 # define getLastError__ WSAGetLastError
10 # define socklen_t int
12 # include <sys/types.h>
13 # include <sys/socket.h>
14 # include <netinet/in.h>
16 # define getLastError__() errno
17 # define closesocket ::close
21 #include "Handlers/UDPDataHandler.h"
22 #include "Private/ConnectionHandler.h"
24 #define SOCKET_CALL(statement, check_on_error, message, function) \
25 if ((statement) check_on_error) \
26 throw Exceptions::SocketError(message, function, getLastError__()); \
32 UDPSocket::UDPSocket(const Details::Address
& address
, unsigned short port
)
37 SOCKET_CALL(fd_
= socket(AF_INET
, SOCK_DGRAM
, IPPROTO_UDP
), < 0, "socket allocation failed", "UDPSocket constructor");
41 SOCKET_CALL(setsockopt(fd_
, SOL_SOCKET
, SO_REUSEADDR
, (const char *)&on
, sizeof(on
)), < 0, "setsockopt failed", "UDPSocket constructor");
42 sockaddr_in in_address
;
43 memset(&in_address
, 0, sizeof(in_address
));
44 in_address
.sin_addr
.s_addr
= address
.u_
.u32_
;
45 in_address
.sin_family
= AF_INET
;
46 in_address
.sin_port
= htons(port
);
47 SOCKET_CALL(bind(fd_
, (const sockaddr
*)(&in_address
), sizeof(sockaddr_in
)), < 0, "bind failed", "UDPSocket constructor");
50 { /* no need to bind */ }
53 UDPSocket::~UDPSocket()
59 std::size_t UDPSocket::send(const Details::Address
& to
, unsigned short port
, const std::vector
< char > & data
)
61 checkStatus("UDPSocket::send");
62 sockaddr_in remote_address
;
63 memset(&remote_address
, 0, sizeof(remote_address
));
64 remote_address
.sin_addr
.s_addr
= to
.u_
.u32_
;
65 remote_address
.sin_family
= AF_INET
;
66 remote_address
.sin_port
= htons(port
);
68 const char * curr(&data
[0]);
69 const char * end(curr
+ data
.size());
70 ssize_t
sent(::sendto(fd_
, curr
, std::distance(curr
, end
), 0, (const sockaddr
*)(&remote_address
), sizeof(remote_address
)));
74 throw Exceptions::SocketError("Send failed", "UDPSocket::send", getLastError__());
82 boost::tuple
< Details::Address
, unsigned short, std::size_t > UDPSocket::recv(std::vector
< char > & buffer
, unsigned long timeout
/* = ~0*/)
84 checkStatus("UDPSocket::recv");
85 boost::recursive_mutex::scoped_lock
sentinel(peek_buffer_lock_
);
86 if (boost::tuples::get
<3>(peek_buffer_
).empty())
89 bool we_own_the_buffer_size(false);
90 if (buffer
.size() == 0)
92 buffer
.resize(4096, 0);
93 we_own_the_buffer_size
= true;
96 { /* keep the buffer as is */ }
97 sockaddr_in remote_address
;
98 memset(&remote_address
, 0, sizeof(remote_address
));
99 socklen_t
remote_address_size(sizeof(remote_address
));
100 bool timed_out(false);
103 int highest_fd
= fd_
+ 1;
110 FD_SET(fd_
, &read_set
);
113 time_out
.tv_usec
= timeout
;
115 switch (select(highest_fd
, &read_set
, &write_set
, &exc_set
, &time_out
))
125 throw Exceptions::SocketError("Select call for recv failed", "UDPSocket::recv", getLastError__());
129 { /* no time-out period */ }
130 ssize_t
received(timed_out
? 0 : recvfrom(fd_
, &buffer
[0], buffer
.size(), 0, (sockaddr
*)(&remote_address
), &remote_address_size
));
134 throw Exceptions::SocketError("Recv failed", "UDPSocket::recv", getLastError__());
137 { /* all is well */ }
138 assert(remote_address_size
== sizeof(remote_address
));
139 if (we_own_the_buffer_size
)
140 buffer
.resize(received
);
142 { /* we don't own the buffer size, so let the called take care of it */ }
143 return boost::make_tuple(Details::Address(remote_address
.sin_addr
.s_addr
), ntohs(remote_address
.sin_port
), received
);
147 buffer
= boost::tuples::get
<3>(peek_buffer_
);
148 boost::tuples::get
<3>(peek_buffer_
).clear();
149 return boost::make_tuple(boost::tuples::get
<0>(peek_buffer_
), boost::tuples::get
<1>(peek_buffer_
), boost::tuples::get
<2>(peek_buffer_
));
153 boost::tuple
< Details::Address
, unsigned short, std::size_t > UDPSocket::peek(std::vector
< char > & buffer
, unsigned long timeout
/* = ~0*/)
155 boost::recursive_mutex::scoped_lock
sentinel(peek_buffer_lock_
);
156 if (boost::tuples::get
<3>(peek_buffer_
).empty())
158 boost::tie(boost::tuples::get
<0>(peek_buffer_
), boost::tuples::get
<1>(peek_buffer_
), boost::tuples::get
<2>(peek_buffer_
)) = recv(boost::tuples::get
<3>(peek_buffer_
), timeout
);
161 { /* already have a peeked buffer */ }
163 buffer
= boost::tuples::get
<3>(peek_buffer_
);
164 return boost::make_tuple(boost::tuples::get
<0>(peek_buffer_
), boost::tuples::get
<1>(peek_buffer_
), boost::tuples::get
<2>(peek_buffer_
));
167 void UDPSocket::clearPeekBuffer()
169 boost::recursive_mutex::scoped_lock
sentinel(peek_buffer_lock_
);
170 boost::tuples::get
<3>(peek_buffer_
).clear();
173 bool UDPSocket::poll() const
175 checkStatus("UDPSocket::poll");
178 int highest_fd
= fd_
+ 1;
185 FD_SET(fd_
, &read_set
);
190 switch (select(highest_fd
, &read_set
, &write_set
, &exc_set
, &timeout
))
200 throw Exceptions::SocketError("Select call for poll failed", "UDPSocket::poll", getLastError__());
206 void UDPSocket::close()
214 { /* already closed */ }
218 void UDPSocket::setDataHandler(Handlers::UDPDataHandler
& handler
, OnErrorCallback on_error_callback
/* = OnErrorCallback()*/)
220 boost::recursive_mutex::scoped_lock
sentinel(fd_lock_
);
221 data_handler_
= &handler
;
222 Private::ConnectionHandler::getInstance().attach(fd_
, boost::bind(&UDPSocket::onDataReady_
, this), on_error_callback
);
225 void UDPSocket::clearDataHandler()
227 boost::recursive_mutex::scoped_lock
sentinel(fd_lock_
);
230 AGELENA_DEBUG_1("Detaching FD %1% from the connection handler", fd_
);
231 Private::ConnectionHandler::getInstance().detach(fd_
);
235 { /* nothing to clear */ }
238 void UDPSocket::checkStatus(const char * for_whom
) const
240 if (status_
!= good__
)
243 throw Exceptions::SocketError("Socket not usable", for_whom
, -1);
246 { /* all is well */ }
249 void UDPSocket::onDataReady_()
251 if (data_handler_
&& status_
== good__
)
253 /* If the internal weak pointer in enable_shared_from_this is
254 * NULL, we're not stored in a shared_ptr, which means we can't
255 * pass one to our users. This is possible, for example, if the
256 * client app uses RAII to handle the life-cycle of the UDP
257 * socket, rather than using a shared_ptr. */
258 if (_internal_weak_this
.lock())
259 (*data_handler_
)(shared_from_this());
261 (*data_handler_
)(this);