Fix typos
[TortoiseGit.git] / src / AsyncFramework / JobScheduler.cpp
blobd84ada381156517de618308f82a4a77d50078d72
1 /***************************************************************************
2 * Copyright (C) 2009-2011 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include "stdafx.h"
22 #include "JobScheduler.h"
23 #include "IJob.h"
24 #include "Thread.h"
26 namespace async
29 // queue size management
31 void CJobScheduler::CQueue::Grow (size_t newSize)
33 TJob* newData = new TJob[newSize];
35 size_t count = size();
36 if (first)
37 memmove (newData, first, count * sizeof (TJob[1]));
38 delete[] data;
40 data = newData;
41 end = newData + newSize;
42 first = newData;
43 last = newData + count;
46 void CJobScheduler::CQueue::AutoGrow()
48 size_t count = size();
50 if (count * 2 <= capacity())
52 memmove (data, first, count * sizeof (TJob[1]));
53 first = data;
54 last = data + count;
56 else
58 Grow (capacity() * 2);
62 // remove one entry from \ref starving container.
63 // Return NULL, if container was empty
65 CJobScheduler* CJobScheduler::CThreadPool::SelectStarving()
67 CCriticalSectionLock lock (mutex);
68 if (starving.empty() || pool.empty())
69 return NULL;
71 std::vector<CJobScheduler*>::iterator begin = starving.begin();
73 CJobScheduler* scheduler = *begin;
74 starving.erase (begin);
76 return scheduler;
79 // create empty thread pool
81 CJobScheduler::CThreadPool::CThreadPool()
85 // release threads
87 CJobScheduler::CThreadPool::~CThreadPool()
89 SetThreadCount(0);
92 // Meyer's singleton
94 CJobScheduler::CThreadPool& CJobScheduler::CThreadPool::GetInstance()
96 static CThreadPool instance;
97 return instance;
100 // pool interface
102 CJobScheduler::SThreadInfo* CJobScheduler::CThreadPool::TryAlloc()
104 CCriticalSectionLock lock (mutex);
105 if (pool.empty())
107 if (yetToCreate == 0)
108 return NULL;
110 // lazy thread creation
112 SThreadInfo* info = new SThreadInfo;
113 info->owner = NULL;
114 info->thread = new CThread (&ThreadFunc, info, true);
116 --yetToCreate;
117 ++allocCount;
119 return info;
122 CJobScheduler::SThreadInfo* thread = pool.back();
123 pool.pop_back();
124 ++allocCount;
126 return thread;
129 void CJobScheduler::CThreadPool::Release (SThreadInfo* thread)
132 // put back into pool, unless its capacity has been exceeded
134 CCriticalSectionLock lock (mutex);
135 if (pool.size() + --allocCount < maxCount)
137 pool.push_back (thread);
139 // shortcut
141 if (starving.empty())
142 return;
144 else
146 // pool overflow -> terminate thread
148 delete thread->thread;
149 delete thread;
153 // notify starved schedulers that there may be some threads to alloc
155 for ( CJobScheduler* scheduler = SelectStarving()
156 ; scheduler != NULL
157 ; scheduler = SelectStarving())
159 scheduler->ThreadAvailable();
163 // set max. number of concurrent threads
165 void CJobScheduler::CThreadPool::SetThreadCount (size_t count)
167 CCriticalSectionLock lock (mutex);
169 maxCount = count;
170 if (pool.size() + allocCount < maxCount)
171 yetToCreate = maxCount - pool.size() + allocCount;
173 while ((pool.size() + allocCount > maxCount) && !pool.empty())
175 SThreadInfo* info = pool.back();
176 pool.pop_back();
178 delete info->thread;
179 delete info;
183 size_t CJobScheduler::CThreadPool::GetThreadCount() const
185 return maxCount;
188 // manage starving schedulers
190 void CJobScheduler::CThreadPool::AddStarving (CJobScheduler* scheduler)
192 CCriticalSectionLock lock (mutex);
193 starving.push_back (scheduler);
196 bool CJobScheduler::CThreadPool::RemoveStarving (CJobScheduler* scheduler)
198 CCriticalSectionLock lock (mutex);
200 auto begin = starving.begin();
201 auto end = starving.end();
203 auto newEnd = std::remove_copy (begin, end, begin, scheduler);
204 if (newEnd == end)
205 return false;
207 starving.erase (newEnd, end);
208 return true;
211 // job execution helpers
213 CJobScheduler::TJob CJobScheduler::AssignJob (SThreadInfo* info)
215 CCriticalSectionLock lock (mutex);
217 // wake up threads that waited for some work to finish
219 if (waitingThreads > 0)
220 threadIsIdle.Set();
222 // suspend this thread if there is no work left
224 if (queue.empty() || (threads.stopCount != 0))
226 // suspend
228 info->thread->Suspend();
230 // remove from "running" list and put it back either to
231 // "suspended" pool or global shared pool
233 bool terminateThread = false;
234 for (size_t i = 0, count = threads.running.size(); i < count; ++i)
235 if (threads.running[i] == info)
237 threads.running[i] = threads.running[count-1];
238 threads.running.pop_back();
240 // still in debt of shared pool?
242 if (threads.fromShared > 0)
244 // if we are actually idle, we aren't starving anymore
246 StopStarvation();
248 // put back to global pool
250 info->owner = NULL;
251 CThreadPool::GetInstance().Release (info);
253 --threads.fromShared;
255 else if (aggressiveThreading)
257 // don't keep private idle threads
259 delete info;
260 ++threads.yetToCreate;
261 terminateThread = true;
263 else
266 // add to local pool
268 threads.suspended.push_back (info);
269 ++threads.suspendedCount;
272 // signal empty queue, if necessary
274 ++threads.unusedCount;
275 if (--threads.runningCount == 0)
276 emptyEvent.Set();
278 break;
281 return TJob(static_cast<IJob*>(nullptr), terminateThread);
284 // extract one job
286 return queue.pop();
289 // try to get a thread from the shared pool.
290 // register as "starving" if that failed
292 bool CJobScheduler::AllocateSharedThread()
294 if (!threads.starved)
296 SThreadInfo* info = CThreadPool::GetInstance().TryAlloc();
297 if (info != NULL)
299 ++threads.fromShared;
300 --threads.unusedCount;
301 ++threads.runningCount;
303 threads.running.push_back (info);
304 info->owner = this;
305 info->thread->Resume();
307 return true;
309 else
311 threads.starved = true;
312 CThreadPool::GetInstance().AddStarving (this);
316 return false;
319 bool CJobScheduler::AllocateThread()
321 if (threads.suspendedCount > 0)
323 // recycle suspended, private thread
325 SThreadInfo* info = threads.suspended.back();
326 threads.suspended.pop_back();
328 --threads.suspendedCount;
329 --threads.unusedCount;
330 ++threads.runningCount;
332 threads.running.push_back (info);
333 info->thread->Resume();
335 else if (threads.yetToCreate > 0)
337 // time to start a new private thread
339 --threads.yetToCreate;
341 --threads.unusedCount;
342 ++threads.runningCount;
344 SThreadInfo* info = new SThreadInfo;
345 info->owner = this;
346 info->thread = new CThread (&ThreadFunc, info, true);
347 threads.running.push_back (info);
349 info->thread->Resume();
351 else
353 // try to allocate a shared thread
355 if (threads.fromShared < threads.maxFromShared)
356 return AllocateSharedThread();
358 return false;
361 return true;
364 // unregister from "starving" list.
365 // This may race with CThreadPool::Release -> must loop here.
367 void CJobScheduler::StopStarvation()
369 while (threads.starved)
370 if (CThreadPool::GetInstance().RemoveStarving (this))
372 threads.starved = false;
373 break;
377 // worker thread function
379 bool CJobScheduler::ThreadFunc (void* arg)
381 SThreadInfo* info = reinterpret_cast<SThreadInfo*>(arg);
383 TJob job = info->owner->AssignJob (info);
384 if (job.first != NULL)
386 // run the job
388 job.first->Execute();
390 // it is no longer referenced by the scheduler
392 job.first->OnUnSchedule (info->owner);
394 // is it our responsibility to clean up this job?
396 if (job.second)
397 delete job.first;
399 // continue
401 return false;
403 else
405 // maybe, auto-delete thread object
407 return job.second;
411 // Create & remove threads
413 CJobScheduler::CJobScheduler
414 ( size_t threadCount
415 , size_t sharedThreads
416 , bool aggressiveThreading
417 , bool fifo)
418 : aggressiveThreading(aggressiveThreading)
420 threads.maxFromShared = sharedThreads;
422 threads.unusedCount = threadCount + sharedThreads;
423 threads.yetToCreate = threadCount;
425 queue.set_fifo (fifo);
427 // auto-initialize shared threads
429 if (GetSharedThreadCount() == 0)
430 UseAllCPUs();
433 CJobScheduler::~CJobScheduler()
435 StopStarvation();
436 WaitForEmptyQueue();
438 assert (threads.running.empty());
439 assert (threads.fromShared == 0);
441 for (size_t i = 0, count = threads.suspended.size(); i < count; ++i)
443 SThreadInfo* info = threads.suspended[i];
444 delete info->thread;
445 delete info;
449 // Meyer's singleton
451 CJobScheduler* CJobScheduler::GetDefault()
453 static CJobScheduler instance (0, SIZE_MAX);
454 return &instance;
457 // job management:
458 // add new job
460 void CJobScheduler::Schedule (IJob* job, bool transferOwnership)
462 TJob toAdd (job, transferOwnership);
463 job->OnSchedule (this);
465 CCriticalSectionLock lock (mutex);
467 if (queue.empty())
468 emptyEvent.Reset();
470 queue.push (toAdd);
471 if (threads.stopCount != 0)
472 return;
474 bool addThread = aggressiveThreading
475 || ( (queue.size() > 2 * threads.runningCount)
476 && (threads.unusedCount > 0));
478 if (addThread)
479 AllocateThread();
482 // notification that a new thread may be available
484 void CJobScheduler::ThreadAvailable()
486 CCriticalSectionLock lock (mutex);
487 threads.starved = false;
489 while ( (threads.stopCount != 0)
490 && (queue.size() > 2 * threads.runningCount)
491 && (threads.suspendedCount == 0)
492 && (threads.yetToCreate == 0)
493 && (threads.fromShared < threads.maxFromShared))
495 if (!AllocateSharedThread())
496 return;
500 // wait for all current and follow-up jobs to terminate
502 void CJobScheduler::WaitForEmptyQueue()
504 WaitForEmptyQueueOrTimeout(INFINITE);
507 bool CJobScheduler::WaitForEmptyQueueOrTimeout(DWORD milliSeconds)
509 for (;;)
512 CCriticalSectionLock lock (mutex);
514 // if the scheduler has been stopped, we need to remove
515 // the waiting jobs manually
517 if (threads.stopCount != 0)
518 while (!queue.empty())
520 const TJob& job = queue.pop();
522 job.first->OnUnSchedule(this);
523 if (job.second)
524 delete job.first;
527 // empty queue and no jobs still being processed?
529 if ((threads.runningCount == 0) && queue.empty())
530 return true;
532 // we will be woken up as soon as both containers are empty
534 emptyEvent.Reset();
537 if (!emptyEvent.WaitForEndOrTimeout(milliSeconds))
538 return false;
540 //return true;
543 // wait for some jobs to be finished.
545 void CJobScheduler::WaitForSomeJobs()
548 CCriticalSectionLock lock (mutex);
549 if ( static_cast<size_t>(InterlockedIncrement (&waitingThreads))
550 < threads.runningCount)
552 // there are enough running job threads left
553 // -> wait for one of them to run idle *or*
554 // for too many of them to enter this method
556 threadIsIdle.Reset();
558 else
560 // number of waiting jobs nor equal to or greater than
561 // the number of running job threads
562 // -> wake them all
564 threadIsIdle.Set();
565 InterlockedDecrement (&waitingThreads);
567 return;
571 threadIsIdle.WaitFor();
572 InterlockedDecrement (&waitingThreads);
575 // Returns the number of jobs waiting for execution.
577 size_t CJobScheduler::GetQueueDepth() const
579 CCriticalSectionLock lock (mutex);
580 return queue.size();
583 // Returns the number of threads that currently execute
584 // jobs for this scheduler
586 size_t CJobScheduler::GetRunningThreadCount() const
588 CCriticalSectionLock lock (mutex);
589 return threads.runningCount;
592 // remove waiting entries from the queue until their
593 // number drops to or below the given watermark.
595 std::vector<IJob*> CJobScheduler::RemoveJobFromQueue
596 ( size_t watermark
597 , bool oldest)
599 std::vector<IJob*> removed;
602 CCriticalSectionLock lock (mutex);
603 if (queue.size() > watermark)
605 size_t toRemove = queue.size() - watermark;
606 removed.reserve (toRemove);
608 // temporarily change the queue extraction strategy
609 // such that we remove jobs from the requested end
610 // (fifo -> oldest are at front, otherwise they are
611 // at the back)
613 bool fifo = queue.get_fifo();
614 queue.set_fifo (oldest == fifo);
616 // remove 'em
618 for (size_t i = 0; i < toRemove; ++i)
620 IJob* job = queue.pop().first;
621 job->OnUnSchedule (this);
623 removed.push_back (job);
626 // restore job execution order
628 queue.set_fifo (fifo);
632 return removed;
635 long CJobScheduler::Stop()
637 CCriticalSectionLock lock (mutex);
638 return ++threads.stopCount;
641 long CJobScheduler::Resume()
643 CCriticalSectionLock lock (mutex);
644 assert (threads.stopCount > 0);
646 if (--threads.stopCount == 0)
648 while ( ( (queue.size() > threads.runningCount)
649 && aggressiveThreading)
650 || ( (queue.size() > 4 * threads.runningCount)
651 && (threads.unusedCount > 0)))
653 if (!AllocateThread())
654 break;
658 return threads.stopCount;
663 // set max. number of concurrent threads
665 void CJobScheduler::SetSharedThreadCount (size_t count)
667 CThreadPool::GetInstance().SetThreadCount(count);
670 size_t CJobScheduler::GetSharedThreadCount()
672 return CThreadPool::GetInstance().GetThreadCount();
675 void CJobScheduler::UseAllCPUs()
677 SetSharedThreadCount (GetHWThreadCount());
680 size_t CJobScheduler::GetHWThreadCount()
682 #ifdef _WIN32
683 SYSTEM_INFO si;
684 GetSystemInfo(&si);
686 size_t sysNumProcs = si.dwNumberOfProcessors;
687 #else
688 size_t sysNumProcs = sysconf (_SC_NPROCESSORS_CONF);
689 #endif
691 return sysNumProcs;