r1467@opsdev009 (orig r77715): mcslee | 2008-01-14 18:59:12 -0800
[amiethrift.git] / lib / java / src / server / TThreadPoolServer.java
blobd1b5e0907581a6ab3d116877793753797b4fbb4a
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 package com.facebook.thrift.server;
9 import com.facebook.thrift.TException;
10 import com.facebook.thrift.TProcessor;
11 import com.facebook.thrift.TProcessorFactory;
12 import com.facebook.thrift.protocol.TProtocol;
13 import com.facebook.thrift.protocol.TProtocolFactory;
14 import com.facebook.thrift.protocol.TBinaryProtocol;
15 import com.facebook.thrift.transport.TServerTransport;
16 import com.facebook.thrift.transport.TTransport;
17 import com.facebook.thrift.transport.TTransportException;
18 import com.facebook.thrift.transport.TTransportFactory;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.SynchronousQueue;
23 import java.util.concurrent.ThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
27 /**
28 * Server which uses Java's built in ThreadPool management to spawn off
29 * a worker pool that
31 * @author Mark Slee <mcslee@facebook.com>
33 public class TThreadPoolServer extends TServer {
35 // Executor service for handling client connections
36 private ExecutorService executorService_;
38 // Flag for stopping the server
39 private volatile boolean stopped_;
41 // Server options
42 private Options options_;
44 // Customizable server options
45 public static class Options {
46 public int minWorkerThreads = 5;
47 public int maxWorkerThreads = Integer.MAX_VALUE;
48 public int stopTimeoutVal = 60;
49 public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
52 public TThreadPoolServer(TProcessor processor,
53 TServerTransport serverTransport) {
54 this(processor, serverTransport,
55 new TTransportFactory(), new TTransportFactory(),
56 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
59 public TThreadPoolServer(TProcessorFactory processorFactory,
60 TServerTransport serverTransport) {
61 this(processorFactory, serverTransport,
62 new TTransportFactory(), new TTransportFactory(),
63 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
66 public TThreadPoolServer(TProcessor processor,
67 TServerTransport serverTransport,
68 TProtocolFactory protocolFactory) {
69 this(processor, serverTransport,
70 new TTransportFactory(), new TTransportFactory(),
71 protocolFactory, protocolFactory);
74 public TThreadPoolServer(TProcessor processor,
75 TServerTransport serverTransport,
76 TTransportFactory transportFactory,
77 TProtocolFactory protocolFactory) {
78 this(processor, serverTransport,
79 transportFactory, transportFactory,
80 protocolFactory, protocolFactory);
83 public TThreadPoolServer(TProcessorFactory processorFactory,
84 TServerTransport serverTransport,
85 TTransportFactory transportFactory,
86 TProtocolFactory protocolFactory) {
87 this(processorFactory, serverTransport,
88 transportFactory, transportFactory,
89 protocolFactory, protocolFactory);
92 public TThreadPoolServer(TProcessor processor,
93 TServerTransport serverTransport,
94 TTransportFactory inputTransportFactory,
95 TTransportFactory outputTransportFactory,
96 TProtocolFactory inputProtocolFactory,
97 TProtocolFactory outputProtocolFactory) {
98 this(new TProcessorFactory(processor), serverTransport,
99 inputTransportFactory, outputTransportFactory,
100 inputProtocolFactory, outputProtocolFactory);
103 public TThreadPoolServer(TProcessorFactory processorFactory,
104 TServerTransport serverTransport,
105 TTransportFactory inputTransportFactory,
106 TTransportFactory outputTransportFactory,
107 TProtocolFactory inputProtocolFactory,
108 TProtocolFactory outputProtocolFactory) {
109 super(processorFactory, serverTransport,
110 inputTransportFactory, outputTransportFactory,
111 inputProtocolFactory, outputProtocolFactory);
112 options_ = new Options();
113 executorService_ = Executors.newCachedThreadPool();
116 public TThreadPoolServer(TProcessor processor,
117 TServerTransport serverTransport,
118 TTransportFactory inputTransportFactory,
119 TTransportFactory outputTransportFactory,
120 TProtocolFactory inputProtocolFactory,
121 TProtocolFactory outputProtocolFactory,
122 Options options) {
123 this(new TProcessorFactory(processor), serverTransport,
124 inputTransportFactory, outputTransportFactory,
125 inputProtocolFactory, outputProtocolFactory,
126 options);
129 public TThreadPoolServer(TProcessorFactory processorFactory,
130 TServerTransport serverTransport,
131 TTransportFactory inputTransportFactory,
132 TTransportFactory outputTransportFactory,
133 TProtocolFactory inputProtocolFactory,
134 TProtocolFactory outputProtocolFactory,
135 Options options) {
136 super(processorFactory, serverTransport,
137 inputTransportFactory, outputTransportFactory,
138 inputProtocolFactory, outputProtocolFactory);
140 executorService_ = null;
142 SynchronousQueue<Runnable> executorQueue =
143 new SynchronousQueue<Runnable>();
145 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
146 options.maxWorkerThreads,
148 TimeUnit.SECONDS,
149 executorQueue);
151 options_ = options;
155 public void serve() {
156 try {
157 serverTransport_.listen();
158 } catch (TTransportException ttx) {
159 ttx.printStackTrace();
160 return;
163 stopped_ = false;
164 while (!stopped_) {
165 int failureCount = 0;
166 try {
167 TTransport client = serverTransport_.accept();
168 WorkerProcess wp = new WorkerProcess(client);
169 executorService_.execute(wp);
170 } catch (TTransportException ttx) {
171 if (!stopped_) {
172 ++failureCount;
173 ttx.printStackTrace();
178 executorService_.shutdown();
179 try {
180 executorService_.awaitTermination(options_.stopTimeoutVal,
181 options_.stopTimeoutUnit);
182 } catch (InterruptedException ix) {
183 // Ignore and more on
187 public void stop() {
188 stopped_ = true;
189 serverTransport_.interrupt();
192 private class WorkerProcess implements Runnable {
195 * Client that this services.
197 private TTransport client_;
200 * Default constructor.
202 * @param client Transport to process
204 private WorkerProcess(TTransport client) {
205 client_ = client;
209 * Loops on processing a client forever
211 public void run() {
212 TProcessor processor = null;
213 TTransport inputTransport = null;
214 TTransport outputTransport = null;
215 TProtocol inputProtocol = null;
216 TProtocol outputProtocol = null;
217 try {
218 processor = processorFactory_.getProcessor(client_);
219 inputTransport = inputTransportFactory_.getTransport(client_);
220 outputTransport = outputTransportFactory_.getTransport(client_);
221 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
222 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
223 while (processor.process(inputProtocol, outputProtocol)) {}
224 } catch (TTransportException ttx) {
225 // Assume the client died and continue silently
226 } catch (TException tx) {
227 tx.printStackTrace();
228 } catch (Exception x) {
229 x.printStackTrace();
232 if (inputTransport != null) {
233 inputTransport.close();
236 if (outputTransport != null) {
237 outputTransport.close();