1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include "TNonblockingServer.h"
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12 #include <netinet/tcp.h>
18 namespace facebook
{ namespace thrift
{ namespace server
{
20 using namespace facebook::thrift::protocol
;
21 using namespace facebook::thrift::transport
;
24 class TConnection::Task
: public Runnable
{
26 Task(boost::shared_ptr
<TProcessor
> processor
,
27 boost::shared_ptr
<TProtocol
> input
,
28 boost::shared_ptr
<TProtocol
> output
,
30 processor_(processor
),
33 taskHandle_(taskHandle
) {}
37 while (processor_
->process(input_
, output_
)) {
38 if (!input_
->getTransport()->peek()) {
42 } catch (TTransportException
& ttx
) {
43 cerr
<< "TThreadedServer client died: " << ttx
.what() << endl
;
44 } catch (TException
& x
) {
45 cerr
<< "TThreadedServer exception: " << x
.what() << endl
;
47 cerr
<< "TThreadedServer uncaught exception." << endl
;
50 // Signal completion back to the libevent thread via a socketpair
52 if (-1 == send(taskHandle_
, &b
, sizeof(int8_t), 0)) {
53 GlobalOutput("TNonblockingServer::Task: send");
55 if (-1 == ::close(taskHandle_
)) {
56 GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
61 boost::shared_ptr
<TProcessor
> processor_
;
62 boost::shared_ptr
<TProtocol
> input_
;
63 boost::shared_ptr
<TProtocol
> output_
;
67 void TConnection::init(int socket
, short eventFlags
, TNonblockingServer
* s
) {
80 socketState_
= SOCKET_RECV
;
85 // Set flags, which also registers the event
88 // get input/transports
89 factoryInputTransport_
= s
->getInputTransportFactory()->getTransport(inputTransport_
);
90 factoryOutputTransport_
= s
->getOutputTransportFactory()->getTransport(outputTransport_
);
93 inputProtocol_
= s
->getInputProtocolFactory()->getProtocol(factoryInputTransport_
);
94 outputProtocol_
= s
->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_
);
97 void TConnection::workSocket() {
98 int flags
=0, got
=0, left
=0, sent
=0;
101 switch (socketState_
) {
103 // It is an error to be in this state if we already have all the data
104 assert(readBufferPos_
< readWant_
);
106 // Double the buffer size until it is big enough
107 if (readWant_
> readBufferSize_
) {
108 while (readWant_
> readBufferSize_
) {
109 readBufferSize_
*= 2;
111 readBuffer_
= (uint8_t*)realloc(readBuffer_
, readBufferSize_
);
112 if (readBuffer_
== NULL
) {
113 GlobalOutput("TConnection::workSocket() realloc");
119 // Read from the socket
120 fetch
= readWant_
- readBufferPos_
;
121 got
= recv(socket_
, readBuffer_
+ readBufferPos_
, fetch
, 0);
124 // Move along in the buffer
125 readBufferPos_
+= got
;
127 // Check that we did not overdo it
128 assert(readBufferPos_
<= readWant_
);
130 // We are done reading, move onto the next state
131 if (readBufferPos_
== readWant_
) {
135 } else if (got
== -1) {
136 // Blocking errors are okay, just move on
137 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
141 if (errno
!= ECONNRESET
) {
142 GlobalOutput("TConnection::workSocket() recv -1");
146 // Whenever we get down here it means a remote disconnect
152 // Should never have position past size
153 assert(writeBufferPos_
<= writeBufferSize_
);
155 // If there is no data to send, then let us move on
156 if (writeBufferPos_
== writeBufferSize_
) {
157 GlobalOutput("WARNING: Send state with no data to send\n");
164 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
165 // check for the EPIPE return condition and close the socket in that case
166 flags
|= MSG_NOSIGNAL
;
167 #endif // ifdef MSG_NOSIGNAL
169 left
= writeBufferSize_
- writeBufferPos_
;
170 sent
= send(socket_
, writeBuffer_
+ writeBufferPos_
, left
, flags
);
173 // Blocking errors are okay, just move on
174 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
177 if (errno
!= EPIPE
) {
178 GlobalOutput("TConnection::workSocket() send -1");
184 writeBufferPos_
+= sent
;
187 assert(writeBufferPos_
<= writeBufferSize_
);
190 if (writeBufferPos_
== writeBufferSize_
) {
197 fprintf(stderr
, "Shit Got Ill. Socket State %d\n", socketState_
);
203 * This is called when the application transitions from one state into
204 * another. This means that it has finished writing the data that it needed
205 * to, or finished receiving the data that it needed to.
207 void TConnection::transition() {
211 // Switch upon the state that we are currently in and move to a new state
214 case APP_READ_REQUEST
:
215 // We are done reading the request, package the read buffer into transport
216 // and get back some data from the dispatch function
217 inputTransport_
->resetBuffer(readBuffer_
, readBufferPos_
);
218 outputTransport_
->resetBuffer();
220 if (server_
->isThreadPoolProcessing()) {
221 // We are setting up a Task to do this work and we will wait on it
223 if (-1 == socketpair(AF_LOCAL
, SOCK_STREAM
, 0, sv
)) {
224 GlobalOutput("TConnection::socketpair() failed");
225 // Now we will fall through to the APP_WAIT_TASK block with no response
227 // Create task and dispatch to the thread manager
228 boost::shared_ptr
<Runnable
> task
=
229 boost::shared_ptr
<Runnable
>(new Task(server_
->getProcessor(),
233 // The application is now waiting on the task to finish
234 appState_
= APP_WAIT_TASK
;
236 // Create an event to be notified when the task finishes
237 event_set(&taskEvent_
,
240 TConnection::taskHandler
,
243 // Attach to the base
244 event_base_set(server_
->getEventBase(), &taskEvent_
);
246 // Add the event and start up the server
247 if (-1 == event_add(&taskEvent_
, 0)) {
248 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
251 server_
->addTask(task
);
253 // Set this connection idle so that libevent doesn't process more
254 // data on it while we're still waiting for the threadmanager to
261 // Invoke the processor
262 server_
->getProcessor()->process(inputProtocol_
, outputProtocol_
);
263 } catch (TTransportException
&ttx
) {
264 fprintf(stderr
, "TTransportException: Server::process() %s\n", ttx
.what());
267 } catch (TException
&x
) {
268 fprintf(stderr
, "TException: Server::process() %s\n", x
.what());
272 fprintf(stderr
, "Server::process() unknown exception\n");
278 // Intentionally fall through here, the call to process has written into
282 // We have now finished processing a task and the result has been written
283 // into the outputTransport_, so we grab its contents and place them into
284 // the writeBuffer_ for actual writing by the libevent thread
286 // Get the result of the operation
287 outputTransport_
->getBuffer(&writeBuffer_
, &writeBufferSize_
);
289 // If the function call generated return data, then move into the send
290 // state and get going
291 if (writeBufferSize_
> 0) {
293 // Move into write state
295 socketState_
= SOCKET_SEND
;
297 if (server_
->getFrameResponses()) {
298 // Put the frame size into the write buffer
299 appState_
= APP_SEND_FRAME_SIZE
;
300 frameSize_
= (int32_t)htonl(writeBufferSize_
);
301 writeBuffer_
= (uint8_t*)&frameSize_
;
302 writeBufferSize_
= 4;
304 // Go straight into sending the result, do not frame it
305 appState_
= APP_SEND_RESULT
;
308 // Socket into write mode
311 // Try to work the socket immediately
317 // In this case, the request was asynchronous and we should fall through
318 // right back into the read frame header state
321 case APP_SEND_FRAME_SIZE
:
323 // Refetch the result of the operation since we put the frame size into
325 outputTransport_
->getBuffer(&writeBuffer_
, &writeBufferSize_
);
328 // Now in send result state
329 appState_
= APP_SEND_RESULT
;
331 // Go to work on the socket right away, probably still writeable
336 case APP_SEND_RESULT
:
338 // N.B.: We also intentionally fall through here into the INIT state!
343 // Clear write buffer variables
346 writeBufferSize_
= 0;
348 // Set up read buffer for getting 4 bytes
352 // Into read4 state we go
353 socketState_
= SOCKET_RECV
;
354 appState_
= APP_READ_FRAME_SIZE
;
356 // Register read event
359 // Try to work the socket right away
364 case APP_READ_FRAME_SIZE
:
365 // We just read the request length, deserialize it
366 sz
= *(int32_t*)readBuffer_
;
367 sz
= (int32_t)ntohl(sz
);
370 fprintf(stderr
, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz
);
375 // Reset the read buffer
376 readWant_
= (uint32_t)sz
;
379 // Move into read request state
380 appState_
= APP_READ_REQUEST
;
382 // Work the socket right away
388 fprintf(stderr
, "Totally Fucked. Application State %d\n", appState_
);
393 void TConnection::setFlags(short eventFlags
) {
394 // Catch the do nothing case
395 if (eventFlags_
== eventFlags
) {
399 // Delete a previously existing event
400 if (eventFlags_
!= 0) {
401 if (event_del(&event_
) == -1) {
402 GlobalOutput("TConnection::setFlags event_del");
407 // Update in memory structure
408 eventFlags_
= eventFlags
;
410 // Do not call event_set if there are no flags
418 * Prepares the event structure &event to be used in future calls to
419 * event_add() and event_del(). The event will be prepared to call the
420 * eventHandler using the 'sock' file descriptor to monitor events.
422 * The events can be either EV_READ, EV_WRITE, or both, indicating
423 * that an application can read or write from the file respectively without
426 * The eventHandler will be called with the file descriptor that triggered
427 * the event and the type of event which will be one of: EV_TIMEOUT,
428 * EV_SIGNAL, EV_READ, EV_WRITE.
430 * The additional flag EV_PERSIST makes an event_add() persistent until
431 * event_del() has been called.
433 * Once initialized, the &event struct can be used repeatedly with
434 * event_add() and event_del() and does not need to be reinitialized unless
435 * the eventHandler and/or the argument to it are to be changed. However,
436 * when an ev structure has been added to libevent using event_add() the
437 * structure must persist until the event occurs (assuming EV_PERSIST
438 * is not set) or is removed using event_del(). You may not reuse the same
439 * ev structure for multiple monitored descriptors; each descriptor needs
442 event_set(&event_
, socket_
, eventFlags_
, TConnection::eventHandler
, this);
443 event_base_set(server_
->getEventBase(), &event_
);
446 if (event_add(&event_
, 0) == -1) {
447 GlobalOutput("TConnection::setFlags(): could not event_add");
452 * Closes a connection
454 void TConnection::close() {
455 // Delete the registered libevent
456 if (event_del(&event_
) == -1) {
457 GlobalOutput("TConnection::close() event_del");
466 // close any factory produced transports
467 factoryInputTransport_
->close();
468 factoryOutputTransport_
->close();
470 // Give this object back to the server that owns it
471 server_
->returnConnection(this);
475 * Creates a new connection either by reusing an object off the stack or
476 * by allocating a new one entirely
478 TConnection
* TNonblockingServer::createConnection(int socket
, short flags
) {
480 if (connectionStack_
.empty()) {
481 return new TConnection(socket
, flags
, this);
483 TConnection
* result
= connectionStack_
.top();
484 connectionStack_
.pop();
485 result
->init(socket
, flags
, this);
491 * Returns a connection to the stack
493 void TNonblockingServer::returnConnection(TConnection
* connection
) {
494 connectionStack_
.push(connection
);
498 * Server socket had something happen
500 void TNonblockingServer::handleEvent(int fd
, short which
) {
501 // Make sure that libevent didn't fuck up the socket handles
502 assert(fd
== serverSocket_
);
504 // Server socket accepted a new connection
506 struct sockaddr addr
;
507 addrLen
= sizeof(addr
);
509 // Going to accept a new client socket
512 // Accept as many new clients as possible, even though libevent signaled only
513 // one, this helps us to avoid having to go back into the libevent engine so
515 while ((clientSocket
= accept(fd
, &addr
, &addrLen
)) != -1) {
517 // Explicitly set this socket to NONBLOCK mode
519 if ((flags
= fcntl(clientSocket
, F_GETFL
, 0)) < 0 ||
520 fcntl(clientSocket
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
521 GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
526 // Create a new TConnection for this client socket.
527 TConnection
* clientConnection
=
528 createConnection(clientSocket
, EV_READ
| EV_PERSIST
);
530 // Fail fast if we could not create a TConnection object
531 if (clientConnection
== NULL
) {
532 fprintf(stderr
, "thriftServerEventHandler: failed TConnection factory");
537 // Put this client connection into the proper state
538 clientConnection
->transition();
541 // Done looping accept, now we have to make sure the error is due to
542 // blocking. Any other error is a problem
543 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
544 GlobalOutput("thriftServerEventHandler: accept()");
549 * Creates a socket to listen on and binds it to the local port.
551 void TNonblockingServer::listenSocket() {
553 struct addrinfo hints
, *res
, *res0
;
556 char port
[sizeof("65536") + 1];
557 memset(&hints
, 0, sizeof(hints
));
558 hints
.ai_family
= PF_UNSPEC
;
559 hints
.ai_socktype
= SOCK_STREAM
;
560 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
561 sprintf(port
, "%d", port_
);
564 error
= getaddrinfo(NULL
, port
, &hints
, &res0
);
566 GlobalOutput("TNonblockingServer::serve() getaddrinfo");
570 // Pick the ipv6 address first since ipv4 addresses can be mapped
572 for (res
= res0
; res
; res
= res
->ai_next
) {
573 if (res
->ai_family
== AF_INET6
|| res
->ai_next
== NULL
)
577 // Create the server socket
578 s
= socket(res
->ai_family
, res
->ai_socktype
, res
->ai_protocol
);
581 throw TException("TNonblockingServer::serve() socket() -1");
586 // Set reuseaddr to avoid 2MSL delay on server restart
587 setsockopt(s
, SOL_SOCKET
, SO_REUSEADDR
, &one
, sizeof(one
));
589 if (bind(s
, res
->ai_addr
, res
->ai_addrlen
) == -1) {
592 throw TException("TNonblockingServer::serve() bind");
595 // Done with the addr info
598 // Set up this file descriptor for listening
603 * Takes a socket created by listenSocket() and sets various options on it
604 * to prepare for use in the server.
606 void TNonblockingServer::listenSocket(int s
) {
607 // Set socket to nonblocking mode
609 if ((flags
= fcntl(s
, F_GETFL
, 0)) < 0 ||
610 fcntl(s
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
612 throw TException("TNonblockingServer::serve() O_NONBLOCK");
616 struct linger ling
= {0, 0};
618 // Keepalive to ensure full result flushing
619 setsockopt(s
, SOL_SOCKET
, SO_KEEPALIVE
, &one
, sizeof(one
));
621 // Turn linger off to avoid hung sockets
622 setsockopt(s
, SOL_SOCKET
, SO_LINGER
, &ling
, sizeof(ling
));
624 // Set TCP nodelay if available, MAC OS X Hack
625 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
627 setsockopt(s
, IPPROTO_TCP
, TCP_NODELAY
, &one
, sizeof(one
));
630 if (listen(s
, LISTEN_BACKLOG
) == -1) {
632 throw TException("TNonblockingServer::serve() listen");
635 // Cool, this socket is good to go, set it as the serverSocket_
640 * Register the core libevent events onto the proper base.
642 void TNonblockingServer::registerEvents(event_base
* base
) {
643 assert(serverSocket_
!= -1);
647 // Print some libevent stats
649 "libevent %s method %s\n",
653 // Register the server event
654 event_set(&serverEvent_
,
656 EV_READ
| EV_PERSIST
,
657 TNonblockingServer::eventHandler
,
659 event_base_set(eventBase_
, &serverEvent_
);
661 // Add the event and start up the server
662 if (-1 == event_add(&serverEvent_
, 0)) {
663 throw TException("TNonblockingServer::serve(): coult not event_add");
668 * Main workhorse function, starts up the server listening on a port and
669 * loops over the libevent handler.
671 void TNonblockingServer::serve() {
675 // Initialize libevent core
676 registerEvents(static_cast<event_base
*>(event_init()));
678 // Run the preServe event
679 if (eventHandler_
!= NULL
) {
680 eventHandler_
->preServe();
683 // Run libevent engine, never returns, invokes calls to eventHandler
684 event_base_loop(eventBase_
, 0);
687 }}} // facebook::thrift::server