declare_folded_class NO LONGER _in_file
[hiphop-php.git] / hphp / util / async-job.h
blob234db76e11f037fd17e63f52fb380dd26a8e696a
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #pragma once
19 #include <algorithm>
20 #include <random>
22 #include <folly/portability/SysTime.h>
24 #include "hphp/util/async-func.h"
25 #include "hphp/util/lock.h"
27 namespace HPHP {
28 ///////////////////////////////////////////////////////////////////////////////
29 // helper, skip to JobDispatcher at below to read comments and usages...
31 template<class TJob, class TWorker>
32 struct JobDispatcher;
34 template<class TJob>
35 struct WorkerInfo {
36 enum { DoInitFini = true };
39 template<class TJob, class TWorker>
40 struct WorkerWrapper {
41 explicit WorkerWrapper(JobDispatcher<TJob, TWorker> &dispatcher)
42 : m_dispatcher(dispatcher)
43 , m_func(this, &WorkerWrapper<TJob, TWorker>::doJob)
45 if (!WorkerInfo<TJob>::DoInitFini) {
46 m_func.setNoInitFini();
50 TWorker *getWorker() { return &m_worker;}
51 AsyncFunc<WorkerWrapper<TJob, TWorker> > &getAsyncFunc() {
52 return m_func;
55 void doJob() {
56 m_worker.onThreadEnter();
57 while (true) {
58 std::shared_ptr<TJob> job = m_dispatcher.getNextJob();
59 if (!job) break;
60 m_worker.doJob(job);
62 m_worker.onThreadExit();
65 private:
66 JobDispatcher<TJob, TWorker> &m_dispatcher;
67 AsyncFunc<WorkerWrapper<TJob, TWorker> > m_func;
68 TWorker m_worker;
71 ///////////////////////////////////////////////////////////////////////////////
73 /**
74 * The promise is to make job processing cross multi-worker as easy as
75 * possible. The final syntax is like,
77 * MyJobPtrVec jobs;
78 * // prepare job list in "jobs"
80 * JobDispatcher<MyJob, MyWorker>(std::move(jobs), 10).run();
82 * or
84 * JobDispatcher<MyJob, MyWorker> dispatcher(std::move(jobs), 10);
85 * dispatcher.start();
86 * // do something else;
87 * dispatcher.waitForEnd();
90 * The only requirement is MyWorker has to be a class that implements this:
92 * struct MyWorker {
93 * void onThreadEnter();
94 * void doJob(MyJobPtr job);
95 * void onThreadExit();
96 * };
98 template<class TJob, class TWorker>
99 struct JobDispatcher {
100 JobDispatcher(const std::vector<std::shared_ptr<TJob> > &&jobs,
101 unsigned int workerCount, bool showStatus = false)
102 : m_index(0),
103 m_jobs(std::move(jobs)),
104 m_showStatus(showStatus),
105 m_lastPercent(0) {
106 std::mt19937 prng(42);
107 std::shuffle(m_jobs.begin(), m_jobs.end(), prng);
108 if (workerCount > m_jobs.size()) {
109 workerCount = m_jobs.size();
111 m_workers.resize(workerCount);
112 for (unsigned int i = 0; i < m_workers.size(); i++) {
113 m_workers[i] = std::shared_ptr<WorkerWrapper<TJob, TWorker> >
114 (new WorkerWrapper<TJob, TWorker>(*this));
119 * Start job processing asynchronously.
121 void start() {
122 gettimeofday(&m_start, 0);
123 for (unsigned int i = 0; i < m_workers.size(); i++) {
124 m_workers[i]->getAsyncFunc().start();
129 * Wait for all jobs finish running.
131 void waitForEnd() {
132 for (unsigned int i = 0; i < m_workers.size(); i++) {
133 m_workers[i]->getAsyncFunc().waitForEnd();
138 * Start job processing and wait for all jobs finish running.
140 void run() {
141 if (m_workers.size() == 1) {
142 m_workers[0]->doJob();
143 } else {
144 start();
145 waitForEnd();
149 unsigned int getWorkerCount() {
150 return m_workers.size();
153 TWorker *getWorker(unsigned int i) {
154 assert(i < m_workers.size());
155 return m_workers[i]->getWorker();
158 void setStatus(int percent) {
159 if (percent > m_lastPercent) {
160 struct timeval tv;
161 gettimeofday(&tv, 0);
162 long diff = (tv.tv_sec - m_start.tv_sec ) * 1000000 +
163 (tv.tv_usec - m_start.tv_usec);
164 m_lastPercent = percent;
165 int seconds = diff / (percent * 10000);
166 printf("%d%% (ETA: %d sec or %d min or %d hours)\n", percent,
167 seconds, seconds/60, seconds/3600);
171 std::shared_ptr<TJob> getNextJob() {
172 Lock lock(m_mutex);
173 if (m_index >= m_jobs.size()) {
174 return std::shared_ptr<TJob>();
176 if (m_showStatus && m_index > m_workers.size()) {
177 setStatus((m_index - m_workers.size()) * 100 / m_jobs.size());
179 return m_jobs[m_index++];
182 private:
183 Mutex m_mutex;
184 unsigned int m_index;
185 std::vector<std::shared_ptr<TJob> > m_jobs;
186 std::vector<std::shared_ptr<WorkerWrapper<TJob, TWorker> > > m_workers;
187 bool m_showStatus;
188 int m_lastPercent;
189 struct timeval m_start;
192 ///////////////////////////////////////////////////////////////////////////////