Refactored: Dropped unused code
[TortoiseGit.git] / src / AsyncFramework / JobScheduler.cpp
blob4cf52e20667a58c5e3d5a7b7a581f6df9016291e
1 /***************************************************************************
2 * Copyright (C) 2009-2011 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include "stdafx.h"
22 #include "JobScheduler.h"
23 #include "IJob.h"
24 #include "Thread.h"
26 namespace async
29 // queue size management
31 void CJobScheduler::CQueue::Grow (size_t newSize)
33 TJob* newData = new TJob[newSize];
35 size_t count = size();
36 memmove (newData, first, count * sizeof (TJob[1]));
37 delete[] data;
39 data = newData;
40 end = newData + newSize;
41 first = newData;
42 last = newData + count;
45 void CJobScheduler::CQueue::AutoGrow()
47 size_t count = size();
49 if (count * 2 <= capacity())
51 memmove (data, first, count * sizeof (TJob[1]));
52 first = data;
53 last = data + count;
55 else
57 Grow (capacity() * 2);
61 // remove one entry from \ref starving container.
62 // Return NULL, if container was empty
64 CJobScheduler* CJobScheduler::CThreadPool::SelectStarving()
66 CCriticalSectionLock lock (mutex);
67 if (starving.empty() || pool.empty())
68 return NULL;
70 std::vector<CJobScheduler*>::iterator begin = starving.begin();
72 CJobScheduler* scheduler = *begin;
73 starving.erase (begin);
75 return scheduler;
78 // create empty thread pool
80 CJobScheduler::CThreadPool::CThreadPool()
81 : yetToCreate (0)
82 , allocCount (0)
83 , maxCount (0)
87 // release threads
89 CJobScheduler::CThreadPool::~CThreadPool()
91 SetThreadCount(0);
94 // Meyer's singleton
96 CJobScheduler::CThreadPool& CJobScheduler::CThreadPool::GetInstance()
98 static CThreadPool instance;
99 return instance;
102 // pool interface
104 CJobScheduler::SThreadInfo* CJobScheduler::CThreadPool::TryAlloc()
106 CCriticalSectionLock lock (mutex);
107 if (pool.empty())
109 if (yetToCreate == 0)
110 return NULL;
112 // lazy thread creation
114 SThreadInfo* info = new SThreadInfo;
115 info->owner = NULL;
116 info->thread = new CThread (&ThreadFunc, info, true);
118 --yetToCreate;
119 ++allocCount;
121 return info;
124 CJobScheduler::SThreadInfo* thread = pool.back();
125 pool.pop_back();
126 ++allocCount;
128 return thread;
131 void CJobScheduler::CThreadPool::Release (SThreadInfo* thread)
134 // put back into pool, unless its capacity has been exceeded
136 CCriticalSectionLock lock (mutex);
137 if (pool.size() + --allocCount < maxCount)
139 pool.push_back (thread);
141 // shortcut
143 if (starving.empty())
144 return;
146 else
148 // pool overflow -> terminate thread
150 delete thread->thread;
151 delete thread;
155 // notify starved schedulers that there may be some threads to alloc
157 for ( CJobScheduler* scheduler = SelectStarving()
158 ; scheduler != NULL
159 ; scheduler = SelectStarving())
161 scheduler->ThreadAvailable();
165 // set max. number of concurrent threads
167 void CJobScheduler::CThreadPool::SetThreadCount (size_t count)
169 CCriticalSectionLock lock (mutex);
171 maxCount = count;
172 if (pool.size() + allocCount < maxCount)
173 yetToCreate = maxCount - pool.size() + allocCount;
175 while ((pool.size() + allocCount > maxCount) && !pool.empty())
177 SThreadInfo* info = pool.back();
178 pool.pop_back();
180 delete info->thread;
181 delete info;
185 size_t CJobScheduler::CThreadPool::GetThreadCount()
187 return maxCount;
190 // manage starving schedulers
192 void CJobScheduler::CThreadPool::AddStarving (CJobScheduler* scheduler)
194 CCriticalSectionLock lock (mutex);
195 starving.push_back (scheduler);
198 bool CJobScheduler::CThreadPool::RemoveStarving (CJobScheduler* scheduler)
200 CCriticalSectionLock lock (mutex);
202 typedef std::vector<CJobScheduler*>::iterator TI;
203 TI begin = starving.begin();
204 TI end = starving.end();
206 TI newEnd = std::remove_copy (begin, end, begin, scheduler);
207 if (newEnd == end)
208 return false;
210 starving.erase (newEnd, end);
211 return true;
214 // job execution helpers
216 CJobScheduler::TJob CJobScheduler::AssignJob (SThreadInfo* info)
218 CCriticalSectionLock lock (mutex);
220 // wake up threads that waited for some work to finish
222 if (waitingThreads > 0)
223 threadIsIdle.Set();
225 // suspend this thread if there is no work left
227 if (queue.empty() || (threads.stopCount != 0))
229 // suspend
231 info->thread->Suspend();
233 // remove from "running" list and put it back either to
234 // "suspended" pool or global shared pool
236 bool terminateThread = false;
237 for (size_t i = 0, count = threads.running.size(); i < count; ++i)
238 if (threads.running[i] == info)
240 threads.running[i] = threads.running[count-1];
241 threads.running.pop_back();
243 // still in debt of shared pool?
245 if (threads.fromShared > 0)
247 // if we are actually idle, we aren't starving anymore
249 StopStarvation();
251 // put back to global pool
253 info->owner = NULL;
254 CThreadPool::GetInstance().Release (info);
256 --threads.fromShared;
258 else if (aggressiveThreading)
260 // don't keep private idle threads
262 delete info;
263 ++threads.yetToCreate;
264 terminateThread = true;
266 else
269 // add to local pool
271 threads.suspended.push_back (info);
272 ++threads.suspendedCount;
275 // signal empty queue, if necessary
277 ++threads.unusedCount;
278 if (--threads.runningCount == 0)
279 emptyEvent.Set();
281 break;
284 return TJob ((IJob*)NULL, terminateThread);
287 // extract one job
289 return queue.pop();
292 // try to get a thread from the shared pool.
293 // register as "starving" if that failed
295 bool CJobScheduler::AllocateSharedThread()
297 if (!threads.starved)
299 SThreadInfo* info = CThreadPool::GetInstance().TryAlloc();
300 if (info != NULL)
302 ++threads.fromShared;
303 --threads.unusedCount;
304 ++threads.runningCount;
306 threads.running.push_back (info);
307 info->owner = this;
308 info->thread->Resume();
310 return true;
312 else
314 threads.starved = true;
315 CThreadPool::GetInstance().AddStarving (this);
319 return false;
322 bool CJobScheduler::AllocateThread()
324 if (threads.suspendedCount > 0)
326 // recycle suspended, private thread
328 SThreadInfo* info = threads.suspended.back();
329 threads.suspended.pop_back();
331 --threads.suspendedCount;
332 --threads.unusedCount;
333 ++threads.runningCount;
335 threads.running.push_back (info);
336 info->thread->Resume();
338 else if (threads.yetToCreate > 0)
340 // time to start a new private thread
342 --threads.yetToCreate;
344 --threads.unusedCount;
345 ++threads.runningCount;
347 SThreadInfo* info = new SThreadInfo;
348 info->owner = this;
349 info->thread = new CThread (&ThreadFunc, info, true);
350 threads.running.push_back (info);
352 info->thread->Resume();
354 else
356 // try to allocate a shared thread
358 if (threads.fromShared < threads.maxFromShared)
359 return AllocateSharedThread();
361 return false;
364 return true;
367 // unregister from "starving" list.
368 // This may race with CThreadPool::Release -> must loop here.
370 void CJobScheduler::StopStarvation()
372 while (threads.starved)
373 if (CThreadPool::GetInstance().RemoveStarving (this))
375 threads.starved = false;
376 break;
380 // worker thread function
382 bool CJobScheduler::ThreadFunc (void* arg)
384 SThreadInfo* info = reinterpret_cast<SThreadInfo*>(arg);
386 TJob job = info->owner->AssignJob (info);
387 if (job.first != NULL)
389 // run the job
391 job.first->Execute();
393 // it is no longer referenced by the scheduler
395 job.first->OnUnSchedule (info->owner);
397 // is it our responsibility to clean up this job?
399 if (job.second)
400 delete job.first;
402 // continue
404 return false;
406 else
408 // maybe, auto-delete thread object
410 return job.second;
414 // Create & remove threads
416 CJobScheduler::CJobScheduler
417 ( size_t threadCount
418 , size_t sharedThreads
419 , bool aggressiveThreading
420 , bool fifo)
421 : waitingThreads (0)
422 , aggressiveThreading (aggressiveThreading)
424 threads.runningCount = 0;
425 threads.suspendedCount = 0;
427 threads.fromShared = 0;
428 threads.maxFromShared = sharedThreads;
430 threads.unusedCount = threadCount + sharedThreads;
431 threads.yetToCreate = threadCount;
433 threads.starved = false;
434 threads.stopCount = 0;
436 queue.set_fifo (fifo);
438 // auto-initialize shared threads
440 if (GetSharedThreadCount() == 0)
441 UseAllCPUs();
444 CJobScheduler::~CJobScheduler(void)
446 StopStarvation();
447 WaitForEmptyQueue();
449 assert (threads.running.empty());
450 assert (threads.fromShared == 0);
452 for (size_t i = 0, count = threads.suspended.size(); i < count; ++i)
454 SThreadInfo* info = threads.suspended[i];
455 delete info->thread;
456 delete info;
460 // Meyer's singleton
462 CJobScheduler* CJobScheduler::GetDefault()
464 static CJobScheduler instance (0, SIZE_MAX);
465 return &instance;
468 // job management:
469 // add new job
471 void CJobScheduler::Schedule (IJob* job, bool transferOwnership)
473 TJob toAdd (job, transferOwnership);
474 job->OnSchedule (this);
476 CCriticalSectionLock lock (mutex);
478 if (queue.empty())
479 emptyEvent.Reset();
481 queue.push (toAdd);
482 if (threads.stopCount != 0)
483 return;
485 bool addThread = aggressiveThreading
486 || ( (queue.size() > 2 * threads.runningCount)
487 && (threads.unusedCount > 0));
489 if (addThread)
490 AllocateThread();
493 // notification that a new thread may be available
495 void CJobScheduler::ThreadAvailable()
497 CCriticalSectionLock lock (mutex);
498 threads.starved = false;
500 while ( (threads.stopCount != 0)
501 && (queue.size() > 2 * threads.runningCount)
502 && (threads.suspendedCount == 0)
503 && (threads.yetToCreate == 0)
504 && (threads.fromShared < threads.maxFromShared))
506 if (!AllocateSharedThread())
507 return;
511 // wait for all current and follow-up jobs to terminate
513 void CJobScheduler::WaitForEmptyQueue()
515 WaitForEmptyQueueOrTimeout(INFINITE);
518 bool CJobScheduler::WaitForEmptyQueueOrTimeout(DWORD milliSeconds)
520 while (true)
523 CCriticalSectionLock lock (mutex);
525 // if the scheduler has been stopped, we need to remove
526 // the waiting jobs manually
528 if (threads.stopCount != 0)
529 while (!queue.empty())
531 const TJob& job = queue.pop();
533 job.first->OnUnSchedule(this);
534 if (job.second)
535 delete job.first;
538 // empty queue and no jobs still being processed?
540 if ((threads.runningCount == 0) && queue.empty())
541 return true;
543 // we will be woken up as soon as both containers are empty
545 emptyEvent.Reset();
548 if (!emptyEvent.WaitForEndOrTimeout(milliSeconds))
549 return false;
551 return true;
554 // wait for some jobs to be finished.
556 void CJobScheduler::WaitForSomeJobs()
559 CCriticalSectionLock lock (mutex);
560 if ( static_cast<size_t>(InterlockedIncrement (&waitingThreads))
561 < threads.runningCount)
563 // there are enough running job threads left
564 // -> wait for one of them to run idle *or*
565 // for too many of them to enter this method
567 threadIsIdle.Reset();
569 else
571 // number of waiting jobs nor equal to or greater than
572 // the number of running job threads
573 // -> wake them all
575 threadIsIdle.Set();
576 InterlockedDecrement (&waitingThreads);
578 return;
582 threadIsIdle.WaitFor();
583 InterlockedDecrement (&waitingThreads);
586 // Returns the number of jobs waiting for execution.
588 size_t CJobScheduler::GetQueueDepth() const
590 CCriticalSectionLock lock (mutex);
591 return queue.size();
594 // Returns the number of threads that currently execute
595 // jobs for this scheduler
597 size_t CJobScheduler::GetRunningThreadCount() const
599 CCriticalSectionLock lock (mutex);
600 return threads.runningCount;
603 // remove waiting entries from the queue until their
604 // number drops to or below the given watermark.
606 std::vector<IJob*> CJobScheduler::RemoveJobFromQueue
607 ( size_t watermark
608 , bool oldest)
610 std::vector<IJob*> removed;
613 CCriticalSectionLock lock (mutex);
614 if (queue.size() > watermark)
616 size_t toRemove = queue.size() - watermark;
617 removed.reserve (toRemove);
619 // temporarily change the queue extraction strategy
620 // such that we remove jobs from the requested end
621 // (fifo -> oldest are at front, otherwise they are
622 // at the back)
624 bool fifo = queue.get_fifo();
625 queue.set_fifo (oldest == fifo);
627 // remove 'em
629 for (size_t i = 0; i < toRemove; ++i)
631 IJob* job = queue.pop().first;
632 job->OnUnSchedule (this);
634 removed.push_back (job);
637 // restore job execution order
639 queue.set_fifo (fifo);
643 return removed;
646 long CJobScheduler::Stop()
648 CCriticalSectionLock lock (mutex);
649 return ++threads.stopCount;
652 long CJobScheduler::Resume()
654 CCriticalSectionLock lock (mutex);
655 assert (threads.stopCount > 0);
657 if (--threads.stopCount == 0)
659 while ( ( (queue.size() > threads.runningCount)
660 && aggressiveThreading)
661 || ( (queue.size() > 4 * threads.runningCount)
662 && (threads.unusedCount > 0)))
664 if (!AllocateThread())
665 break;
669 return threads.stopCount;
674 // set max. number of concurrent threads
676 void CJobScheduler::SetSharedThreadCount (size_t count)
678 CThreadPool::GetInstance().SetThreadCount(count);
681 size_t CJobScheduler::GetSharedThreadCount()
683 return CThreadPool::GetInstance().GetThreadCount();
686 void CJobScheduler::UseAllCPUs()
688 SetSharedThreadCount (GetHWThreadCount());
691 size_t CJobScheduler::GetHWThreadCount()
693 #ifdef _WIN32
694 SYSTEM_INFO si;
695 GetSystemInfo(&si);
697 size_t sysNumProcs = si.dwNumberOfProcessors;
698 #else
699 size_t sysNumProcs = sysconf (_SC_NPROCESSORS_CONF);
700 #endif
702 return sysNumProcs;