1 /***************************************************************************
2 * Copyright (C) 2009-2011 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
22 #include "JobScheduler.h"
29 // queue size management
31 void CJobScheduler::CQueue::Grow (size_t newSize
)
33 TJob
* newData
= new TJob
[newSize
];
35 size_t count
= size();
36 memmove (newData
, first
, count
* sizeof (TJob
[1]));
40 end
= newData
+ newSize
;
42 last
= newData
+ count
;
45 void CJobScheduler::CQueue::AutoGrow()
47 size_t count
= size();
49 if (count
* 2 <= capacity())
51 memmove (data
, first
, count
* sizeof (TJob
[1]));
57 Grow (capacity() * 2);
61 // remove one entry from \ref starving container.
62 // Return NULL, if container was empty
64 CJobScheduler
* CJobScheduler::CThreadPool::SelectStarving()
66 CCriticalSectionLock
lock (mutex
);
67 if (starving
.empty() || pool
.empty())
70 std::vector
<CJobScheduler
*>::iterator begin
= starving
.begin();
72 CJobScheduler
* scheduler
= *begin
;
73 starving
.erase (begin
);
78 // create empty thread pool
80 CJobScheduler::CThreadPool::CThreadPool()
89 CJobScheduler::CThreadPool::~CThreadPool()
96 CJobScheduler::CThreadPool
& CJobScheduler::CThreadPool::GetInstance()
98 static CThreadPool instance
;
104 CJobScheduler::SThreadInfo
* CJobScheduler::CThreadPool::TryAlloc()
106 CCriticalSectionLock
lock (mutex
);
109 if (yetToCreate
== 0)
112 // lazy thread creation
114 SThreadInfo
* info
= new SThreadInfo
;
116 info
->thread
= new CThread (&ThreadFunc
, info
, true);
124 CJobScheduler::SThreadInfo
* thread
= pool
.back();
131 void CJobScheduler::CThreadPool::Release (SThreadInfo
* thread
)
134 // put back into pool, unless its capacity has been exceeded
136 CCriticalSectionLock
lock (mutex
);
137 if (pool
.size() + --allocCount
< maxCount
)
139 pool
.push_back (thread
);
143 if (starving
.empty())
148 // pool overflow -> terminate thread
150 delete thread
->thread
;
155 // notify starved schedulers that there may be some threads to alloc
157 for ( CJobScheduler
* scheduler
= SelectStarving()
159 ; scheduler
= SelectStarving())
161 scheduler
->ThreadAvailable();
165 // set max. number of concurrent threads
167 void CJobScheduler::CThreadPool::SetThreadCount (size_t count
)
169 CCriticalSectionLock
lock (mutex
);
172 if (pool
.size() + allocCount
< maxCount
)
173 yetToCreate
= maxCount
- pool
.size() + allocCount
;
175 while ((pool
.size() + allocCount
> maxCount
) && !pool
.empty())
177 SThreadInfo
* info
= pool
.back();
185 size_t CJobScheduler::CThreadPool::GetThreadCount()
190 // manage starving schedulers
192 void CJobScheduler::CThreadPool::AddStarving (CJobScheduler
* scheduler
)
194 CCriticalSectionLock
lock (mutex
);
195 starving
.push_back (scheduler
);
198 bool CJobScheduler::CThreadPool::RemoveStarving (CJobScheduler
* scheduler
)
200 CCriticalSectionLock
lock (mutex
);
202 typedef std::vector
<CJobScheduler
*>::iterator TI
;
203 TI begin
= starving
.begin();
204 TI end
= starving
.end();
206 TI newEnd
= std::remove_copy (begin
, end
, begin
, scheduler
);
210 starving
.erase (newEnd
, end
);
214 // job execution helpers
216 CJobScheduler::TJob
CJobScheduler::AssignJob (SThreadInfo
* info
)
218 CCriticalSectionLock
lock (mutex
);
220 // wake up threads that waited for some work to finish
222 if (waitingThreads
> 0)
225 // suspend this thread if there is no work left
227 if (queue
.empty() || (threads
.stopCount
!= 0))
231 info
->thread
->Suspend();
233 // remove from "running" list and put it back either to
234 // "suspended" pool or global shared pool
236 bool terminateThread
= false;
237 for (size_t i
= 0, count
= threads
.running
.size(); i
< count
; ++i
)
238 if (threads
.running
[i
] == info
)
240 threads
.running
[i
] = threads
.running
[count
-1];
241 threads
.running
.pop_back();
243 // still in debt of shared pool?
245 if (threads
.fromShared
> 0)
247 // if we are actually idle, we aren't starving anymore
251 // put back to global pool
254 CThreadPool::GetInstance().Release (info
);
256 --threads
.fromShared
;
258 else if (aggressiveThreading
)
260 // don't keep private idle threads
263 ++threads
.yetToCreate
;
264 terminateThread
= true;
271 threads
.suspended
.push_back (info
);
272 ++threads
.suspendedCount
;
275 // signal empty queue, if necessary
277 ++threads
.unusedCount
;
278 if (--threads
.runningCount
== 0)
284 return TJob ((IJob
*)NULL
, terminateThread
);
292 // try to get a thread from the shared pool.
293 // register as "starving" if that failed
295 bool CJobScheduler::AllocateSharedThread()
297 if (!threads
.starved
)
299 SThreadInfo
* info
= CThreadPool::GetInstance().TryAlloc();
302 ++threads
.fromShared
;
303 --threads
.unusedCount
;
304 ++threads
.runningCount
;
306 threads
.running
.push_back (info
);
308 info
->thread
->Resume();
314 threads
.starved
= true;
315 CThreadPool::GetInstance().AddStarving (this);
322 bool CJobScheduler::AllocateThread()
324 if (threads
.suspendedCount
> 0)
326 // recycle suspended, private thread
328 SThreadInfo
* info
= threads
.suspended
.back();
329 threads
.suspended
.pop_back();
331 --threads
.suspendedCount
;
332 --threads
.unusedCount
;
333 ++threads
.runningCount
;
335 threads
.running
.push_back (info
);
336 info
->thread
->Resume();
338 else if (threads
.yetToCreate
> 0)
340 // time to start a new private thread
342 --threads
.yetToCreate
;
344 --threads
.unusedCount
;
345 ++threads
.runningCount
;
347 SThreadInfo
* info
= new SThreadInfo
;
349 info
->thread
= new CThread (&ThreadFunc
, info
, true);
350 threads
.running
.push_back (info
);
352 info
->thread
->Resume();
356 // try to allocate a shared thread
358 if (threads
.fromShared
< threads
.maxFromShared
)
359 return AllocateSharedThread();
367 // unregister from "starving" list.
368 // This may race with CThreadPool::Release -> must loop here.
370 void CJobScheduler::StopStarvation()
372 while (threads
.starved
)
373 if (CThreadPool::GetInstance().RemoveStarving (this))
375 threads
.starved
= false;
380 // worker thread function
382 bool CJobScheduler::ThreadFunc (void* arg
)
384 SThreadInfo
* info
= reinterpret_cast<SThreadInfo
*>(arg
);
386 TJob job
= info
->owner
->AssignJob (info
);
387 if (job
.first
!= NULL
)
391 job
.first
->Execute();
393 // it is no longer referenced by the scheduler
395 job
.first
->OnUnSchedule (info
->owner
);
397 // is it our responsibility to clean up this job?
408 // maybe, auto-delete thread object
414 // Create & remove threads
416 CJobScheduler::CJobScheduler
418 , size_t sharedThreads
419 , bool aggressiveThreading
422 , aggressiveThreading (aggressiveThreading
)
424 threads
.runningCount
= 0;
425 threads
.suspendedCount
= 0;
427 threads
.fromShared
= 0;
428 threads
.maxFromShared
= sharedThreads
;
430 threads
.unusedCount
= threadCount
+ sharedThreads
;
431 threads
.yetToCreate
= threadCount
;
433 threads
.starved
= false;
434 threads
.stopCount
= 0;
436 queue
.set_fifo (fifo
);
438 // auto-initialize shared threads
440 if (GetSharedThreadCount() == 0)
444 CJobScheduler::~CJobScheduler(void)
449 assert (threads
.running
.empty());
450 assert (threads
.fromShared
== 0);
452 for (size_t i
= 0, count
= threads
.suspended
.size(); i
< count
; ++i
)
454 SThreadInfo
* info
= threads
.suspended
[i
];
462 CJobScheduler
* CJobScheduler::GetDefault()
464 static CJobScheduler
instance (0, SIZE_MAX
);
471 void CJobScheduler::Schedule (IJob
* job
, bool transferOwnership
)
473 TJob
toAdd (job
, transferOwnership
);
474 job
->OnSchedule (this);
476 CCriticalSectionLock
lock (mutex
);
482 if (threads
.stopCount
!= 0)
485 bool addThread
= aggressiveThreading
486 || ( (queue
.size() > 2 * threads
.runningCount
)
487 && (threads
.unusedCount
> 0));
493 // notification that a new thread may be available
495 void CJobScheduler::ThreadAvailable()
497 CCriticalSectionLock
lock (mutex
);
498 threads
.starved
= false;
500 while ( (threads
.stopCount
!= 0)
501 && (queue
.size() > 2 * threads
.runningCount
)
502 && (threads
.suspendedCount
== 0)
503 && (threads
.yetToCreate
== 0)
504 && (threads
.fromShared
< threads
.maxFromShared
))
506 if (!AllocateSharedThread())
511 // wait for all current and follow-up jobs to terminate
513 void CJobScheduler::WaitForEmptyQueue()
515 WaitForEmptyQueueOrTimeout(INFINITE
);
518 bool CJobScheduler::WaitForEmptyQueueOrTimeout(DWORD milliSeconds
)
523 CCriticalSectionLock
lock (mutex
);
525 // if the scheduler has been stopped, we need to remove
526 // the waiting jobs manually
528 if (threads
.stopCount
!= 0)
529 while (!queue
.empty())
531 const TJob
& job
= queue
.pop();
533 job
.first
->OnUnSchedule(this);
538 // empty queue and no jobs still being processed?
540 if ((threads
.runningCount
== 0) && queue
.empty())
543 // we will be woken up as soon as both containers are empty
548 if (!emptyEvent
.WaitForEndOrTimeout(milliSeconds
))
554 // wait for some jobs to be finished.
556 void CJobScheduler::WaitForSomeJobs()
559 CCriticalSectionLock
lock (mutex
);
560 if ( static_cast<size_t>(InterlockedIncrement (&waitingThreads
))
561 < threads
.runningCount
)
563 // there are enough running job threads left
564 // -> wait for one of them to run idle *or*
565 // for too many of them to enter this method
567 threadIsIdle
.Reset();
571 // number of waiting jobs nor equal to or greater than
572 // the number of running job threads
576 InterlockedDecrement (&waitingThreads
);
582 threadIsIdle
.WaitFor();
583 InterlockedDecrement (&waitingThreads
);
586 // Returns the number of jobs waiting for execution.
588 size_t CJobScheduler::GetQueueDepth() const
590 CCriticalSectionLock
lock (mutex
);
594 // Returns the number of threads that currently execute
595 // jobs for this scheduler
597 size_t CJobScheduler::GetRunningThreadCount() const
599 CCriticalSectionLock
lock (mutex
);
600 return threads
.runningCount
;
603 // remove waiting entries from the queue until their
604 // number drops to or below the given watermark.
606 std::vector
<IJob
*> CJobScheduler::RemoveJobFromQueue
610 std::vector
<IJob
*> removed
;
613 CCriticalSectionLock
lock (mutex
);
614 if (queue
.size() > watermark
)
616 size_t toRemove
= queue
.size() - watermark
;
617 removed
.reserve (toRemove
);
619 // temporarily change the queue extraction strategy
620 // such that we remove jobs from the requested end
621 // (fifo -> oldest are at front, otherwise they are
624 bool fifo
= queue
.get_fifo();
625 queue
.set_fifo (oldest
== fifo
);
629 for (size_t i
= 0; i
< toRemove
; ++i
)
631 IJob
* job
= queue
.pop().first
;
632 job
->OnUnSchedule (this);
634 removed
.push_back (job
);
637 // restore job execution order
639 queue
.set_fifo (fifo
);
646 long CJobScheduler::Stop()
648 CCriticalSectionLock
lock (mutex
);
649 return ++threads
.stopCount
;
652 long CJobScheduler::Resume()
654 CCriticalSectionLock
lock (mutex
);
655 assert (threads
.stopCount
> 0);
657 if (--threads
.stopCount
== 0)
659 while ( ( (queue
.size() > threads
.runningCount
)
660 && aggressiveThreading
)
661 || ( (queue
.size() > 4 * threads
.runningCount
)
662 && (threads
.unusedCount
> 0)))
664 if (!AllocateThread())
669 return threads
.stopCount
;
674 // set max. number of concurrent threads
676 void CJobScheduler::SetSharedThreadCount (size_t count
)
678 CThreadPool::GetInstance().SetThreadCount(count
);
681 size_t CJobScheduler::GetSharedThreadCount()
683 return CThreadPool::GetInstance().GetThreadCount();
686 void CJobScheduler::UseAllCPUs()
688 SetSharedThreadCount (GetHWThreadCount());
691 size_t CJobScheduler::GetHWThreadCount()
697 size_t sysNumProcs
= si
.dwNumberOfProcessors
;
699 size_t sysNumProcs
= sysconf (_SC_NPROCESSORS_CONF
);