2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
25 #include <boost/range/adaptors.hpp>
26 #include <folly/Memory.h>
28 #include "hphp/util/alloc.h"
29 #include "hphp/util/async-func.h"
30 #include "hphp/util/atomic.h"
31 #include "hphp/util/compatibility.h"
32 #include "hphp/util/exception.h"
33 #include "hphp/util/health-monitor-types.h"
34 #include "hphp/util/lock.h"
35 #include "hphp/util/logger.h"
36 #include "hphp/util/numa.h"
37 #include "hphp/util/synchronizable-multi.h"
38 #include "hphp/util/timer.h"
41 ///////////////////////////////////////////////////////////////////////////////
44 * A queue-based multi-threaded job processing facility. Internally, we have a
45 * queue of jobs and a list of workers, each of which runs in its own thread.
46 * Job queue can take new jobs on the fly and workers will continue to pull
47 * jobs off the queue and work on it.
49 * To use it, simply define your own job and worker class like this,
55 * struct MyWorker : JobQueueWorker<MyJob*> {
56 * virtual void doJob(MyJob *job) {
58 * delete job; // if it was new-ed
62 * Now, use JobQueueDispatcher to start the whole thing,
64 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
67 * dispatcher.enqueue(new MyJob(...));
71 * Note this class is different from JobListDispatcher that uses a vector to
72 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
73 * initially and new jobs are pushed into the queue over time. Also, workers
74 * can be stopped individually.
76 * Job process ordering
77 * ====================
78 * By default, requests are processed in FIFO order.
80 * In addition, we support an option where the request processing order can flip
81 * between FIFO or LIFO based on the length of the queue. This can be enabled by
82 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
83 * to be in FIFO mode, and the current queue length exceeds
84 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
85 * order until the queue size is below the threshold in which case we resume in
86 * FIFO order. Setting the queue to be in LIFO mode initially will have the
87 * opposite behavior. This is useful when we are in a loaded situation and we
88 * want to prioritize the newest requests.
90 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
93 ///////////////////////////////////////////////////////////////////////////////
96 struct NoDropCachePolicy
{ static void dropCache() {} };
99 struct IQueuedJobsReleaser
{
100 virtual ~IQueuedJobsReleaser() { }
101 virtual int32_t numOfJobsToRelease() = 0;
104 struct SimpleReleaser
: IQueuedJobsReleaser
{
105 explicit SimpleReleaser(int32_t rate
)
106 : m_queuedJobsReleaseRate(rate
){}
107 int32_t numOfJobsToRelease() override
{
108 return m_queuedJobsReleaseRate
;
111 int m_queuedJobsReleaseRate
= 3;
115 * A job queue that's suitable for multiple threads to work on.
117 template<typename TJob
,
118 bool waitable
= false,
119 class DropCachePolicy
= detail::NoDropCachePolicy
>
120 struct JobQueue
: SynchronizableMulti
{
121 // trivial class for signaling queue stop
122 struct StopSignal
{};
128 JobQueue(int maxThreadCount
, int dropCacheTimeout
,
129 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
130 int maxJobQueuingMs
= -1, int numPriorities
= 1,
131 int queuedJobsReleaseRate
= 3,
132 IHostHealthObserver
* healthStatus
= nullptr)
133 : SynchronizableMulti(maxThreadCount
+ 1), // reaper added
134 m_jobCount(0), m_stopped(false), m_workerCount(0),
135 m_dropCacheTimeout(dropCacheTimeout
), m_dropStack(dropStack
),
136 m_lifoSwitchThreshold(lifoSwitchThreshold
),
137 m_maxJobQueuingMs(maxJobQueuingMs
),
138 m_jobReaperId(maxThreadCount
), m_healthStatus(healthStatus
),
139 m_queuedJobsReleaser(
140 std::make_shared
<SimpleReleaser
>(queuedJobsReleaseRate
)) {
141 assert(maxThreadCount
> 0);
142 m_jobQueues
.resize(numPriorities
);
146 * Put a job into the queue and notify a worker to pick it up.
148 void enqueue(TJob job
, int priority
=0) {
149 assert(priority
>= 0);
150 assert(priority
< m_jobQueues
.size());
151 timespec enqueueTime
;
152 Timer::GetMonotonicTime(enqueueTime
);
154 m_jobQueues
[priority
].emplace_back(job
, enqueueTime
);
160 * Grab a job from the queue for processing. Since the job was not created
161 * by this queue class, it's up to a worker class on whether to deallocate
162 * the job object correctly.
164 TJob
dequeueMaybeExpired(int id
, int q
, bool inc
, bool* expired
,
165 bool highpri
= false) {
166 if (id
== m_jobReaperId
) {
168 return dequeueOnlyExpiredImpl(id
, q
, inc
);
171 Timer::GetMonotonicTime(now
);
172 return dequeueMaybeExpiredImpl(id
, q
, inc
, now
, expired
, highpri
);
176 * Purely for making sure no new jobs are queued when we are stopping.
181 notifyAll(); // so all waiting threads can find out queue is stopped
185 void signalEmpty() {}
188 * Keeps track of how many active workers are working on the queue.
190 void incActiveWorker() {
193 int decActiveWorker() {
194 return --m_workerCount
;
196 int getActiveWorker() {
197 return m_workerCount
;
201 * Keep track of how many jobs are queued, but not yet been serviced.
203 int getQueuedJobs() {
207 int releaseQueuedJobs() {
208 int toRelease
= m_queuedJobsReleaser
->numOfJobsToRelease();
209 if (toRelease
<= 0) {
215 for (iter
= 0; iter
< toRelease
&& iter
< m_jobCount
; iter
++) {
222 friend class JobQueue_Expiration_Test
;
223 TJob
dequeueMaybeExpiredImpl(int id
, int q
, bool inc
, const timespec
& now
,
224 bool* expired
, bool highPri
= false) {
227 bool flushed
= false;
228 bool ableToDeque
= m_healthStatus
== nullptr ||
229 m_healthStatus
->getHealthLevel() != HealthLevel::BackOff
;
231 while (m_jobCount
== 0 || !ableToDeque
) {
232 uint32_t kNumPriority
= m_jobQueues
.size();
233 if (m_jobQueues
[kNumPriority
- 1].size() > 0) {
234 // we do not block HealthMon requests (with the highest priority)
242 wait(id
, q
, Priority::High
);
244 if (m_dropCacheTimeout
<= 0 || flushed
) {
245 wait(id
, q
, Priority::Low
);
246 } else if (!wait(id
, q
, Priority::Middle
, m_dropCacheTimeout
)) {
247 // since we timed out, maybe we can turn idle without holding memory
248 if (m_jobCount
== 0) {
249 ScopedUnlock
unlock(this);
250 #ifdef USE_JEMALLOC_EXTENT_HOOKS
251 thread_huge_tcache_flush();
253 flush_thread_caches();
254 if (m_dropStack
&& s_stackLimit
) {
255 flush_thread_stack();
257 DropCachePolicy::dropCache();
263 ableToDeque
= m_healthStatus
->getHealthLevel() != HealthLevel::BackOff
;
266 if (inc
) incActiveWorker();
269 // look across all our queues from highest priority to lowest.
270 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
275 // peek at the beginning of the queue to see if the request has already
277 if (m_maxJobQueuingMs
> 0 &&
278 gettime_diff_us(jobs
.front().second
, now
) >
279 m_maxJobQueuingMs
* 1000) {
281 TJob job
= jobs
.front().first
;
286 if (m_jobCount
>= m_lifoSwitchThreshold
) {
287 TJob job
= jobs
.back().first
;
291 TJob job
= jobs
.front().first
;
296 return TJob(); // make compiler happy.
300 * One worker can be designated as the job reaper. The id of the job reaper
301 * equals m_maxThreadCount of the dispatcher. The job reaper checks if the
302 * oldest job on the queue has expired and if so, terminate that job without
303 * processing it. When the job reaper calls dequeueMaybeExpired(), it goes to
304 * dequeueOnlyExpiredImpl(), which only returns the oldest job and only if
305 * it's expired. Otherwise dequeueMaybeExpired() will block until a job
308 TJob
dequeueOnlyExpiredImpl(int id
, int q
, bool inc
) {
309 assert(id
== m_jobReaperId
);
310 assert(m_maxJobQueuingMs
> 0);
313 long waitTimeUs
= m_maxJobQueuingMs
* 1000;
315 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
318 Timer::GetMonotonicTime(now
);
319 int64_t queuedTimeUs
= gettime_diff_us(jobs
.front().second
, now
);
320 if (queuedTimeUs
> m_maxJobQueuingMs
* 1000) {
321 if (inc
) incActiveWorker();
324 TJob job
= jobs
.front().first
;
328 // oldest job hasn't expired yet. wake us up when it will.
329 long waitTimeForQueue
= m_maxJobQueuingMs
* 1000 - queuedTimeUs
;
330 waitTimeUs
= ((waitTimeUs
< waitTimeForQueue
) ?
335 if (wait(id
, q
, Priority::Low
,
336 waitTimeUs
/ 1000000, waitTimeUs
% 1000000)) {
337 // We got woken up by somebody calling notify (as opposed to timeout),
338 // then some work might be on the queue. We only expire things here,
339 // so let's notify somebody else as well.
347 std::vector
<std::deque
<std::pair
<TJob
, timespec
>>> m_jobQueues
;
349 std::atomic
<int> m_workerCount
;
350 const int m_dropCacheTimeout
;
351 const bool m_dropStack
;
352 const int m_lifoSwitchThreshold
;
353 const int m_maxJobQueuingMs
;
354 const int m_jobReaperId
; // equals max worker thread count
355 IHostHealthObserver
* m_healthStatus
; // the dispatcher responsible for this
357 std::shared_ptr
<IQueuedJobsReleaser
> m_queuedJobsReleaser
;
360 template<class TJob
, class Policy
>
361 struct JobQueue
<TJob
,true,Policy
> : JobQueue
<TJob
,false,Policy
> {
362 JobQueue(int threadCount
, int dropCacheTimeout
,
363 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
364 int maxJobQueuingMs
= -1, int numPriorities
= 1,
365 int queuedJobsReleaseRate
= 3,
366 IHostHealthObserver
* healthStatus
= nullptr) :
367 JobQueue
<TJob
,false,Policy
>(threadCount
,
373 queuedJobsReleaseRate
,
375 pthread_cond_init(&m_cond
, nullptr);
377 ~JobQueue() override
{
378 pthread_cond_destroy(&m_cond
);
382 while (this->getActiveWorker() || this->getQueuedJobs()) {
383 pthread_cond_wait(&m_cond
, &this->getMutex().getRaw());
388 return !(this->getActiveWorker() || this->getQueuedJobs());
391 pthread_cond_signal(&m_cond
);
394 pthread_cond_t m_cond
;
397 ///////////////////////////////////////////////////////////////////////////////
400 * Base class for a customized worker.
402 * DropCachePolicy is an extra callback for specific actions to take
403 * when we decide to drop stack/caches.
405 template<typename TJob
,
406 typename TContext
= void*,
407 bool countActive
= false,
408 bool waitable
= false,
409 class Policy
= detail::NoDropCachePolicy
>
410 struct JobQueueWorker
{
411 typedef TJob JobType
;
412 typedef TContext ContextType
;
413 typedef JobQueue
<TJob
, waitable
, Policy
> QueueType
;
414 typedef Policy DropCachePolicy
;
416 static const bool Waitable
= waitable
;
417 static const bool CountActive
= countActive
;
419 * Default constructor.
422 : m_func(nullptr), m_context(), m_stopped(false), m_queue(nullptr) {
425 virtual ~JobQueueWorker() {
429 * Two-phase object creation for easier derivation and for JobQueueDispatcher
430 * to easily create a vector of workers.
432 void create(int id
, QueueType
* queue
, void *func
, ContextType context
) {
441 * The only functions a subclass needs to implement.
443 virtual void doJob(TJob job
) = 0;
444 virtual void abortJob(TJob
/*job*/) {
445 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
447 virtual void onThreadEnter() {}
448 virtual void onThreadExit() {}
451 * Start this worker thread.
456 // bool highPri = (s_firstSlab.first != nullptr);
457 bool highPri
= false;
460 bool expired
= false;
461 TJob job
= m_queue
->dequeueMaybeExpired(m_id
, s_numaNode
, countActive
,
469 if (!m_queue
->decActiveWorker() && waitable
) {
471 if (!m_queue
->getActiveWorker() &&
472 !m_queue
->getQueuedJobs()) {
473 m_queue
->signalEmpty();
477 } catch (const typename
QueueType::StopSignal
&) {
478 m_stopped
= true; // queue is empty and stopped, so we are done
485 * Stop this worker thread.
493 void* m_func
{nullptr};
494 ContextType m_context
;
495 bool m_stopped
{false};
501 ///////////////////////////////////////////////////////////////////////////////
504 * Driver class to push through the whole thing.
506 template<class TWorker
>
507 struct JobQueueDispatcher
: IHostHealthObserver
{
511 JobQueueDispatcher(int maxThreadCount
,
512 int dropCacheTimeout
, bool dropStack
,
513 typename
TWorker::ContextType context
,
514 int lifoSwitchThreshold
= INT_MAX
,
515 int maxJobQueuingMs
= -1, int numPriorities
= 1,
516 int queuedJobsReleaseRate
= 3,
518 int initThreadCount
= -1,
519 int queueToWorkerRatio
= 1) // A worker per 1 queued job.
520 : m_stopped(true), m_healthStatus(HealthLevel::Bold
), m_id(0),
521 m_context(context
), m_maxThreadCount(maxThreadCount
),
522 m_currThreadCountLimit(initThreadCount
),
523 m_hugeThreadCount(hugeCount
),
524 m_startReaperThread(maxJobQueuingMs
> 0),
525 m_queueToWorkerRatio(queueToWorkerRatio
),
526 m_queue(maxThreadCount
, dropCacheTimeout
, dropStack
,
527 lifoSwitchThreshold
, maxJobQueuingMs
, numPriorities
,
528 queuedJobsReleaseRate
, this) {
529 assert(maxThreadCount
>= 1);
530 if (initThreadCount
< 0 || initThreadCount
> maxThreadCount
) {
531 m_currThreadCountLimit
= maxThreadCount
;
533 if (!TWorker::CountActive
) {
534 // If TWorker does not support counting the number of
535 // active workers, just start all of the workers eagerly
536 for (int i
= 0; i
< m_maxThreadCount
; i
++) {
537 addWorkerImpl(false);
542 int32_t dispatcher_id
= 0;
544 ~JobQueueDispatcher() override
{
546 for (auto func
: m_funcs
) delete func
;
547 for (auto worker
: m_workers
) delete worker
;
550 int getActiveWorker() {
551 return m_queue
.getActiveWorker();
554 int getQueuedJobs() {
555 return m_queue
.getQueuedJobs();
558 int getTargetNumWorkers() {
559 if (TWorker::CountActive
) {
560 int target
= getActiveWorker();
561 const auto queued
= getQueuedJobs();
562 const auto r
= m_queueToWorkerRatio
;
563 always_assert(r
>= 1);
565 target
+= (queued
+ r
- 1) / r
; // Round up.
567 target
+= queued
/ r
; // Round down.
569 if (target
> m_currThreadCountLimit
) return m_currThreadCountLimit
;
572 return m_currThreadCountLimit
;
577 * Creates worker threads and start running them. This is non-blocking.
581 m_queue
.setNumGroups(num_numa_nodes());
582 // Spin up more worker threads if appropriate
583 int target
= getTargetNumWorkers();
584 for (int n
= m_workers
.size(); n
< target
; ++n
) {
585 addWorkerImpl(false);
587 for (auto worker
: m_funcs
) {
592 if (m_startReaperThread
) {
600 void enqueue(typename
TWorker::JobType job
, int priority
= 0) {
601 m_queue
.enqueue(job
, priority
);
602 // Spin up another worker thread if appropriate
603 int target
= getTargetNumWorkers();
604 int n
= m_workers
.size();
611 * Add a worker thread on the fly.
621 * Increase the limit on number of workers by n, without exceeding the initial
624 void addWorkers(int n
) {
626 if (m_stopped
) return;
627 int limit
= m_maxThreadCount
- m_currThreadCountLimit
;
629 if (n
> limit
) n
= limit
;
630 m_currThreadCountLimit
+= n
;
631 if (!TWorker::CountActive
) {
632 for (int i
= 0; i
< n
; ++i
) {
636 while (m_workers
.size() < getTargetNumWorkers()) {
642 void getWorkers(std::vector
<TWorker
*> &workers
) {
644 workers
.insert(workers
.end(), m_workers
.begin(), m_workers
.end());
647 void waitEmpty(bool stop
= true) {
648 if (m_stopped
) return;
650 if (stop
) this->stop();
654 if (m_stopped
) return true;
655 return m_queue
.pollEmpty();
659 * Stop all workers after all jobs are processed. No new jobs should be
660 * enqueued at this moment, or this call may block for longer time.
663 // TODO(t5572120): If stop has already been called when the destructor
664 // runs, we'd bail out here and potentially start destroying AsyncFuncs
665 // that are still running.
666 if (m_stopped
) return;
670 bool exceptioned
= false;
674 AsyncFunc
<TWorker
> *func
= nullptr;
677 if (!m_funcs
.empty()) {
678 func
= *m_funcs
.begin();
680 } else if (m_reaperFunc
) {
681 func
= m_reaperFunc
.release();
684 if (func
== nullptr) {
689 } catch (Exception
&e
) {
705 void notifyNewStatus(HealthLevel newStatus
) override
{
706 bool curStopDequeue
= (newStatus
== HealthLevel::BackOff
);
707 if (!curStopDequeue
) {
708 // release blocked requests in queue if any
709 m_queue
.releaseQueuedJobs();
712 m_healthStatus
= newStatus
;
715 HealthLevel
getHealthLevel() override
{
716 return m_healthStatus
;
719 void setHugeThreadCount(int count
) {
720 m_hugeThreadCount
= count
;
725 HealthLevel m_healthStatus
;
727 typename
TWorker::ContextType m_context
;
728 const int m_maxThreadCount
; // not including the possible reaper
729 int m_currThreadCountLimit
; // initial limit can be lower than max
730 int m_hugeThreadCount
{0};
731 const bool m_startReaperThread
;
732 int m_queueToWorkerRatio
{1};
733 JobQueue
<typename
TWorker::JobType
,
735 typename
TWorker::DropCachePolicy
> m_queue
;
738 std::set
<TWorker
*> m_workers
;
739 std::set
<AsyncFunc
<TWorker
> *> m_funcs
;
740 std::unique_ptr
<TWorker
> m_reaper
;
741 std::unique_ptr
<AsyncFunc
<TWorker
>> m_reaperFunc
;
744 m_reaper
= std::make_unique
<TWorker
>();
745 m_reaperFunc
= std::make_unique
<AsyncFunc
<TWorker
>>(m_reaper
.get(),
747 m_reaper
->create(m_maxThreadCount
, &m_queue
, m_reaperFunc
.get(), m_context
);
748 m_reaperFunc
->start();
749 return m_maxThreadCount
;
752 // Cannot be called concurrently (callers should hold m_mutex, or
753 // otherwise ensure that no other threads are calling this).
754 void addWorkerImpl(bool start
) {
755 if (m_workers
.size() >= m_maxThreadCount
) {
756 // another thread raced with us to add a worker.
757 assert(m_workers
.size() == m_maxThreadCount
);
760 TWorker
*worker
= new TWorker();
761 AsyncFunc
<TWorker
> *func
=
762 new AsyncFunc
<TWorker
>(worker
, &TWorker::start
);
763 m_workers
.insert(worker
);
764 m_funcs
.insert(func
);
766 worker
->create(id
, &m_queue
, func
, m_context
);
775 ///////////////////////////////////////////////////////////////////////////////