1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include "ThreadManager.h"
11 #include <boost/shared_ptr.hpp>
19 #endif //defined(DEBUG)
21 namespace facebook
{ namespace thrift
{ namespace concurrency
{
23 using boost::shared_ptr
;
24 using boost::dynamic_pointer_cast
;
29 * This class manages a pool of threads. It uses a ThreadFactory to create
30 * threads. It never actually creates or destroys worker threads, rather
31 * it maintains statistics on number of idle threads, number of active threads,
32 * task backlog, and average wait and service times.
37 class ThreadManager::Impl
: public ThreadManager
{
44 pendingTaskCountMax_(0),
45 state_(ThreadManager::UNINITIALIZED
) {}
51 void stop() { stopImpl(false); }
53 void join() { stopImpl(true); }
55 const ThreadManager::STATE
state() const {
59 shared_ptr
<ThreadFactory
> threadFactory() const {
60 Synchronized
s(monitor_
);
61 return threadFactory_
;
64 void threadFactory(shared_ptr
<ThreadFactory
> value
) {
65 Synchronized
s(monitor_
);
66 threadFactory_
= value
;
69 void addWorker(size_t value
);
71 void removeWorker(size_t value
);
73 size_t idleWorkerCount() const {
77 size_t workerCount() const {
78 Synchronized
s(monitor_
);
82 size_t pendingTaskCount() const {
83 Synchronized
s(monitor_
);
87 size_t totalTaskCount() const {
88 Synchronized
s(monitor_
);
89 return tasks_
.size() + workerCount_
- idleCount_
;
92 size_t pendingTaskCountMax() const {
93 Synchronized
s(monitor_
);
94 return pendingTaskCountMax_
;
97 void pendingTaskCountMax(const size_t value
) {
98 Synchronized
s(monitor_
);
99 pendingTaskCountMax_
= value
;
104 void add(shared_ptr
<Runnable
> value
, int64_t timeout
);
106 void remove(shared_ptr
<Runnable
> task
);
109 void stopImpl(bool join
);
112 size_t workerMaxCount_
;
114 size_t pendingTaskCountMax_
;
116 ThreadManager::STATE state_
;
117 shared_ptr
<ThreadFactory
> threadFactory_
;
120 friend class ThreadManager::Task
;
121 std::queue
<shared_ptr
<Task
> > tasks_
;
123 Monitor workerMonitor_
;
125 friend class ThreadManager::Worker
;
126 std::set
<shared_ptr
<Thread
> > workers_
;
127 std::set
<shared_ptr
<Thread
> > deadWorkers_
;
128 std::map
<const Thread::id_t
, shared_ptr
<Thread
> > idMap_
;
131 class ThreadManager::Task
: public Runnable
{
141 Task(shared_ptr
<Runnable
> runnable
) :
148 if (state_
== EXECUTING
) {
155 shared_ptr
<Runnable
> runnable_
;
156 friend class ThreadManager::Worker
;
160 class ThreadManager::Worker
: public Runnable
{
170 Worker(ThreadManager::Impl
* manager
) :
172 state_(UNINITIALIZED
),
178 bool isActive() const {
180 (manager_
->workerCount_
<= manager_
->workerMaxCount_
) ||
181 (manager_
->state_
== JOINING
&& !manager_
->tasks_
.empty());
188 * As long as worker thread is running, pull tasks off the task queue and
193 bool notifyManager
= false;
196 * Increment worker semaphore and notify manager if worker count reached
199 * Note: We have to release the monitor and acquire the workerMonitor
200 * since that is what the manager blocks on for worker add/remove
203 Synchronized
s(manager_
->monitor_
);
204 active
= manager_
->workerCount_
< manager_
->workerMaxCount_
;
206 manager_
->workerCount_
++;
207 notifyManager
= manager_
->workerCount_
== manager_
->workerMaxCount_
;
212 Synchronized
s(manager_
->workerMonitor_
);
213 manager_
->workerMonitor_
.notify();
214 notifyManager
= false;
218 shared_ptr
<ThreadManager::Task
> task
;
221 * While holding manager monitor block for non-empty task queue (Also
222 * check that the thread hasn't been requested to stop). Once the queue
223 * is non-empty, dequeue a task, release monitor, and execute. If the
224 * worker max count has been decremented such that we exceed it, mark
225 * ourself inactive, decrement the worker count and notify the manager
226 * (technically we're notifying the next blocked thread but eventually
227 * the manager will see it.
230 Synchronized
s(manager_
->monitor_
);
233 while (active
&& manager_
->tasks_
.empty()) {
234 manager_
->idleCount_
++;
236 manager_
->monitor_
.wait();
239 manager_
->idleCount_
--;
243 if (!manager_
->tasks_
.empty()) {
244 task
= manager_
->tasks_
.front();
245 manager_
->tasks_
.pop();
246 if (task
->state_
== ThreadManager::Task::WAITING
) {
247 task
->state_
= ThreadManager::Task::EXECUTING
;
250 /* If we have a pending task max and we just dropped below it, wakeup any
251 thread that might be blocked on add. */
252 if (manager_
->pendingTaskCountMax_
!= 0 &&
253 manager_
->tasks_
.size() == manager_
->pendingTaskCountMax_
- 1) {
254 manager_
->workerMonitor_
.notify();
259 manager_
->workerCount_
--;
260 notifyManager
= (manager_
->workerCount_
== manager_
->workerMaxCount_
);
265 if (task
->state_
== ThreadManager::Task::EXECUTING
) {
269 // XXX need to log this
276 Synchronized
s(manager_
->workerMonitor_
);
277 manager_
->deadWorkers_
.insert(this->thread());
279 manager_
->workerMonitor_
.notify();
287 ThreadManager::Impl
* manager_
;
288 friend class ThreadManager::Impl
;
294 void ThreadManager::Impl::addWorker(size_t value
) {
295 std::set
<shared_ptr
<Thread
> > newThreads
;
296 for (size_t ix
= 0; ix
< value
; ix
++) {
297 class ThreadManager::Worker
;
298 shared_ptr
<ThreadManager::Worker
> worker
= shared_ptr
<ThreadManager::Worker
>(new ThreadManager::Worker(this));
299 newThreads
.insert(threadFactory_
->newThread(worker
));
303 Synchronized
s(monitor_
);
304 workerMaxCount_
+= value
;
305 workers_
.insert(newThreads
.begin(), newThreads
.end());
308 for (std::set
<shared_ptr
<Thread
> >::iterator ix
= newThreads
.begin(); ix
!= newThreads
.end(); ix
++) {
309 shared_ptr
<ThreadManager::Worker
> worker
= dynamic_pointer_cast
<ThreadManager::Worker
, Runnable
>((*ix
)->runnable());
310 worker
->state_
= ThreadManager::Worker::STARTING
;
312 idMap_
.insert(std::pair
<const Thread::id_t
, shared_ptr
<Thread
> >((*ix
)->getId(), *ix
));
316 Synchronized
s(workerMonitor_
);
317 while (workerCount_
!= workerMaxCount_
) {
318 workerMonitor_
.wait();
323 void ThreadManager::Impl::start() {
325 if (state_
== ThreadManager::STOPPED
) {
330 Synchronized
s(monitor_
);
331 if (state_
== ThreadManager::UNINITIALIZED
) {
332 if (threadFactory_
== NULL
) {
333 throw InvalidArgumentException();
335 state_
= ThreadManager::STARTED
;
336 monitor_
.notifyAll();
339 while (state_
== STARTING
) {
345 void ThreadManager::Impl::stopImpl(bool join
) {
347 if (state_
== ThreadManager::STOPPED
) {
352 Synchronized
s(monitor_
);
353 if (state_
!= ThreadManager::STOPPING
&&
354 state_
!= ThreadManager::JOINING
&&
355 state_
!= ThreadManager::STOPPED
) {
357 state_
= join
? ThreadManager::JOINING
: ThreadManager::STOPPING
;
362 removeWorker(workerCount_
);
366 // should be able to block here for transition to STOPPED since we're no
370 Synchronized
s(monitor_
);
371 state_
= ThreadManager::STOPPED
;
376 void ThreadManager::Impl::removeWorker(size_t value
) {
377 std::set
<shared_ptr
<Thread
> > removedThreads
;
379 Synchronized
s(monitor_
);
380 if (value
> workerMaxCount_
) {
381 throw InvalidArgumentException();
384 workerMaxCount_
-= value
;
386 if (idleCount_
< value
) {
387 for (size_t ix
= 0; ix
< idleCount_
; ix
++) {
391 monitor_
.notifyAll();
396 Synchronized
s(workerMonitor_
);
398 while (workerCount_
!= workerMaxCount_
) {
399 workerMonitor_
.wait();
402 for (std::set
<shared_ptr
<Thread
> >::iterator ix
= deadWorkers_
.begin(); ix
!= deadWorkers_
.end(); ix
++) {
404 idMap_
.erase((*ix
)->getId());
407 deadWorkers_
.clear();
411 bool ThreadManager::Impl::canSleep() {
412 const Thread::id_t id
= threadFactory_
->getCurrentThreadId();
413 return idMap_
.find(id
) == idMap_
.end();
416 void ThreadManager::Impl::add(shared_ptr
<Runnable
> value
, int64_t timeout
) {
417 Synchronized
s(monitor_
);
419 if (state_
!= ThreadManager::STARTED
) {
420 throw IllegalStateException();
423 if (pendingTaskCountMax_
> 0 && (tasks_
.size() >= pendingTaskCountMax_
)) {
424 if (canSleep() && timeout
>= 0) {
425 while (pendingTaskCountMax_
> 0 && tasks_
.size() >= pendingTaskCountMax_
) {
426 monitor_
.wait(timeout
);
429 throw TooManyPendingTasksException();
433 tasks_
.push(shared_ptr
<ThreadManager::Task
>(new ThreadManager::Task(value
)));
435 // If idle thread is available notify it, otherwise all worker threads are
436 // running and will get around to this task in time.
437 if (idleCount_
> 0) {
442 void ThreadManager::Impl::remove(shared_ptr
<Runnable
> task
) {
443 Synchronized
s(monitor_
);
444 if (state_
!= ThreadManager::STARTED
) {
445 throw IllegalStateException();
449 class SimpleThreadManager
: public ThreadManager::Impl
{
452 SimpleThreadManager(size_t workerCount
=4, size_t pendingTaskCountMax
=0) :
453 workerCount_(workerCount
),
454 pendingTaskCountMax_(pendingTaskCountMax
),
459 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_
);
460 ThreadManager::Impl::start();
461 addWorker(workerCount_
);
465 const size_t workerCount_
;
466 const size_t pendingTaskCountMax_
;
472 shared_ptr
<ThreadManager
> ThreadManager::newThreadManager() {
473 return shared_ptr
<ThreadManager
>(new ThreadManager::Impl());
476 shared_ptr
<ThreadManager
> ThreadManager::newSimpleThreadManager(size_t count
, size_t pendingTaskCountMax
) {
477 return shared_ptr
<ThreadManager
>(new SimpleThreadManager(count
, pendingTaskCountMax
));
480 }}} // facebook::thrift::concurrency