1
/***************************************************************************
2 * Copyright (C) 2009-2011 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
22 #include "JobScheduler.h"
29 // queue size management
31 void CJobScheduler::CQueue::Grow (size_t newSize
)
33 TJob
* newData
= new TJob
[newSize
];
35 size_t count
= size();
37 memmove (newData
, first
, count
* sizeof (TJob
[1]));
41 end
= newData
+ newSize
;
43 last
= newData
+ count
;
46 void CJobScheduler::CQueue::AutoGrow()
48 size_t count
= size();
50 if (count
* 2 <= capacity())
52 memmove (data
, first
, count
* sizeof (TJob
[1]));
58 Grow (capacity() * 2);
62 // remove one entry from \ref starving container.
63 // Return NULL, if container was empty
65 CJobScheduler
* CJobScheduler::CThreadPool::SelectStarving()
67 CCriticalSectionLock
lock (mutex
);
68 if (starving
.empty() || pool
.empty())
71 std::vector
<CJobScheduler
*>::iterator begin
= starving
.begin();
73 CJobScheduler
* scheduler
= *begin
;
74 starving
.erase (begin
);
79 // create empty thread pool
81 CJobScheduler::CThreadPool::CThreadPool()
87 CJobScheduler::CThreadPool::~CThreadPool()
94 CJobScheduler::CThreadPool
& CJobScheduler::CThreadPool::GetInstance()
96 static CThreadPool instance
;
102 CJobScheduler::SThreadInfo
* CJobScheduler::CThreadPool::TryAlloc()
104 CCriticalSectionLock
lock (mutex
);
107 if (yetToCreate
== 0)
110 // lazy thread creation
112 SThreadInfo
* info
= new SThreadInfo
;
114 info
->thread
= new CThread (&ThreadFunc
, info
, true);
122 CJobScheduler::SThreadInfo
* thread
= pool
.back();
129 void CJobScheduler::CThreadPool::Release (SThreadInfo
* thread
)
132 // put back into pool, unless its capacity has been exceeded
134 CCriticalSectionLock
lock (mutex
);
135 if (pool
.size() + --allocCount
< maxCount
)
137 pool
.push_back (thread
);
141 if (starving
.empty())
146 // pool overflow -> terminate thread
148 delete thread
->thread
;
153 // notify starved schedulers that there may be some threads to alloc
155 for ( CJobScheduler
* scheduler
= SelectStarving()
157 ; scheduler
= SelectStarving())
159 scheduler
->ThreadAvailable();
163 // set max. number of concurrent threads
165 void CJobScheduler::CThreadPool::SetThreadCount (size_t count
)
167 CCriticalSectionLock
lock (mutex
);
170 if (pool
.size() + allocCount
< maxCount
)
171 yetToCreate
= maxCount
- pool
.size() + allocCount
;
173 while ((pool
.size() + allocCount
> maxCount
) && !pool
.empty())
175 SThreadInfo
* info
= pool
.back();
183 size_t CJobScheduler::CThreadPool::GetThreadCount() const
188 // manage starving schedulers
190 void CJobScheduler::CThreadPool::AddStarving (CJobScheduler
* scheduler
)
192 CCriticalSectionLock
lock (mutex
);
193 starving
.push_back (scheduler
);
196 bool CJobScheduler::CThreadPool::RemoveStarving (CJobScheduler
* scheduler
)
198 CCriticalSectionLock
lock (mutex
);
200 auto begin
= starving
.begin();
201 auto end
= starving
.end();
203 auto newEnd
= std::remove_copy (begin
, end
, begin
, scheduler
);
207 starving
.erase (newEnd
, end
);
211 // job execution helpers
213 CJobScheduler::TJob
CJobScheduler::AssignJob (SThreadInfo
* info
)
215 CCriticalSectionLock
lock (mutex
);
217 // wake up threads that waited for some work to finish
219 if (waitingThreads
> 0)
222 // suspend this thread if there is no work left
224 if (queue
.empty() || (threads
.stopCount
!= 0))
228 info
->thread
->Suspend();
230 // remove from "running" list and put it back either to
231 // "suspended" pool or global shared pool
233 bool terminateThread
= false;
234 for (size_t i
= 0, count
= threads
.running
.size(); i
< count
; ++i
)
235 if (threads
.running
[i
] == info
)
237 threads
.running
[i
] = threads
.running
[count
-1];
238 threads
.running
.pop_back();
240 // still in debt of shared pool?
242 if (threads
.fromShared
> 0)
244 // if we are actually idle, we aren't starving anymore
248 // put back to global pool
251 CThreadPool::GetInstance().Release (info
);
253 --threads
.fromShared
;
255 else if (aggressiveThreading
)
257 // don't keep private idle threads
260 ++threads
.yetToCreate
;
261 terminateThread
= true;
268 threads
.suspended
.push_back (info
);
269 ++threads
.suspendedCount
;
272 // signal empty queue, if necessary
274 ++threads
.unusedCount
;
275 if (--threads
.runningCount
== 0)
281 return TJob(static_cast<IJob
*>(nullptr), terminateThread
);
289 // try to get a thread from the shared pool.
290 // register as "starving" if that failed
292 bool CJobScheduler::AllocateSharedThread()
294 if (!threads
.starved
)
296 SThreadInfo
* info
= CThreadPool::GetInstance().TryAlloc();
299 ++threads
.fromShared
;
300 --threads
.unusedCount
;
301 ++threads
.runningCount
;
303 threads
.running
.push_back (info
);
305 info
->thread
->Resume();
311 threads
.starved
= true;
312 CThreadPool::GetInstance().AddStarving (this);
319 bool CJobScheduler::AllocateThread()
321 if (threads
.suspendedCount
> 0)
323 // recycle suspended, private thread
325 SThreadInfo
* info
= threads
.suspended
.back();
326 threads
.suspended
.pop_back();
328 --threads
.suspendedCount
;
329 --threads
.unusedCount
;
330 ++threads
.runningCount
;
332 threads
.running
.push_back (info
);
333 info
->thread
->Resume();
335 else if (threads
.yetToCreate
> 0)
337 // time to start a new private thread
339 --threads
.yetToCreate
;
341 --threads
.unusedCount
;
342 ++threads
.runningCount
;
344 SThreadInfo
* info
= new SThreadInfo
;
346 info
->thread
= new CThread (&ThreadFunc
, info
, true);
347 threads
.running
.push_back (info
);
349 info
->thread
->Resume();
353 // try to allocate a shared thread
355 if (threads
.fromShared
< threads
.maxFromShared
)
356 return AllocateSharedThread();
364 // unregister from "starving" list.
365 // This may race with CThreadPool::Release -> must loop here.
367 void CJobScheduler::StopStarvation()
369 while (threads
.starved
)
370 if (CThreadPool::GetInstance().RemoveStarving (this))
372 threads
.starved
= false;
377 // worker thread function
379 bool CJobScheduler::ThreadFunc (void* arg
)
381 SThreadInfo
* info
= reinterpret_cast<SThreadInfo
*>(arg
);
383 TJob job
= info
->owner
->AssignJob (info
);
384 if (job
.first
!= NULL
)
388 job
.first
->Execute();
390 // it is no longer referenced by the scheduler
392 job
.first
->OnUnSchedule (info
->owner
);
394 // is it our responsibility to clean up this job?
405 // maybe, auto-delete thread object
411 // Create & remove threads
413 CJobScheduler::CJobScheduler
415 , size_t sharedThreads
416 , bool aggressiveThreading
418 : aggressiveThreading(aggressiveThreading
)
420 threads
.maxFromShared
= sharedThreads
;
422 threads
.unusedCount
= threadCount
+ sharedThreads
;
423 threads
.yetToCreate
= threadCount
;
425 queue
.set_fifo (fifo
);
427 // auto-initialize shared threads
429 if (GetSharedThreadCount() == 0)
433 CJobScheduler::~CJobScheduler()
438 assert (threads
.running
.empty());
439 assert (threads
.fromShared
== 0);
441 for (size_t i
= 0, count
= threads
.suspended
.size(); i
< count
; ++i
)
443 SThreadInfo
* info
= threads
.suspended
[i
];
451 CJobScheduler
* CJobScheduler::GetDefault()
453 static CJobScheduler
instance (0, SIZE_MAX
);
460 void CJobScheduler::Schedule (IJob
* job
, bool transferOwnership
)
462 TJob
toAdd (job
, transferOwnership
);
463 job
->OnSchedule (this);
465 CCriticalSectionLock
lock (mutex
);
471 if (threads
.stopCount
!= 0)
474 bool addThread
= aggressiveThreading
475 || ( (queue
.size() > 2 * threads
.runningCount
)
476 && (threads
.unusedCount
> 0));
482 // notification that a new thread may be available
484 void CJobScheduler::ThreadAvailable()
486 CCriticalSectionLock
lock (mutex
);
487 threads
.starved
= false;
489 while ( (threads
.stopCount
!= 0)
490 && (queue
.size() > 2 * threads
.runningCount
)
491 && (threads
.suspendedCount
== 0)
492 && (threads
.yetToCreate
== 0)
493 && (threads
.fromShared
< threads
.maxFromShared
))
495 if (!AllocateSharedThread())
500 // wait for all current and follow-up jobs to terminate
502 void CJobScheduler::WaitForEmptyQueue()
504 WaitForEmptyQueueOrTimeout(INFINITE
);
507 bool CJobScheduler::WaitForEmptyQueueOrTimeout(DWORD milliSeconds
)
512 CCriticalSectionLock
lock (mutex
);
514 // if the scheduler has been stopped, we need to remove
515 // the waiting jobs manually
517 if (threads
.stopCount
!= 0)
518 while (!queue
.empty())
520 const TJob
& job
= queue
.pop();
522 job
.first
->OnUnSchedule(this);
527 // empty queue and no jobs still being processed?
529 if ((threads
.runningCount
== 0) && queue
.empty())
532 // we will be woken up as soon as both containers are empty
537 if (!emptyEvent
.WaitForEndOrTimeout(milliSeconds
))
543 // wait for some jobs to be finished.
545 void CJobScheduler::WaitForSomeJobs()
548 CCriticalSectionLock
lock (mutex
);
549 if ( static_cast<size_t>(InterlockedIncrement (&waitingThreads
))
550 < threads
.runningCount
)
552 // there are enough running job threads left
553 // -> wait for one of them to run idle *or*
554 // for too many of them to enter this method
556 threadIsIdle
.Reset();
560 // number of waiting jobs nor equal to or greater than
561 // the number of running job threads
565 InterlockedDecrement (&waitingThreads
);
571 threadIsIdle
.WaitFor();
572 InterlockedDecrement (&waitingThreads
);
575 // Returns the number of jobs waiting for execution.
577 size_t CJobScheduler::GetQueueDepth() const
579 CCriticalSectionLock
lock (mutex
);
583 // Returns the number of threads that currently execute
584 // jobs for this scheduler
586 size_t CJobScheduler::GetRunningThreadCount() const
588 CCriticalSectionLock
lock (mutex
);
589 return threads
.runningCount
;
592 // remove waiting entries from the queue until their
593 // number drops to or below the given watermark.
595 std::vector
<IJob
*> CJobScheduler::RemoveJobFromQueue
599 std::vector
<IJob
*> removed
;
602 CCriticalSectionLock
lock (mutex
);
603 if (queue
.size() > watermark
)
605 size_t toRemove
= queue
.size() - watermark
;
606 removed
.reserve (toRemove
);
608 // temporarily change the queue extraction strategy
609 // such that we remove jobs from the requested end
610 // (fifo -> oldest are at front, otherwise they are
613 bool fifo
= queue
.get_fifo();
614 queue
.set_fifo (oldest
== fifo
);
618 for (size_t i
= 0; i
< toRemove
; ++i
)
620 IJob
* job
= queue
.pop().first
;
621 job
->OnUnSchedule (this);
623 removed
.push_back (job
);
626 // restore job execution order
628 queue
.set_fifo (fifo
);
635 long CJobScheduler::Stop()
637 CCriticalSectionLock
lock (mutex
);
638 return ++threads
.stopCount
;
641 long CJobScheduler::Resume()
643 CCriticalSectionLock
lock (mutex
);
644 assert (threads
.stopCount
> 0);
646 if (--threads
.stopCount
== 0)
648 while ( ( (queue
.size() > threads
.runningCount
)
649 && aggressiveThreading
)
650 || ( (queue
.size() > 4 * threads
.runningCount
)
651 && (threads
.unusedCount
> 0)))
653 if (!AllocateThread())
658 return threads
.stopCount
;
663 // set max. number of concurrent threads
665 void CJobScheduler::SetSharedThreadCount (size_t count
)
667 CThreadPool::GetInstance().SetThreadCount(count
);
670 size_t CJobScheduler::GetSharedThreadCount()
672 return CThreadPool::GetInstance().GetThreadCount();
675 void CJobScheduler::UseAllCPUs()
677 SetSharedThreadCount (GetHWThreadCount());
680 size_t CJobScheduler::GetHWThreadCount()
686 size_t sysNumProcs
= si
.dwNumberOfProcessors
;
688 size_t sysNumProcs
= sysconf (_SC_NPROCESSORS_CONF
);