2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #ifndef incl_HPHP_ASYNC_JOB_H_
18 #define incl_HPHP_ASYNC_JOB_H_
20 #include "hphp/util/base.h"
21 #include "hphp/util/async-func.h"
22 #include "hphp/util/lock.h"
27 ///////////////////////////////////////////////////////////////////////////////
28 // helper, skip to JobDispatcher at below to read comments and usages...
30 template<class TJob
, class TWorker
>
36 enum { DoInit
= true };
39 template<class TJob
, class TWorker
>
42 explicit WorkerWrapper(JobDispatcher
<TJob
, TWorker
> &dispatcher
)
43 : m_dispatcher(dispatcher
)
44 , m_func(this, &WorkerWrapper
<TJob
, TWorker
>::doJob
)
46 if (!WorkerInfo
<TJob
>::DoInit
) {
51 TWorker
*getWorker() { return &m_worker
;}
52 AsyncFunc
<WorkerWrapper
<TJob
, TWorker
> > &getAsyncFunc() {
57 m_worker
.onThreadEnter();
59 std::shared_ptr
<TJob
> job
= m_dispatcher
.getNextJob();
63 m_worker
.onThreadExit();
67 JobDispatcher
<TJob
, TWorker
> &m_dispatcher
;
68 AsyncFunc
<WorkerWrapper
<TJob
, TWorker
> > m_func
;
72 ///////////////////////////////////////////////////////////////////////////////
75 * The promise is to make job processing cross multi-worker as easy as
76 * possible. The final syntax is like,
79 * // prepare job list in "jobs"
81 * JobDispatcher<MyJob, MyWorker>(jobs, 10).run();
85 * JobDispatcher<MyJob, MyWorker> dispatcher(jobs, 10);
87 * // do something else;
88 * dispatcher.waitForEnd();
91 * The only requirement is MyWorker has to be a class that implements this:
95 * void doJob(MyJobPtr job);
98 template<class TJob
, class TWorker
>
101 JobDispatcher(std::vector
<std::shared_ptr
<TJob
> > &jobs
,
102 unsigned int workerCount
, bool showStatus
= false)
103 : m_index(0), m_jobs(jobs
), m_showStatus(showStatus
), m_lastPercent(0) {
104 std::random_shuffle(m_jobs
.begin(), m_jobs
.end());
105 if (workerCount
> jobs
.size()) {
106 workerCount
= jobs
.size();
108 m_workers
.resize(workerCount
);
109 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
110 m_workers
[i
] = std::shared_ptr
<WorkerWrapper
<TJob
, TWorker
> >
111 (new WorkerWrapper
<TJob
, TWorker
>(*this));
116 * Start job processing asynchronously.
119 gettimeofday(&m_start
, 0);
120 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
121 m_workers
[i
]->getAsyncFunc().start();
126 * Wait for all jobs finish running.
129 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
130 m_workers
[i
]->getAsyncFunc().waitForEnd();
135 * Start job processing and wait for all jobs finish running.
138 if (m_workers
.size() == 1) {
139 m_workers
[0]->doJob();
146 unsigned int getWorkerCount() {
147 return m_workers
.size();
150 TWorker
*getWorker(unsigned int i
) {
151 assert(i
< m_workers
.size());
152 return m_workers
[i
]->getWorker();
155 void setStatus(int percent
) {
156 if (percent
> m_lastPercent
) {
158 gettimeofday(&tv
, 0);
159 long diff
= (tv
.tv_sec
- m_start
.tv_sec
) * 1000000 +
160 (tv
.tv_usec
- m_start
.tv_usec
);
161 m_lastPercent
= percent
;
162 int seconds
= diff
/ (percent
* 10000);
163 printf("%d%% (ETA: %d sec or %d min or %d hours)\n", percent
,
164 seconds
, seconds
/60, seconds
/3600);
168 std::shared_ptr
<TJob
> getNextJob() {
170 if (m_index
>= m_jobs
.size()) {
171 return std::shared_ptr
<TJob
>();
173 if (m_showStatus
&& m_index
> m_workers
.size()) {
174 setStatus((m_index
- m_workers
.size()) * 100 / m_jobs
.size());
176 return m_jobs
[m_index
++];
181 unsigned int m_index
;
182 std::vector
<std::shared_ptr
<TJob
> > &m_jobs
;
183 std::vector
<std::shared_ptr
<WorkerWrapper
<TJob
, TWorker
> > > m_workers
;
186 struct timeval m_start
;
189 ///////////////////////////////////////////////////////////////////////////////
192 #endif // incl_HPHP_ASYNC_JOB_H_