2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
23 #include "hphp/util/alloc.h"
24 #include <boost/range/adaptors.hpp>
25 #include "hphp/util/async_func.h"
26 #include "hphp/util/atomic.h"
27 #include "hphp/util/compatibility.h"
28 #include "hphp/util/exception.h"
29 #include "hphp/util/lock.h"
30 #include "hphp/util/logger.h"
31 #include "hphp/util/synchronizable_multi.h"
32 #include "hphp/util/timer.h"
35 ///////////////////////////////////////////////////////////////////////////////
38 * A queue-based multi-threaded job processing facility. Internally, we have a
39 * queue of jobs and a list of workers, each of which runs in its own thread.
40 * Job queue can take new jobs on the fly and workers will continue to pull
41 * jobs off the queue and work on it.
43 * To use it, simply define your own job and worker class like this,
50 * class MyWorker : public JobQueueWorker<MyJob*> {
52 * virtual void doJob(MyJob *job) {
54 * delete job; // if it was new-ed
58 * Now, use JobQueueDispatcher to start the whole thing,
60 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
63 * dispatcher.enqueue(new MyJob(...));
67 * Note this class is different from JobListDispatcher that uses a vector to
68 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
69 * initially and new jobs are pushed into the queue over time. Also, workers
70 * can be stopped individually.
72 * Job process ordering
73 * ====================
74 * By default, requests are processed in FIFO order.
76 * In addition, we support an option where the request processing order can flip
77 * between FIFO or LIFO based on the length of the queue. This can be enabled by
78 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
79 * to be in FIFO mode, and the current queue length exceeds
80 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
81 * order until the queue size is below the threshold in which case we resume in
82 * FIFO order. Setting the queue to be in LIFO mode initially will have the
83 * opposite behavior. This is useful when we are in a loaded situation and we
84 * want to prioritize the newest requests.
86 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
89 ///////////////////////////////////////////////////////////////////////////////
92 struct NoDropCachePolicy
{ static void dropCache() {} };
96 * A job queue that's suitable for multiple threads to work on.
98 template<typename TJob
,
99 bool waitable
= false,
100 class DropCachePolicy
= detail::NoDropCachePolicy
>
101 class JobQueue
: public SynchronizableMulti
{
103 // trivial class for signaling queue stop
110 JobQueue(int threadCount
, bool threadRoundRobin
, int dropCacheTimeout
,
111 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
112 int maxJobQueuingMs
=-1, int numPriorities
=1)
113 : SynchronizableMulti(threadRoundRobin
? 1 : threadCount
),
114 m_jobCount(0), m_stopped(false), m_workerCount(0),
115 m_dropCacheTimeout(dropCacheTimeout
), m_dropStack(dropStack
),
116 m_lifoSwitchThreshold(lifoSwitchThreshold
),
117 m_maxJobQueuingMs(maxJobQueuingMs
),
119 m_jobQueues
.resize(numPriorities
);
123 * Put a job into the queue and notify a worker to pick it up.
125 void enqueue(TJob job
, int priority
=0) {
126 assert(priority
>= 0);
127 assert(priority
< m_jobQueues
.size());
128 timespec enqueueTime
;
129 Timer::GetMonotonicTime(enqueueTime
);
131 m_jobQueues
[priority
].emplace_back(job
, enqueueTime
);
137 * Grab a job from the queue for processing. Since the job was not created
138 * by this queue class, it's up to a worker class on whether to deallocate
139 * the job object correctly.
141 TJob
dequeueMaybeExpired(int id
, bool inc
, bool* expired
) {
142 if (id
== m_jobReaperId
.load()) {
144 return dequeueOnlyExpiredImpl(id
, inc
);
147 Timer::GetMonotonicTime(now
);
148 return dequeueMaybeExpiredImpl(id
, inc
, now
, expired
);
152 * Purely for making sure no new jobs are queued when we are stopping.
157 notifyAll(); // so all waiting threads can find out queue is stopped
161 void signalEmpty() {}
164 * Keeps track of how many active workers are working on the queue.
166 void incActiveWorker() {
167 atomic_inc(m_workerCount
);
169 int decActiveWorker() {
170 return atomic_dec(m_workerCount
);
172 int getActiveWorker() {
173 return m_workerCount
;
177 * Keep track of how many jobs are queued, but not yet been serviced.
179 int getQueuedJobs() {
184 * One worker can be designated as the job reaper. The job reaper's job is to
185 * check if the oldest job on the queue has expired and if so, terminate that
186 * job without processing it. When the job reaper work calls
187 * dequeueMaybeExpired(), it'll only return the oldest job and only if it's
188 * expired. Otherwise dequeueMaybeExpired() will block until a job expires.
190 void setJobReaperId(int id
) {
191 assert(m_maxJobQueuingMs
> 0);
192 m_jobReaperId
.store(id
);
195 int getJobReaperId() const {
196 return m_jobReaperId
.load();
200 friend class JobQueue_Expiration_Test
;
201 TJob
dequeueMaybeExpiredImpl(int id
, bool inc
, const timespec
& now
,
205 bool flushed
= false;
206 while (m_jobCount
== 0) {
210 if (m_dropCacheTimeout
<= 0 || flushed
) {
212 } else if (!wait(id
, true, m_dropCacheTimeout
)) {
213 // since we timed out, maybe we can turn idle without holding memory
214 if (m_jobCount
== 0) {
215 ScopedUnlock
unlock(this);
216 Util::flush_thread_caches();
217 if (m_dropStack
&& Util::s_stackLimit
) {
218 Util::flush_thread_stack();
220 DropCachePolicy::dropCache();
225 if (inc
) incActiveWorker();
228 // look across all our queues from highest priority to lowest.
229 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
234 // peek at the beginning of the queue to see if the request has already
236 if (m_maxJobQueuingMs
> 0 &&
237 gettime_diff_us(jobs
.front().second
, now
) >
238 m_maxJobQueuingMs
* 1000) {
240 TJob job
= jobs
.front().first
;
246 if (m_jobCount
>= m_lifoSwitchThreshold
) {
247 TJob job
= jobs
.back().first
;
251 TJob job
= jobs
.front().first
;
256 return TJob(); // make compiler happy.
259 TJob
dequeueOnlyExpiredImpl(int id
, bool inc
) {
261 assert(m_maxJobQueuingMs
> 0);
263 long waitTimeUs
= m_maxJobQueuingMs
* 1000;
265 for (auto& jobs
: boost::adaptors::reverse(m_jobQueues
)) {
268 Timer::GetMonotonicTime(now
);
269 int64_t queuedTimeUs
= gettime_diff_us(jobs
.front().second
, now
);
270 if (queuedTimeUs
> m_maxJobQueuingMs
* 1000) {
271 if (inc
) incActiveWorker();
274 TJob job
= jobs
.front().first
;
278 // oldest job hasn't expired yet. wake us up when it will.
279 long waitTimeForQueue
= m_maxJobQueuingMs
* 1000 - queuedTimeUs
;
280 waitTimeUs
= ((waitTimeUs
< waitTimeForQueue
) ?
285 if (wait(id
, false, waitTimeUs
/ 1000000, waitTimeUs
% 1000000)) {
286 // We got woken up by somebody calling notify (as opposed to timeout),
287 // then some work might be on the queue. We only expire things here,
288 // so let's notify somebody else as well.
296 std::vector
<std::deque
<std::pair
<TJob
, timespec
>>> m_jobQueues
;
299 const int m_dropCacheTimeout
;
300 const bool m_dropStack
;
301 const int m_lifoSwitchThreshold
;
302 const int m_maxJobQueuingMs
;
303 std::atomic
<int> m_jobReaperId
;
306 template<class TJob
, class Policy
>
307 struct JobQueue
<TJob
,true,Policy
> : JobQueue
<TJob
,false,Policy
> {
308 JobQueue(int threadCount
, bool threadRoundRobin
, int dropCacheTimeout
,
309 bool dropStack
, int lifoSwitchThreshold
=INT_MAX
,
310 int maxJobQueuingMs
=-1, int numPriorities
=1) :
311 JobQueue
<TJob
,false,Policy
>(threadCount
,
318 pthread_cond_init(&m_cond
, nullptr);
321 pthread_cond_destroy(&m_cond
);
325 while (this->getActiveWorker() || this->getQueuedJobs()) {
326 pthread_cond_wait(&m_cond
, &this->getMutex().getRaw());
331 return !(this->getActiveWorker() || this->getQueuedJobs());
334 pthread_cond_signal(&m_cond
);
337 pthread_cond_t m_cond
;
340 ///////////////////////////////////////////////////////////////////////////////
343 * Base class for a customized worker.
345 * DropCachePolicy is an extra callback for specific actions to take
346 * when we decide to drop stack/caches.
348 template<typename TJob
,
349 bool countActive
= false,
350 bool waitable
= false,
351 class Policy
= detail::NoDropCachePolicy
>
352 class JobQueueWorker
{
354 typedef TJob JobType
;
355 typedef JobQueue
<TJob
,waitable
,Policy
> QueueType
;
356 typedef Policy DropCachePolicy
;
358 static const bool Waitable
= waitable
;
359 static const bool CountActive
= countActive
;
361 * Default constructor.
364 : m_func(nullptr), m_opaque(nullptr), m_stopped(false), m_queue(nullptr) {
367 virtual ~JobQueueWorker() {
371 * Two-phase object creation for easier derivation and for JobQueueDispatcher
372 * to easily create a vector of workers.
374 void create(int id
, QueueType
* queue
, void *func
, void *opaque
) {
383 * The only functions a subclass needs to implement.
385 virtual void doJob(TJob job
) = 0;
386 virtual void abortJob(TJob job
) {
387 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
389 virtual void onThreadEnter() {}
390 virtual void onThreadExit() {}
393 * Start this worker thread.
400 bool expired
= false;
401 TJob job
= m_queue
->dequeueMaybeExpired(m_id
, countActive
, &expired
);
408 if (!m_queue
->decActiveWorker() && waitable
) {
410 if (!m_queue
->getActiveWorker() &&
411 !m_queue
->getQueuedJobs()) {
412 m_queue
->signalEmpty();
416 } catch (const typename
QueueType::StopSignal
&) {
417 m_stopped
= true; // queue is empty and stopped, so we are done
424 * Stop this worker thread.
440 ///////////////////////////////////////////////////////////////////////////////
443 * Driver class to push through the whole thing.
445 template<class TJob
, class TWorker
>
446 class JobQueueDispatcher
{
451 JobQueueDispatcher(int threadCount
, bool threadRoundRobin
,
452 int dropCacheTimeout
, bool dropStack
, void *opaque
,
453 int lifoSwitchThreshold
= INT_MAX
,
454 int maxJobQueuingMs
= -1, int numPriorities
= 1)
455 : m_stopped(true), m_id(0), m_opaque(opaque
),
456 m_maxThreadCount(threadCount
),
457 m_queue(threadCount
, threadRoundRobin
, dropCacheTimeout
, dropStack
,
458 lifoSwitchThreshold
, maxJobQueuingMs
, numPriorities
),
459 m_startReaperThread(maxJobQueuingMs
> 0) {
460 assert(threadCount
>= 1);
461 if (!TWorker::CountActive
) {
462 // If TWorker does not support counting the number of
463 // active workers, just start all of the workers eagerly
464 for (int i
= 0; i
< threadCount
; i
++) {
465 addWorkerImpl(false);
470 ~JobQueueDispatcher() {
473 std::set
<AsyncFunc
<TWorker
>*>::iterator iter
= m_funcs
.begin();
474 iter
!= m_funcs
.end(); ++iter
) {
478 std::set
<TWorker
*>::iterator iter
= m_workers
.begin();
479 iter
!= m_workers
.end(); ++iter
) {
484 int getActiveWorker() {
485 return m_queue
.getActiveWorker();
488 int getQueuedJobs() {
489 return m_queue
.getQueuedJobs();
492 int getTargetNumWorkers() {
493 if (TWorker::CountActive
) {
494 int target
= getActiveWorker() + getQueuedJobs();
495 return (target
> m_maxThreadCount
) ? m_maxThreadCount
: target
;
497 return m_maxThreadCount
;
502 * Creates worker threads and start running them. This is non-blocking.
506 // Spin up more worker threads if appropriate
507 int target
= getTargetNumWorkers();
508 for (int n
= m_workers
.size(); n
< target
; ++n
) {
509 addWorkerImpl(false);
512 std::set
<AsyncFunc
<TWorker
>*>::iterator iter
= m_funcs
.begin();
513 iter
!= m_funcs
.end(); ++iter
) {
518 if (m_startReaperThread
) {
519 // If we have set a max timeout for requests on the queue, start a reaper
520 // thread just for expiring off old requests so we guarantee requests are
521 // taken off the queue as soon as possible when they expire even if all
522 // other worker threads are stalled.
523 m_queue
.setJobReaperId(addWorkerImpl(true));
530 void enqueue(TJob job
, int priority
= 0) {
531 m_queue
.enqueue(job
, priority
);
532 // Spin up another worker thread if appropriate
533 int target
= getTargetNumWorkers();
534 int n
= m_workers
.size();
541 * Add a worker thread on the fly.
551 * Add N new worker threads.
553 void addWorkers(int n
) {
555 if (m_stopped
) return;
556 for (int i
= 0; i
< n
; ++i
) {
561 void getWorkers(std::vector
<TWorker
*> &workers
) {
563 workers
.insert(workers
.end(), m_workers
.begin(), m_workers
.end());
566 void waitEmpty(bool stop
= true) {
567 if (m_stopped
) return;
569 if (stop
) this->stop();
573 if (m_stopped
) return true;
574 return m_queue
.pollEmpty();
578 * Stop all workers after all jobs are processed. No new jobs should be
579 * enqueued at this moment, or this call may block for longer time.
582 if (m_stopped
) return;
586 bool exceptioned
= false;
590 AsyncFunc
<TWorker
> *func
= nullptr;
593 if (!m_funcs
.empty()) {
594 func
= *m_funcs
.begin();
598 if (func
== nullptr) {
603 } catch (Exception
&e
) {
623 int m_maxThreadCount
;
626 typename
TWorker::DropCachePolicy
> m_queue
;
629 std::set
<TWorker
*> m_workers
;
630 std::set
<AsyncFunc
<TWorker
> *> m_funcs
;
631 const bool m_startReaperThread
;
633 // return the id for the worker.
634 int addWorkerImpl(bool start
) {
635 TWorker
*worker
= new TWorker();
636 AsyncFunc
<TWorker
> *func
= new AsyncFunc
<TWorker
>(worker
, &TWorker::start
);
637 m_workers
.insert(worker
);
638 m_funcs
.insert(func
);
640 worker
->create(id
, &m_queue
, func
, m_opaque
);
649 ///////////////////////////////////////////////////////////////////////////////