r1361@opsdev009 (orig r71612): mcslee | 2007-11-27 17:51:43 -0800
[amiethrift.git] / lib / cpp / src / server / TNonblockingServer.h
blob84833067860e96fd7ffa618235ef7ebef817469f
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
10 #include <Thrift.h>
11 #include <server/TServer.h>
12 #include <transport/TTransportUtils.h>
13 #include <concurrency/ThreadManager.h>
14 #include <stack>
15 #include <event.h>
17 namespace facebook { namespace thrift { namespace server {
19 using facebook::thrift::transport::TMemoryBuffer;
20 using facebook::thrift::protocol::TProtocol;
21 using facebook::thrift::concurrency::Runnable;
22 using facebook::thrift::concurrency::ThreadManager;
24 // Forward declaration of class
25 class TConnection;
27 /**
28 * This is a non-blocking server in C++ for high performance that operates a
29 * single IO thread. It assumes that all incoming requests are framed with a
30 * 4 byte length indicator and writes out responses using the same framing.
32 * It does not use the TServerTransport framework, but rather has socket
33 * operations hardcoded for use with select.
35 * @author Mark Slee <mcslee@facebook.com>
37 class TNonblockingServer : public TServer {
38 private:
40 // Listen backlog
41 static const int LISTEN_BACKLOG = 1024;
43 // Server socket file descriptor
44 int serverSocket_;
46 // Port server runs on
47 int port_;
49 // Whether to frame responses
50 bool frameResponses_;
52 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
58 // The event base for libevent
59 event_base* eventBase_;
61 // Event struct, for use with eventBase_
62 struct event serverEvent_;
64 /**
65 * This is a stack of all the objects that have been created but that
66 * are NOT currently in use. When we close a connection, we place it on this
67 * stack so that the object can be reused later, rather than freeing the
68 * memory and reallocating a new object later.
70 std::stack<TConnection*> connectionStack_;
72 void handleEvent(int fd, short which);
74 public:
75 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
76 int port) :
77 TServer(processor),
78 serverSocket_(-1),
79 port_(port),
80 frameResponses_(true),
81 threadPoolProcessing_(false),
82 eventBase_(NULL) {}
84 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
85 boost::shared_ptr<TProtocolFactory> protocolFactory,
86 int port,
87 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
88 TServer(processor),
89 serverSocket_(-1),
90 port_(port),
91 frameResponses_(true),
92 threadManager_(threadManager),
93 eventBase_(NULL) {
94 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
95 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
96 setInputProtocolFactory(protocolFactory);
97 setOutputProtocolFactory(protocolFactory);
98 setThreadManager(threadManager);
101 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
102 boost::shared_ptr<TTransportFactory> inputTransportFactory,
103 boost::shared_ptr<TTransportFactory> outputTransportFactory,
104 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
105 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
106 int port,
107 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
108 TServer(processor),
109 serverSocket_(0),
110 port_(port),
111 frameResponses_(true),
112 threadManager_(threadManager) {
113 setInputTransportFactory(inputTransportFactory);
114 setOutputTransportFactory(outputTransportFactory);
115 setInputProtocolFactory(inputProtocolFactory);
116 setOutputProtocolFactory(outputProtocolFactory);
117 setThreadManager(threadManager);
120 ~TNonblockingServer() {}
122 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
123 threadManager_ = threadManager;
124 threadPoolProcessing_ = (threadManager != NULL);
127 bool isThreadPoolProcessing() const {
128 return threadPoolProcessing_;
131 void addTask(boost::shared_ptr<Runnable> task) {
132 threadManager_->add(task);
135 void setFrameResponses(bool frameResponses) {
136 frameResponses_ = frameResponses;
139 bool getFrameResponses() const {
140 return frameResponses_;
143 event_base* getEventBase() const {
144 return eventBase_;
147 TConnection* createConnection(int socket, short flags);
149 void returnConnection(TConnection* connection);
151 static void eventHandler(int fd, short which, void* v) {
152 ((TNonblockingServer*)v)->handleEvent(fd, which);
155 void listenSocket();
157 void listenSocket(int fd);
159 void registerEvents(event_base* base);
161 void serve();
166 * Two states for sockets, recv and send mode
168 enum TSocketState {
169 SOCKET_RECV,
170 SOCKET_SEND
174 * Four states for the nonblocking servr:
175 * 1) initialize
176 * 2) read 4 byte frame size
177 * 3) read frame of data
178 * 4) send back data (if any)
180 enum TAppState {
181 APP_INIT,
182 APP_READ_FRAME_SIZE,
183 APP_READ_REQUEST,
184 APP_WAIT_TASK,
185 APP_SEND_FRAME_SIZE,
186 APP_SEND_RESULT
190 * Represents a connection that is handled via libevent. This connection
191 * essentially encapsulates a socket that has some associated libevent state.
193 class TConnection {
194 private:
196 class Task;
198 // Server handle
199 TNonblockingServer* server_;
201 // Socket handle
202 int socket_;
204 // Libevent object
205 struct event event_;
207 // Libevent flags
208 short eventFlags_;
210 // Socket mode
211 TSocketState socketState_;
213 // Application state
214 TAppState appState_;
216 // How much data needed to read
217 uint32_t readWant_;
219 // Where in the read buffer are we
220 uint32_t readBufferPos_;
222 // Read buffer
223 uint8_t* readBuffer_;
225 // Read buffer size
226 uint32_t readBufferSize_;
228 // Write buffer
229 uint8_t* writeBuffer_;
231 // Write buffer size
232 uint32_t writeBufferSize_;
234 // How far through writing are we?
235 uint32_t writeBufferPos_;
237 // Frame size
238 int32_t frameSize_;
240 // Task handle
241 int taskHandle_;
243 // Task event
244 struct event taskEvent_;
246 // Transport to read from
247 boost::shared_ptr<TMemoryBuffer> inputTransport_;
249 // Transport that processor writes to
250 boost::shared_ptr<TMemoryBuffer> outputTransport_;
252 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
253 boost::shared_ptr<TTransport> factoryInputTransport_;
254 boost::shared_ptr<TTransport> factoryOutputTransport_;
256 // Protocol decoder
257 boost::shared_ptr<TProtocol> inputProtocol_;
259 // Protocol encoder
260 boost::shared_ptr<TProtocol> outputProtocol_;
262 // Go into read mode
263 void setRead() {
264 setFlags(EV_READ | EV_PERSIST);
267 // Go into write mode
268 void setWrite() {
269 setFlags(EV_WRITE | EV_PERSIST);
272 // Set socket idle
273 void setIdle() {
274 setFlags(0);
277 // Set event flags
278 void setFlags(short eventFlags);
280 // Libevent handlers
281 void workSocket();
283 // Close this client and reset
284 void close();
286 public:
288 // Constructor
289 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
290 readBuffer_ = (uint8_t*)malloc(1024);
291 if (readBuffer_ == NULL) {
292 throw new facebook::thrift::TException("Out of memory.");
294 readBufferSize_ = 1024;
296 // Allocate input and output tranpsorts
297 // these only need to be allocated once per TConnection (they don't need to be
298 // reallocated on init() call)
299 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
300 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
302 init(socket, eventFlags, s);
305 // Initialize
306 void init(int socket, short eventFlags, TNonblockingServer *s);
308 // Transition into a new state
309 void transition();
311 // Handler wrapper
312 static void eventHandler(int fd, short which, void* v) {
313 assert(fd == ((TConnection*)v)->socket_);
314 ((TConnection*)v)->workSocket();
317 // Handler wrapper for task block
318 static void taskHandler(int fd, short which, void* v) {
319 assert(fd == ((TConnection*)v)->taskHandle_);
320 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
321 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
323 ((TConnection*)v)->transition();
328 }}} // facebook::thrift::server
330 #endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_