r1355@opsdev009 (orig r71477): mcslee | 2007-11-27 00:42:19 -0800
[amiethrift.git] / lib / cpp / src / transport / TServerSocket.cpp
blob69d35bfc8c39027fc0d8047b3e4095dd3d3468a1
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include <sys/socket.h>
8 #include <sys/select.h>
9 #include <sys/types.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
12 #include <netdb.h>
13 #include <fcntl.h>
14 #include <errno.h>
16 #include "TSocket.h"
17 #include "TServerSocket.h"
18 #include <boost/shared_ptr.hpp>
20 namespace facebook { namespace thrift { namespace transport {
22 using namespace std;
23 using boost::shared_ptr;
25 TServerSocket::TServerSocket(int port) :
26 port_(port),
27 serverSocket_(-1),
28 acceptBacklog_(1024),
29 sendTimeout_(0),
30 recvTimeout_(0),
31 retryLimit_(0),
32 retryDelay_(0),
33 intSock1_(-1),
34 intSock2_(-1) {}
36 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
37 port_(port),
38 serverSocket_(-1),
39 acceptBacklog_(1024),
40 sendTimeout_(sendTimeout),
41 recvTimeout_(recvTimeout),
42 retryLimit_(0),
43 retryDelay_(0),
44 intSock1_(-1),
45 intSock2_(-1) {}
47 TServerSocket::~TServerSocket() {
48 close();
51 void TServerSocket::setSendTimeout(int sendTimeout) {
52 sendTimeout_ = sendTimeout;
55 void TServerSocket::setRecvTimeout(int recvTimeout) {
56 recvTimeout_ = recvTimeout;
59 void TServerSocket::setRetryLimit(int retryLimit) {
60 retryLimit_ = retryLimit;
63 void TServerSocket::setRetryDelay(int retryDelay) {
64 retryDelay_ = retryDelay;
67 void TServerSocket::listen() {
68 int sv[2];
69 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
70 GlobalOutput("TServerSocket::init()");
71 intSock1_ = -1;
72 intSock2_ = -1;
73 } else {
74 intSock1_ = sv[1];
75 intSock2_ = sv[0];
78 struct addrinfo hints, *res, *res0;
79 int error;
80 char port[sizeof("65536") + 1];
81 memset(&hints, 0, sizeof(hints));
82 hints.ai_family = PF_UNSPEC;
83 hints.ai_socktype = SOCK_STREAM;
84 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
85 sprintf(port, "%d", port_);
87 // Wildcard address
88 error = getaddrinfo(NULL, port, &hints, &res0);
89 if (error) {
90 fprintf(stderr, "getaddrinfo %d: %s\n", error, gai_strerror(error));
91 close();
92 throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
95 // Pick the ipv6 address first since ipv4 addresses can be mapped
96 // into ipv6 space.
97 for (res = res0; res; res = res->ai_next) {
98 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
99 break;
102 serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
103 if (serverSocket_ == -1) {
104 GlobalOutput("TServerSocket::listen() socket");
105 close();
106 throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.");
109 // Set reusaddress to prevent 2MSL delay on accept
110 int one = 1;
111 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
112 &one, sizeof(one))) {
113 GlobalOutput("TServerSocket::listen() SO_REUSEADDR");
114 close();
115 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR");
118 // Defer accept
119 #ifdef TCP_DEFER_ACCEPT
120 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
121 &one, sizeof(one))) {
122 GlobalOutput("TServerSocket::listen() TCP_DEFER_ACCEPT");
123 close();
124 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT");
126 #endif // #ifdef TCP_DEFER_ACCEPT
128 // Turn linger off, don't want to block on calls to close
129 struct linger ling = {0, 0};
130 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
131 &ling, sizeof(ling))) {
132 close();
133 GlobalOutput("TServerSocket::listen() SO_LINGER");
134 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER");
137 // TCP Nodelay, speed over bandwidth
138 if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
139 &one, sizeof(one))) {
140 close();
141 GlobalOutput("setsockopt TCP_NODELAY");
142 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY");
145 // Set NONBLOCK on the accept socket
146 int flags = fcntl(serverSocket_, F_GETFL, 0);
147 if (flags == -1) {
148 throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
151 if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
152 throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
155 // prepare the port information
156 // we may want to try to bind more than once, since SO_REUSEADDR doesn't
157 // always seem to work. The client can configure the retry variables.
158 int retries = 0;
159 do {
160 if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
161 break;
164 // use short circuit evaluation here to only sleep if we need to
165 } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
167 // free addrinfo
168 freeaddrinfo(res0);
170 // throw an error if we failed to bind properly
171 if (retries > retryLimit_) {
172 char errbuf[1024];
173 sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
174 GlobalOutput(errbuf);
175 close();
176 throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
179 // Call listen
180 if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
181 GlobalOutput("TServerSocket::listen() LISTEN");
182 close();
183 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen");
186 // The socket is now listening!
189 shared_ptr<TTransport> TServerSocket::acceptImpl() {
190 if (serverSocket_ < 0) {
191 throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
194 fd_set fds;
196 int maxEintrs = 5;
197 int numEintrs = 0;
199 while (true) {
200 FD_ZERO(&fds);
201 FD_SET(serverSocket_, &fds);
202 if (intSock2_ >= 0) {
203 FD_SET(intSock2_, &fds);
205 int ret = select(serverSocket_+1, &fds, NULL, NULL, NULL);
207 if (ret < 0) {
208 // error cases
209 if (errno == EINTR && (numEintrs++ < maxEintrs)) {
210 // EINTR needs to be handled manually and we can tolerate
211 // a certain number
212 continue;
214 GlobalOutput("TServerSocket::acceptImpl() select -1");
215 throw TTransportException(TTransportException::UNKNOWN);
216 } else if (ret > 0) {
217 // Check for an interrupt signal
218 if (intSock2_ >= 0 && FD_ISSET(intSock2_, &fds)) {
219 int8_t buf;
220 if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
221 GlobalOutput("TServerSocket::acceptImpl() interrupt receive");
223 throw TTransportException(TTransportException::INTERRUPTED);
225 // Check for the actual server socket being ready
226 if (FD_ISSET(serverSocket_, &fds)) {
227 break;
229 } else {
230 GlobalOutput("TServerSocket::acceptImpl() select 0");
231 throw TTransportException(TTransportException::UNKNOWN);
235 struct sockaddr_storage clientAddress;
236 int size = sizeof(clientAddress);
237 int clientSocket = ::accept(serverSocket_,
238 (struct sockaddr *) &clientAddress,
239 (socklen_t *) &size);
241 if (clientSocket < 0) {
242 int errno_copy = errno;
243 GlobalOutput("TServerSocket::accept()");
244 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
247 // Make sure client socket is blocking
248 int flags = fcntl(clientSocket, F_GETFL, 0);
249 if (flags == -1) {
250 int errno_copy = errno;
251 GlobalOutput("TServerSocket::select() fcntl GETFL");
252 throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
254 if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
255 int errno_copy = errno;
256 GlobalOutput("TServerSocket::select() fcntl SETFL");
257 throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
260 shared_ptr<TSocket> client(new TSocket(clientSocket));
261 if (sendTimeout_ > 0) {
262 client->setSendTimeout(sendTimeout_);
264 if (recvTimeout_ > 0) {
265 client->setRecvTimeout(recvTimeout_);
268 return client;
271 void TServerSocket::interrupt() {
272 if (intSock1_ >= 0) {
273 int8_t byte = 0;
274 if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
275 GlobalOutput("TServerSocket::interrupt()");
280 void TServerSocket::close() {
281 if (serverSocket_ >= 0) {
282 shutdown(serverSocket_, SHUT_RDWR);
283 ::close(serverSocket_);
285 if (intSock1_ >= 0) {
286 ::close(intSock1_);
288 if (intSock2_ >= 0) {
289 ::close(intSock2_);
291 serverSocket_ = -1;
292 intSock1_ = -1;
293 intSock2_ = -1;
296 }}} // facebook::thrift::transport