fixup yaml to be c++ compliant
[hiphop-php.git] / hphp / util / async-job.h
blob5c64ce24de78529aa2ed6d36a153169a6339f717
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_ASYNC_JOB_H_
18 #define incl_HPHP_ASYNC_JOB_H_
20 #include "hphp/util/base.h"
21 #include "hphp/util/async-func.h"
22 #include "hphp/util/lock.h"
23 #include <algorithm>
24 #include <sys/time.h>
26 namespace HPHP {
27 ///////////////////////////////////////////////////////////////////////////////
28 // helper, skip to JobDispatcher at below to read comments and usages...
30 template<class TJob, class TWorker>
31 class JobDispatcher;
33 template<class TJob>
34 class WorkerInfo {
35 public:
36 enum { DoInit = true };
39 template<class TJob, class TWorker>
40 class WorkerWrapper {
41 public:
42 explicit WorkerWrapper(JobDispatcher<TJob, TWorker> &dispatcher)
43 : m_dispatcher(dispatcher)
44 , m_func(this, &WorkerWrapper<TJob, TWorker>::doJob)
46 if (!WorkerInfo<TJob>::DoInit) {
47 m_func.setNoInit();
51 TWorker *getWorker() { return &m_worker;}
52 AsyncFunc<WorkerWrapper<TJob, TWorker> > &getAsyncFunc() {
53 return m_func;
56 void doJob() {
57 m_worker.onThreadEnter();
58 while (true) {
59 std::shared_ptr<TJob> job = m_dispatcher.getNextJob();
60 if (!job) break;
61 m_worker.doJob(job);
63 m_worker.onThreadExit();
66 private:
67 JobDispatcher<TJob, TWorker> &m_dispatcher;
68 AsyncFunc<WorkerWrapper<TJob, TWorker> > m_func;
69 TWorker m_worker;
72 ///////////////////////////////////////////////////////////////////////////////
74 /**
75 * The promise is to make job processing cross multi-worker as easy as
76 * possible. The final syntax is like,
78 * MyJobPtrVec jobs;
79 * // prepare job list in "jobs"
81 * JobDispatcher<MyJob, MyWorker>(jobs, 10).run();
83 * or
85 * JobDispatcher<MyJob, MyWorker> dispatcher(jobs, 10);
86 * dispatcher.start();
87 * // do something else;
88 * dispatcher.waitForEnd();
91 * The only requirement is MyWorker has to be a class that implements this:
93 * class MyWorker {
94 * public:
95 * void doJob(MyJobPtr job);
96 * };
98 template<class TJob, class TWorker>
99 class JobDispatcher {
100 public:
101 JobDispatcher(std::vector<std::shared_ptr<TJob> > &jobs,
102 unsigned int workerCount, bool showStatus = false)
103 : m_index(0), m_jobs(jobs), m_showStatus(showStatus), m_lastPercent(0) {
104 std::random_shuffle(m_jobs.begin(), m_jobs.end());
105 if (workerCount > jobs.size()) {
106 workerCount = jobs.size();
108 m_workers.resize(workerCount);
109 for (unsigned int i = 0; i < m_workers.size(); i++) {
110 m_workers[i] = std::shared_ptr<WorkerWrapper<TJob, TWorker> >
111 (new WorkerWrapper<TJob, TWorker>(*this));
116 * Start job processing asynchronously.
118 void start() {
119 gettimeofday(&m_start, 0);
120 for (unsigned int i = 0; i < m_workers.size(); i++) {
121 m_workers[i]->getAsyncFunc().start();
126 * Wait for all jobs finish running.
128 void waitForEnd() {
129 for (unsigned int i = 0; i < m_workers.size(); i++) {
130 m_workers[i]->getAsyncFunc().waitForEnd();
135 * Start job processing and wait for all jobs finish running.
137 void run() {
138 if (m_workers.size() == 1) {
139 m_workers[0]->doJob();
140 } else {
141 start();
142 waitForEnd();
146 unsigned int getWorkerCount() {
147 return m_workers.size();
150 TWorker *getWorker(unsigned int i) {
151 assert(i < m_workers.size());
152 return m_workers[i]->getWorker();
155 void setStatus(int percent) {
156 if (percent > m_lastPercent) {
157 struct timeval tv;
158 gettimeofday(&tv, 0);
159 long diff = (tv.tv_sec - m_start.tv_sec ) * 1000000 +
160 (tv.tv_usec - m_start.tv_usec);
161 m_lastPercent = percent;
162 int seconds = diff / (percent * 10000);
163 printf("%d%% (ETA: %d sec or %d min or %d hours)\n", percent,
164 seconds, seconds/60, seconds/3600);
168 std::shared_ptr<TJob> getNextJob() {
169 Lock lock(m_mutex);
170 if (m_index >= m_jobs.size()) {
171 return std::shared_ptr<TJob>();
173 if (m_showStatus && m_index > m_workers.size()) {
174 setStatus((m_index - m_workers.size()) * 100 / m_jobs.size());
176 return m_jobs[m_index++];
179 private:
180 Mutex m_mutex;
181 unsigned int m_index;
182 std::vector<std::shared_ptr<TJob> > &m_jobs;
183 std::vector<std::shared_ptr<WorkerWrapper<TJob, TWorker> > > m_workers;
184 bool m_showStatus;
185 int m_lastPercent;
186 struct timeval m_start;
189 ///////////////////////////////////////////////////////////////////////////////
192 #endif // incl_HPHP_ASYNC_JOB_H_