Optional Two-phase heap tracing
[hiphop-php.git] / hphp / util / job-queue.h
blobffc9326d4d5e1f2f1c25c28c7fd2264bb3b5dd90
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
20 #include <memory>
21 #include <set>
22 #include <time.h>
23 #include <vector>
25 #include <boost/range/adaptors.hpp>
26 #include <folly/Memory.h>
28 #include "hphp/util/alloc.h"
29 #include "hphp/util/async-func.h"
30 #include "hphp/util/atomic.h"
31 #include "hphp/util/compatibility.h"
32 #include "hphp/util/exception.h"
33 #include "hphp/util/health-monitor-types.h"
34 #include "hphp/util/lock.h"
35 #include "hphp/util/logger.h"
36 #include "hphp/util/numa.h"
37 #include "hphp/util/synchronizable-multi.h"
38 #include "hphp/util/timer.h"
40 namespace HPHP {
41 ///////////////////////////////////////////////////////////////////////////////
43 /**
44 * A queue-based multi-threaded job processing facility. Internally, we have a
45 * queue of jobs and a list of workers, each of which runs in its own thread.
46 * Job queue can take new jobs on the fly and workers will continue to pull
47 * jobs off the queue and work on it.
49 * To use it, simply define your own job and worker class like this,
51 * struct MyJob {
52 * // storing job data
53 * };
55 * struct MyWorker : JobQueueWorker<MyJob*> {
56 * virtual void doJob(MyJob *job) {
57 * // process the job
58 * delete job; // if it was new-ed
59 * }
60 * };
62 * Now, use JobQueueDispatcher to start the whole thing,
64 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
65 * dispatcher.start();
66 * ...
67 * dispatcher.enqueue(new MyJob(...));
68 * ...
69 * dispatcher.stop();
71 * Note this class is different from JobListDispatcher that uses a vector to
72 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
73 * initially and new jobs are pushed into the queue over time. Also, workers
74 * can be stopped individually.
76 * Job process ordering
77 * ====================
78 * By default, requests are processed in FIFO order.
80 * In addition, we support an option where the request processing order can flip
81 * between FIFO or LIFO based on the length of the queue. This can be enabled by
82 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
83 * to be in FIFO mode, and the current queue length exceeds
84 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
85 * order until the queue size is below the threshold in which case we resume in
86 * FIFO order. Setting the queue to be in LIFO mode initially will have the
87 * opposite behavior. This is useful when we are in a loaded situation and we
88 * want to prioritize the newest requests.
90 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
93 ///////////////////////////////////////////////////////////////////////////////
95 namespace detail {
96 struct NoDropCachePolicy { static void dropCache() {} };
99 struct IQueuedJobsReleaser {
100 virtual ~IQueuedJobsReleaser() { }
101 virtual int32_t numOfJobsToRelease() = 0;
104 struct SimpleReleaser : IQueuedJobsReleaser {
105 explicit SimpleReleaser(int32_t rate)
106 : m_queuedJobsReleaseRate(rate){}
107 int32_t numOfJobsToRelease() override {
108 return m_queuedJobsReleaseRate;
110 private:
111 int m_queuedJobsReleaseRate = 3;
115 * A job queue that's suitable for multiple threads to work on.
117 template<typename TJob,
118 bool waitable = false,
119 class DropCachePolicy = detail::NoDropCachePolicy>
120 struct JobQueue : SynchronizableMulti {
121 // trivial class for signaling queue stop
122 struct StopSignal {};
124 public:
126 * Constructor.
128 JobQueue(int maxThreadCount, int dropCacheTimeout,
129 bool dropStack, int lifoSwitchThreshold=INT_MAX,
130 int maxJobQueuingMs = -1, int numPriorities = 1,
131 int queuedJobsReleaseRate = 3,
132 IHostHealthObserver* healthStatus = nullptr)
133 : SynchronizableMulti(maxThreadCount + 1), // reaper added
134 m_jobCount(0), m_stopped(false), m_workerCount(0),
135 m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
136 m_lifoSwitchThreshold(lifoSwitchThreshold),
137 m_maxJobQueuingMs(maxJobQueuingMs),
138 m_jobReaperId(maxThreadCount), m_healthStatus(healthStatus),
139 m_queuedJobsReleaser(
140 std::make_shared<SimpleReleaser>(queuedJobsReleaseRate)) {
141 assert(maxThreadCount > 0);
142 m_jobQueues.resize(numPriorities);
146 * Put a job into the queue and notify a worker to pick it up.
148 void enqueue(TJob job, int priority=0) {
149 assert(priority >= 0);
150 assert(priority < m_jobQueues.size());
151 timespec enqueueTime;
152 Timer::GetMonotonicTime(enqueueTime);
153 Lock lock(this);
154 m_jobQueues[priority].emplace_back(job, enqueueTime);
155 ++m_jobCount;
156 notify();
160 * Grab a job from the queue for processing. Since the job was not created
161 * by this queue class, it's up to a worker class on whether to deallocate
162 * the job object correctly.
164 TJob dequeueMaybeExpired(int id, int q, bool inc, bool* expired,
165 bool highpri = false) {
166 if (id == m_jobReaperId) {
167 *expired = true;
168 return dequeueOnlyExpiredImpl(id, q, inc);
170 timespec now;
171 Timer::GetMonotonicTime(now);
172 return dequeueMaybeExpiredImpl(id, q, inc, now, expired, highpri);
176 * Purely for making sure no new jobs are queued when we are stopping.
178 void stop() {
179 Lock lock(this);
180 m_stopped = true;
181 notifyAll(); // so all waiting threads can find out queue is stopped
184 void waitEmpty() {}
185 void signalEmpty() {}
188 * Keeps track of how many active workers are working on the queue.
190 void incActiveWorker() {
191 ++m_workerCount;
193 int decActiveWorker() {
194 return --m_workerCount;
196 int getActiveWorker() {
197 return m_workerCount;
201 * Keep track of how many jobs are queued, but not yet been serviced.
203 int getQueuedJobs() {
204 return m_jobCount;
207 int releaseQueuedJobs() {
208 int toRelease = m_queuedJobsReleaser->numOfJobsToRelease();
209 if (toRelease <= 0) {
210 return 0;
213 Lock lock(this);
214 int iter;
215 for (iter = 0; iter < toRelease && iter < m_jobCount; iter++) {
216 notify();
218 return iter;
221 private:
222 friend class JobQueue_Expiration_Test;
223 TJob dequeueMaybeExpiredImpl(int id, int q, bool inc, const timespec& now,
224 bool* expired, bool highPri = false) {
225 *expired = false;
226 Lock lock(this);
227 bool flushed = false;
228 bool ableToDeque = m_healthStatus == nullptr ||
229 m_healthStatus->getHealthLevel() != HealthLevel::BackOff;
231 while (m_jobCount == 0 || !ableToDeque) {
232 uint32_t kNumPriority = m_jobQueues.size();
233 if (m_jobQueues[kNumPriority - 1].size() > 0) {
234 // we do not block HealthMon requests (with the highest priority)
235 break;
238 if (m_stopped) {
239 throw StopSignal();
242 if (flushed) {
243 // Flushed worker threads gets lower priority. But a flushed worker
244 // with huge stack is still more preferable than a non-flushed worker
245 // without huge stack.
246 wait(id, q, highPri ? Priority::High : Priority::Low);
247 } else if (m_dropCacheTimeout > 0) {
248 if (!wait(id, q, (highPri ? Priority::Highest : Priority::Normal),
249 m_dropCacheTimeout)) {
250 // since we timed out, maybe we can turn idle without holding memory
251 if (m_jobCount == 0) {
252 ScopedUnlock unlock(this);
253 flush_thread_caches();
254 if (m_dropStack && s_stackLimit) {
255 flush_thread_stack();
257 DropCachePolicy::dropCache();
258 flushed = true;
261 } else {
262 // m_dropCacheTimeout <= 0, a thread that starts waiting more recently
263 // should be given a task first (LIFO), same as unflushed threads.
264 wait(id, q, highPri ? Priority::Highest : Priority::Normal);
267 if (!ableToDeque) {
268 ableToDeque = m_healthStatus->getHealthLevel() != HealthLevel::BackOff;
271 if (inc) incActiveWorker();
272 --m_jobCount;
274 // look across all our queues from highest priority to lowest.
275 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
276 if (jobs.empty()) {
277 continue;
280 // peek at the beginning of the queue to see if the request has already
281 // timed out.
282 if (m_maxJobQueuingMs > 0 &&
283 gettime_diff_us(jobs.front().second, now) >
284 m_maxJobQueuingMs * 1000) {
285 *expired = true;
286 TJob job = jobs.front().first;
287 jobs.pop_front();
288 return job;
291 if (m_jobCount >= m_lifoSwitchThreshold) {
292 TJob job = jobs.back().first;
293 jobs.pop_back();
294 return job;
296 TJob job = jobs.front().first;
297 jobs.pop_front();
298 return job;
300 assert(false);
301 return TJob(); // make compiler happy.
305 * One worker can be designated as the job reaper. The id of the job reaper
306 * equals m_maxThreadCount of the dispatcher. The job reaper checks if the
307 * oldest job on the queue has expired and if so, terminate that job without
308 * processing it. When the job reaper calls dequeueMaybeExpired(), it goes to
309 * dequeueOnlyExpiredImpl(), which only returns the oldest job and only if
310 * it's expired. Otherwise dequeueMaybeExpired() will block until a job
311 * expires.
313 TJob dequeueOnlyExpiredImpl(int id, int q, bool inc) {
314 assert(id == m_jobReaperId);
315 assert(m_maxJobQueuingMs > 0);
316 Lock lock(this);
317 while(!m_stopped) {
318 long waitTimeUs = m_maxJobQueuingMs * 1000;
320 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
321 if (!jobs.empty()) {
322 timespec now;
323 Timer::GetMonotonicTime(now);
324 int64_t queuedTimeUs = gettime_diff_us(jobs.front().second, now);
325 if (queuedTimeUs > m_maxJobQueuingMs * 1000) {
326 if (inc) incActiveWorker();
327 --m_jobCount;
329 TJob job = jobs.front().first;
330 jobs.pop_front();
331 return job;
333 // oldest job hasn't expired yet. wake us up when it will.
334 long waitTimeForQueue = m_maxJobQueuingMs * 1000 - queuedTimeUs;
335 waitTimeUs = ((waitTimeUs < waitTimeForQueue) ?
336 waitTimeUs :
337 waitTimeForQueue);
340 if (wait(id, q, Priority::Low,
341 waitTimeUs / 1000000, waitTimeUs % 1000000)) {
342 // We got woken up by somebody calling notify (as opposed to timeout),
343 // then some work might be on the queue. We only expire things here,
344 // so let's notify somebody else as well.
345 notify();
348 throw StopSignal();
351 int m_jobCount;
352 std::vector<std::deque<std::pair<TJob, timespec>>> m_jobQueues;
353 bool m_stopped;
354 std::atomic<int> m_workerCount;
355 const int m_dropCacheTimeout;
356 const bool m_dropStack;
357 const int m_lifoSwitchThreshold;
358 const int m_maxJobQueuingMs;
359 const int m_jobReaperId; // equals max worker thread count
360 IHostHealthObserver* m_healthStatus; // the dispatcher responsible for this
361 // JobQueue
362 std::shared_ptr<IQueuedJobsReleaser> m_queuedJobsReleaser;
365 template<class TJob, class Policy>
366 struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
367 JobQueue(int threadCount, int dropCacheTimeout,
368 bool dropStack, int lifoSwitchThreshold=INT_MAX,
369 int maxJobQueuingMs = -1, int numPriorities = 1,
370 int queuedJobsReleaseRate = 3,
371 IHostHealthObserver* healthStatus = nullptr) :
372 JobQueue<TJob,false,Policy>(threadCount,
373 dropCacheTimeout,
374 dropStack,
375 lifoSwitchThreshold,
376 maxJobQueuingMs,
377 numPriorities,
378 queuedJobsReleaseRate,
379 healthStatus) {
380 pthread_cond_init(&m_cond, nullptr);
382 ~JobQueue() override {
383 pthread_cond_destroy(&m_cond);
385 void waitEmpty() {
386 Lock lock(this);
387 while (this->getActiveWorker() || this->getQueuedJobs()) {
388 pthread_cond_wait(&m_cond, &this->getMutex().getRaw());
391 bool pollEmpty() {
392 Lock lock(this);
393 return !(this->getActiveWorker() || this->getQueuedJobs());
395 void signalEmpty() {
396 pthread_cond_signal(&m_cond);
398 private:
399 pthread_cond_t m_cond;
402 ///////////////////////////////////////////////////////////////////////////////
405 * Base class for a customized worker.
407 * DropCachePolicy is an extra callback for specific actions to take
408 * when we decide to drop stack/caches.
410 template<typename TJob,
411 typename TContext = void*,
412 bool countActive = false,
413 bool waitable = false,
414 class Policy = detail::NoDropCachePolicy>
415 struct JobQueueWorker {
416 typedef TJob JobType;
417 typedef TContext ContextType;
418 typedef JobQueue<TJob, waitable, Policy> QueueType;
419 typedef Policy DropCachePolicy;
421 static const bool Waitable = waitable;
422 static const bool CountActive = countActive;
424 * Default constructor.
426 JobQueueWorker()
427 : m_func(nullptr), m_context(), m_stopped(false), m_queue(nullptr) {
430 virtual ~JobQueueWorker() {
434 * Two-phase object creation for easier derivation and for JobQueueDispatcher
435 * to easily create a vector of workers.
437 void create(int id, QueueType* queue, void *func, ContextType context) {
438 assert(queue);
439 m_id = id;
440 m_queue = queue;
441 m_func = func;
442 m_context = context;
446 * The only functions a subclass needs to implement.
448 virtual void doJob(TJob job) = 0;
449 virtual void abortJob(TJob /*job*/) {
450 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
452 virtual void onThreadEnter() {}
453 virtual void onThreadExit() {}
456 * Start this worker thread.
458 void start() {
459 assert(m_queue);
460 onThreadEnter();
461 bool highPri = (s_firstSlab.ptr != nullptr);
462 while (!m_stopped) {
463 try {
464 bool expired = false;
465 TJob job = m_queue->dequeueMaybeExpired(m_id, s_numaNode, countActive,
466 &expired, highPri);
467 if (expired) {
468 abortJob(job);
469 } else {
470 doJob(job);
472 if (countActive) {
473 if (!m_queue->decActiveWorker() && waitable) {
474 Lock lock(m_queue);
475 if (!m_queue->getActiveWorker() &&
476 !m_queue->getQueuedJobs()) {
477 m_queue->signalEmpty();
481 } catch (const typename QueueType::StopSignal&) {
482 m_stopped = true; // queue is empty and stopped, so we are done
485 onThreadExit();
489 * Stop this worker thread.
491 void stop() {
492 m_stopped = true;
495 protected:
496 int m_id{-1};
497 void* m_func{nullptr};
498 ContextType m_context;
499 bool m_stopped{false};
501 private:
502 QueueType* m_queue;
505 ///////////////////////////////////////////////////////////////////////////////
508 * Driver class to push through the whole thing.
510 template<class TWorker>
511 struct JobQueueDispatcher : IHostHealthObserver {
513 * Constructor.
515 JobQueueDispatcher(int maxThreadCount,
516 int dropCacheTimeout, bool dropStack,
517 typename TWorker::ContextType context,
518 int lifoSwitchThreshold = INT_MAX,
519 int maxJobQueuingMs = -1, int numPriorities = 1,
520 int queuedJobsReleaseRate = 3,
521 int hugeCount = 0,
522 int initThreadCount = -1,
523 int queueToWorkerRatio = 1) // A worker per 1 queued job.
524 : m_stopped(true), m_healthStatus(HealthLevel::Bold), m_id(0),
525 m_context(context), m_maxThreadCount(maxThreadCount),
526 m_currThreadCountLimit(initThreadCount),
527 m_hugeThreadCount(hugeCount),
528 m_startReaperThread(maxJobQueuingMs > 0),
529 m_queueToWorkerRatio(queueToWorkerRatio),
530 m_queue(maxThreadCount, dropCacheTimeout, dropStack,
531 lifoSwitchThreshold, maxJobQueuingMs, numPriorities,
532 queuedJobsReleaseRate, this) {
533 assert(maxThreadCount >= 1);
534 if (initThreadCount < 0 || initThreadCount > maxThreadCount) {
535 m_currThreadCountLimit = maxThreadCount;
537 if (!TWorker::CountActive) {
538 // If TWorker does not support counting the number of
539 // active workers, just start all of the workers eagerly
540 for (int i = 0; i < m_maxThreadCount; i++) {
541 addWorkerImpl(false);
546 int32_t dispatcher_id = 0;
548 ~JobQueueDispatcher() override {
549 stop();
550 for (auto func : m_funcs) delete func;
551 for (auto worker : m_workers) delete worker;
554 int getActiveWorker() {
555 return m_queue.getActiveWorker();
558 int getQueuedJobs() {
559 return m_queue.getQueuedJobs();
562 int getTargetNumWorkers() {
563 if (TWorker::CountActive) {
564 int target = getActiveWorker();
565 const auto queued = getQueuedJobs();
566 const auto r = m_queueToWorkerRatio;
567 always_assert(r >= 1);
568 if (target == 0) {
569 target += (queued + r - 1) / r; // Round up.
570 } else {
571 target += queued / r; // Round down.
573 if (target > m_currThreadCountLimit) return m_currThreadCountLimit;
574 return target;
575 } else {
576 return m_currThreadCountLimit;
581 * Creates worker threads and start running them. This is non-blocking.
583 void start() {
584 Lock lock(m_mutex);
585 m_queue.setNumGroups(num_numa_nodes());
586 // Spin up more worker threads if appropriate
587 int target = getTargetNumWorkers();
588 for (int n = m_workers.size(); n < target; ++n) {
589 addWorkerImpl(false);
591 for (auto worker : m_funcs) {
592 worker->start();
594 m_stopped = false;
596 if (m_startReaperThread) {
597 addReaper();
602 * Enqueue a new job.
604 void enqueue(typename TWorker::JobType job, int priority = 0) {
605 m_queue.enqueue(job, priority);
606 // Spin up another worker thread if appropriate
607 int target = getTargetNumWorkers();
608 int n = m_workers.size();
609 if (n < target) {
610 addWorker();
615 * Add a worker thread on the fly.
617 void addWorker() {
618 Lock lock(m_mutex);
619 if (!m_stopped) {
620 addWorkerImpl(true);
625 * Increase the limit on number of workers by n, without exceeding the initial
626 * upper bound.
628 void addWorkers(int n) {
629 Lock lock(m_mutex);
630 if (m_stopped) return;
631 int limit = m_maxThreadCount - m_currThreadCountLimit;
632 assert(limit >= 0);
633 if (n > limit) n = limit;
634 m_currThreadCountLimit += n;
635 if (!TWorker::CountActive) {
636 for (int i = 0; i < n; ++i) {
637 addWorkerImpl(true);
639 } else {
640 while (m_workers.size() < getTargetNumWorkers()) {
641 addWorkerImpl(true);
646 void getWorkers(std::vector<TWorker*> &workers) {
647 Lock lock(m_mutex);
648 workers.insert(workers.end(), m_workers.begin(), m_workers.end());
651 void waitEmpty(bool stop = true) {
652 if (m_stopped) return;
653 m_queue.waitEmpty();
654 if (stop) this->stop();
657 bool pollEmpty() {
658 if (m_stopped) return true;
659 return m_queue.pollEmpty();
663 * Stop all workers after all jobs are processed. No new jobs should be
664 * enqueued at this moment, or this call may block for longer time.
666 void stop() {
667 // TODO(t5572120): If stop has already been called when the destructor
668 // runs, we'd bail out here and potentially start destroying AsyncFuncs
669 // that are still running.
670 if (m_stopped) return;
671 m_stopped = true;
673 m_queue.stop();
674 bool exceptioned = false;
675 Exception exception;
677 while (true) {
678 AsyncFunc<TWorker> *func = nullptr;
680 Lock lock(m_mutex);
681 if (!m_funcs.empty()) {
682 func = *m_funcs.begin();
683 m_funcs.erase(func);
684 } else if (m_reaperFunc) {
685 func = m_reaperFunc.release();
688 if (func == nullptr) {
689 break;
691 try {
692 func->waitForEnd();
693 } catch (Exception &e) {
694 exceptioned = true;
695 exception = e;
697 delete func;
699 if (exceptioned) {
700 throw exception;
704 void run() {
705 start();
706 stop();
709 void notifyNewStatus(HealthLevel newStatus) override {
710 bool curStopDequeue = (newStatus == HealthLevel::BackOff);
711 if (!curStopDequeue) {
712 // release blocked requests in queue if any
713 m_queue.releaseQueuedJobs();
716 m_healthStatus = newStatus;
719 HealthLevel getHealthLevel() override {
720 return m_healthStatus;
723 void setHugeThreadCount(int count) {
724 m_hugeThreadCount = count;
727 private:
728 bool m_stopped;
729 HealthLevel m_healthStatus;
730 int m_id;
731 typename TWorker::ContextType m_context;
732 const int m_maxThreadCount; // not including the possible reaper
733 int m_currThreadCountLimit; // initial limit can be lower than max
734 int m_hugeThreadCount{0};
735 const bool m_startReaperThread;
736 int m_queueToWorkerRatio{1};
737 JobQueue<typename TWorker::JobType,
738 TWorker::Waitable,
739 typename TWorker::DropCachePolicy> m_queue;
741 Mutex m_mutex;
742 std::set<TWorker*> m_workers;
743 std::set<AsyncFunc<TWorker> *> m_funcs;
744 std::unique_ptr<TWorker> m_reaper;
745 std::unique_ptr<AsyncFunc<TWorker>> m_reaperFunc;
747 int addReaper() {
748 m_reaper = std::make_unique<TWorker>();
749 m_reaperFunc = std::make_unique<AsyncFunc<TWorker>>(m_reaper.get(),
750 &TWorker::start);
751 m_reaper->create(m_maxThreadCount, &m_queue, m_reaperFunc.get(), m_context);
752 m_reaperFunc->start();
753 return m_maxThreadCount;
756 // Cannot be called concurrently (callers should hold m_mutex, or
757 // otherwise ensure that no other threads are calling this).
758 void addWorkerImpl(bool start) {
759 if (m_workers.size() >= m_maxThreadCount) {
760 // another thread raced with us to add a worker.
761 assert(m_workers.size() == m_maxThreadCount);
762 return;
764 TWorker *worker = new TWorker();
765 bool huge = m_workers.size() < m_hugeThreadCount;
766 AsyncFunc<TWorker> *func =
767 new AsyncFunc<TWorker>(worker, &TWorker::start, huge);
768 m_workers.insert(worker);
769 m_funcs.insert(func);
770 int id = m_id++;
771 worker->create(id, &m_queue, func, m_context);
773 if (start) {
774 func->start();
780 ///////////////////////////////////////////////////////////////////////////////
783 #endif