r1361@opsdev009 (orig r71612): mcslee | 2007-11-27 17:51:43 -0800
[amiethrift.git] / lib / cpp / src / server / TNonblockingServer.cpp
blobde32db5460f3c443c3c3d926bd814fb02ad45ed9
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include "TNonblockingServer.h"
9 #include <iostream>
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12 #include <netinet/tcp.h>
13 #include <netdb.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <assert.h>
18 namespace facebook { namespace thrift { namespace server {
20 using namespace facebook::thrift::protocol;
21 using namespace facebook::thrift::transport;
22 using namespace std;
24 class TConnection::Task: public Runnable {
25 public:
26 Task(boost::shared_ptr<TProcessor> processor,
27 boost::shared_ptr<TProtocol> input,
28 boost::shared_ptr<TProtocol> output,
29 int taskHandle) :
30 processor_(processor),
31 input_(input),
32 output_(output),
33 taskHandle_(taskHandle) {}
35 void run() {
36 try {
37 while (processor_->process(input_, output_)) {
38 if (!input_->getTransport()->peek()) {
39 break;
42 } catch (TTransportException& ttx) {
43 cerr << "TThreadedServer client died: " << ttx.what() << endl;
44 } catch (TException& x) {
45 cerr << "TThreadedServer exception: " << x.what() << endl;
46 } catch (...) {
47 cerr << "TThreadedServer uncaught exception." << endl;
50 // Signal completion back to the libevent thread via a socketpair
51 int8_t b = 0;
52 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
53 GlobalOutput("TNonblockingServer::Task: send");
55 if (-1 == ::close(taskHandle_)) {
56 GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
60 private:
61 boost::shared_ptr<TProcessor> processor_;
62 boost::shared_ptr<TProtocol> input_;
63 boost::shared_ptr<TProtocol> output_;
64 int taskHandle_;
67 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
68 socket_ = socket;
69 server_ = s;
70 appState_ = APP_INIT;
71 eventFlags_ = 0;
73 readBufferPos_ = 0;
74 readWant_ = 0;
76 writeBuffer_ = NULL;
77 writeBufferSize_ = 0;
78 writeBufferPos_ = 0;
80 socketState_ = SOCKET_RECV;
81 appState_ = APP_INIT;
83 taskHandle_ = -1;
85 // Set flags, which also registers the event
86 setFlags(eventFlags);
88 // get input/transports
89 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
90 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
92 // Create protocol
93 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
94 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
97 void TConnection::workSocket() {
98 int flags=0, got=0, left=0, sent=0;
99 uint32_t fetch = 0;
101 switch (socketState_) {
102 case SOCKET_RECV:
103 // It is an error to be in this state if we already have all the data
104 assert(readBufferPos_ < readWant_);
106 // Double the buffer size until it is big enough
107 if (readWant_ > readBufferSize_) {
108 while (readWant_ > readBufferSize_) {
109 readBufferSize_ *= 2;
111 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
112 if (readBuffer_ == NULL) {
113 GlobalOutput("TConnection::workSocket() realloc");
114 close();
115 return;
119 // Read from the socket
120 fetch = readWant_ - readBufferPos_;
121 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
123 if (got > 0) {
124 // Move along in the buffer
125 readBufferPos_ += got;
127 // Check that we did not overdo it
128 assert(readBufferPos_ <= readWant_);
130 // We are done reading, move onto the next state
131 if (readBufferPos_ == readWant_) {
132 transition();
134 return;
135 } else if (got == -1) {
136 // Blocking errors are okay, just move on
137 if (errno == EAGAIN || errno == EWOULDBLOCK) {
138 return;
141 if (errno != ECONNRESET) {
142 GlobalOutput("TConnection::workSocket() recv -1");
146 // Whenever we get down here it means a remote disconnect
147 close();
149 return;
151 case SOCKET_SEND:
152 // Should never have position past size
153 assert(writeBufferPos_ <= writeBufferSize_);
155 // If there is no data to send, then let us move on
156 if (writeBufferPos_ == writeBufferSize_) {
157 GlobalOutput("WARNING: Send state with no data to send\n");
158 transition();
159 return;
162 flags = 0;
163 #ifdef MSG_NOSIGNAL
164 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
165 // check for the EPIPE return condition and close the socket in that case
166 flags |= MSG_NOSIGNAL;
167 #endif // ifdef MSG_NOSIGNAL
169 left = writeBufferSize_ - writeBufferPos_;
170 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
172 if (sent <= 0) {
173 // Blocking errors are okay, just move on
174 if (errno == EAGAIN || errno == EWOULDBLOCK) {
175 return;
177 if (errno != EPIPE) {
178 GlobalOutput("TConnection::workSocket() send -1");
180 close();
181 return;
184 writeBufferPos_ += sent;
186 // Did we overdo it?
187 assert(writeBufferPos_ <= writeBufferSize_);
189 // We are done!
190 if (writeBufferPos_ == writeBufferSize_) {
191 transition();
194 return;
196 default:
197 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
198 assert(0);
203 * This is called when the application transitions from one state into
204 * another. This means that it has finished writing the data that it needed
205 * to, or finished receiving the data that it needed to.
207 void TConnection::transition() {
209 int sz = 0;
211 // Switch upon the state that we are currently in and move to a new state
212 switch (appState_) {
214 case APP_READ_REQUEST:
215 // We are done reading the request, package the read buffer into transport
216 // and get back some data from the dispatch function
217 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
218 outputTransport_->resetBuffer();
220 if (server_->isThreadPoolProcessing()) {
221 // We are setting up a Task to do this work and we will wait on it
222 int sv[2];
223 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
224 GlobalOutput("TConnection::socketpair() failed");
225 // Now we will fall through to the APP_WAIT_TASK block with no response
226 } else {
227 // Create task and dispatch to the thread manager
228 boost::shared_ptr<Runnable> task =
229 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
230 inputProtocol_,
231 outputProtocol_,
232 sv[1]));
233 // The application is now waiting on the task to finish
234 appState_ = APP_WAIT_TASK;
236 // Create an event to be notified when the task finishes
237 event_set(&taskEvent_,
238 taskHandle_ = sv[0],
239 EV_READ,
240 TConnection::taskHandler,
241 this);
243 // Attach to the base
244 event_base_set(server_->getEventBase(), &taskEvent_);
246 // Add the event and start up the server
247 if (-1 == event_add(&taskEvent_, 0)) {
248 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
249 return;
251 server_->addTask(task);
253 // Set this connection idle so that libevent doesn't process more
254 // data on it while we're still waiting for the threadmanager to
255 // finish this task
256 setIdle();
257 return;
259 } else {
260 try {
261 // Invoke the processor
262 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
263 } catch (TTransportException &ttx) {
264 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
265 close();
266 return;
267 } catch (TException &x) {
268 fprintf(stderr, "TException: Server::process() %s\n", x.what());
269 close();
270 return;
271 } catch (...) {
272 fprintf(stderr, "Server::process() unknown exception\n");
273 close();
274 return;
278 // Intentionally fall through here, the call to process has written into
279 // the writeBuffer_
281 case APP_WAIT_TASK:
282 // We have now finished processing a task and the result has been written
283 // into the outputTransport_, so we grab its contents and place them into
284 // the writeBuffer_ for actual writing by the libevent thread
286 // Get the result of the operation
287 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
289 // If the function call generated return data, then move into the send
290 // state and get going
291 if (writeBufferSize_ > 0) {
293 // Move into write state
294 writeBufferPos_ = 0;
295 socketState_ = SOCKET_SEND;
297 if (server_->getFrameResponses()) {
298 // Put the frame size into the write buffer
299 appState_ = APP_SEND_FRAME_SIZE;
300 frameSize_ = (int32_t)htonl(writeBufferSize_);
301 writeBuffer_ = (uint8_t*)&frameSize_;
302 writeBufferSize_ = 4;
303 } else {
304 // Go straight into sending the result, do not frame it
305 appState_ = APP_SEND_RESULT;
308 // Socket into write mode
309 setWrite();
311 // Try to work the socket immediately
312 // workSocket();
314 return;
317 // In this case, the request was asynchronous and we should fall through
318 // right back into the read frame header state
319 goto LABEL_APP_INIT;
321 case APP_SEND_FRAME_SIZE:
323 // Refetch the result of the operation since we put the frame size into
324 // writeBuffer_
325 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
326 writeBufferPos_ = 0;
328 // Now in send result state
329 appState_ = APP_SEND_RESULT;
331 // Go to work on the socket right away, probably still writeable
332 // workSocket();
334 return;
336 case APP_SEND_RESULT:
338 // N.B.: We also intentionally fall through here into the INIT state!
340 LABEL_APP_INIT:
341 case APP_INIT:
343 // Clear write buffer variables
344 writeBuffer_ = NULL;
345 writeBufferPos_ = 0;
346 writeBufferSize_ = 0;
348 // Set up read buffer for getting 4 bytes
349 readBufferPos_ = 0;
350 readWant_ = 4;
352 // Into read4 state we go
353 socketState_ = SOCKET_RECV;
354 appState_ = APP_READ_FRAME_SIZE;
356 // Register read event
357 setRead();
359 // Try to work the socket right away
360 // workSocket();
362 return;
364 case APP_READ_FRAME_SIZE:
365 // We just read the request length, deserialize it
366 sz = *(int32_t*)readBuffer_;
367 sz = (int32_t)ntohl(sz);
369 if (sz <= 0) {
370 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
371 close();
372 return;
375 // Reset the read buffer
376 readWant_ = (uint32_t)sz;
377 readBufferPos_= 0;
379 // Move into read request state
380 appState_ = APP_READ_REQUEST;
382 // Work the socket right away
383 // workSocket();
385 return;
387 default:
388 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
389 assert(0);
393 void TConnection::setFlags(short eventFlags) {
394 // Catch the do nothing case
395 if (eventFlags_ == eventFlags) {
396 return;
399 // Delete a previously existing event
400 if (eventFlags_ != 0) {
401 if (event_del(&event_) == -1) {
402 GlobalOutput("TConnection::setFlags event_del");
403 return;
407 // Update in memory structure
408 eventFlags_ = eventFlags;
410 // Do not call event_set if there are no flags
411 if (!eventFlags_) {
412 return;
416 * event_set:
418 * Prepares the event structure &event to be used in future calls to
419 * event_add() and event_del(). The event will be prepared to call the
420 * eventHandler using the 'sock' file descriptor to monitor events.
422 * The events can be either EV_READ, EV_WRITE, or both, indicating
423 * that an application can read or write from the file respectively without
424 * blocking.
426 * The eventHandler will be called with the file descriptor that triggered
427 * the event and the type of event which will be one of: EV_TIMEOUT,
428 * EV_SIGNAL, EV_READ, EV_WRITE.
430 * The additional flag EV_PERSIST makes an event_add() persistent until
431 * event_del() has been called.
433 * Once initialized, the &event struct can be used repeatedly with
434 * event_add() and event_del() and does not need to be reinitialized unless
435 * the eventHandler and/or the argument to it are to be changed. However,
436 * when an ev structure has been added to libevent using event_add() the
437 * structure must persist until the event occurs (assuming EV_PERSIST
438 * is not set) or is removed using event_del(). You may not reuse the same
439 * ev structure for multiple monitored descriptors; each descriptor needs
440 * its own ev.
442 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
443 event_base_set(server_->getEventBase(), &event_);
445 // Add the event
446 if (event_add(&event_, 0) == -1) {
447 GlobalOutput("TConnection::setFlags(): could not event_add");
452 * Closes a connection
454 void TConnection::close() {
455 // Delete the registered libevent
456 if (event_del(&event_) == -1) {
457 GlobalOutput("TConnection::close() event_del");
460 // Close the socket
461 if (socket_ > 0) {
462 ::close(socket_);
464 socket_ = 0;
466 // close any factory produced transports
467 factoryInputTransport_->close();
468 factoryOutputTransport_->close();
470 // Give this object back to the server that owns it
471 server_->returnConnection(this);
475 * Creates a new connection either by reusing an object off the stack or
476 * by allocating a new one entirely
478 TConnection* TNonblockingServer::createConnection(int socket, short flags) {
479 // Check the stack
480 if (connectionStack_.empty()) {
481 return new TConnection(socket, flags, this);
482 } else {
483 TConnection* result = connectionStack_.top();
484 connectionStack_.pop();
485 result->init(socket, flags, this);
486 return result;
491 * Returns a connection to the stack
493 void TNonblockingServer::returnConnection(TConnection* connection) {
494 connectionStack_.push(connection);
498 * Server socket had something happen
500 void TNonblockingServer::handleEvent(int fd, short which) {
501 // Make sure that libevent didn't fuck up the socket handles
502 assert(fd == serverSocket_);
504 // Server socket accepted a new connection
505 socklen_t addrLen;
506 struct sockaddr addr;
507 addrLen = sizeof(addr);
509 // Going to accept a new client socket
510 int clientSocket;
512 // Accept as many new clients as possible, even though libevent signaled only
513 // one, this helps us to avoid having to go back into the libevent engine so
514 // many times
515 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
517 // Explicitly set this socket to NONBLOCK mode
518 int flags;
519 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
520 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
521 GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
522 close(clientSocket);
523 return;
526 // Create a new TConnection for this client socket.
527 TConnection* clientConnection =
528 createConnection(clientSocket, EV_READ | EV_PERSIST);
530 // Fail fast if we could not create a TConnection object
531 if (clientConnection == NULL) {
532 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
533 close(clientSocket);
534 return;
537 // Put this client connection into the proper state
538 clientConnection->transition();
541 // Done looping accept, now we have to make sure the error is due to
542 // blocking. Any other error is a problem
543 if (errno != EAGAIN && errno != EWOULDBLOCK) {
544 GlobalOutput("thriftServerEventHandler: accept()");
549 * Creates a socket to listen on and binds it to the local port.
551 void TNonblockingServer::listenSocket() {
552 int s;
553 struct addrinfo hints, *res, *res0;
554 int error;
556 char port[sizeof("65536") + 1];
557 memset(&hints, 0, sizeof(hints));
558 hints.ai_family = PF_UNSPEC;
559 hints.ai_socktype = SOCK_STREAM;
560 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
561 sprintf(port, "%d", port_);
563 // Wildcard address
564 error = getaddrinfo(NULL, port, &hints, &res0);
565 if (error) {
566 GlobalOutput("TNonblockingServer::serve() getaddrinfo");
567 return;
570 // Pick the ipv6 address first since ipv4 addresses can be mapped
571 // into ipv6 space.
572 for (res = res0; res; res = res->ai_next) {
573 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
574 break;
577 // Create the server socket
578 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
579 if (s == -1) {
580 freeaddrinfo(res0);
581 throw TException("TNonblockingServer::serve() socket() -1");
584 int one = 1;
586 // Set reuseaddr to avoid 2MSL delay on server restart
587 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
589 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
590 close(s);
591 freeaddrinfo(res0);
592 throw TException("TNonblockingServer::serve() bind");
595 // Done with the addr info
596 freeaddrinfo(res0);
598 // Set up this file descriptor for listening
599 listenSocket(s);
603 * Takes a socket created by listenSocket() and sets various options on it
604 * to prepare for use in the server.
606 void TNonblockingServer::listenSocket(int s) {
607 // Set socket to nonblocking mode
608 int flags;
609 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
610 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
611 close(s);
612 throw TException("TNonblockingServer::serve() O_NONBLOCK");
615 int one = 1;
616 struct linger ling = {0, 0};
618 // Keepalive to ensure full result flushing
619 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
621 // Turn linger off to avoid hung sockets
622 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
624 // Set TCP nodelay if available, MAC OS X Hack
625 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
626 #ifndef TCP_NOPUSH
627 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
628 #endif
630 if (listen(s, LISTEN_BACKLOG) == -1) {
631 close(s);
632 throw TException("TNonblockingServer::serve() listen");
635 // Cool, this socket is good to go, set it as the serverSocket_
636 serverSocket_ = s;
640 * Register the core libevent events onto the proper base.
642 void TNonblockingServer::registerEvents(event_base* base) {
643 assert(serverSocket_ != -1);
644 assert(!eventBase_);
645 eventBase_ = base;
647 // Print some libevent stats
648 fprintf(stderr,
649 "libevent %s method %s\n",
650 event_get_version(),
651 event_get_method());
653 // Register the server event
654 event_set(&serverEvent_,
655 serverSocket_,
656 EV_READ | EV_PERSIST,
657 TNonblockingServer::eventHandler,
658 this);
659 event_base_set(eventBase_, &serverEvent_);
661 // Add the event and start up the server
662 if (-1 == event_add(&serverEvent_, 0)) {
663 throw TException("TNonblockingServer::serve(): coult not event_add");
668 * Main workhorse function, starts up the server listening on a port and
669 * loops over the libevent handler.
671 void TNonblockingServer::serve() {
672 // Init socket
673 listenSocket();
675 // Initialize libevent core
676 registerEvents(static_cast<event_base*>(event_init()));
678 // Run the preServe event
679 if (eventHandler_ != NULL) {
680 eventHandler_->preServe();
683 // Run libevent engine, never returns, invokes calls to eventHandler
684 event_base_loop(eventBase_, 0);
687 }}} // facebook::thrift::server