r1431@opsdev009 (orig r75739): aditya | 2007-12-25 14:58:50 -0800
[amiethrift.git] / lib / cpp / src / concurrency / ThreadManager.cpp
blob604602e04aa12242839f24ea57973ec4d49f6b63
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include "ThreadManager.h"
8 #include "Exception.h"
9 #include "Monitor.h"
11 #include <boost/shared_ptr.hpp>
13 #include <assert.h>
14 #include <queue>
15 #include <set>
17 #if defined(DEBUG)
18 #include <iostream>
19 #endif //defined(DEBUG)
21 namespace facebook { namespace thrift { namespace concurrency {
23 using boost::shared_ptr;
24 using boost::dynamic_pointer_cast;
26 /**
27 * ThreadManager class
29 * This class manages a pool of threads. It uses a ThreadFactory to create
30 * threads. It never actually creates or destroys worker threads, rather
31 * it maintains statistics on number of idle threads, number of active threads,
32 * task backlog, and average wait and service times.
34 * @author marc
35 * @version $Id:$
37 class ThreadManager::Impl : public ThreadManager {
39 public:
40 Impl() :
41 workerCount_(0),
42 workerMaxCount_(0),
43 idleCount_(0),
44 pendingTaskCountMax_(0),
45 state_(ThreadManager::UNINITIALIZED) {}
47 ~Impl() { stop(); }
49 void start();
51 void stop() { stopImpl(false); }
53 void join() { stopImpl(true); }
55 const ThreadManager::STATE state() const {
56 return state_;
59 shared_ptr<ThreadFactory> threadFactory() const {
60 Synchronized s(monitor_);
61 return threadFactory_;
64 void threadFactory(shared_ptr<ThreadFactory> value) {
65 Synchronized s(monitor_);
66 threadFactory_ = value;
69 void addWorker(size_t value);
71 void removeWorker(size_t value);
73 size_t idleWorkerCount() const {
74 return idleCount_;
77 size_t workerCount() const {
78 Synchronized s(monitor_);
79 return workerCount_;
82 size_t pendingTaskCount() const {
83 Synchronized s(monitor_);
84 return tasks_.size();
87 size_t totalTaskCount() const {
88 Synchronized s(monitor_);
89 return tasks_.size() + workerCount_ - idleCount_;
92 size_t pendingTaskCountMax() const {
93 Synchronized s(monitor_);
94 return pendingTaskCountMax_;
97 void pendingTaskCountMax(const size_t value) {
98 Synchronized s(monitor_);
99 pendingTaskCountMax_ = value;
102 bool canSleep();
104 void add(shared_ptr<Runnable> value, int64_t timeout);
106 void remove(shared_ptr<Runnable> task);
108 private:
109 void stopImpl(bool join);
111 size_t workerCount_;
112 size_t workerMaxCount_;
113 size_t idleCount_;
114 size_t pendingTaskCountMax_;
116 ThreadManager::STATE state_;
117 shared_ptr<ThreadFactory> threadFactory_;
120 friend class ThreadManager::Task;
121 std::queue<shared_ptr<Task> > tasks_;
122 Monitor monitor_;
123 Monitor workerMonitor_;
125 friend class ThreadManager::Worker;
126 std::set<shared_ptr<Thread> > workers_;
127 std::set<shared_ptr<Thread> > deadWorkers_;
128 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
131 class ThreadManager::Task : public Runnable {
133 public:
134 enum STATE {
135 WAITING,
136 EXECUTING,
137 CANCELLED,
138 COMPLETE
141 Task(shared_ptr<Runnable> runnable) :
142 runnable_(runnable),
143 state_(WAITING) {}
145 ~Task() {}
147 void run() {
148 if (state_ == EXECUTING) {
149 runnable_->run();
150 state_ = COMPLETE;
154 private:
155 shared_ptr<Runnable> runnable_;
156 friend class ThreadManager::Worker;
157 STATE state_;
160 class ThreadManager::Worker: public Runnable {
161 enum STATE {
162 UNINITIALIZED,
163 STARTING,
164 STARTED,
165 STOPPING,
166 STOPPED
169 public:
170 Worker(ThreadManager::Impl* manager) :
171 manager_(manager),
172 state_(UNINITIALIZED),
173 idle_(false) {}
175 ~Worker() {}
177 private:
178 bool isActive() const {
179 return
180 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
181 (manager_->state_ == JOINING && !manager_->tasks_.empty());
184 public:
186 * Worker entry point
188 * As long as worker thread is running, pull tasks off the task queue and
189 * execute.
191 void run() {
192 bool active = false;
193 bool notifyManager = false;
196 * Increment worker semaphore and notify manager if worker count reached
197 * desired max
199 * Note: We have to release the monitor and acquire the workerMonitor
200 * since that is what the manager blocks on for worker add/remove
203 Synchronized s(manager_->monitor_);
204 active = manager_->workerCount_ < manager_->workerMaxCount_;
205 if (active) {
206 manager_->workerCount_++;
207 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
211 if (notifyManager) {
212 Synchronized s(manager_->workerMonitor_);
213 manager_->workerMonitor_.notify();
214 notifyManager = false;
217 while (active) {
218 shared_ptr<ThreadManager::Task> task;
221 * While holding manager monitor block for non-empty task queue (Also
222 * check that the thread hasn't been requested to stop). Once the queue
223 * is non-empty, dequeue a task, release monitor, and execute. If the
224 * worker max count has been decremented such that we exceed it, mark
225 * ourself inactive, decrement the worker count and notify the manager
226 * (technically we're notifying the next blocked thread but eventually
227 * the manager will see it.
230 Synchronized s(manager_->monitor_);
231 active = isActive();
233 while (active && manager_->tasks_.empty()) {
234 manager_->idleCount_++;
235 idle_ = true;
236 manager_->monitor_.wait();
237 active = isActive();
238 idle_ = false;
239 manager_->idleCount_--;
242 if (active) {
243 if (!manager_->tasks_.empty()) {
244 task = manager_->tasks_.front();
245 manager_->tasks_.pop();
246 if (task->state_ == ThreadManager::Task::WAITING) {
247 task->state_ = ThreadManager::Task::EXECUTING;
250 /* If we have a pending task max and we just dropped below it, wakeup any
251 thread that might be blocked on add. */
252 if (manager_->pendingTaskCountMax_ != 0 &&
253 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
254 manager_->workerMonitor_.notify();
257 } else {
258 idle_ = true;
259 manager_->workerCount_--;
260 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
264 if (task != NULL) {
265 if (task->state_ == ThreadManager::Task::EXECUTING) {
266 try {
267 task->run();
268 } catch(...) {
269 // XXX need to log this
276 Synchronized s(manager_->workerMonitor_);
277 manager_->deadWorkers_.insert(this->thread());
278 if (notifyManager) {
279 manager_->workerMonitor_.notify();
283 return;
286 private:
287 ThreadManager::Impl* manager_;
288 friend class ThreadManager::Impl;
289 STATE state_;
290 bool idle_;
294 void ThreadManager::Impl::addWorker(size_t value) {
295 std::set<shared_ptr<Thread> > newThreads;
296 for (size_t ix = 0; ix < value; ix++) {
297 class ThreadManager::Worker;
298 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
299 newThreads.insert(threadFactory_->newThread(worker));
303 Synchronized s(monitor_);
304 workerMaxCount_ += value;
305 workers_.insert(newThreads.begin(), newThreads.end());
308 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
309 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
310 worker->state_ = ThreadManager::Worker::STARTING;
311 (*ix)->start();
312 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
316 Synchronized s(workerMonitor_);
317 while (workerCount_ != workerMaxCount_) {
318 workerMonitor_.wait();
323 void ThreadManager::Impl::start() {
325 if (state_ == ThreadManager::STOPPED) {
326 return;
330 Synchronized s(monitor_);
331 if (state_ == ThreadManager::UNINITIALIZED) {
332 if (threadFactory_ == NULL) {
333 throw InvalidArgumentException();
335 state_ = ThreadManager::STARTED;
336 monitor_.notifyAll();
339 while (state_ == STARTING) {
340 monitor_.wait();
345 void ThreadManager::Impl::stopImpl(bool join) {
346 bool doStop = false;
347 if (state_ == ThreadManager::STOPPED) {
348 return;
352 Synchronized s(monitor_);
353 if (state_ != ThreadManager::STOPPING &&
354 state_ != ThreadManager::JOINING &&
355 state_ != ThreadManager::STOPPED) {
356 doStop = true;
357 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
361 if (doStop) {
362 removeWorker(workerCount_);
365 // XXX
366 // should be able to block here for transition to STOPPED since we're no
367 // using shared_ptrs
370 Synchronized s(monitor_);
371 state_ = ThreadManager::STOPPED;
376 void ThreadManager::Impl::removeWorker(size_t value) {
377 std::set<shared_ptr<Thread> > removedThreads;
379 Synchronized s(monitor_);
380 if (value > workerMaxCount_) {
381 throw InvalidArgumentException();
384 workerMaxCount_ -= value;
386 if (idleCount_ < value) {
387 for (size_t ix = 0; ix < idleCount_; ix++) {
388 monitor_.notify();
390 } else {
391 monitor_.notifyAll();
396 Synchronized s(workerMonitor_);
398 while (workerCount_ != workerMaxCount_) {
399 workerMonitor_.wait();
402 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
403 workers_.erase(*ix);
404 idMap_.erase((*ix)->getId());
407 deadWorkers_.clear();
411 bool ThreadManager::Impl::canSleep() {
412 const Thread::id_t id = threadFactory_->getCurrentThreadId();
413 return idMap_.find(id) == idMap_.end();
416 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
417 Synchronized s(monitor_);
419 if (state_ != ThreadManager::STARTED) {
420 throw IllegalStateException();
423 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
424 if (canSleep() && timeout >= 0) {
425 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
426 monitor_.wait(timeout);
428 } else {
429 throw TooManyPendingTasksException();
433 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
435 // If idle thread is available notify it, otherwise all worker threads are
436 // running and will get around to this task in time.
437 if (idleCount_ > 0) {
438 monitor_.notify();
442 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
443 Synchronized s(monitor_);
444 if (state_ != ThreadManager::STARTED) {
445 throw IllegalStateException();
449 class SimpleThreadManager : public ThreadManager::Impl {
451 public:
452 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
453 workerCount_(workerCount),
454 pendingTaskCountMax_(pendingTaskCountMax),
455 firstTime_(true) {
458 void start() {
459 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
460 ThreadManager::Impl::start();
461 addWorker(workerCount_);
464 private:
465 const size_t workerCount_;
466 const size_t pendingTaskCountMax_;
467 bool firstTime_;
468 Monitor monitor_;
472 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
473 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
476 shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
477 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
480 }}} // facebook::thrift::concurrency