[7297] Fixed profession spells sorting in trainer spell list at client.
[getmangos.git] / dep / src / zthread / PoolExecutor.cxx
blobcf84e145453b441f673a97632ee4ed53488d21c6
1 /*
2 * Copyright (c) 2005, Eric Crahen
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is furnished
9 * to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 #include "ThreadImpl.h"
24 #include "zthread/PoolExecutor.h"
25 #include "zthread/MonitoredQueue.h"
26 #include "zthread/FastMutex.h"
27 #include "ThreadImpl.h"
28 #include "ThreadQueue.h"
30 #include <algorithm>
31 #include <deque>
32 #include <utility>
34 using namespace ZThread;
36 namespace ZThread {
38 namespace {
40 /**
42 class WaiterQueue {
44 typedef std::deque<ThreadImpl*> ThreadList;
46 typedef struct group_t {
47 size_t id;
48 size_t count;
49 ThreadList waiters;
50 group_t(size_t n) : id(n), count(0) {}
51 } Group;
53 typedef std::deque<Group> GroupList;
55 //! Predicate to find a specific group
56 struct by_id : public std::unary_function<bool, Group> {
57 size_t id;
58 by_id(size_t n) : id(n) {}
59 bool operator()(const Group& grp) {
60 return grp.id == id;
64 //! Functor to count groups
65 struct counter : public std::unary_function<void, Group> {
66 size_t count;
67 counter() : count(0) {}
68 void operator()(const Group& grp) { count += grp.count; }
69 operator size_t() { return count; }
72 FastMutex _lock;
73 GroupList _list;
74 size_t _id;
75 size_t _generation;
77 public:
79 WaiterQueue() : _id(0), _generation(0) {
80 // At least one empty-group exists
81 _list.push_back( Group(_id++) );
84 /**
85 * Insert the current thread into the current waiter list
87 * @pre At least one empty group exists
88 * @post At least one empty group exists
90 bool wait(unsigned long timeout) {
92 ThreadImpl* current = ThreadImpl::current();
93 Monitor& m = current->getMonitor();
95 Monitor::STATE state;
97 Guard<FastMutex> g1(_lock);
99 // At least one empty-group exists
100 assert(!_list.empty());
102 // Return w/o waiting if there are no executing tasks
103 if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
104 return true;
106 // Update the waiter list for the active group
107 _list.back().waiters.push_back(current);
108 size_t n = _list.back().id;
110 m.acquire();
114 Guard<FastMutex, UnlockedScope> g2(g1);
115 state = timeout == 0 ? m.wait() : m.wait(timeout);
119 m.release();
121 // If awoke due to a reason other than the last task in the group 'n' completing,
122 // then then find the group 'current' is waiting in
123 GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
124 if(i != _list.end()) {
126 // Remove 'current' from that list if it is still a member
127 ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current);
128 if(j != i->waiters.end())
129 i->waiters.erase(j);
133 // At least one empty-group exists
134 assert(!_list.empty());
136 switch(state) {
137 case Monitor::SIGNALED:
138 break;
139 case Monitor::TIMEDOUT:
140 return false;
141 case Monitor::INTERRUPTED:
142 throw Interrupted_Exception();
143 default:
144 throw Synchronization_Exception();
147 return true;
152 * Increment the active group count
154 * @pre at least 1 empty group exists
155 * @post at least 1 non-empty group exists
157 std::pair<size_t, size_t> increment() {
159 Guard<FastMutex> g(_lock);
161 // At least one empty-group exists
162 assert(!_list.empty());
164 GroupList::iterator i = --_list.end();
165 size_t n = i->id;
167 if(i == _list.end()) {
169 // A group should never have been removed until
170 // the final task in that group completed
171 assert(0);
175 i->count++;
177 // When the active group is being incremented, insert a new active group
178 // to replace it if there were waiting threads
179 if(i == --_list.end() && !i->waiters.empty())
180 _list.push_back(Group(_id++));
182 // At least 1 non-empty group exists
183 assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
185 return std::make_pair(n, _generation);
191 * Decrease the count for the group with the given id.
193 * @param n group id
195 * @pre At least 1 non-empty group exists
196 * @post At least 1 empty group exists
198 void decrement(size_t n) {
200 Guard<FastMutex> g1(_lock);
202 // At least 1 non-empty group exists
203 assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
205 // Find the requested group
206 GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
207 if(i == _list.end()) {
209 // A group should never have been removed until
210 // the final task in that group completed
211 assert(0);
215 // Decrease the count for tasks in this group,
216 if(--i->count == 0 && i == _list.begin()) {
218 do {
220 // When the first group completes, wake all waiters for every
221 // group, starting from the first until a group that is not
222 // complete is reached
225 // Don't remove the empty active group
226 if(i == --_list.end() && i->waiters.empty())
227 break;
230 if( awaken(*i) ) {
232 // If all waiters were awakened, remove the group
233 i = _list.erase(i);
235 } else {
239 // Otherwise, unlock and yield allowing the waiter
240 // lists to be updated if other threads are busy
241 Guard<FastLock, UnlockedScope> g2(g1);
242 ThreadImpl::yield();
246 i = _list.begin();
250 } while(i != _list.end() && i->count == 0);
252 // Ensure that an active group exists
253 if(_list.empty())
254 _list.push_back( Group(++_id) );
258 // At least one group exists
259 assert(!_list.empty());
265 size_t generation(bool next = false) {
267 Guard<FastMutex> g(_lock);
268 return next ? _generation++ : _generation;
272 private:
275 * Awaken all the waiters remaining in the given group
277 * @return
278 * - true if all the waiting threads were successfully awakened.
279 * - false if there were one or more threads that could not be awakened.
281 bool awaken(Group& grp) {
283 // Go through the waiter list in the given group;
284 for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
286 ThreadImpl* impl = *i;
287 Monitor& m = impl->getMonitor();
289 // Try the monitor lock, if it cant be locked skip to the next waiter
290 if(m.tryAcquire()) {
292 // Notify the monitor & remove from the waiter list so time isn't
293 // wasted checking it again.
294 i = grp.waiters.erase(i);
296 // Try to wake the waiter, it doesn't matter if this is successful
297 // or not (only fails when the monitor is already going to stop waiting).
298 m.notify();
299 m.release();
301 } else ++i;
305 return grp.waiters.empty();
312 * @class GroupedRunnable
314 * Wrap a task with group and generation information.
316 * - 'group' allows tasks to be grouped together so that lists of waiting
317 * threads can be managed.
319 * - 'generation' allows tasks to be interrupted
321 class GroupedRunnable : public Runnable {
323 Task _task;
324 WaiterQueue& _queue;
326 size_t _group;
327 size_t _generation;
329 public:
331 GroupedRunnable(const Task& task, WaiterQueue& queue)
332 : _task(task), _queue(queue) {
334 std::pair<size_t, size_t> pr( _queue.increment() );
336 _group = pr.first;
337 _generation = pr.second;
341 size_t group() const {
342 return _group;
345 size_t generation() const {
346 return _generation;
349 void run() {
351 try {
353 _task->run();
355 } catch(...) {
359 _queue.decrement( group() );
365 typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;
370 class ExecutorImpl {
372 typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
373 typedef std::deque<ThreadImpl*> ThreadList;
375 TaskQueue _taskQueue;
376 WaiterQueue _waitingQueue;
378 ThreadList _threads;
379 volatile size_t _size;
382 public:
384 ExecutorImpl() : _size(0) {}
387 void registerThread() {
389 Guard<TaskQueue> g(_taskQueue);
391 ThreadImpl* impl = ThreadImpl::current();
392 _threads.push_back(impl);
394 // current cancel if too many threads are being created
395 if(_threads.size() > _size)
396 impl->cancel();
400 void unregisterThread() {
402 Guard<TaskQueue> g(_taskQueue);
403 std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());
407 void execute(const Task& task) {
409 // Wrap the task with a grouped task
410 GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
412 try {
414 _taskQueue.add( ExecutorTask(runnable) );
416 } catch(...) {
418 // Incase the queue is canceled between the time the WaiterQueue is
419 // updated and the task is added to the TaskQueue
420 _waitingQueue.decrement( runnable->group() );
421 throw;
427 void interrupt() {
429 // Bump the generation number
430 _waitingQueue.generation(true);
432 Guard<TaskQueue> g(_taskQueue);
434 // Interrupt all threads currently running, thier tasks would be
435 // from an older generation
436 for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
437 (*i)->interrupt();
441 //! Adjust the number of desired workers and return the number of Threads needed
442 size_t workers(size_t n) {
444 Guard<TaskQueue> g(_taskQueue);
446 size_t m = (_size < n) ? (n - _size) : 0;
447 _size = n;
449 return m;
453 size_t workers() {
455 Guard<TaskQueue> g(_taskQueue);
456 return _size;
460 ExecutorTask next() {
462 ExecutorTask task;
464 // Draw the task from the queue
465 for(;;) {
467 try {
469 task = _taskQueue.next();
470 break;
472 } catch(Interrupted_Exception&) {
474 // Ignore interruption here, it can only come from
475 // another thread interrupt()ing the executor. The
476 // thread was interrupted in the hopes it was busy
477 // with a task
483 // Interrupt the thread running the tasks when the generation
484 // does not match the current generation
485 if( task->generation() != _waitingQueue.generation() )
486 ThreadImpl::current()->interrupt();
488 // Otherwise, clear the interrupted status for the thread and
489 // give it a clean slate to start with
490 else
491 ThreadImpl::current()->isInterrupted();
493 return task;
497 bool isCanceled() {
498 return _taskQueue.isCanceled();
501 void cancel() {
502 _taskQueue.cancel();
505 bool wait(unsigned long timeout) {
506 return _waitingQueue.wait(timeout);
511 //! Executor job
512 class Worker : public Runnable {
514 CountedPtr< ExecutorImpl > _impl;
516 public:
518 //! Create a Worker that draws upon the given Queue
519 Worker(const CountedPtr< ExecutorImpl >& impl)
520 : _impl(impl) { }
522 //! Run until Thread or Queue are canceled
523 void run() {
525 _impl->registerThread();
527 // Run until the Queue is canceled
528 while(!Thread::canceled()) {
530 // Draw tasks from the queue
531 ExecutorTask task( _impl->next() );
532 task->run();
536 _impl->unregisterThread();
540 }; /* Worker */
543 //! Helper
544 class Shutdown : public Runnable {
546 CountedPtr< ExecutorImpl > _impl;
548 public:
550 Shutdown(const CountedPtr< ExecutorImpl >& impl)
551 : _impl(impl) { }
553 void run() {
554 _impl->cancel();
557 }; /* Shutdown */
561 PoolExecutor::PoolExecutor(size_t n)
562 : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
564 size(n);
566 // Request cancelation when main() exits
567 ThreadQueue::instance()->insertShutdownTask(_shutdown);
571 PoolExecutor::~PoolExecutor() {
573 try {
576 * If the shutdown task for this executor has not already been
577 * selected to run, then run it locally
579 if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
580 _shutdown->run();
582 } catch(...) { }
586 void PoolExecutor::interrupt() {
587 _impl->interrupt();
590 void PoolExecutor::size(size_t n) {
592 if(n < 1)
593 throw InvalidOp_Exception();
595 for(size_t m = _impl->workers(n); m > 0; --m)
596 Thread t(new Worker(_impl));
600 size_t PoolExecutor::size() {
601 return _impl->workers();
605 void PoolExecutor::execute(const Task& task) {
607 // Enqueue the task, the Queue will reject it with a
608 // Cancelation_Exception if the Executor has been canceled
609 _impl->execute(task);
613 void PoolExecutor::cancel() {
614 _impl->cancel();
617 bool PoolExecutor::isCanceled() {
618 return _impl->isCanceled();
621 void PoolExecutor::wait() {
622 _impl->wait(0);
625 bool PoolExecutor::wait(unsigned long timeout) {
626 return _impl->wait(timeout == 0 ? 1 : timeout);