fix OSX build issues
[hiphop-php.git] / hphp / util / job_queue.h
blob35a49bc62b7d610ef48d584fd4a0511d7ca58c8f
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
18 #define incl_HPHP_UTIL_JOB_QUEUE_H_
20 #include <time.h>
21 #include <vector>
22 #include <set>
23 #include "hphp/util/alloc.h"
24 #include <boost/range/adaptors.hpp>
25 #include "hphp/util/async_func.h"
26 #include "hphp/util/atomic.h"
27 #include "hphp/util/compatibility.h"
28 #include "hphp/util/exception.h"
29 #include "hphp/util/lock.h"
30 #include "hphp/util/logger.h"
31 #include "hphp/util/synchronizable_multi.h"
32 #include "hphp/util/timer.h"
34 namespace HPHP {
35 ///////////////////////////////////////////////////////////////////////////////
37 /**
38 * A queue-based multi-threaded job processing facility. Internally, we have a
39 * queue of jobs and a list of workers, each of which runs in its own thread.
40 * Job queue can take new jobs on the fly and workers will continue to pull
41 * jobs off the queue and work on it.
43 * To use it, simply define your own job and worker class like this,
45 * class MyJob {
46 * public:
47 * // storing job data
48 * };
50 * class MyWorker : public JobQueueWorker<MyJob*> {
51 * public:
52 * virtual void doJob(MyJob *job) {
53 * // process the job
54 * delete job; // if it was new-ed
55 * }
56 * };
58 * Now, use JobQueueDispatcher to start the whole thing,
60 * JobQueueDispatcher<MyJob*, MyWorker> dispatcher(40, NULL); // 40 threads
61 * dispatcher.start();
62 * ...
63 * dispatcher.enqueue(new MyJob(...));
64 * ...
65 * dispatcher.stop();
67 * Note this class is different from JobListDispatcher that uses a vector to
68 * store prepared jobs. With JobQueueDispatcher, job queue is normally empty
69 * initially and new jobs are pushed into the queue over time. Also, workers
70 * can be stopped individually.
72 * Job process ordering
73 * ====================
74 * By default, requests are processed in FIFO order.
76 * In addition, we support an option where the request processing order can flip
77 * between FIFO or LIFO based on the length of the queue. This can be enabled by
78 * setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
79 * to be in FIFO mode, and the current queue length exceeds
80 * lifoSwitchThreshold, then the workers will begin work on requests in LIFO
81 * order until the queue size is below the threshold in which case we resume in
82 * FIFO order. Setting the queue to be in LIFO mode initially will have the
83 * opposite behavior. This is useful when we are in a loaded situation and we
84 * want to prioritize the newest requests.
86 * You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
89 ///////////////////////////////////////////////////////////////////////////////
91 namespace detail {
92 struct NoDropCachePolicy { static void dropCache() {} };
95 /**
96 * A job queue that's suitable for multiple threads to work on.
98 template<typename TJob,
99 bool waitable = false,
100 class DropCachePolicy = detail::NoDropCachePolicy>
101 class JobQueue : public SynchronizableMulti {
102 public:
103 // trivial class for signaling queue stop
104 class StopSignal {};
106 public:
108 * Constructor.
110 JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
111 bool dropStack, int lifoSwitchThreshold=INT_MAX,
112 int maxJobQueuingMs=-1, int numPriorities=1)
113 : SynchronizableMulti(threadRoundRobin ? 1 : threadCount),
114 m_jobCount(0), m_stopped(false), m_workerCount(0),
115 m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
116 m_lifoSwitchThreshold(lifoSwitchThreshold),
117 m_maxJobQueuingMs(maxJobQueuingMs),
118 m_jobReaperId(-1) {
119 m_jobQueues.resize(numPriorities);
123 * Put a job into the queue and notify a worker to pick it up.
125 void enqueue(TJob job, int priority=0) {
126 assert(priority >= 0);
127 assert(priority < m_jobQueues.size());
128 timespec enqueueTime;
129 Timer::GetMonotonicTime(enqueueTime);
130 Lock lock(this);
131 m_jobQueues[priority].emplace_back(job, enqueueTime);
132 ++m_jobCount;
133 notify();
137 * Grab a job from the queue for processing. Since the job was not created
138 * by this queue class, it's up to a worker class on whether to deallocate
139 * the job object correctly.
141 TJob dequeueMaybeExpired(int id, bool inc, bool* expired) {
142 if (id == m_jobReaperId.load()) {
143 *expired = true;
144 return dequeueOnlyExpiredImpl(id, inc);
146 timespec now;
147 Timer::GetMonotonicTime(now);
148 return dequeueMaybeExpiredImpl(id, inc, now, expired);
152 * Purely for making sure no new jobs are queued when we are stopping.
154 void stop() {
155 Lock lock(this);
156 m_stopped = true;
157 notifyAll(); // so all waiting threads can find out queue is stopped
160 void waitEmpty() {}
161 void signalEmpty() {}
164 * Keeps track of how many active workers are working on the queue.
166 void incActiveWorker() {
167 atomic_inc(m_workerCount);
169 int decActiveWorker() {
170 return atomic_dec(m_workerCount);
172 int getActiveWorker() {
173 return m_workerCount;
177 * Keep track of how many jobs are queued, but not yet been serviced.
179 int getQueuedJobs() {
180 return m_jobCount;
184 * One worker can be designated as the job reaper. The job reaper's job is to
185 * check if the oldest job on the queue has expired and if so, terminate that
186 * job without processing it. When the job reaper work calls
187 * dequeueMaybeExpired(), it'll only return the oldest job and only if it's
188 * expired. Otherwise dequeueMaybeExpired() will block until a job expires.
190 void setJobReaperId(int id) {
191 assert(m_maxJobQueuingMs > 0);
192 m_jobReaperId.store(id);
195 int getJobReaperId() const {
196 return m_jobReaperId.load();
199 private:
200 friend class JobQueue_Expiration_Test;
201 TJob dequeueMaybeExpiredImpl(int id, bool inc, const timespec& now,
202 bool* expired) {
203 *expired = false;
204 Lock lock(this);
205 bool flushed = false;
206 while (m_jobCount == 0) {
207 if (m_stopped) {
208 throw StopSignal();
210 if (m_dropCacheTimeout <= 0 || flushed) {
211 wait(id, false);
212 } else if (!wait(id, true, m_dropCacheTimeout)) {
213 // since we timed out, maybe we can turn idle without holding memory
214 if (m_jobCount == 0) {
215 ScopedUnlock unlock(this);
216 Util::flush_thread_caches();
217 if (m_dropStack && Util::s_stackLimit) {
218 Util::flush_thread_stack();
220 DropCachePolicy::dropCache();
221 flushed = true;
225 if (inc) incActiveWorker();
226 --m_jobCount;
228 // look across all our queues from highest priority to lowest.
229 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
230 if (jobs.empty()) {
231 continue;
234 // peek at the beginning of the queue to see if the request has already
235 // timed out.
236 if (m_maxJobQueuingMs > 0 &&
237 gettime_diff_us(jobs.front().second, now) >
238 m_maxJobQueuingMs * 1000) {
239 *expired = true;
240 TJob job = jobs.front().first;
241 jobs.pop_front();
242 return job;
246 if (m_jobCount >= m_lifoSwitchThreshold) {
247 TJob job = jobs.back().first;
248 jobs.pop_back();
249 return job;
251 TJob job = jobs.front().first;
252 jobs.pop_front();
253 return job;
255 assert(false);
256 return TJob(); // make compiler happy.
259 TJob dequeueOnlyExpiredImpl(int id, bool inc) {
260 Lock lock(this);
261 assert(m_maxJobQueuingMs > 0);
262 while(!m_stopped) {
263 long waitTimeUs = m_maxJobQueuingMs * 1000;
265 for (auto& jobs : boost::adaptors::reverse(m_jobQueues)) {
266 if (!jobs.empty()) {
267 timespec now;
268 Timer::GetMonotonicTime(now);
269 int64_t queuedTimeUs = gettime_diff_us(jobs.front().second, now);
270 if (queuedTimeUs > m_maxJobQueuingMs * 1000) {
271 if (inc) incActiveWorker();
272 --m_jobCount;
274 TJob job = jobs.front().first;
275 jobs.pop_front();
276 return job;
278 // oldest job hasn't expired yet. wake us up when it will.
279 long waitTimeForQueue = m_maxJobQueuingMs * 1000 - queuedTimeUs;
280 waitTimeUs = ((waitTimeUs < waitTimeForQueue) ?
281 waitTimeUs :
282 waitTimeForQueue);
285 if (wait(id, false, waitTimeUs / 1000000, waitTimeUs % 1000000)) {
286 // We got woken up by somebody calling notify (as opposed to timeout),
287 // then some work might be on the queue. We only expire things here,
288 // so let's notify somebody else as well.
289 notify();
292 throw StopSignal();
295 int m_jobCount;
296 std::vector<std::deque<std::pair<TJob, timespec>>> m_jobQueues;
297 bool m_stopped;
298 int m_workerCount;
299 const int m_dropCacheTimeout;
300 const bool m_dropStack;
301 const int m_lifoSwitchThreshold;
302 const int m_maxJobQueuingMs;
303 std::atomic<int> m_jobReaperId;
306 template<class TJob, class Policy>
307 struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
308 JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
309 bool dropStack, int lifoSwitchThreshold=INT_MAX,
310 int maxJobQueuingMs=-1, int numPriorities=1) :
311 JobQueue<TJob,false,Policy>(threadCount,
312 threadRoundRobin,
313 dropCacheTimeout,
314 dropStack,
315 lifoSwitchThreshold,
316 maxJobQueuingMs,
317 numPriorities) {
318 pthread_cond_init(&m_cond, nullptr);
320 ~JobQueue() {
321 pthread_cond_destroy(&m_cond);
323 void waitEmpty() {
324 Lock lock(this);
325 while (this->getActiveWorker() || this->getQueuedJobs()) {
326 pthread_cond_wait(&m_cond, &this->getMutex().getRaw());
329 bool pollEmpty() {
330 Lock lock(this);
331 return !(this->getActiveWorker() || this->getQueuedJobs());
333 void signalEmpty() {
334 pthread_cond_signal(&m_cond);
336 private:
337 pthread_cond_t m_cond;
340 ///////////////////////////////////////////////////////////////////////////////
343 * Base class for a customized worker.
345 * DropCachePolicy is an extra callback for specific actions to take
346 * when we decide to drop stack/caches.
348 template<typename TJob,
349 bool countActive = false,
350 bool waitable = false,
351 class Policy = detail::NoDropCachePolicy>
352 class JobQueueWorker {
353 public:
354 typedef TJob JobType;
355 typedef JobQueue<TJob,waitable,Policy> QueueType;
356 typedef Policy DropCachePolicy;
358 static const bool Waitable = waitable;
359 static const bool CountActive = countActive;
361 * Default constructor.
363 JobQueueWorker()
364 : m_func(nullptr), m_opaque(nullptr), m_stopped(false), m_queue(nullptr) {
367 virtual ~JobQueueWorker() {
371 * Two-phase object creation for easier derivation and for JobQueueDispatcher
372 * to easily create a vector of workers.
374 void create(int id, QueueType* queue, void *func, void *opaque) {
375 assert(queue);
376 m_id = id;
377 m_queue = queue;
378 m_func = func;
379 m_opaque = opaque;
383 * The only functions a subclass needs to implement.
385 virtual void doJob(TJob job) = 0;
386 virtual void abortJob(TJob job) {
387 Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
389 virtual void onThreadEnter() {}
390 virtual void onThreadExit() {}
393 * Start this worker thread.
395 void start() {
396 assert(m_queue);
397 onThreadEnter();
398 while (!m_stopped) {
399 try {
400 bool expired = false;
401 TJob job = m_queue->dequeueMaybeExpired(m_id, countActive, &expired);
402 if (expired) {
403 abortJob(job);
404 } else {
405 doJob(job);
407 if (countActive) {
408 if (!m_queue->decActiveWorker() && waitable) {
409 Lock lock(m_queue);
410 if (!m_queue->getActiveWorker() &&
411 !m_queue->getQueuedJobs()) {
412 m_queue->signalEmpty();
416 } catch (const typename QueueType::StopSignal&) {
417 m_stopped = true; // queue is empty and stopped, so we are done
420 onThreadExit();
424 * Stop this worker thread.
426 void stop() {
427 m_stopped = true;
430 protected:
431 int m_id;
432 void *m_func;
433 void *m_opaque;
434 bool m_stopped;
436 private:
437 QueueType* m_queue;
440 ///////////////////////////////////////////////////////////////////////////////
443 * Driver class to push through the whole thing.
445 template<class TJob, class TWorker>
446 class JobQueueDispatcher {
447 public:
449 * Constructor.
451 JobQueueDispatcher(int threadCount, bool threadRoundRobin,
452 int dropCacheTimeout, bool dropStack, void *opaque,
453 int lifoSwitchThreshold = INT_MAX,
454 int maxJobQueuingMs = -1, int numPriorities = 1)
455 : m_stopped(true), m_id(0), m_opaque(opaque),
456 m_maxThreadCount(threadCount),
457 m_queue(threadCount, threadRoundRobin, dropCacheTimeout, dropStack,
458 lifoSwitchThreshold, maxJobQueuingMs, numPriorities),
459 m_startReaperThread(maxJobQueuingMs > 0) {
460 assert(threadCount >= 1);
461 if (!TWorker::CountActive) {
462 // If TWorker does not support counting the number of
463 // active workers, just start all of the workers eagerly
464 for (int i = 0; i < threadCount; i++) {
465 addWorkerImpl(false);
470 ~JobQueueDispatcher() {
471 stop();
472 for (typename
473 std::set<AsyncFunc<TWorker>*>::iterator iter = m_funcs.begin();
474 iter != m_funcs.end(); ++iter) {
475 delete *iter;
477 for (typename
478 std::set<TWorker*>::iterator iter = m_workers.begin();
479 iter != m_workers.end(); ++iter) {
480 delete *iter;
484 int getActiveWorker() {
485 return m_queue.getActiveWorker();
488 int getQueuedJobs() {
489 return m_queue.getQueuedJobs();
492 int getTargetNumWorkers() {
493 if (TWorker::CountActive) {
494 int target = getActiveWorker() + getQueuedJobs();
495 return (target > m_maxThreadCount) ? m_maxThreadCount : target;
496 } else {
497 return m_maxThreadCount;
502 * Creates worker threads and start running them. This is non-blocking.
504 void start() {
505 Lock lock(m_mutex);
506 // Spin up more worker threads if appropriate
507 int target = getTargetNumWorkers();
508 for (int n = m_workers.size(); n < target; ++n) {
509 addWorkerImpl(false);
511 for (typename
512 std::set<AsyncFunc<TWorker>*>::iterator iter = m_funcs.begin();
513 iter != m_funcs.end(); ++iter) {
514 (*iter)->start();
516 m_stopped = false;
518 if (m_startReaperThread) {
519 // If we have set a max timeout for requests on the queue, start a reaper
520 // thread just for expiring off old requests so we guarantee requests are
521 // taken off the queue as soon as possible when they expire even if all
522 // other worker threads are stalled.
523 m_queue.setJobReaperId(addWorkerImpl(true));
528 * Enqueue a new job.
530 void enqueue(TJob job, int priority = 0) {
531 m_queue.enqueue(job, priority);
532 // Spin up another worker thread if appropriate
533 int target = getTargetNumWorkers();
534 int n = m_workers.size();
535 if (n < target) {
536 addWorker();
541 * Add a worker thread on the fly.
543 void addWorker() {
544 Lock lock(m_mutex);
545 if (!m_stopped) {
546 addWorkerImpl(true);
551 * Add N new worker threads.
553 void addWorkers(int n) {
554 Lock lock(m_mutex);
555 if (m_stopped) return;
556 for (int i = 0; i < n; ++i) {
557 addWorkerImpl(true);
561 void getWorkers(std::vector<TWorker*> &workers) {
562 Lock lock(m_mutex);
563 workers.insert(workers.end(), m_workers.begin(), m_workers.end());
566 void waitEmpty(bool stop = true) {
567 if (m_stopped) return;
568 m_queue.waitEmpty();
569 if (stop) this->stop();
572 bool pollEmpty() {
573 if (m_stopped) return true;
574 return m_queue.pollEmpty();
578 * Stop all workers after all jobs are processed. No new jobs should be
579 * enqueued at this moment, or this call may block for longer time.
581 void stop() {
582 if (m_stopped) return;
583 m_stopped = true;
585 m_queue.stop();
586 bool exceptioned = false;
587 Exception exception;
589 while (true) {
590 AsyncFunc<TWorker> *func = nullptr;
592 Lock lock(m_mutex);
593 if (!m_funcs.empty()) {
594 func = *m_funcs.begin();
595 m_funcs.erase(func);
598 if (func == nullptr) {
599 break;
601 try {
602 func->waitForEnd();
603 } catch (Exception &e) {
604 exceptioned = true;
605 exception = e;
607 delete func;
609 if (exceptioned) {
610 throw exception;
614 void run() {
615 start();
616 stop();
619 private:
620 bool m_stopped;
621 int m_id;
622 void *m_opaque;
623 int m_maxThreadCount;
624 JobQueue<TJob,
625 TWorker::Waitable,
626 typename TWorker::DropCachePolicy> m_queue;
628 Mutex m_mutex;
629 std::set<TWorker*> m_workers;
630 std::set<AsyncFunc<TWorker> *> m_funcs;
631 const bool m_startReaperThread;
633 // return the id for the worker.
634 int addWorkerImpl(bool start) {
635 TWorker *worker = new TWorker();
636 AsyncFunc<TWorker> *func = new AsyncFunc<TWorker>(worker, &TWorker::start);
637 m_workers.insert(worker);
638 m_funcs.insert(func);
639 int id = m_id++;
640 worker->create(id, &m_queue, func, m_opaque);
642 if (start) {
643 func->start();
645 return id;
649 ///////////////////////////////////////////////////////////////////////////////
652 #endif