1 /***************************************************************************
2 * Copyright (C) 2009-2010 by Stefan Fuhrmann *
3 * stefanfuhrmann@alice-dsl.de *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
23 #include "CriticalSection.h"
24 #include "WaitableEvent.h"
29 // forward declarations
35 * Central job execution management class. It encapsulates
36 * a private worker thread pool and a \ref queue of \ref IJob
37 * instance to be executed.
39 * There may be many instances of this class. One should use
40 * a separate job scheduler for every different kind of resource
41 * such as disk, network or CPU. By that, I/O-heavy jobs can
42 * be executed mainly independently of CPU load, for instance.
44 * Jobs are guarateed to be picked from the queue in strict
45 * order but will retire in an arbitrary order depending on
46 * the time it takes to execute them. The \ref Schedule method
47 * will decide whether more theads schould be activated --
48 * depending on the \ref aggressiveThreadStart property and
49 * the number of entries in the \ref queue.
51 * The worker thread use \ref AssignJob to get the next job
52 * from the queue. If there is no such job (queue is empty),
53 * the thread gets suspended. Have one or more threads been
54 * allocated from the shared pool, the suspended thread will
55 * be moved to that shared pool.
57 * Upon construction, a fixed number (may be 0) of private
58 * worker threads gets created. On top of that, additional
59 * threads may be allocated from a singleton shared thread pool.
60 * This number is specified by the second constructor parameter.
62 * The instance returned by \ref GetDefault() is created with
63 * (0, INT_MAX, false), thus is suitable for short-running
64 * CPU-heavy jobs. Under high load, it will utilize all CPU
65 * resources but not exceed them, i.e. the number of shared
68 * Please note that the job queue must be empty upon destruction.
69 * You may call \ref WaitForEmptyQueue() to achive this.
70 * If you use multiple job schedulers, you may also use that
71 * method to wait efficiently for a whole group of jobs
74 * Another related method is \ref WaitForSomeJobs. It will
75 * effiently wait for at least one job to be finished unless
76 * the job queue is almost empty (so you don't deadlock waiting
77 * for yourself to finish). Call it if some *previous* job must
78 * still provide some information that you are waiting for but
79 * you don't know the particular job instance. Usually you will
80 * need to call this method in a polling loop.
89 mutable CCriticalSection mutex
;
93 typedef std::pair
<IJob
*, bool> TJob
;
96 * Very low-overhead job queue class.
97 * All method calls are amortized O(1) with an
98 * average execution time of <10 ticks.
100 * Methods have been named and modeled similar
103 * The data is stored in a single memory buffer.
104 * Valid data is [\ref first, \ref last) and is
105 * not moved around unless there is an overflow.
106 * Overflow condition is \ref last reaching \ref end,
107 * i.e. there is no space to add another entry.
109 * In that case, all content will either be shifted
110 * to the beginning of the buffer (if less than 50%
111 * are currently being used) or the buffer itself
112 * gets doubled in size.
123 /// oldest entry, i.e. first to be extracted.
127 /// first entry behind the most recently added one.
131 /// first entry behind the allocated buffer
135 /// if set, \ref pop removes entries at the front (\ref first).
136 /// Otherwise, it will remove them from the back (\ref last).
142 void Grow (size_t newSize
);
147 /// construction / destruction
165 void push (const TJob
& job
)
173 return fifo
? *(first
++) : *--last
;
178 bool get_fifo() const
182 void set_fifo (bool newValue
)
191 return first
== last
;
197 size_t capacity() const
203 /// queue of jobs not yet assigned to tasks.
211 /// Job scheduler currently owning this job.
212 /// \a NULL, if this is a shared thread currently
213 /// not assigned to any scheduler.
215 CJobScheduler
* owner
;
217 /// the actual thread management object
223 * Global pool of shared threads. This is a singleton class.
225 * Job schedulers will borrow from this pool via \ref TryAlloc
226 * and release (not necessarily the same) threads via
227 * \ref Release as as they are no longer used.
229 * The number of threads handed out to job scheudulers
230 * (\ref allocCount) plus the number of threads still in the
231 * \ref pool will not exceed \ref maxCount. Surplus threads
232 * may appear if \ref maxCount has been reduced through
233 * \ref SetThreadCount. Such threads will be removed
234 * automatically in \ref Release.
236 * To maximize thread usage, job schedulers may register
237 * as "starving". Those will be notified in a FCFS pattern
238 * as soon as a thread gets returned to the pool. Hence,
239 * it's their chance to allocate additional threads to
240 * execute their jobs quicker.
247 /// list of idle shared threads
249 std::vector
<SThreadInfo
*> pool
;
251 /// number of shared threads currently to job schedulers
252 /// (hence, not in \ref pool at the moment).
256 /// maximum number of entries in \ref pool
258 mutable size_t maxCount
;
260 /// number of threads that may still be created (lazily)
264 /// access sync. object
266 CCriticalSection mutex
;
268 /// schedulers that would like to have more threads left
270 std::vector
<CJobScheduler
*> starving
;
272 /// no public construction
276 /// remove one entry from \ref starving container.
277 /// Return NULL, if container was empty
279 CJobScheduler
* SelectStarving();
283 CThreadPool(const CThreadPool
& rhs
);
284 CThreadPool
& operator=(const CThreadPool
& rhs
);
292 /// Meyer's singleton
294 static CThreadPool
& GetInstance();
298 SThreadInfo
* TryAlloc();
299 void Release (SThreadInfo
* thread
);
301 /// set max. number of concurrent threads
303 void SetThreadCount (size_t count
);
305 /// get maximum number of shared threads
307 size_t GetThreadCount() const;
309 /// manage starving schedulers
310 /// (must be notified as soon as there is an idle thread)
312 /// entry will be auto-removed upon notification
314 void AddStarving (CJobScheduler
* scheduler
);
316 /// must be called before destroying the \ref scheduler.
317 /// No-op, if not in \ref starved list.
318 /// \returns true if it was found.
320 bool RemoveStarving (CJobScheduler
* scheduler
);
324 * Per-scheduler thread pool. It comprises of two sub-pools:
325 * \ref running contains all threads that currenty execute
326 * jobs whereas \ref suspended holds currently unused threads.
328 * If the latter is depleted, up to \ref maxFromShared
329 * threads may be allocated from the shared thread pool
330 * (\ref CThreadPool). As soon as threads are no longer
331 * 'running', return them to the shared thread pool until
332 * \ref fromShared is 0.
334 * Should the global pool be exhausted while we would like
335 * to allocate more threads, register as "starved".
337 * Scheduler-private threads will be created lazily, i.e.
338 * if \ref suspended is empty and \ref yetToCreate is not 0
339 * yet, the latter will be decremented and a new thread is
340 * being started instead of allocating a shared one.
345 std::vector
<SThreadInfo
*> running
;
346 std::vector
<SThreadInfo
*> suspended
;
348 /// for speed, cache size() value from the vectors
351 size_t suspendedCount
;
353 /// number of private threads that may still be created.
354 /// (used to start threads lazily).
358 /// how many of the \ref running threads have been allocated
359 /// from \ref CThreadPool. Must be 0, if \ref suspended is
364 /// how many of the \ref running threads have been allocated
365 /// from \ref CThreadPool.
367 size_t maxFromShared
;
369 /// suspendedCount + maxFromShared - fromShared
372 /// if set, we registered at shared pool as "starved"
373 volatile bool starved
;
375 /// if > 0, queued jobs won't get assigned execution threads
376 volatile long stopCount
;
381 /// this will be signalled when the last job is finished
383 CWaitableEvent emptyEvent
;
385 /// number of threads in \ref WaitForSomeJobs
387 volatile LONG waitingThreads
;
389 /// this will be signalled when a thread gets suspended
391 CWaitableEvent threadIsIdle
;
393 /// if this is set, worker threads will be resumed
394 /// unconditionally whenever a new job gets added.
395 /// Also, worker threads will be destroyed
396 /// as soon as the job queue runs low (i.e. threads
397 /// won't be kept around in suspended state).
399 bool aggressiveThreading
;
401 /// worker thread function
403 static bool ThreadFunc (void* arg
);
405 /// job execution helper
407 TJob
AssignJob (SThreadInfo
* info
);
409 /// try to get a thread from the shared pool.
410 /// register as "starving" if that failed
412 bool AllocateSharedThread();
414 /// try to resume or create a private thread.
415 /// If that fails, try to allocate a shared thread.
417 bool AllocateThread();
419 /// Unregister from "starving" list, if we are registered.
420 /// This is non-trival as it may race with CThreadPool::Release
421 /// trying to notify this instance.
423 void StopStarvation();
425 /// notification that a new thread may be available
426 /// (to be called by CThreadPool only)
428 void ThreadAvailable();
430 friend class CThreadPool
;
436 CJobScheduler ( size_t threadCount
437 , size_t sharedThreads
438 , bool aggressiveThreading
= false
441 /// End threads. Job queue must have run empty before calling this.
443 ~CJobScheduler(void);
445 /// This one will be used for jobs that have not been
446 /// assigned to other job schedulers explicitly.
448 static CJobScheduler
* GetDefault();
450 /// Add a new job to the queue. Wake further suspended
451 /// threads as the queue gets larger.
452 /// If \ref transferOwnership is set, the \ref job object
453 /// will be deleted automatically after execution.
455 void Schedule (IJob
* job
, bool transferOwnership
);
457 /// wait for all current and follow-up jobs to terminate
459 void WaitForEmptyQueue();
461 /// wait until either all current and follow-up jobs terminated
462 /// or the specified timeout has passed. Returns false in case
464 /// Please note that in cases of high contention, internal
465 /// retries may cause the timeout to elapse more than once.
467 bool WaitForEmptyQueueOrTimeout(DWORD milliSeconds
);
469 /// Wait for some jobs to be finished.
470 /// This function may return immediately if there are
471 /// too many threads waiting in this function.
473 void WaitForSomeJobs();
475 /// Returns the number of jobs waiting for execution.
477 size_t GetQueueDepth() const;
479 /// Returns the number of threads that currently execute
480 /// jobs for this scheduler
482 size_t GetRunningThreadCount() const;
484 /// remove waiting entries from the queue until their
485 /// number drops to or below the given watermark.
487 std::vector
<IJob
*> RemoveJobFromQueue (size_t watermark
, bool oldest
= true);
489 /// increment the internal stop counter. Until @ref Resume() was called
490 /// just as often, no further jobs will be sent to the processing threads.
491 /// Returns the new value of the stop counter;
495 /// decrement the internal stop counter. If it reaches 0, jobs will
496 /// be sent to processing threads again.
497 /// Returns the new value of the stop counter;
501 /// access properties of the \ref CThreadPool instances.
503 static void SetSharedThreadCount (size_t count
);
504 static size_t GetSharedThreadCount();
506 /// set number of shared threads to the number of
507 /// HW threads (#CPUs x #Core/CPU x #SMT/Core)
509 static void UseAllCPUs();
511 /// \ref returns the number of HW threads.
513 static size_t GetHWThreadCount();