2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
25 #include <boost/range/adaptors.hpp>
26 #include <folly/Memory.h>
28 #include "hphp/util/alloc.h"
29 #include "hphp/util/async-func.h"
30 #include "hphp/util/atomic.h"
31 #include "hphp/util/compatibility.h"
32 #include "hphp/util/exception.h"
33 #include "hphp/util/health-monitor-types.h"
34 #include "hphp/util/lock.h"
35 #include "hphp/util/logger.h"
36 #include "hphp/util/numa.h"
37 #include "hphp/util/synchronizable-multi.h"
38 #include "hphp/util/timer.h"
41 ///////////////////////////////////////////////////////////////////////////////
44 * A queue-based multi-threaded job processing facility. Internally, we have a
45 * queue of jobs and a list of workers, each of which runs in its own thread.
46 * Job queue can take new jobs on the fly and workers will continue to pull
47 * jobs off the queue and work on it.
49 * To use it, simply define your own job and worker class like this,
55 * struct MyWorker : JobQueueWorker<MyJob*> {
56 * virtual void doJob(MyJob *job) {
58 * delete job; // if it was new-ed
62 * Now, use JobQueueDispatcher to start the whole thing,
64 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
67 * dispatcher.enqueue(new MyJob(...));
71 * Note this class is different from JobListDispatcher that uses a vector to
72 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
73 * initially and new jobs are pushed into the queue over time. Also, workers
74 * can be stopped individually.
76 * Job process ordering
77 * ====================
78 * By default, requests are processed in FIFO order.
80 * In addition, we support an option where the request processing order can flip
81 * between FIFO or LIFO based on the length of the queue. This can be enabled by
82 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
83 * to be in FIFO mode, and the current queue length exceeds
84 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
85 * order until the queue size is below the threshold in which case we resume in
86 * FIFO order. Setting the queue to be in LIFO mode initially will have the
87 * opposite behavior. This is useful when we are in a loaded situation and we
88 * want to prioritize the newest requests.
90 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
93 ///////////////////////////////////////////////////////////////////////////////
96 struct NoDropCachePolicy
{ static void dropCache() {} };
99 struct IQueuedJobsReleaser
{
100 virtual ~IQueuedJobsReleaser() { }
101 virtual int32_t numOfJobsToRelease() = 0;
104 struct SimpleReleaser
: IQueuedJobsReleaser
{
105 explicit SimpleReleaser(int32_t rate
)
106 : m_queuedJobsReleaseRate(rate
){}
107 int32_t numOfJobsToRelease() override
{
108 return m_queuedJobsReleaseRate
;
111 int m_queuedJobsReleaseRate
= 3;
115 * A job queue that's suitable for multiple threads to work on.
117 template<typename TJob
,
118 bool waitable
= false,
119 class DropCachePolicy
= detail::NoDropCachePolicy
>
120 struct JobQueue
: SynchronizableMulti
{
121 // trivial class for signaling queue stop
122 struct StopSignal
{};
128 JobQueue(int maxThreadCount
, int dropCacheTimeout
,
129 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
130 int maxJobQueuingMs
= -1, int numPriorities
= 1,
131 int queuedJobsReleaseRate
= 3,
132 IHostHealthObserver
* healthStatus
= nullptr)
133 : SynchronizableMulti(maxThreadCount
+ 1), // reaper added
134 m_jobCount(0), m_stopped(false), m_workerCount(0),
135 m_dropCacheTimeout(dropCacheTimeout
), m_dropStack(dropStack
),
136 m_lifoSwitchThreshold(lifoSwitchThreshold
),
137 m_maxJobQueuingMs(maxJobQueuingMs
),
138 m_jobReaperId(maxThreadCount
), m_healthStatus(healthStatus
),
139 m_queuedJobsReleaser(
140 std::make_shared
<SimpleReleaser
>(queuedJobsReleaseRate
)) {
141 assert(maxThreadCount
> 0);
142 m_jobQueues
.resize(numPriorities
);
146 * Put a job into the queue and notify a worker to pick it up.
148 void enqueue(TJob job
, int priority
=0) {
149 assert(priority
>= 0);
150 assert(priority
< m_jobQueues
.size());
151 timespec enqueueTime
;
152 Timer::GetMonotonicTime(enqueueTime
);
154 m_jobQueues
[priority
].emplace_back(job
, enqueueTime
);
160 * Grab a job from the queue for processing. Since the job was not created
161 * by this queue class, it's up to a worker class on whether to deallocate
162 * the job object correctly.
164 TJob
dequeueMaybeExpired(int id
, int q
, bool inc
, bool* expired
,
165 bool highpri
= false) {
166 if (id
== m_jobReaperId
) {
168 return dequeueOnlyExpiredImpl(id
, q
, inc
);
171 Timer::GetMonotonicTime(now
);
172 return dequeueMaybeExpiredImpl(id
, q
, inc
, now
, expired
, highpri
);
176 * Purely for making sure no new jobs are queued when we are stopping.
181 notifyAll(); // so all waiting threads can find out queue is stopped
185 void signalEmpty() {}
188 * Keeps track of how many active workers are working on the queue.
190 void incActiveWorker() {
193 int decActiveWorker() {
194 return --m_workerCount
;
196 int getActiveWorker() {
197 return m_workerCount
;
201 * Keep track of how many jobs are queued, but not yet been serviced.
203 int getQueuedJobs() {
207 int releaseQueuedJobs() {
208 int toRelease
= m_queuedJobsReleaser
->numOfJobsToRelease();
209 if (toRelease
<= 0) {
215 for (iter
= 0; iter
< toRelease
&& iter
< m_jobCount
; iter
++) {
222 friend class JobQueue_Expiration_Test
;
223 TJob
dequeueMaybeExpiredImpl(int id
, int q
, bool inc
, const timespec
& now
,
224 bool* expired
, bool highPri
= false) {
227 bool flushed
= false;
228 bool ableToDeque
= m_healthStatus
== nullptr ||
229 m_healthStatus
->getHealthLevel() != HealthLevel::BackOff
;
231 while (m_jobCount
== 0 || !ableToDeque
) {
232 uint32_t kNumPriority
= m_jobQueues
.size();
233 if (m_jobQueues
[kNumPriority
- 1].size() > 0) {
234 // we do not block HealthMon requests (with the highest priority)
243 // Flushed worker threads gets lower priority. But a flushed worker
244 // with huge stack is still more preferable than a non-flushed worker
245 // without huge stack.
246 wait(id
, q
, highPri
? Priority::High
: Priority::Low
);
247 } else if (m_dropCacheTimeout
> 0) {
248 if (!wait(id
, q
, (highPri
? Priority::Highest
: Priority::Normal
),
249 m_dropCacheTimeout
)) {
250 // since we timed out, maybe we can turn idle without holding memory
251 if (m_jobCount
== 0) {
252 ScopedUnlock
unlock(this);
253 flush_thread_caches();
254 if (m_dropStack
&& s_stackLimit
) {
255 flush_thread_stack();
257 DropCachePolicy::dropCache();
262 // m_dropCacheTimeout <= 0, a thread that starts waiting more recently
263 // should be given a task first (LIFO), same as unflushed threads.
264 wait(id
, q
, highPri
? Priority::Highest
: Priority::Normal
);
268 ableToDeque
= m_healthStatus
->getHealthLevel() != HealthLevel::BackOff
;
271 if (inc
) incActiveWorker();
274 // look across all our queues from highest priority to lowest.
275 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
280 // peek at the beginning of the queue to see if the request has already
282 if (m_maxJobQueuingMs
> 0 &&
283 gettime_diff_us(jobs
.front().second
, now
) >
284 m_maxJobQueuingMs
* 1000) {
286 TJob job
= jobs
.front().first
;
291 if (m_jobCount
>= m_lifoSwitchThreshold
) {
292 TJob job
= jobs
.back().first
;
296 TJob job
= jobs
.front().first
;
301 return TJob(); // make compiler happy.
305 * One worker can be designated as the job reaper. The id of the job reaper
306 * equals m_maxThreadCount of the dispatcher. The job reaper checks if the
307 * oldest job on the queue has expired and if so, terminate that job without
308 * processing it. When the job reaper calls dequeueMaybeExpired(), it goes to
309 * dequeueOnlyExpiredImpl(), which only returns the oldest job and only if
310 * it's expired. Otherwise dequeueMaybeExpired() will block until a job
313 TJob
dequeueOnlyExpiredImpl(int id
, int q
, bool inc
) {
314 assert(id
== m_jobReaperId
);
315 assert(m_maxJobQueuingMs
> 0);
318 long waitTimeUs
= m_maxJobQueuingMs
* 1000;
320 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
323 Timer::GetMonotonicTime(now
);
324 int64_t queuedTimeUs
= gettime_diff_us(jobs
.front().second
, now
);
325 if (queuedTimeUs
> m_maxJobQueuingMs
* 1000) {
326 if (inc
) incActiveWorker();
329 TJob job
= jobs
.front().first
;
333 // oldest job hasn't expired yet. wake us up when it will.
334 long waitTimeForQueue
= m_maxJobQueuingMs
* 1000 - queuedTimeUs
;
335 waitTimeUs
= ((waitTimeUs
< waitTimeForQueue
) ?
340 if (wait(id
, q
, Priority::Low
,
341 waitTimeUs
/ 1000000, waitTimeUs
% 1000000)) {
342 // We got woken up by somebody calling notify (as opposed to timeout),
343 // then some work might be on the queue. We only expire things here,
344 // so let's notify somebody else as well.
352 std::vector
<std::deque
<std::pair
<TJob
, timespec
>>> m_jobQueues
;
354 std::atomic
<int> m_workerCount
;
355 const int m_dropCacheTimeout
;
356 const bool m_dropStack
;
357 const int m_lifoSwitchThreshold
;
358 const int m_maxJobQueuingMs
;
359 const int m_jobReaperId
; // equals max worker thread count
360 IHostHealthObserver
* m_healthStatus
; // the dispatcher responsible for this
362 std::shared_ptr
<IQueuedJobsReleaser
> m_queuedJobsReleaser
;
365 template<class TJob
, class Policy
>
366 struct JobQueue
<TJob
,true,Policy
> : JobQueue
<TJob
,false,Policy
> {
367 JobQueue(int threadCount
, int dropCacheTimeout
,
368 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
369 int maxJobQueuingMs
= -1, int numPriorities
= 1,
370 int queuedJobsReleaseRate
= 3,
371 IHostHealthObserver
* healthStatus
= nullptr) :
372 JobQueue
<TJob
,false,Policy
>(threadCount
,
378 queuedJobsReleaseRate
,
380 pthread_cond_init(&m_cond
, nullptr);
382 ~JobQueue() override
{
383 pthread_cond_destroy(&m_cond
);
387 while (this->getActiveWorker() || this->getQueuedJobs()) {
388 pthread_cond_wait(&m_cond
, &this->getMutex().getRaw());
393 return !(this->getActiveWorker() || this->getQueuedJobs());
396 pthread_cond_signal(&m_cond
);
399 pthread_cond_t m_cond
;
402 ///////////////////////////////////////////////////////////////////////////////
405 * Base class for a customized worker.
407 * DropCachePolicy is an extra callback for specific actions to take
408 * when we decide to drop stack/caches.
410 template<typename TJob
,
411 typename TContext
= void*,
412 bool countActive
= false,
413 bool waitable
= false,
414 class Policy
= detail::NoDropCachePolicy
>
415 struct JobQueueWorker
{
416 typedef TJob JobType
;
417 typedef TContext ContextType
;
418 typedef JobQueue
<TJob
, waitable
, Policy
> QueueType
;
419 typedef Policy DropCachePolicy
;
421 static const bool Waitable
= waitable
;
422 static const bool CountActive
= countActive
;
424 * Default constructor.
427 : m_func(nullptr), m_context(), m_stopped(false), m_queue(nullptr) {
430 virtual ~JobQueueWorker() {
434 * Two-phase object creation for easier derivation and for JobQueueDispatcher
435 * to easily create a vector of workers.
437 void create(int id
, QueueType
* queue
, void *func
, ContextType context
) {
446 * The only functions a subclass needs to implement.
448 virtual void doJob(TJob job
) = 0;
449 virtual void abortJob(TJob
/*job*/) {
450 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
452 virtual void onThreadEnter() {}
453 virtual void onThreadExit() {}
456 * Start this worker thread.
461 bool highPri
= (s_firstSlab
.ptr
!= nullptr);
464 bool expired
= false;
465 TJob job
= m_queue
->dequeueMaybeExpired(m_id
, s_numaNode
, countActive
,
473 if (!m_queue
->decActiveWorker() && waitable
) {
475 if (!m_queue
->getActiveWorker() &&
476 !m_queue
->getQueuedJobs()) {
477 m_queue
->signalEmpty();
481 } catch (const typename
QueueType::StopSignal
&) {
482 m_stopped
= true; // queue is empty and stopped, so we are done
489 * Stop this worker thread.
497 void* m_func
{nullptr};
498 ContextType m_context
;
499 bool m_stopped
{false};
505 ///////////////////////////////////////////////////////////////////////////////
508 * Driver class to push through the whole thing.
510 template<class TWorker
>
511 struct JobQueueDispatcher
: IHostHealthObserver
{
515 JobQueueDispatcher(int maxThreadCount
,
516 int dropCacheTimeout
, bool dropStack
,
517 typename
TWorker::ContextType context
,
518 int lifoSwitchThreshold
= INT_MAX
,
519 int maxJobQueuingMs
= -1, int numPriorities
= 1,
520 int queuedJobsReleaseRate
= 3,
522 int initThreadCount
= -1,
523 int queueToWorkerRatio
= 1) // A worker per 1 queued job.
524 : m_stopped(true), m_healthStatus(HealthLevel::Bold
), m_id(0),
525 m_context(context
), m_maxThreadCount(maxThreadCount
),
526 m_currThreadCountLimit(initThreadCount
),
527 m_hugeThreadCount(hugeCount
),
528 m_startReaperThread(maxJobQueuingMs
> 0),
529 m_queueToWorkerRatio(queueToWorkerRatio
),
530 m_queue(maxThreadCount
, dropCacheTimeout
, dropStack
,
531 lifoSwitchThreshold
, maxJobQueuingMs
, numPriorities
,
532 queuedJobsReleaseRate
, this) {
533 assert(maxThreadCount
>= 1);
534 if (initThreadCount
< 0 || initThreadCount
> maxThreadCount
) {
535 m_currThreadCountLimit
= maxThreadCount
;
537 if (!TWorker::CountActive
) {
538 // If TWorker does not support counting the number of
539 // active workers, just start all of the workers eagerly
540 for (int i
= 0; i
< m_maxThreadCount
; i
++) {
541 addWorkerImpl(false);
546 int32_t dispatcher_id
= 0;
548 ~JobQueueDispatcher() override
{
550 for (auto func
: m_funcs
) delete func
;
551 for (auto worker
: m_workers
) delete worker
;
554 int getActiveWorker() {
555 return m_queue
.getActiveWorker();
558 int getQueuedJobs() {
559 return m_queue
.getQueuedJobs();
562 int getTargetNumWorkers() {
563 if (TWorker::CountActive
) {
564 int target
= getActiveWorker();
565 const auto queued
= getQueuedJobs();
566 const auto r
= m_queueToWorkerRatio
;
567 always_assert(r
>= 1);
569 target
+= (queued
+ r
- 1) / r
; // Round up.
571 target
+= queued
/ r
; // Round down.
573 if (target
> m_currThreadCountLimit
) return m_currThreadCountLimit
;
576 return m_currThreadCountLimit
;
581 * Creates worker threads and start running them. This is non-blocking.
585 m_queue
.setNumGroups(num_numa_nodes());
586 // Spin up more worker threads if appropriate
587 int target
= getTargetNumWorkers();
588 for (int n
= m_workers
.size(); n
< target
; ++n
) {
589 addWorkerImpl(false);
591 for (auto worker
: m_funcs
) {
596 if (m_startReaperThread
) {
604 void enqueue(typename
TWorker::JobType job
, int priority
= 0) {
605 m_queue
.enqueue(job
, priority
);
606 // Spin up another worker thread if appropriate
607 int target
= getTargetNumWorkers();
608 int n
= m_workers
.size();
615 * Add a worker thread on the fly.
625 * Increase the limit on number of workers by n, without exceeding the initial
628 void addWorkers(int n
) {
630 if (m_stopped
) return;
631 int limit
= m_maxThreadCount
- m_currThreadCountLimit
;
633 if (n
> limit
) n
= limit
;
634 m_currThreadCountLimit
+= n
;
635 if (!TWorker::CountActive
) {
636 for (int i
= 0; i
< n
; ++i
) {
640 while (m_workers
.size() < getTargetNumWorkers()) {
646 void getWorkers(std::vector
<TWorker
*> &workers
) {
648 workers
.insert(workers
.end(), m_workers
.begin(), m_workers
.end());
651 void waitEmpty(bool stop
= true) {
652 if (m_stopped
) return;
654 if (stop
) this->stop();
658 if (m_stopped
) return true;
659 return m_queue
.pollEmpty();
663 * Stop all workers after all jobs are processed. No new jobs should be
664 * enqueued at this moment, or this call may block for longer time.
667 // TODO(t5572120): If stop has already been called when the destructor
668 // runs, we'd bail out here and potentially start destroying AsyncFuncs
669 // that are still running.
670 if (m_stopped
) return;
674 bool exceptioned
= false;
678 AsyncFunc
<TWorker
> *func
= nullptr;
681 if (!m_funcs
.empty()) {
682 func
= *m_funcs
.begin();
684 } else if (m_reaperFunc
) {
685 func
= m_reaperFunc
.release();
688 if (func
== nullptr) {
693 } catch (Exception
&e
) {
709 void notifyNewStatus(HealthLevel newStatus
) override
{
710 bool curStopDequeue
= (newStatus
== HealthLevel::BackOff
);
711 if (!curStopDequeue
) {
712 // release blocked requests in queue if any
713 m_queue
.releaseQueuedJobs();
716 m_healthStatus
= newStatus
;
719 HealthLevel
getHealthLevel() override
{
720 return m_healthStatus
;
723 void setHugeThreadCount(int count
) {
724 m_hugeThreadCount
= count
;
729 HealthLevel m_healthStatus
;
731 typename
TWorker::ContextType m_context
;
732 const int m_maxThreadCount
; // not including the possible reaper
733 int m_currThreadCountLimit
; // initial limit can be lower than max
734 int m_hugeThreadCount
{0};
735 const bool m_startReaperThread
;
736 int m_queueToWorkerRatio
{1};
737 JobQueue
<typename
TWorker::JobType
,
739 typename
TWorker::DropCachePolicy
> m_queue
;
742 std::set
<TWorker
*> m_workers
;
743 std::set
<AsyncFunc
<TWorker
> *> m_funcs
;
744 std::unique_ptr
<TWorker
> m_reaper
;
745 std::unique_ptr
<AsyncFunc
<TWorker
>> m_reaperFunc
;
748 m_reaper
= std::make_unique
<TWorker
>();
749 m_reaperFunc
= std::make_unique
<AsyncFunc
<TWorker
>>(m_reaper
.get(),
751 m_reaper
->create(m_maxThreadCount
, &m_queue
, m_reaperFunc
.get(), m_context
);
752 m_reaperFunc
->start();
753 return m_maxThreadCount
;
756 // Cannot be called concurrently (callers should hold m_mutex, or
757 // otherwise ensure that no other threads are calling this).
758 void addWorkerImpl(bool start
) {
759 if (m_workers
.size() >= m_maxThreadCount
) {
760 // another thread raced with us to add a worker.
761 assert(m_workers
.size() == m_maxThreadCount
);
764 TWorker
*worker
= new TWorker();
765 bool huge
= m_workers
.size() < m_hugeThreadCount
;
766 AsyncFunc
<TWorker
> *func
=
767 new AsyncFunc
<TWorker
>(worker
, &TWorker::start
, huge
);
768 m_workers
.insert(worker
);
769 m_funcs
.insert(func
);
771 worker
->create(id
, &m_queue
, func
, m_context
);
780 ///////////////////////////////////////////////////////////////////////////////