Fix semdiff syntactic output
[hiphop-php.git] / hphp / util / job-queue.h
blob6dbcf511882c2644de8ce5615894be62e1a6e701
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
20 #include <memory>
21 #include <set>
22 #include <time.h>
23 #include <vector>
25 #include <boost/range/adaptors.hpp>
26 #include <folly/Memory.h>
28 #include "hphp/util/alloc.h"
29 #include "hphp/util/async-func.h"
30 #include "hphp/util/atomic.h"
31 #include "hphp/util/compatibility.h"
32 #include "hphp/util/exception.h"
33 #include "hphp/util/health-monitor-types.h"
34 #include "hphp/util/lock.h"
35 #include "hphp/util/logger.h"
36 #include "hphp/util/numa.h"
37 #include "hphp/util/synchronizable-multi.h"
38 #include "hphp/util/timer.h"
40 namespace HPHP {
41 ///////////////////////////////////////////////////////////////////////////////
43 /**
44 * A queue-based multi-threaded job processing facility. Internally, we have a
45 * queue of jobs and a list of workers, each of which runs in its own thread.
46 * Job queue can take new jobs on the fly and workers will continue to pull
47 * jobs off the queue and work on it.
49 * To use it, simply define your own job and worker class like this,
51 * struct MyJob {
52 * // storing job data
53 * };
55 * struct MyWorker : JobQueueWorker<MyJob*> {
56 * virtual void doJob(MyJob *job) {
57 * // process the job
58 * delete job; // if it was new-ed
59 * }
60 * };
62 * Now, use JobQueueDispatcher to start the whole thing,
64 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
65 * dispatcher.start();
66 * ...
67 * dispatcher.enqueue(new MyJob(...));
68 * ...
69 * dispatcher.stop();
71 * Note this class is different from JobListDispatcher that uses a vector to
72 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
73 * initially and new jobs are pushed into the queue over time. Also, workers
74 * can be stopped individually.
76 * Job process ordering
77 * ====================
78 * By default, requests are processed in FIFO order.
80 * In addition, we support an option where the request processing order can flip
81 * between FIFO or LIFO based on the length of the queue. This can be enabled by
82 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
83 * to be in FIFO mode, and the current queue length exceeds
84 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
85 * order until the queue size is below the threshold in which case we resume in
86 * FIFO order. Setting the queue to be in LIFO mode initially will have the
87 * opposite behavior. This is useful when we are in a loaded situation and we
88 * want to prioritize the newest requests.
90 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
93 ///////////////////////////////////////////////////////////////////////////////
95 namespace detail {
96 struct NoDropCachePolicy { static void dropCache() {} };
99 struct IQueuedJobsReleaser {
100 virtual ~IQueuedJobsReleaser() { }
101 virtual int32_t numOfJobsToRelease() = 0;
104 struct SimpleReleaser : IQueuedJobsReleaser {
105 explicit SimpleReleaser(int32_t rate)
106 : m_queuedJobsReleaseRate(rate){}
107 int32_t numOfJobsToRelease() override {
108 return m_queuedJobsReleaseRate;
110 private:
111 int m_queuedJobsReleaseRate = 3;
115 * A job queue that's suitable for multiple threads to work on.
117 template<typename TJob,
118 bool waitable = false,
119 class DropCachePolicy = detail::NoDropCachePolicy>
120 struct JobQueue : SynchronizableMulti {
121 // trivial class for signaling queue stop
122 struct StopSignal {};
124 public:
126 * Constructor.
128 JobQueue(int maxThreadCount, int dropCacheTimeout,
129 bool dropStack, int lifoSwitchThreshold=INT_MAX,
130 int maxJobQueuingMs = -1, int numPriorities = 1,
131 int queuedJobsReleaseRate = 3,
132 IHostHealthObserver* healthStatus = nullptr)
133 : SynchronizableMulti(maxThreadCount + 1), // reaper added
134 m_jobCount(0), m_stopped(false), m_workerCount(0),
135 m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
136 m_lifoSwitchThreshold(lifoSwitchThreshold),
137 m_maxJobQueuingMs(maxJobQueuingMs),
138 m_jobReaperId(maxThreadCount), m_healthStatus(healthStatus),
139 m_queuedJobsReleaser(
140 std::make_shared<SimpleReleaser>(queuedJobsReleaseRate)) {
141 assert(maxThreadCount > 0);
142 m_jobQueues.resize(numPriorities);
146 * Put a job into the queue and notify a worker to pick it up.
148 void enqueue(TJob job, int priority=0) {
149 assert(priority >= 0);
150 assert(priority < m_jobQueues.size());
151 timespec enqueueTime;
152 Timer::GetMonotonicTime(enqueueTime);
153 Lock lock(this);
154 m_jobQueues[priority].emplace_back(job, enqueueTime);
155 ++m_jobCount;
156 notify();
160 * Grab a job from the queue for processing. Since the job was not created
161 * by this queue class, it's up to a worker class on whether to deallocate
162 * the job object correctly.
164 TJob dequeueMaybeExpired(int id, int q, bool inc, bool* expired,
165 bool highpri = false) {
166 if (id == m_jobReaperId) {
167 *expired = true;
168 return dequeueOnlyExpiredImpl(id, q, inc);
170 timespec now;
171 Timer::GetMonotonicTime(now);
172 return dequeueMaybeExpiredImpl(id, q, inc, now, expired, highpri);
176 * Purely for making sure no new jobs are queued when we are stopping.
178 void stop() {
179 Lock lock(this);
180 m_stopped = true;
181 notifyAll(); // so all waiting threads can find out queue is stopped
184 void waitEmpty() {}
185 void signalEmpty() {}
188 * Keeps track of how many active workers are working on the queue.
190 void incActiveWorker() {
191 ++m_workerCount;
193 int decActiveWorker() {
194 return --m_workerCount;
196 int getActiveWorker() {
197 return m_workerCount;
201 * Keep track of how many jobs are queued, but not yet been serviced.
203 int getQueuedJobs() {
204 return m_jobCount;
207 int releaseQueuedJobs() {
208 int toRelease = m_queuedJobsReleaser->numOfJobsToRelease();
209 if (toRelease <= 0) {
210 return 0;
213 Lock lock(this);
214 int iter;
215 for (iter = 0; iter < toRelease && iter < m_jobCount; iter++) {
216 notify();
218 return iter;
221 private:
222 friend class JobQueue_Expiration_Test;
223 TJob dequeueMaybeExpiredImpl(int id, int q, bool inc, const timespec& now,
224 bool* expired, bool highPri = false) {
225 *expired = false;
226 Lock lock(this);
227 bool flushed = false;
228 bool ableToDeque = m_healthStatus == nullptr ||
229 m_healthStatus->getHealthLevel() != HealthLevel::BackOff;
231 while (m_jobCount == 0 || !ableToDeque) {
232 uint32_t kNumPriority = m_jobQueues.size();
233 if (m_jobQueues[kNumPriority - 1].size() > 0) {
234 // we do not block HealthMon requests (with the highest priority)
235 break;
238 if (m_stopped) {
239 throw StopSignal();
241 if (highPri) {
242 wait(id, q, Priority::High);
243 } else {
244 if (m_dropCacheTimeout <= 0 || flushed) {
245 wait(id, q, Priority::Low);
246 } else if (!wait(id, q, Priority::Middle, m_dropCacheTimeout)) {
247 // since we timed out, maybe we can turn idle without holding memory
248 if (m_jobCount == 0) {
249 ScopedUnlock unlock(this);
250 #ifdef USE_JEMALLOC_EXTENT_HOOKS
251 thread_huge_tcache_flush();
252 #endif
253 flush_thread_caches();
254 if (m_dropStack && s_stackLimit) {
255 flush_thread_stack();
257 DropCachePolicy::dropCache();
258 flushed = true;
262 if (!ableToDeque) {
263 ableToDeque = m_healthStatus->getHealthLevel() != HealthLevel::BackOff;
266 if (inc) incActiveWorker();
267 --m_jobCount;
269 // look across all our queues from highest priority to lowest.
270 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
271 if (jobs.empty()) {
272 continue;
275 // peek at the beginning of the queue to see if the request has already
276 // timed out.
277 if (m_maxJobQueuingMs > 0 &&
278 gettime_diff_us(jobs.front().second, now) >
279 m_maxJobQueuingMs * 1000) {
280 *expired = true;
281 TJob job = jobs.front().first;
282 jobs.pop_front();
283 return job;
286 if (m_jobCount >= m_lifoSwitchThreshold) {
287 TJob job = jobs.back().first;
288 jobs.pop_back();
289 return job;
291 TJob job = jobs.front().first;
292 jobs.pop_front();
293 return job;
295 assert(false);
296 return TJob(); // make compiler happy.
300 * One worker can be designated as the job reaper. The id of the job reaper
301 * equals m_maxThreadCount of the dispatcher. The job reaper checks if the
302 * oldest job on the queue has expired and if so, terminate that job without
303 * processing it. When the job reaper calls dequeueMaybeExpired(), it goes to
304 * dequeueOnlyExpiredImpl(), which only returns the oldest job and only if
305 * it's expired. Otherwise dequeueMaybeExpired() will block until a job
306 * expires.
308 TJob dequeueOnlyExpiredImpl(int id, int q, bool inc) {
309 assert(id == m_jobReaperId);
310 assert(m_maxJobQueuingMs > 0);
311 Lock lock(this);
312 while(!m_stopped) {
313 long waitTimeUs = m_maxJobQueuingMs * 1000;
315 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
316 if (!jobs.empty()) {
317 timespec now;
318 Timer::GetMonotonicTime(now);
319 int64_t queuedTimeUs = gettime_diff_us(jobs.front().second, now);
320 if (queuedTimeUs > m_maxJobQueuingMs * 1000) {
321 if (inc) incActiveWorker();
322 --m_jobCount;
324 TJob job = jobs.front().first;
325 jobs.pop_front();
326 return job;
328 // oldest job hasn't expired yet. wake us up when it will.
329 long waitTimeForQueue = m_maxJobQueuingMs * 1000 - queuedTimeUs;
330 waitTimeUs = ((waitTimeUs < waitTimeForQueue) ?
331 waitTimeUs :
332 waitTimeForQueue);
335 if (wait(id, q, Priority::Low,
336 waitTimeUs / 1000000, waitTimeUs % 1000000)) {
337 // We got woken up by somebody calling notify (as opposed to timeout),
338 // then some work might be on the queue. We only expire things here,
339 // so let's notify somebody else as well.
340 notify();
343 throw StopSignal();
346 int m_jobCount;
347 std::vector<std::deque<std::pair<TJob, timespec>>> m_jobQueues;
348 bool m_stopped;
349 std::atomic<int> m_workerCount;
350 const int m_dropCacheTimeout;
351 const bool m_dropStack;
352 const int m_lifoSwitchThreshold;
353 const int m_maxJobQueuingMs;
354 const int m_jobReaperId; // equals max worker thread count
355 IHostHealthObserver* m_healthStatus; // the dispatcher responsible for this
356 // JobQueue
357 std::shared_ptr<IQueuedJobsReleaser> m_queuedJobsReleaser;
360 template<class TJob, class Policy>
361 struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
362 JobQueue(int threadCount, int dropCacheTimeout,
363 bool dropStack, int lifoSwitchThreshold=INT_MAX,
364 int maxJobQueuingMs = -1, int numPriorities = 1,
365 int queuedJobsReleaseRate = 3,
366 IHostHealthObserver* healthStatus = nullptr) :
367 JobQueue<TJob,false,Policy>(threadCount,
368 dropCacheTimeout,
369 dropStack,
370 lifoSwitchThreshold,
371 maxJobQueuingMs,
372 numPriorities,
373 queuedJobsReleaseRate,
374 healthStatus) {
375 pthread_cond_init(&m_cond, nullptr);
377 ~JobQueue() override {
378 pthread_cond_destroy(&m_cond);
380 void waitEmpty() {
381 Lock lock(this);
382 while (this->getActiveWorker() || this->getQueuedJobs()) {
383 pthread_cond_wait(&m_cond, &this->getMutex().getRaw());
386 bool pollEmpty() {
387 Lock lock(this);
388 return !(this->getActiveWorker() || this->getQueuedJobs());
390 void signalEmpty() {
391 pthread_cond_signal(&m_cond);
393 private:
394 pthread_cond_t m_cond;
397 ///////////////////////////////////////////////////////////////////////////////
400 * Base class for a customized worker.
402 * DropCachePolicy is an extra callback for specific actions to take
403 * when we decide to drop stack/caches.
405 template<typename TJob,
406 typename TContext = void*,
407 bool countActive = false,
408 bool waitable = false,
409 class Policy = detail::NoDropCachePolicy>
410 struct JobQueueWorker {
411 typedef TJob JobType;
412 typedef TContext ContextType;
413 typedef JobQueue<TJob, waitable, Policy> QueueType;
414 typedef Policy DropCachePolicy;
416 static const bool Waitable = waitable;
417 static const bool CountActive = countActive;
419 * Default constructor.
421 JobQueueWorker()
422 : m_func(nullptr), m_context(), m_stopped(false), m_queue(nullptr) {
425 virtual ~JobQueueWorker() {
429 * Two-phase object creation for easier derivation and for JobQueueDispatcher
430 * to easily create a vector of workers.
432 void create(int id, QueueType* queue, void *func, ContextType context) {
433 assert(queue);
434 m_id = id;
435 m_queue = queue;
436 m_func = func;
437 m_context = context;
441 * The only functions a subclass needs to implement.
443 virtual void doJob(TJob job) = 0;
444 virtual void abortJob(TJob /*job*/) {
445 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
447 virtual void onThreadEnter() {}
448 virtual void onThreadExit() {}
451 * Start this worker thread.
453 void start() {
454 assert(m_queue);
455 onThreadEnter();
456 // bool highPri = (s_firstSlab.first != nullptr);
457 bool highPri = false;
458 while (!m_stopped) {
459 try {
460 bool expired = false;
461 TJob job = m_queue->dequeueMaybeExpired(m_id, s_numaNode, countActive,
462 &expired, highPri);
463 if (expired) {
464 abortJob(job);
465 } else {
466 doJob(job);
468 if (countActive) {
469 if (!m_queue->decActiveWorker() && waitable) {
470 Lock lock(m_queue);
471 if (!m_queue->getActiveWorker() &&
472 !m_queue->getQueuedJobs()) {
473 m_queue->signalEmpty();
477 } catch (const typename QueueType::StopSignal&) {
478 m_stopped = true; // queue is empty and stopped, so we are done
481 onThreadExit();
485 * Stop this worker thread.
487 void stop() {
488 m_stopped = true;
491 protected:
492 int m_id{-1};
493 void* m_func{nullptr};
494 ContextType m_context;
495 bool m_stopped{false};
497 private:
498 QueueType* m_queue;
501 ///////////////////////////////////////////////////////////////////////////////
504 * Driver class to push through the whole thing.
506 template<class TWorker>
507 struct JobQueueDispatcher : IHostHealthObserver {
509 * Constructor.
511 JobQueueDispatcher(int maxThreadCount,
512 int dropCacheTimeout, bool dropStack,
513 typename TWorker::ContextType context,
514 int lifoSwitchThreshold = INT_MAX,
515 int maxJobQueuingMs = -1, int numPriorities = 1,
516 int queuedJobsReleaseRate = 3,
517 int hugeCount = 0,
518 int initThreadCount = -1,
519 int queueToWorkerRatio = 1) // A worker per 1 queued job.
520 : m_stopped(true), m_healthStatus(HealthLevel::Bold), m_id(0),
521 m_context(context), m_maxThreadCount(maxThreadCount),
522 m_currThreadCountLimit(initThreadCount),
523 m_hugeThreadCount(hugeCount),
524 m_startReaperThread(maxJobQueuingMs > 0),
525 m_queueToWorkerRatio(queueToWorkerRatio),
526 m_queue(maxThreadCount, dropCacheTimeout, dropStack,
527 lifoSwitchThreshold, maxJobQueuingMs, numPriorities,
528 queuedJobsReleaseRate, this) {
529 assert(maxThreadCount >= 1);
530 if (initThreadCount < 0 || initThreadCount > maxThreadCount) {
531 m_currThreadCountLimit = maxThreadCount;
533 if (!TWorker::CountActive) {
534 // If TWorker does not support counting the number of
535 // active workers, just start all of the workers eagerly
536 for (int i = 0; i < m_maxThreadCount; i++) {
537 addWorkerImpl(false);
542 int32_t dispatcher_id = 0;
544 ~JobQueueDispatcher() override {
545 stop();
546 for (auto func : m_funcs) delete func;
547 for (auto worker : m_workers) delete worker;
550 int getActiveWorker() {
551 return m_queue.getActiveWorker();
554 int getQueuedJobs() {
555 return m_queue.getQueuedJobs();
558 int getTargetNumWorkers() {
559 if (TWorker::CountActive) {
560 int target = getActiveWorker();
561 const auto queued = getQueuedJobs();
562 const auto r = m_queueToWorkerRatio;
563 always_assert(r >= 1);
564 if (target == 0) {
565 target += (queued + r - 1) / r; // Round up.
566 } else {
567 target += queued / r; // Round down.
569 if (target > m_currThreadCountLimit) return m_currThreadCountLimit;
570 return target;
571 } else {
572 return m_currThreadCountLimit;
577 * Creates worker threads and start running them. This is non-blocking.
579 void start() {
580 Lock lock(m_mutex);
581 m_queue.setNumGroups(num_numa_nodes());
582 // Spin up more worker threads if appropriate
583 int target = getTargetNumWorkers();
584 for (int n = m_workers.size(); n < target; ++n) {
585 addWorkerImpl(false);
587 for (auto worker : m_funcs) {
588 worker->start();
590 m_stopped = false;
592 if (m_startReaperThread) {
593 addReaper();
598 * Enqueue a new job.
600 void enqueue(typename TWorker::JobType job, int priority = 0) {
601 m_queue.enqueue(job, priority);
602 // Spin up another worker thread if appropriate
603 int target = getTargetNumWorkers();
604 int n = m_workers.size();
605 if (n < target) {
606 addWorker();
611 * Add a worker thread on the fly.
613 void addWorker() {
614 Lock lock(m_mutex);
615 if (!m_stopped) {
616 addWorkerImpl(true);
621 * Increase the limit on number of workers by n, without exceeding the initial
622 * upper bound.
624 void addWorkers(int n) {
625 Lock lock(m_mutex);
626 if (m_stopped) return;
627 int limit = m_maxThreadCount - m_currThreadCountLimit;
628 assert(limit >= 0);
629 if (n > limit) n = limit;
630 m_currThreadCountLimit += n;
631 if (!TWorker::CountActive) {
632 for (int i = 0; i < n; ++i) {
633 addWorkerImpl(true);
635 } else {
636 while (m_workers.size() < getTargetNumWorkers()) {
637 addWorkerImpl(true);
642 void getWorkers(std::vector<TWorker*> &workers) {
643 Lock lock(m_mutex);
644 workers.insert(workers.end(), m_workers.begin(), m_workers.end());
647 void waitEmpty(bool stop = true) {
648 if (m_stopped) return;
649 m_queue.waitEmpty();
650 if (stop) this->stop();
653 bool pollEmpty() {
654 if (m_stopped) return true;
655 return m_queue.pollEmpty();
659 * Stop all workers after all jobs are processed. No new jobs should be
660 * enqueued at this moment, or this call may block for longer time.
662 void stop() {
663 // TODO(t5572120): If stop has already been called when the destructor
664 // runs, we'd bail out here and potentially start destroying AsyncFuncs
665 // that are still running.
666 if (m_stopped) return;
667 m_stopped = true;
669 m_queue.stop();
670 bool exceptioned = false;
671 Exception exception;
673 while (true) {
674 AsyncFunc<TWorker> *func = nullptr;
676 Lock lock(m_mutex);
677 if (!m_funcs.empty()) {
678 func = *m_funcs.begin();
679 m_funcs.erase(func);
680 } else if (m_reaperFunc) {
681 func = m_reaperFunc.release();
684 if (func == nullptr) {
685 break;
687 try {
688 func->waitForEnd();
689 } catch (Exception &e) {
690 exceptioned = true;
691 exception = e;
693 delete func;
695 if (exceptioned) {
696 throw exception;
700 void run() {
701 start();
702 stop();
705 void notifyNewStatus(HealthLevel newStatus) override {
706 bool curStopDequeue = (newStatus == HealthLevel::BackOff);
707 if (!curStopDequeue) {
708 // release blocked requests in queue if any
709 m_queue.releaseQueuedJobs();
712 m_healthStatus = newStatus;
715 HealthLevel getHealthLevel() override {
716 return m_healthStatus;
719 void setHugeThreadCount(int count) {
720 m_hugeThreadCount = count;
723 private:
724 bool m_stopped;
725 HealthLevel m_healthStatus;
726 int m_id;
727 typename TWorker::ContextType m_context;
728 const int m_maxThreadCount; // not including the possible reaper
729 int m_currThreadCountLimit; // initial limit can be lower than max
730 int m_hugeThreadCount{0};
731 const bool m_startReaperThread;
732 int m_queueToWorkerRatio{1};
733 JobQueue<typename TWorker::JobType,
734 TWorker::Waitable,
735 typename TWorker::DropCachePolicy> m_queue;
737 Mutex m_mutex;
738 std::set<TWorker*> m_workers;
739 std::set<AsyncFunc<TWorker> *> m_funcs;
740 std::unique_ptr<TWorker> m_reaper;
741 std::unique_ptr<AsyncFunc<TWorker>> m_reaperFunc;
743 int addReaper() {
744 m_reaper = std::make_unique<TWorker>();
745 m_reaperFunc = std::make_unique<AsyncFunc<TWorker>>(m_reaper.get(),
746 &TWorker::start);
747 m_reaper->create(m_maxThreadCount, &m_queue, m_reaperFunc.get(), m_context);
748 m_reaperFunc->start();
749 return m_maxThreadCount;
752 // Cannot be called concurrently (callers should hold m_mutex, or
753 // otherwise ensure that no other threads are calling this).
754 void addWorkerImpl(bool start) {
755 if (m_workers.size() >= m_maxThreadCount) {
756 // another thread raced with us to add a worker.
757 assert(m_workers.size() == m_maxThreadCount);
758 return;
760 TWorker *worker = new TWorker();
761 AsyncFunc<TWorker> *func =
762 new AsyncFunc<TWorker>(worker, &TWorker::start);
763 m_workers.insert(worker);
764 m_funcs.insert(func);
765 int id = m_id++;
766 worker->create(id, &m_queue, func, m_context);
768 if (start) {
769 func->start();
775 ///////////////////////////////////////////////////////////////////////////////
778 #endif