Expand jit documentation a bit
[hiphop-php.git] / hphp / util / job-queue.h
blob13e96f9814e5a651e363a62451464a3370a89077
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2014 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
20 #include <time.h>
21 #include <vector>
22 #include <set>
23 #include "hphp/util/alloc.h"
24 #include <boost/range/adaptors.hpp>
25 #include "hphp/util/async-func.h"
26 #include "hphp/util/atomic.h"
27 #include "hphp/util/compatibility.h"
28 #include "hphp/util/exception.h"
29 #include "hphp/util/lock.h"
30 #include "hphp/util/logger.h"
31 #include "hphp/util/synchronizable-multi.h"
32 #include "hphp/util/timer.h"
34 namespace HPHP {
35 ///////////////////////////////////////////////////////////////////////////////
37 /**
38 * A queue-based multi-threaded job processing facility. Internally, we have a
39 * queue of jobs and a list of workers, each of which runs in its own thread.
40 * Job queue can take new jobs on the fly and workers will continue to pull
41 * jobs off the queue and work on it.
43 * To use it, simply define your own job and worker class like this,
45 * class MyJob {
46 * public:
47 * // storing job data
48 * };
50 * class MyWorker : public JobQueueWorker<MyJob*> {
51 * public:
52 * virtual void doJob(MyJob *job) {
53 * // process the job
54 * delete job; // if it was new-ed
55 * }
56 * };
58 * Now, use JobQueueDispatcher to start the whole thing,
60 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
61 * dispatcher.start();
62 * ...
63 * dispatcher.enqueue(new MyJob(...));
64 * ...
65 * dispatcher.stop();
67 * Note this class is different from JobListDispatcher that uses a vector to
68 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
69 * initially and new jobs are pushed into the queue over time. Also, workers
70 * can be stopped individually.
72 * Job process ordering
73 * ====================
74 * By default, requests are processed in FIFO order.
76 * In addition, we support an option where the request processing order can flip
77 * between FIFO or LIFO based on the length of the queue. This can be enabled by
78 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
79 * to be in FIFO mode, and the current queue length exceeds
80 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
81 * order until the queue size is below the threshold in which case we resume in
82 * FIFO order. Setting the queue to be in LIFO mode initially will have the
83 * opposite behavior. This is useful when we are in a loaded situation and we
84 * want to prioritize the newest requests.
86 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
89 ///////////////////////////////////////////////////////////////////////////////
91 namespace detail {
92 struct NoDropCachePolicy { static void dropCache() {} };
95 /**
96 * A job queue that's suitable for multiple threads to work on.
98 template<typename TJob,
99 bool waitable = false,
100 class DropCachePolicy = detail::NoDropCachePolicy>
101 class JobQueue : public SynchronizableMulti {
102 public:
103 // trivial class for signaling queue stop
104 class StopSignal {};
106 public:
108 * Constructor.
110 JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
111 bool dropStack, int lifoSwitchThreshold=INT_MAX,
112 int maxJobQueuingMs=-1, int numPriorities=1, int groups = 1)
113 : SynchronizableMulti(threadRoundRobin ? 1 : threadCount, groups),
114 m_jobCount(0), m_stopped(false), m_workerCount(0),
115 m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
116 m_lifoSwitchThreshold(lifoSwitchThreshold),
117 m_maxJobQueuingMs(maxJobQueuingMs),
118 m_jobReaperId(-1) {
119 m_jobQueues.resize(numPriorities);
123 * Put a job into the queue and notify a worker to pick it up.
125 void enqueue(TJob job, int priority=0) {
126 assert(priority >= 0);
127 assert(priority < m_jobQueues.size());
128 timespec enqueueTime;
129 Timer::GetMonotonicTime(enqueueTime);
130 Lock lock(this);
131 m_jobQueues[priority].emplace_back(job, enqueueTime);
132 ++m_jobCount;
133 notify();
137 * Grab a job from the queue for processing. Since the job was not created
138 * by this queue class, it's up to a worker class on whether to deallocate
139 * the job object correctly.
141 TJob dequeueMaybeExpired(int id, int q, bool inc, bool* expired) {
142 if (id == m_jobReaperId.load()) {
143 *expired = true;
144 return dequeueOnlyExpiredImpl(id, q, inc);
146 timespec now;
147 Timer::GetMonotonicTime(now);
148 return dequeueMaybeExpiredImpl(id, q, inc, now, expired);
152 * Purely for making sure no new jobs are queued when we are stopping.
154 void stop() {
155 Lock lock(this);
156 m_stopped = true;
157 notifyAll(); // so all waiting threads can find out queue is stopped
160 void waitEmpty() {}
161 void signalEmpty() {}
164 * Keeps track of how many active workers are working on the queue.
166 void incActiveWorker() {
167 ++m_workerCount;
169 int decActiveWorker() {
170 return --m_workerCount;
172 int getActiveWorker() {
173 return m_workerCount;
177 * Keep track of how many jobs are queued, but not yet been serviced.
179 int getQueuedJobs() {
180 return m_jobCount;
184 * One worker can be designated as the job reaper. The job reaper's job is to
185 * check if the oldest job on the queue has expired and if so, terminate that
186 * job without processing it. When the job reaper work calls
187 * dequeueMaybeExpired(), it'll only return the oldest job and only if it's
188 * expired. Otherwise dequeueMaybeExpired() will block until a job expires.
190 void setJobReaperId(int id) {
191 assert(m_maxJobQueuingMs > 0);
192 m_jobReaperId.store(id);
195 int getJobReaperId() const {
196 return m_jobReaperId.load();
199 private:
200 friend class JobQueue_Expiration_Test;
201 TJob dequeueMaybeExpiredImpl(int id, int q, bool inc, const timespec& now,
202 bool* expired) {
203 *expired = false;
204 Lock lock(this);
205 bool flushed = false;
206 while (m_jobCount == 0) {
207 if (m_stopped) {
208 throw StopSignal();
210 if (m_dropCacheTimeout <= 0 || flushed) {
211 wait(id, q, false);
212 } else if (!wait(id, q, true, m_dropCacheTimeout)) {
213 // since we timed out, maybe we can turn idle without holding memory
214 if (m_jobCount == 0) {
215 ScopedUnlock unlock(this);
216 flush_thread_caches();
217 if (m_dropStack && s_stackLimit) {
218 flush_thread_stack();
220 DropCachePolicy::dropCache();
221 flushed = true;
225 if (inc) incActiveWorker();
226 --m_jobCount;
228 // look across all our queues from highest priority to lowest.
229 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
230 if (jobs.empty()) {
231 continue;
234 // peek at the beginning of the queue to see if the request has already
235 // timed out.
236 if (m_maxJobQueuingMs > 0 &&
237 gettime_diff_us(jobs.front().second, now) >
238 m_maxJobQueuingMs * 1000) {
239 *expired = true;
240 TJob job = jobs.front().first;
241 jobs.pop_front();
242 return job;
246 if (m_jobCount >= m_lifoSwitchThreshold) {
247 TJob job = jobs.back().first;
248 jobs.pop_back();
249 return job;
251 TJob job = jobs.front().first;
252 jobs.pop_front();
253 return job;
255 assert(false);
256 return TJob(); // make compiler happy.
259 TJob dequeueOnlyExpiredImpl(int id, int q, bool inc) {
260 Lock lock(this);
261 assert(m_maxJobQueuingMs > 0);
262 while(!m_stopped) {
263 long waitTimeUs = m_maxJobQueuingMs * 1000;
265 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
266 if (!jobs.empty()) {
267 timespec now;
268 Timer::GetMonotonicTime(now);
269 int64_t queuedTimeUs = gettime_diff_us(jobs.front().second, now);
270 if (queuedTimeUs > m_maxJobQueuingMs * 1000) {
271 if (inc) incActiveWorker();
272 --m_jobCount;
274 TJob job = jobs.front().first;
275 jobs.pop_front();
276 return job;
278 // oldest job hasn't expired yet. wake us up when it will.
279 long waitTimeForQueue = m_maxJobQueuingMs * 1000 - queuedTimeUs;
280 waitTimeUs = ((waitTimeUs < waitTimeForQueue) ?
281 waitTimeUs :
282 waitTimeForQueue);
285 if (wait(id, q, false, waitTimeUs / 1000000, waitTimeUs % 1000000)) {
286 // We got woken up by somebody calling notify (as opposed to timeout),
287 // then some work might be on the queue. We only expire things here,
288 // so let's notify somebody else as well.
289 notify();
292 throw StopSignal();
295 int m_jobCount;
296 std::vector<std::deque<std::pair<TJob, timespec>>> m_jobQueues;
297 bool m_stopped;
298 std::atomic<int> m_workerCount;
299 const int m_dropCacheTimeout;
300 const bool m_dropStack;
301 const int m_lifoSwitchThreshold;
302 const int m_maxJobQueuingMs;
303 std::atomic<int> m_jobReaperId;
306 template<class TJob, class Policy>
307 struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
308 JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
309 bool dropStack, int lifoSwitchThreshold=INT_MAX,
310 int maxJobQueuingMs=-1, int numPriorities=1, int groups = 1) :
311 JobQueue<TJob,false,Policy>(threadCount,
312 threadRoundRobin,
313 dropCacheTimeout,
314 dropStack,
315 lifoSwitchThreshold,
316 maxJobQueuingMs,
317 numPriorities,
318 groups) {
319 pthread_cond_init(&m_cond, nullptr);
321 ~JobQueue() {
322 pthread_cond_destroy(&m_cond);
324 void waitEmpty() {
325 Lock lock(this);
326 while (this->getActiveWorker() || this->getQueuedJobs()) {
327 pthread_cond_wait(&m_cond, &this->getMutex().getRaw());
330 bool pollEmpty() {
331 Lock lock(this);
332 return !(this->getActiveWorker() || this->getQueuedJobs());
334 void signalEmpty() {
335 pthread_cond_signal(&m_cond);
337 private:
338 pthread_cond_t m_cond;
341 ///////////////////////////////////////////////////////////////////////////////
344 * Base class for a customized worker.
346 * DropCachePolicy is an extra callback for specific actions to take
347 * when we decide to drop stack/caches.
349 template<typename TJob,
350 typename TContext = void*,
351 bool countActive = false,
352 bool waitable = false,
353 class Policy = detail::NoDropCachePolicy>
354 class JobQueueWorker {
355 public:
356 typedef TJob JobType;
357 typedef TContext ContextType;
358 typedef JobQueue<TJob, waitable, Policy> QueueType;
359 typedef Policy DropCachePolicy;
361 static const bool Waitable = waitable;
362 static const bool CountActive = countActive;
364 * Default constructor.
366 JobQueueWorker()
367 : m_func(nullptr), m_context(), m_stopped(false), m_queue(nullptr) {
370 virtual ~JobQueueWorker() {
374 * Two-phase object creation for easier derivation and for JobQueueDispatcher
375 * to easily create a vector of workers.
377 void create(int id, QueueType* queue, void *func, ContextType context) {
378 assert(queue);
379 m_id = id;
380 m_queue = queue;
381 m_func = func;
382 m_context = context;
386 * The only functions a subclass needs to implement.
388 virtual void doJob(TJob job) = 0;
389 virtual void abortJob(TJob job) {
390 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
392 virtual void onThreadEnter() {}
393 virtual void onThreadExit() {}
396 * Start this worker thread.
398 void start() {
399 assert(m_queue);
400 onThreadEnter();
401 while (!m_stopped) {
402 try {
403 bool expired = false;
404 TJob job = m_queue->dequeueMaybeExpired(m_id, s_numaNode,
405 countActive, &expired);
406 if (expired) {
407 abortJob(job);
408 } else {
409 doJob(job);
411 if (countActive) {
412 if (!m_queue->decActiveWorker() && waitable) {
413 Lock lock(m_queue);
414 if (!m_queue->getActiveWorker() &&
415 !m_queue->getQueuedJobs()) {
416 m_queue->signalEmpty();
420 } catch (const typename QueueType::StopSignal&) {
421 m_stopped = true; // queue is empty and stopped, so we are done
424 onThreadExit();
428 * Stop this worker thread.
430 void stop() {
431 m_stopped = true;
434 protected:
435 int m_id;
436 void *m_func;
437 ContextType m_context;
438 bool m_stopped;
440 private:
441 QueueType* m_queue;
444 ///////////////////////////////////////////////////////////////////////////////
447 * Driver class to push through the whole thing.
449 template<class TWorker>
450 class JobQueueDispatcher {
451 public:
453 * Constructor.
455 JobQueueDispatcher(int threadCount, bool threadRoundRobin,
456 int dropCacheTimeout, bool dropStack,
457 typename TWorker::ContextType context,
458 int lifoSwitchThreshold = INT_MAX,
459 int maxJobQueuingMs = -1, int numPriorities = 1,
460 int groups = 1)
461 : m_stopped(true), m_id(0), m_context(context),
462 m_maxThreadCount(threadCount),
463 m_queue(threadCount, threadRoundRobin, dropCacheTimeout, dropStack,
464 lifoSwitchThreshold, maxJobQueuingMs, numPriorities, groups),
465 m_startReaperThread(maxJobQueuingMs > 0) {
466 assert(threadCount >= 1);
467 if (!TWorker::CountActive) {
468 // If TWorker does not support counting the number of
469 // active workers, just start all of the workers eagerly
470 for (int i = 0; i < threadCount; i++) {
471 addWorkerImpl(false);
476 ~JobQueueDispatcher() {
477 stop();
478 for (typename
479 std::set<AsyncFunc<TWorker>*>::iterator iter = m_funcs.begin();
480 iter != m_funcs.end(); ++iter) {
481 delete *iter;
483 for (typename
484 std::set<TWorker*>::iterator iter = m_workers.begin();
485 iter != m_workers.end(); ++iter) {
486 delete *iter;
490 int getActiveWorker() {
491 return m_queue.getActiveWorker();
494 int getQueuedJobs() {
495 return m_queue.getQueuedJobs();
498 int getTargetNumWorkers() {
499 if (TWorker::CountActive) {
500 int target = getActiveWorker() + getQueuedJobs();
501 return (target > m_maxThreadCount) ? m_maxThreadCount : target;
502 } else {
503 return m_maxThreadCount;
508 * Creates worker threads and start running them. This is non-blocking.
510 void start() {
511 Lock lock(m_mutex);
512 // Spin up more worker threads if appropriate
513 int target = getTargetNumWorkers();
514 for (int n = m_workers.size(); n < target; ++n) {
515 addWorkerImpl(false);
517 for (typename
518 std::set<AsyncFunc<TWorker>*>::iterator iter = m_funcs.begin();
519 iter != m_funcs.end(); ++iter) {
520 (*iter)->start();
522 m_stopped = false;
524 if (m_startReaperThread) {
525 // If we have set a max timeout for requests on the queue, start a reaper
526 // thread just for expiring off old requests so we guarantee requests are
527 // taken off the queue as soon as possible when they expire even if all
528 // other worker threads are stalled.
529 m_queue.setJobReaperId(addWorkerImpl(true));
534 * Enqueue a new job.
536 void enqueue(typename TWorker::JobType job, int priority = 0) {
537 m_queue.enqueue(job, priority);
538 // Spin up another worker thread if appropriate
539 int target = getTargetNumWorkers();
540 int n = m_workers.size();
541 if (n < target) {
542 addWorker();
547 * Add a worker thread on the fly.
549 void addWorker() {
550 Lock lock(m_mutex);
551 if (!m_stopped) {
552 addWorkerImpl(true);
557 * Add N new worker threads.
559 void addWorkers(int n) {
560 Lock lock(m_mutex);
561 if (m_stopped) return;
562 m_maxThreadCount += n;
563 if (!TWorker::CountActive) {
564 for (int i = 0; i < n; ++i) {
565 addWorkerImpl(true);
567 } else {
568 while (m_workers.size() < getTargetNumWorkers()) {
569 addWorkerImpl(true);
574 void getWorkers(std::vector<TWorker*> &workers) {
575 Lock lock(m_mutex);
576 workers.insert(workers.end(), m_workers.begin(), m_workers.end());
579 void waitEmpty(bool stop = true) {
580 if (m_stopped) return;
581 m_queue.waitEmpty();
582 if (stop) this->stop();
585 bool pollEmpty() {
586 if (m_stopped) return true;
587 return m_queue.pollEmpty();
591 * Stop all workers after all jobs are processed. No new jobs should be
592 * enqueued at this moment, or this call may block for longer time.
594 void stop() {
595 if (m_stopped) return;
596 m_stopped = true;
598 m_queue.stop();
599 bool exceptioned = false;
600 Exception exception;
602 while (true) {
603 AsyncFunc<TWorker> *func = nullptr;
605 Lock lock(m_mutex);
606 if (!m_funcs.empty()) {
607 func = *m_funcs.begin();
608 m_funcs.erase(func);
611 if (func == nullptr) {
612 break;
614 try {
615 func->waitForEnd();
616 } catch (Exception &e) {
617 exceptioned = true;
618 exception = e;
620 delete func;
622 if (exceptioned) {
623 throw exception;
627 void run() {
628 start();
629 stop();
632 private:
633 bool m_stopped;
634 int m_id;
635 typename TWorker::ContextType m_context;
636 int m_maxThreadCount;
637 JobQueue<typename TWorker::JobType,
638 TWorker::Waitable,
639 typename TWorker::DropCachePolicy> m_queue;
641 Mutex m_mutex;
642 std::set<TWorker*> m_workers;
643 std::set<AsyncFunc<TWorker> *> m_funcs;
644 const bool m_startReaperThread;
646 // return the id for the worker.
647 int addWorkerImpl(bool start) {
648 TWorker *worker = new TWorker();
649 AsyncFunc<TWorker> *func = new AsyncFunc<TWorker>(worker, &TWorker::start);
650 m_workers.insert(worker);
651 m_funcs.insert(func);
652 int id = m_id++;
653 worker->create(id, &m_queue, func, m_context);
655 if (start) {
656 func->start();
658 return id;
662 ///////////////////////////////////////////////////////////////////////////////
665 #endif