2 * Copyright (c) 2005, Eric Crahen
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 #include "ThreadImpl.h"
24 #include "zthread/PoolExecutor.h"
25 #include "zthread/MonitoredQueue.h"
26 #include "zthread/FastMutex.h"
27 #include "ThreadImpl.h"
28 #include "ThreadQueue.h"
34 using namespace ZThread
;
44 typedef std::deque
<ThreadImpl
*> ThreadList
;
46 typedef struct group_t
{
50 group_t(size_t n
) : id(n
), count(0) {}
53 typedef std::deque
<Group
> GroupList
;
55 //! Predicate to find a specific group
56 struct by_id
: public std::unary_function
<bool, Group
> {
58 by_id(size_t n
) : id(n
) {}
59 bool operator()(const Group
& grp
) {
64 //! Functor to count groups
65 struct counter
: public std::unary_function
<void, Group
> {
67 counter() : count(0) {}
68 void operator()(const Group
& grp
) { count
+= grp
.count
; }
69 operator size_t() { return count
; }
79 WaiterQueue() : _id(0), _generation(0) {
80 // At least one empty-group exists
81 _list
.push_back( Group(_id
++) );
85 * Insert the current thread into the current waiter list
87 * @pre At least one empty group exists
88 * @post At least one empty group exists
90 bool wait(unsigned long timeout
) {
92 ThreadImpl
* current
= ThreadImpl::current();
93 Monitor
& m
= current
->getMonitor();
97 Guard
<FastMutex
> g1(_lock
);
99 // At least one empty-group exists
100 assert(!_list
.empty());
102 // Return w/o waiting if there are no executing tasks
103 if((size_t)std::for_each(_list
.begin(), _list
.end(), counter()) < 1)
106 // Update the waiter list for the active group
107 _list
.back().waiters
.push_back(current
);
108 size_t n
= _list
.back().id
;
114 Guard
<FastMutex
, UnlockedScope
> g2(g1
);
115 state
= timeout
== 0 ? m
.wait() : m
.wait(timeout
);
121 // If awoke due to a reason other than the last task in the group 'n' completing,
122 // then then find the group 'current' is waiting in
123 GroupList::iterator i
= std::find_if(_list
.begin(), _list
.end(), by_id(n
));
124 if(i
!= _list
.end()) {
126 // Remove 'current' from that list if it is still a member
127 ThreadList::iterator j
= std::find(i
->waiters
.begin(), i
->waiters
.end(), current
);
128 if(j
!= i
->waiters
.end())
133 // At least one empty-group exists
134 assert(!_list
.empty());
137 case Monitor::SIGNALED
:
139 case Monitor::TIMEDOUT
:
141 case Monitor::INTERRUPTED
:
142 throw Interrupted_Exception();
144 throw Synchronization_Exception();
152 * Increment the active group count
154 * @pre at least 1 empty group exists
155 * @post at least 1 non-empty group exists
157 std::pair
<size_t, size_t> increment() {
159 Guard
<FastMutex
> g(_lock
);
161 // At least one empty-group exists
162 assert(!_list
.empty());
164 GroupList::iterator i
= --_list
.end();
167 if(i
== _list
.end()) {
169 // A group should never have been removed until
170 // the final task in that group completed
177 // When the active group is being incremented, insert a new active group
178 // to replace it if there were waiting threads
179 if(i
== --_list
.end() && !i
->waiters
.empty())
180 _list
.push_back(Group(_id
++));
182 // At least 1 non-empty group exists
183 assert((size_t)std::for_each(_list
.begin(), _list
.end(), counter()) > 0);
185 return std::make_pair(n
, _generation
);
191 * Decrease the count for the group with the given id.
195 * @pre At least 1 non-empty group exists
196 * @post At least 1 empty group exists
198 void decrement(size_t n
) {
200 Guard
<FastMutex
> g1(_lock
);
202 // At least 1 non-empty group exists
203 assert((size_t)std::for_each(_list
.begin(), _list
.end(), counter()) > 0);
205 // Find the requested group
206 GroupList::iterator i
= std::find_if(_list
.begin(), _list
.end(), by_id(n
));
207 if(i
== _list
.end()) {
209 // A group should never have been removed until
210 // the final task in that group completed
215 // Decrease the count for tasks in this group,
216 if(--i
->count
== 0 && i
== _list
.begin()) {
220 // When the first group completes, wake all waiters for every
221 // group, starting from the first until a group that is not
222 // complete is reached
225 // Don't remove the empty active group
226 if(i == --_list.end() && i->waiters.empty())
232 // If all waiters were awakened, remove the group
239 // Otherwise, unlock and yield allowing the waiter
240 // lists to be updated if other threads are busy
241 Guard
<FastLock
, UnlockedScope
> g2(g1
);
250 } while(i
!= _list
.end() && i
->count
== 0);
252 // Ensure that an active group exists
254 _list
.push_back( Group(++_id
) );
258 // At least one group exists
259 assert(!_list
.empty());
265 size_t generation(bool next
= false) {
267 Guard
<FastMutex
> g(_lock
);
268 return next
? _generation
++ : _generation
;
275 * Awaken all the waiters remaining in the given group
278 * - true if all the waiting threads were successfully awakened.
279 * - false if there were one or more threads that could not be awakened.
281 bool awaken(Group
& grp
) {
283 // Go through the waiter list in the given group;
284 for(ThreadList::iterator i
= grp
.waiters
.begin(); i
!= grp
.waiters
.end();) {
286 ThreadImpl
* impl
= *i
;
287 Monitor
& m
= impl
->getMonitor();
289 // Try the monitor lock, if it cant be locked skip to the next waiter
292 // Notify the monitor & remove from the waiter list so time isn't
293 // wasted checking it again.
294 i
= grp
.waiters
.erase(i
);
296 // Try to wake the waiter, it doesn't matter if this is successful
297 // or not (only fails when the monitor is already going to stop waiting).
305 return grp
.waiters
.empty();
312 * @class GroupedRunnable
314 * Wrap a task with group and generation information.
316 * - 'group' allows tasks to be grouped together so that lists of waiting
317 * threads can be managed.
319 * - 'generation' allows tasks to be interrupted
321 class GroupedRunnable
: public Runnable
{
331 GroupedRunnable(const Task
& task
, WaiterQueue
& queue
)
332 : _task(task
), _queue(queue
) {
334 std::pair
<size_t, size_t> pr( _queue
.increment() );
337 _generation
= pr
.second
;
341 size_t group() const {
345 size_t generation() const {
359 _queue
.decrement( group() );
365 typedef CountedPtr
<GroupedRunnable
, size_t> ExecutorTask
;
372 typedef MonitoredQueue
<ExecutorTask
, FastMutex
> TaskQueue
;
373 typedef std::deque
<ThreadImpl
*> ThreadList
;
375 TaskQueue _taskQueue
;
376 WaiterQueue _waitingQueue
;
379 volatile size_t _size
;
384 ExecutorImpl() : _size(0) {}
387 void registerThread() {
389 Guard
<TaskQueue
> g(_taskQueue
);
391 ThreadImpl
* impl
= ThreadImpl::current();
392 _threads
.push_back(impl
);
394 // current cancel if too many threads are being created
395 if(_threads
.size() > _size
)
400 void unregisterThread() {
402 Guard
<TaskQueue
> g(_taskQueue
);
403 std::remove(_threads
.begin(), _threads
.end(), ThreadImpl::current());
407 void execute(const Task
& task
) {
409 // Wrap the task with a grouped task
410 GroupedRunnable
* runnable
= new GroupedRunnable(task
, _waitingQueue
);
414 _taskQueue
.add( ExecutorTask(runnable
) );
418 // Incase the queue is canceled between the time the WaiterQueue is
419 // updated and the task is added to the TaskQueue
420 _waitingQueue
.decrement( runnable
->group() );
429 // Bump the generation number
430 _waitingQueue
.generation(true);
432 Guard
<TaskQueue
> g(_taskQueue
);
434 // Interrupt all threads currently running, thier tasks would be
435 // from an older generation
436 for(ThreadList::iterator i
= _threads
.begin(); i
!= _threads
.end(); ++i
)
441 //! Adjust the number of desired workers and return the number of Threads needed
442 size_t workers(size_t n
) {
444 Guard
<TaskQueue
> g(_taskQueue
);
446 size_t m
= (_size
< n
) ? (n
- _size
) : 0;
455 Guard
<TaskQueue
> g(_taskQueue
);
460 ExecutorTask
next() {
464 // Draw the task from the queue
469 task
= _taskQueue
.next();
472 } catch(Interrupted_Exception
&) {
474 // Ignore interruption here, it can only come from
475 // another thread interrupt()ing the executor. The
476 // thread was interrupted in the hopes it was busy
483 // Interrupt the thread running the tasks when the generation
484 // does not match the current generation
485 if( task
->generation() != _waitingQueue
.generation() )
486 ThreadImpl::current()->interrupt();
488 // Otherwise, clear the interrupted status for the thread and
489 // give it a clean slate to start with
491 ThreadImpl::current()->isInterrupted();
498 return _taskQueue
.isCanceled();
505 bool wait(unsigned long timeout
) {
506 return _waitingQueue
.wait(timeout
);
512 class Worker
: public Runnable
{
514 CountedPtr
< ExecutorImpl
> _impl
;
518 //! Create a Worker that draws upon the given Queue
519 Worker(const CountedPtr
< ExecutorImpl
>& impl
)
522 //! Run until Thread or Queue are canceled
525 _impl
->registerThread();
527 // Run until the Queue is canceled
528 while(!Thread::canceled()) {
530 // Draw tasks from the queue
531 ExecutorTask
task( _impl
->next() );
536 _impl
->unregisterThread();
544 class Shutdown
: public Runnable
{
546 CountedPtr
< ExecutorImpl
> _impl
;
550 Shutdown(const CountedPtr
< ExecutorImpl
>& impl
)
561 PoolExecutor::PoolExecutor(size_t n
)
562 : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl
) ) {
566 // Request cancelation when main() exits
567 ThreadQueue::instance()->insertShutdownTask(_shutdown
);
571 PoolExecutor::~PoolExecutor() {
576 * If the shutdown task for this executor has not already been
577 * selected to run, then run it locally
579 if(ThreadQueue::instance()->removeShutdownTask(_shutdown
))
586 void PoolExecutor::interrupt() {
590 void PoolExecutor::size(size_t n
) {
593 throw InvalidOp_Exception();
595 for(size_t m
= _impl
->workers(n
); m
> 0; --m
)
596 Thread
t(new Worker(_impl
));
600 size_t PoolExecutor::size() {
601 return _impl
->workers();
605 void PoolExecutor::execute(const Task
& task
) {
607 // Enqueue the task, the Queue will reject it with a
608 // Cancelation_Exception if the Executor has been canceled
609 _impl
->execute(task
);
613 void PoolExecutor::cancel() {
617 bool PoolExecutor::isCanceled() {
618 return _impl
->isCanceled();
621 void PoolExecutor::wait() {
625 bool PoolExecutor::wait(unsigned long timeout
) {
626 return _impl
->wait(timeout
== 0 ? 1 : timeout
);