Update rebase documentation
[TortoiseGit.git] / src / AsyncFramework / JobScheduler.cpp
blob37220d12a4854b392c814ab67dd421107b766983
1 /***************************************************************************
2 * Copyright (C) 2009-2011 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include "stdafx.h"
22 #include "JobScheduler.h"
23 #include "IJob.h"
24 #include "Thread.h"
26 namespace async
29 // queue size management
31 void CJobScheduler::CQueue::Grow (size_t newSize)
33 TJob* newData = new TJob[newSize];
35 size_t count = size();
36 if (first)
37 memmove (newData, first, count * sizeof (TJob[1]));
38 delete[] data;
40 data = newData;
41 end = newData + newSize;
42 first = newData;
43 last = newData + count;
46 void CJobScheduler::CQueue::AutoGrow()
48 size_t count = size();
50 if (count * 2 <= capacity())
52 memmove (data, first, count * sizeof (TJob[1]));
53 first = data;
54 last = data + count;
56 else
58 Grow (capacity() * 2);
62 // remove one entry from \ref starving container.
63 // Return NULL, if container was empty
65 CJobScheduler* CJobScheduler::CThreadPool::SelectStarving()
67 CCriticalSectionLock lock (mutex);
68 if (starving.empty() || pool.empty())
69 return NULL;
71 std::vector<CJobScheduler*>::iterator begin = starving.begin();
73 CJobScheduler* scheduler = *begin;
74 starving.erase (begin);
76 return scheduler;
79 // create empty thread pool
81 CJobScheduler::CThreadPool::CThreadPool()
82 : yetToCreate (0)
83 , allocCount (0)
84 , maxCount (0)
88 // release threads
90 CJobScheduler::CThreadPool::~CThreadPool()
92 SetThreadCount(0);
95 // Meyer's singleton
97 CJobScheduler::CThreadPool& CJobScheduler::CThreadPool::GetInstance()
99 static CThreadPool instance;
100 return instance;
103 // pool interface
105 CJobScheduler::SThreadInfo* CJobScheduler::CThreadPool::TryAlloc()
107 CCriticalSectionLock lock (mutex);
108 if (pool.empty())
110 if (yetToCreate == 0)
111 return NULL;
113 // lazy thread creation
115 SThreadInfo* info = new SThreadInfo;
116 info->owner = NULL;
117 info->thread = new CThread (&ThreadFunc, info, true);
119 --yetToCreate;
120 ++allocCount;
122 return info;
125 CJobScheduler::SThreadInfo* thread = pool.back();
126 pool.pop_back();
127 ++allocCount;
129 return thread;
132 void CJobScheduler::CThreadPool::Release (SThreadInfo* thread)
135 // put back into pool, unless its capacity has been exceeded
137 CCriticalSectionLock lock (mutex);
138 if (pool.size() + --allocCount < maxCount)
140 pool.push_back (thread);
142 // shortcut
144 if (starving.empty())
145 return;
147 else
149 // pool overflow -> terminate thread
151 delete thread->thread;
152 delete thread;
156 // notify starved schedulers that there may be some threads to alloc
158 for ( CJobScheduler* scheduler = SelectStarving()
159 ; scheduler != NULL
160 ; scheduler = SelectStarving())
162 scheduler->ThreadAvailable();
166 // set max. number of concurrent threads
168 void CJobScheduler::CThreadPool::SetThreadCount (size_t count)
170 CCriticalSectionLock lock (mutex);
172 maxCount = count;
173 if (pool.size() + allocCount < maxCount)
174 yetToCreate = maxCount - pool.size() + allocCount;
176 while ((pool.size() + allocCount > maxCount) && !pool.empty())
178 SThreadInfo* info = pool.back();
179 pool.pop_back();
181 delete info->thread;
182 delete info;
186 size_t CJobScheduler::CThreadPool::GetThreadCount() const
188 return maxCount;
191 // manage starving schedulers
193 void CJobScheduler::CThreadPool::AddStarving (CJobScheduler* scheduler)
195 CCriticalSectionLock lock (mutex);
196 starving.push_back (scheduler);
199 bool CJobScheduler::CThreadPool::RemoveStarving (CJobScheduler* scheduler)
201 CCriticalSectionLock lock (mutex);
203 typedef std::vector<CJobScheduler*>::iterator TI;
204 TI begin = starving.begin();
205 TI end = starving.end();
207 TI newEnd = std::remove_copy (begin, end, begin, scheduler);
208 if (newEnd == end)
209 return false;
211 starving.erase (newEnd, end);
212 return true;
215 // job execution helpers
217 CJobScheduler::TJob CJobScheduler::AssignJob (SThreadInfo* info)
219 CCriticalSectionLock lock (mutex);
221 // wake up threads that waited for some work to finish
223 if (waitingThreads > 0)
224 threadIsIdle.Set();
226 // suspend this thread if there is no work left
228 if (queue.empty() || (threads.stopCount != 0))
230 // suspend
232 info->thread->Suspend();
234 // remove from "running" list and put it back either to
235 // "suspended" pool or global shared pool
237 bool terminateThread = false;
238 for (size_t i = 0, count = threads.running.size(); i < count; ++i)
239 if (threads.running[i] == info)
241 threads.running[i] = threads.running[count-1];
242 threads.running.pop_back();
244 // still in debt of shared pool?
246 if (threads.fromShared > 0)
248 // if we are actually idle, we aren't starving anymore
250 StopStarvation();
252 // put back to global pool
254 info->owner = NULL;
255 CThreadPool::GetInstance().Release (info);
257 --threads.fromShared;
259 else if (aggressiveThreading)
261 // don't keep private idle threads
263 delete info;
264 ++threads.yetToCreate;
265 terminateThread = true;
267 else
270 // add to local pool
272 threads.suspended.push_back (info);
273 ++threads.suspendedCount;
276 // signal empty queue, if necessary
278 ++threads.unusedCount;
279 if (--threads.runningCount == 0)
280 emptyEvent.Set();
282 break;
285 return TJob ((IJob*)NULL, terminateThread);
288 // extract one job
290 return queue.pop();
293 // try to get a thread from the shared pool.
294 // register as "starving" if that failed
296 bool CJobScheduler::AllocateSharedThread()
298 if (!threads.starved)
300 SThreadInfo* info = CThreadPool::GetInstance().TryAlloc();
301 if (info != NULL)
303 ++threads.fromShared;
304 --threads.unusedCount;
305 ++threads.runningCount;
307 threads.running.push_back (info);
308 info->owner = this;
309 info->thread->Resume();
311 return true;
313 else
315 threads.starved = true;
316 CThreadPool::GetInstance().AddStarving (this);
320 return false;
323 bool CJobScheduler::AllocateThread()
325 if (threads.suspendedCount > 0)
327 // recycle suspended, private thread
329 SThreadInfo* info = threads.suspended.back();
330 threads.suspended.pop_back();
332 --threads.suspendedCount;
333 --threads.unusedCount;
334 ++threads.runningCount;
336 threads.running.push_back (info);
337 info->thread->Resume();
339 else if (threads.yetToCreate > 0)
341 // time to start a new private thread
343 --threads.yetToCreate;
345 --threads.unusedCount;
346 ++threads.runningCount;
348 SThreadInfo* info = new SThreadInfo;
349 info->owner = this;
350 info->thread = new CThread (&ThreadFunc, info, true);
351 threads.running.push_back (info);
353 info->thread->Resume();
355 else
357 // try to allocate a shared thread
359 if (threads.fromShared < threads.maxFromShared)
360 return AllocateSharedThread();
362 return false;
365 return true;
368 // unregister from "starving" list.
369 // This may race with CThreadPool::Release -> must loop here.
371 void CJobScheduler::StopStarvation()
373 while (threads.starved)
374 if (CThreadPool::GetInstance().RemoveStarving (this))
376 threads.starved = false;
377 break;
381 // worker thread function
383 bool CJobScheduler::ThreadFunc (void* arg)
385 SThreadInfo* info = reinterpret_cast<SThreadInfo*>(arg);
387 TJob job = info->owner->AssignJob (info);
388 if (job.first != NULL)
390 // run the job
392 job.first->Execute();
394 // it is no longer referenced by the scheduler
396 job.first->OnUnSchedule (info->owner);
398 // is it our responsibility to clean up this job?
400 if (job.second)
401 delete job.first;
403 // continue
405 return false;
407 else
409 // maybe, auto-delete thread object
411 return job.second;
415 // Create & remove threads
417 CJobScheduler::CJobScheduler
418 ( size_t threadCount
419 , size_t sharedThreads
420 , bool aggressiveThreading
421 , bool fifo)
422 : waitingThreads (0)
423 , aggressiveThreading (aggressiveThreading)
425 threads.runningCount = 0;
426 threads.suspendedCount = 0;
428 threads.fromShared = 0;
429 threads.maxFromShared = sharedThreads;
431 threads.unusedCount = threadCount + sharedThreads;
432 threads.yetToCreate = threadCount;
434 threads.starved = false;
435 threads.stopCount = 0;
437 queue.set_fifo (fifo);
439 // auto-initialize shared threads
441 if (GetSharedThreadCount() == 0)
442 UseAllCPUs();
445 CJobScheduler::~CJobScheduler(void)
447 StopStarvation();
448 WaitForEmptyQueue();
450 assert (threads.running.empty());
451 assert (threads.fromShared == 0);
453 for (size_t i = 0, count = threads.suspended.size(); i < count; ++i)
455 SThreadInfo* info = threads.suspended[i];
456 delete info->thread;
457 delete info;
461 // Meyer's singleton
463 CJobScheduler* CJobScheduler::GetDefault()
465 static CJobScheduler instance (0, SIZE_MAX);
466 return &instance;
469 // job management:
470 // add new job
472 void CJobScheduler::Schedule (IJob* job, bool transferOwnership)
474 TJob toAdd (job, transferOwnership);
475 job->OnSchedule (this);
477 CCriticalSectionLock lock (mutex);
479 if (queue.empty())
480 emptyEvent.Reset();
482 queue.push (toAdd);
483 if (threads.stopCount != 0)
484 return;
486 bool addThread = aggressiveThreading
487 || ( (queue.size() > 2 * threads.runningCount)
488 && (threads.unusedCount > 0));
490 if (addThread)
491 AllocateThread();
494 // notification that a new thread may be available
496 void CJobScheduler::ThreadAvailable()
498 CCriticalSectionLock lock (mutex);
499 threads.starved = false;
501 while ( (threads.stopCount != 0)
502 && (queue.size() > 2 * threads.runningCount)
503 && (threads.suspendedCount == 0)
504 && (threads.yetToCreate == 0)
505 && (threads.fromShared < threads.maxFromShared))
507 if (!AllocateSharedThread())
508 return;
512 // wait for all current and follow-up jobs to terminate
514 void CJobScheduler::WaitForEmptyQueue()
516 WaitForEmptyQueueOrTimeout(INFINITE);
519 bool CJobScheduler::WaitForEmptyQueueOrTimeout(DWORD milliSeconds)
521 for (;;)
524 CCriticalSectionLock lock (mutex);
526 // if the scheduler has been stopped, we need to remove
527 // the waiting jobs manually
529 if (threads.stopCount != 0)
530 while (!queue.empty())
532 const TJob& job = queue.pop();
534 job.first->OnUnSchedule(this);
535 if (job.second)
536 delete job.first;
539 // empty queue and no jobs still being processed?
541 if ((threads.runningCount == 0) && queue.empty())
542 return true;
544 // we will be woken up as soon as both containers are empty
546 emptyEvent.Reset();
549 if (!emptyEvent.WaitForEndOrTimeout(milliSeconds))
550 return false;
552 //return true;
555 // wait for some jobs to be finished.
557 void CJobScheduler::WaitForSomeJobs()
560 CCriticalSectionLock lock (mutex);
561 if ( static_cast<size_t>(InterlockedIncrement (&waitingThreads))
562 < threads.runningCount)
564 // there are enough running job threads left
565 // -> wait for one of them to run idle *or*
566 // for too many of them to enter this method
568 threadIsIdle.Reset();
570 else
572 // number of waiting jobs nor equal to or greater than
573 // the number of running job threads
574 // -> wake them all
576 threadIsIdle.Set();
577 InterlockedDecrement (&waitingThreads);
579 return;
583 threadIsIdle.WaitFor();
584 InterlockedDecrement (&waitingThreads);
587 // Returns the number of jobs waiting for execution.
589 size_t CJobScheduler::GetQueueDepth() const
591 CCriticalSectionLock lock (mutex);
592 return queue.size();
595 // Returns the number of threads that currently execute
596 // jobs for this scheduler
598 size_t CJobScheduler::GetRunningThreadCount() const
600 CCriticalSectionLock lock (mutex);
601 return threads.runningCount;
604 // remove waiting entries from the queue until their
605 // number drops to or below the given watermark.
607 std::vector<IJob*> CJobScheduler::RemoveJobFromQueue
608 ( size_t watermark
609 , bool oldest)
611 std::vector<IJob*> removed;
614 CCriticalSectionLock lock (mutex);
615 if (queue.size() > watermark)
617 size_t toRemove = queue.size() - watermark;
618 removed.reserve (toRemove);
620 // temporarily change the queue extraction strategy
621 // such that we remove jobs from the requested end
622 // (fifo -> oldest are at front, otherwise they are
623 // at the back)
625 bool fifo = queue.get_fifo();
626 queue.set_fifo (oldest == fifo);
628 // remove 'em
630 for (size_t i = 0; i < toRemove; ++i)
632 IJob* job = queue.pop().first;
633 job->OnUnSchedule (this);
635 removed.push_back (job);
638 // restore job execution order
640 queue.set_fifo (fifo);
644 return removed;
647 long CJobScheduler::Stop()
649 CCriticalSectionLock lock (mutex);
650 return ++threads.stopCount;
653 long CJobScheduler::Resume()
655 CCriticalSectionLock lock (mutex);
656 assert (threads.stopCount > 0);
658 if (--threads.stopCount == 0)
660 while ( ( (queue.size() > threads.runningCount)
661 && aggressiveThreading)
662 || ( (queue.size() > 4 * threads.runningCount)
663 && (threads.unusedCount > 0)))
665 if (!AllocateThread())
666 break;
670 return threads.stopCount;
675 // set max. number of concurrent threads
677 void CJobScheduler::SetSharedThreadCount (size_t count)
679 CThreadPool::GetInstance().SetThreadCount(count);
682 size_t CJobScheduler::GetSharedThreadCount()
684 return CThreadPool::GetInstance().GetThreadCount();
687 void CJobScheduler::UseAllCPUs()
689 SetSharedThreadCount (GetHWThreadCount());
692 size_t CJobScheduler::GetHWThreadCount()
694 #ifdef _WIN32
695 SYSTEM_INFO si;
696 GetSystemInfo(&si);
698 size_t sysNumProcs = si.dwNumberOfProcessors;
699 #else
700 size_t sysNumProcs = sysconf (_SC_NPROCESSORS_CONF);
701 #endif
703 return sysNumProcs;