1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include <sys/socket.h>
8 #include <sys/select.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
17 #include "TServerSocket.h"
18 #include <boost/shared_ptr.hpp>
20 namespace facebook
{ namespace thrift
{ namespace transport
{
23 using boost::shared_ptr
;
25 TServerSocket::TServerSocket(int port
) :
36 TServerSocket::TServerSocket(int port
, int sendTimeout
, int recvTimeout
) :
40 sendTimeout_(sendTimeout
),
41 recvTimeout_(recvTimeout
),
47 TServerSocket::~TServerSocket() {
51 void TServerSocket::setSendTimeout(int sendTimeout
) {
52 sendTimeout_
= sendTimeout
;
55 void TServerSocket::setRecvTimeout(int recvTimeout
) {
56 recvTimeout_
= recvTimeout
;
59 void TServerSocket::setRetryLimit(int retryLimit
) {
60 retryLimit_
= retryLimit
;
63 void TServerSocket::setRetryDelay(int retryDelay
) {
64 retryDelay_
= retryDelay
;
67 void TServerSocket::listen() {
69 if (-1 == socketpair(AF_LOCAL
, SOCK_STREAM
, 0, sv
)) {
70 GlobalOutput("TServerSocket::init()");
78 struct addrinfo hints
, *res
, *res0
;
80 char port
[sizeof("65536") + 1];
81 memset(&hints
, 0, sizeof(hints
));
82 hints
.ai_family
= PF_UNSPEC
;
83 hints
.ai_socktype
= SOCK_STREAM
;
84 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
85 sprintf(port
, "%d", port_
);
88 error
= getaddrinfo(NULL
, port
, &hints
, &res0
);
90 fprintf(stderr
, "getaddrinfo %d: %s\n", error
, gai_strerror(error
));
92 throw TTransportException(TTransportException::NOT_OPEN
, "Could not resolve host for server socket.");
95 // Pick the ipv6 address first since ipv4 addresses can be mapped
97 for (res
= res0
; res
; res
= res
->ai_next
) {
98 if (res
->ai_family
== AF_INET6
|| res
->ai_next
== NULL
)
102 serverSocket_
= socket(res
->ai_family
, res
->ai_socktype
, res
->ai_protocol
);
103 if (serverSocket_
== -1) {
104 GlobalOutput("TServerSocket::listen() socket");
106 throw TTransportException(TTransportException::NOT_OPEN
, "Could not create server socket.");
109 // Set reusaddress to prevent 2MSL delay on accept
111 if (-1 == setsockopt(serverSocket_
, SOL_SOCKET
, SO_REUSEADDR
,
112 &one
, sizeof(one
))) {
113 GlobalOutput("TServerSocket::listen() SO_REUSEADDR");
115 throw TTransportException(TTransportException::NOT_OPEN
, "Could not set SO_REUSEADDR");
119 #ifdef TCP_DEFER_ACCEPT
120 if (-1 == setsockopt(serverSocket_
, SOL_SOCKET
, TCP_DEFER_ACCEPT
,
121 &one
, sizeof(one
))) {
122 GlobalOutput("TServerSocket::listen() TCP_DEFER_ACCEPT");
124 throw TTransportException(TTransportException::NOT_OPEN
, "Could not set TCP_DEFER_ACCEPT");
126 #endif // #ifdef TCP_DEFER_ACCEPT
128 // Turn linger off, don't want to block on calls to close
129 struct linger ling
= {0, 0};
130 if (-1 == setsockopt(serverSocket_
, SOL_SOCKET
, SO_LINGER
,
131 &ling
, sizeof(ling
))) {
133 GlobalOutput("TServerSocket::listen() SO_LINGER");
134 throw TTransportException(TTransportException::NOT_OPEN
, "Could not set SO_LINGER");
137 // TCP Nodelay, speed over bandwidth
138 if (-1 == setsockopt(serverSocket_
, IPPROTO_TCP
, TCP_NODELAY
,
139 &one
, sizeof(one
))) {
141 GlobalOutput("setsockopt TCP_NODELAY");
142 throw TTransportException(TTransportException::NOT_OPEN
, "Could not set TCP_NODELAY");
145 // Set NONBLOCK on the accept socket
146 int flags
= fcntl(serverSocket_
, F_GETFL
, 0);
148 throw TTransportException(TTransportException::NOT_OPEN
, "fcntl() failed");
151 if (-1 == fcntl(serverSocket_
, F_SETFL
, flags
| O_NONBLOCK
)) {
152 throw TTransportException(TTransportException::NOT_OPEN
, "fcntl() failed");
155 // prepare the port information
156 // we may want to try to bind more than once, since SO_REUSEADDR doesn't
157 // always seem to work. The client can configure the retry variables.
160 if (0 == bind(serverSocket_
, res
->ai_addr
, res
->ai_addrlen
)) {
164 // use short circuit evaluation here to only sleep if we need to
165 } while ((retries
++ < retryLimit_
) && (sleep(retryDelay_
) == 0));
170 // throw an error if we failed to bind properly
171 if (retries
> retryLimit_
) {
173 sprintf(errbuf
, "TServerSocket::listen() BIND %d", port_
);
174 GlobalOutput(errbuf
);
176 throw TTransportException(TTransportException::NOT_OPEN
, "Could not bind");
180 if (-1 == ::listen(serverSocket_
, acceptBacklog_
)) {
181 GlobalOutput("TServerSocket::listen() LISTEN");
183 throw TTransportException(TTransportException::NOT_OPEN
, "Could not listen");
186 // The socket is now listening!
189 shared_ptr
<TTransport
> TServerSocket::acceptImpl() {
190 if (serverSocket_
< 0) {
191 throw TTransportException(TTransportException::NOT_OPEN
, "TServerSocket not listening");
201 FD_SET(serverSocket_
, &fds
);
202 if (intSock2_
>= 0) {
203 FD_SET(intSock2_
, &fds
);
205 int ret
= select(serverSocket_
+1, &fds
, NULL
, NULL
, NULL
);
209 if (errno
== EINTR
&& (numEintrs
++ < maxEintrs
)) {
210 // EINTR needs to be handled manually and we can tolerate
214 GlobalOutput("TServerSocket::acceptImpl() select -1");
215 throw TTransportException(TTransportException::UNKNOWN
);
216 } else if (ret
> 0) {
217 // Check for an interrupt signal
218 if (intSock2_
>= 0 && FD_ISSET(intSock2_
, &fds
)) {
220 if (-1 == recv(intSock2_
, &buf
, sizeof(int8_t), 0)) {
221 GlobalOutput("TServerSocket::acceptImpl() interrupt receive");
223 throw TTransportException(TTransportException::INTERRUPTED
);
225 // Check for the actual server socket being ready
226 if (FD_ISSET(serverSocket_
, &fds
)) {
230 GlobalOutput("TServerSocket::acceptImpl() select 0");
231 throw TTransportException(TTransportException::UNKNOWN
);
235 struct sockaddr_storage clientAddress
;
236 int size
= sizeof(clientAddress
);
237 int clientSocket
= ::accept(serverSocket_
,
238 (struct sockaddr
*) &clientAddress
,
239 (socklen_t
*) &size
);
241 if (clientSocket
< 0) {
242 int errno_copy
= errno
;
243 GlobalOutput("TServerSocket::accept()");
244 throw TTransportException(TTransportException::UNKNOWN
, "accept()", errno_copy
);
247 // Make sure client socket is blocking
248 int flags
= fcntl(clientSocket
, F_GETFL
, 0);
250 int errno_copy
= errno
;
251 GlobalOutput("TServerSocket::select() fcntl GETFL");
252 throw TTransportException(TTransportException::UNKNOWN
, "fcntl(F_GETFL)", errno_copy
);
254 if (-1 == fcntl(clientSocket
, F_SETFL
, flags
& ~O_NONBLOCK
)) {
255 int errno_copy
= errno
;
256 GlobalOutput("TServerSocket::select() fcntl SETFL");
257 throw TTransportException(TTransportException::UNKNOWN
, "fcntl(F_SETFL)", errno_copy
);
260 shared_ptr
<TSocket
> client(new TSocket(clientSocket
));
261 if (sendTimeout_
> 0) {
262 client
->setSendTimeout(sendTimeout_
);
264 if (recvTimeout_
> 0) {
265 client
->setRecvTimeout(recvTimeout_
);
271 void TServerSocket::interrupt() {
272 if (intSock1_
>= 0) {
274 if (-1 == send(intSock1_
, &byte
, sizeof(int8_t), 0)) {
275 GlobalOutput("TServerSocket::interrupt()");
280 void TServerSocket::close() {
281 if (serverSocket_
>= 0) {
282 shutdown(serverSocket_
, SHUT_RDWR
);
283 ::close(serverSocket_
);
285 if (intSock1_
>= 0) {
288 if (intSock2_
>= 0) {
296 }}} // facebook::thrift::transport