Call post push hook even if push wasn't successful
[TortoiseGit.git] / src / AsyncFramework / JobScheduler.h
blob95d82b60f6f9b1e65d549668d4382a54f9e2ad27
1 /***************************************************************************
2 * Copyright (C) 2009-2010 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #pragma once
23 #include "CriticalSection.h"
24 #include "WaitableEvent.h"
26 namespace async
29 // forward declarations
31 class IJob;
32 class CThread;
34 /**
35 * Central job execution management class. It encapsulates
36 * a private worker thread pool and a \ref queue of \ref IJob
37 * instance to be executed.
39 * There may be many instances of this class. One should use
40 * a separate job scheduler for every different kind of resource
41 * such as disk, network or CPU. By that, I/O-heavy jobs can
42 * be executed mainly independently of CPU load, for instance.
44 * Jobs are guarateed to be picked from the queue in strict
45 * order but will retire in an arbitrary order depending on
46 * the time it takes to execute them. The \ref Schedule method
47 * will decide whether more theads schould be activated --
48 * depending on the \ref aggressiveThreadStart property and
49 * the number of entries in the \ref queue.
51 * The worker thread use \ref AssignJob to get the next job
52 * from the queue. If there is no such job (queue is empty),
53 * the thread gets suspended. Have one or more threads been
54 * allocated from the shared pool, the suspended thread will
55 * be moved to that shared pool.
57 * Upon construction, a fixed number (may be 0) of private
58 * worker threads gets created. On top of that, additional
59 * threads may be allocated from a singleton shared thread pool.
60 * This number is specified by the second constructor parameter.
62 * The instance returned by \ref GetDefault() is created with
63 * (0, INT_MAX, false), thus is suitable for short-running
64 * CPU-heavy jobs. Under high load, it will utilize all CPU
65 * resources but not exceed them, i.e. the number of shared
66 * worker threads.
68 * Please note that the job queue must be empty upon destruction.
69 * You may call \ref WaitForEmptyQueue() to achive this.
70 * If you use multiple job schedulers, you may also use that
71 * method to wait efficiently for a whole group of jobs
72 * to be finished.
74 * Another related method is \ref WaitForSomeJobs. It will
75 * effiently wait for at least one job to be finished unless
76 * the job queue is almost empty (so you don't deadlock waiting
77 * for yourself to finish). Call it if some *previous* job must
78 * still provide some information that you are waiting for but
79 * you don't know the particular job instance. Usually you will
80 * need to call this method in a polling loop.
83 class CJobScheduler
85 private:
87 /// access sync
89 mutable CCriticalSection mutex;
91 /// jobs
93 typedef std::pair<IJob*, bool> TJob;
95 /**
96 * Very low-overhead job queue class.
97 * All method calls are amortized O(1) with an
98 * average execution time of <10 ticks.
100 * Methods have been named and modeled similar
101 * to std::queue<>.
103 * The data is stored in a single memory buffer.
104 * Valid data is [\ref first, \ref last) and is
105 * not moved around unless there is an overflow.
106 * Overflow condition is \ref last reaching \ref end,
107 * i.e. there is no space to add another entry.
109 * In that case, all content will either be shifted
110 * to the beginning of the buffer (if less than 50%
111 * are currently being used) or the buffer itself
112 * gets doubled in size.
115 class CQueue
117 private:
119 /// buffer start
121 TJob* data;
123 /// oldest entry, i.e. first to be extracted.
125 TJob* first;
127 /// first entry behind the most recently added one.
129 TJob* last;
131 /// first entry behind the allocated buffer
133 TJob* end;
135 /// if set, \ref pop removes entries at the front (\ref first).
136 /// Otherwise, it will remove them from the back (\ref last).
138 bool fifo;
140 /// size management
142 void Grow (size_t newSize);
143 void AutoGrow();
145 public:
147 /// construction / destruction
149 CQueue()
150 : data (NULL)
151 , first (NULL)
152 , last (NULL)
153 , end (NULL)
154 , fifo (true)
156 Grow (1024);
158 ~CQueue()
160 delete[] data;
163 /// queue I/O
165 void push (const TJob& job)
167 *last = job;
168 if (++last == end)
169 AutoGrow();
171 const TJob& pop()
173 return fifo ? *(first++) : *--last;
176 /// I/O order
178 bool get_fifo() const
180 return fifo;
182 void set_fifo (bool newValue)
184 fifo = newValue;
187 /// size info
189 bool empty() const
191 return first == last;
193 size_t size() const
195 return last - first;
197 size_t capacity() const
199 return end - data;
203 /// queue of jobs not yet assigned to tasks.
205 CQueue queue;
207 /// per-thread info
209 struct SThreadInfo
211 /// Job scheduler currently owning this job.
212 /// \a NULL, if this is a shared thread currently
213 /// not assigned to any scheduler.
215 CJobScheduler* owner;
217 /// the actual thread management object
219 CThread* thread;
223 * Global pool of shared threads. This is a singleton class.
225 * Job schedulers will borrow from this pool via \ref TryAlloc
226 * and release (not necessarily the same) threads via
227 * \ref Release as as they are no longer used.
229 * The number of threads handed out to job scheudulers
230 * (\ref allocCount) plus the number of threads still in the
231 * \ref pool will not exceed \ref maxCount. Surplus threads
232 * may appear if \ref maxCount has been reduced through
233 * \ref SetThreadCount. Such threads will be removed
234 * automatically in \ref Release.
236 * To maximize thread usage, job schedulers may register
237 * as "starving". Those will be notified in a FCFS pattern
238 * as soon as a thread gets returned to the pool. Hence,
239 * it's their chance to allocate additional threads to
240 * execute their jobs quicker.
243 class CThreadPool
245 private:
247 /// list of idle shared threads
249 std::vector<SThreadInfo*> pool;
251 /// number of shared threads currently to job schedulers
252 /// (hence, not in \ref pool at the moment).
254 size_t allocCount;
256 /// maximum number of entries in \ref pool
258 mutable size_t maxCount;
260 /// number of threads that may still be created (lazily)
262 size_t yetToCreate;
264 /// access sync. object
266 CCriticalSection mutex;
268 /// schedulers that would like to have more threads left
270 std::vector<CJobScheduler*> starving;
272 /// no public construction
274 CThreadPool();
276 /// remove one entry from \ref starving container.
277 /// Return NULL, if container was empty
279 CJobScheduler* SelectStarving();
281 /// no copy support
283 CThreadPool(const CThreadPool& rhs);
284 CThreadPool& operator=(const CThreadPool& rhs);
286 public:
288 /// release threads
290 ~CThreadPool();
292 /// Meyer's singleton
294 static CThreadPool& GetInstance();
296 /// pool interface
298 SThreadInfo* TryAlloc();
299 void Release (SThreadInfo* thread);
301 /// set max. number of concurrent threads
303 void SetThreadCount (size_t count);
305 /// get maximum number of shared threads
307 size_t GetThreadCount() const;
309 /// manage starving schedulers
310 /// (must be notified as soon as there is an idle thread)
312 /// entry will be auto-removed upon notification
314 void AddStarving (CJobScheduler* scheduler);
316 /// must be called before destroying the \ref scheduler.
317 /// No-op, if not in \ref starved list.
318 /// \returns true if it was found.
320 bool RemoveStarving (CJobScheduler* scheduler);
324 * Per-scheduler thread pool. It comprises of two sub-pools:
325 * \ref running contains all threads that currenty execute
326 * jobs whereas \ref suspended holds currently unused threads.
328 * If the latter is depleted, up to \ref maxFromShared
329 * threads may be allocated from the shared thread pool
330 * (\ref CThreadPool). As soon as threads are no longer
331 * 'running', return them to the shared thread pool until
332 * \ref fromShared is 0.
334 * Should the global pool be exhausted while we would like
335 * to allocate more threads, register as "starved".
337 * Scheduler-private threads will be created lazily, i.e.
338 * if \ref suspended is empty and \ref yetToCreate is not 0
339 * yet, the latter will be decremented and a new thread is
340 * being started instead of allocating a shared one.
343 struct SThreads
345 std::vector<SThreadInfo*> running;
346 std::vector<SThreadInfo*> suspended;
348 /// for speed, cache size() value from the vectors
350 size_t runningCount;
351 size_t suspendedCount;
353 /// number of private threads that may still be created.
354 /// (used to start threads lazily).
356 size_t yetToCreate;
358 /// how many of the \ref running threads have been allocated
359 /// from \ref CThreadPool. Must be 0, if \ref suspended is
360 /// not empty.
362 size_t fromShared;
364 /// how many of the \ref running threads have been allocated
365 /// from \ref CThreadPool.
367 size_t maxFromShared;
369 /// suspendedCount + maxFromShared - fromShared
370 size_t unusedCount;
372 /// if set, we registered at shared pool as "starved"
373 volatile bool starved;
375 /// if > 0, queued jobs won't get assigned execution threads
376 volatile long stopCount;
379 SThreads threads;
381 /// this will be signalled when the last job is finished
383 CWaitableEvent emptyEvent;
385 /// number of threads in \ref WaitForSomeJobs
387 volatile LONG waitingThreads;
389 /// this will be signalled when a thread gets suspended
391 CWaitableEvent threadIsIdle;
393 /// if this is set, worker threads will be resumed
394 /// unconditionally whenever a new job gets added.
395 /// Also, worker threads will be destroyed
396 /// as soon as the job queue runs low (i.e. threads
397 /// won't be kept around in suspended state).
399 bool aggressiveThreading;
401 /// worker thread function
403 static bool ThreadFunc (void* arg);
405 /// job execution helper
407 TJob AssignJob (SThreadInfo* info);
409 /// try to get a thread from the shared pool.
410 /// register as "starving" if that failed
412 bool AllocateSharedThread();
414 /// try to resume or create a private thread.
415 /// If that fails, try to allocate a shared thread.
417 bool AllocateThread();
419 /// Unregister from "starving" list, if we are registered.
420 /// This is non-trival as it may race with CThreadPool::Release
421 /// trying to notify this instance.
423 void StopStarvation();
425 /// notification that a new thread may be available
426 /// (to be called by CThreadPool only)
428 void ThreadAvailable();
430 friend class CThreadPool;
432 public:
434 /// Create threads
436 CJobScheduler ( size_t threadCount
437 , size_t sharedThreads
438 , bool aggressiveThreading = false
439 , bool fifo = true);
441 /// End threads. Job queue must have run empty before calling this.
443 ~CJobScheduler(void);
445 /// This one will be used for jobs that have not been
446 /// assigned to other job schedulers explicitly.
448 static CJobScheduler* GetDefault();
450 /// Add a new job to the queue. Wake further suspended
451 /// threads as the queue gets larger.
452 /// If \ref transferOwnership is set, the \ref job object
453 /// will be deleted automatically after execution.
455 void Schedule (IJob* job, bool transferOwnership);
457 /// wait for all current and follow-up jobs to terminate
459 void WaitForEmptyQueue();
461 /// wait until either all current and follow-up jobs terminated
462 /// or the specified timeout has passed. Returns false in case
463 /// of a timeout.
464 /// Please note that in cases of high contention, internal
465 /// retries may cause the timeout to elapse more than once.
467 bool WaitForEmptyQueueOrTimeout(DWORD milliSeconds);
469 /// Wait for some jobs to be finished.
470 /// This function may return immediately if there are
471 /// too many threads waiting in this function.
473 void WaitForSomeJobs();
475 /// Returns the number of jobs waiting for execution.
477 size_t GetQueueDepth() const;
479 /// Returns the number of threads that currently execute
480 /// jobs for this scheduler
482 size_t GetRunningThreadCount() const;
484 /// remove waiting entries from the queue until their
485 /// number drops to or below the given watermark.
487 std::vector<IJob*> RemoveJobFromQueue (size_t watermark, bool oldest = true);
489 /// increment the internal stop counter. Until @ref Resume() was called
490 /// just as often, no further jobs will be sent to the processing threads.
491 /// Returns the new value of the stop counter;
493 long Stop();
495 /// decrement the internal stop counter. If it reaches 0, jobs will
496 /// be sent to processing threads again.
497 /// Returns the new value of the stop counter;
499 long Resume();
501 /// access properties of the \ref CThreadPool instances.
503 static void SetSharedThreadCount (size_t count);
504 static size_t GetSharedThreadCount();
506 /// set number of shared threads to the number of
507 /// HW threads (#CPUs x #Core/CPU x #SMT/Core)
509 static void UseAllCPUs();
511 /// \ref returns the number of HW threads.
513 static size_t GetHWThreadCount();