2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
22 #include <folly/portability/SysTime.h>
24 #include "hphp/util/async-func.h"
25 #include "hphp/util/lock.h"
28 ///////////////////////////////////////////////////////////////////////////////
29 // helper, skip to JobDispatcher at below to read comments and usages...
31 template<class TJob
, class TWorker
>
36 enum { DoInitFini
= true };
39 template<class TJob
, class TWorker
>
40 struct WorkerWrapper
{
41 explicit WorkerWrapper(JobDispatcher
<TJob
, TWorker
> &dispatcher
)
42 : m_dispatcher(dispatcher
)
43 , m_func(this, &WorkerWrapper
<TJob
, TWorker
>::doJob
)
45 if (!WorkerInfo
<TJob
>::DoInitFini
) {
46 m_func
.setNoInitFini();
50 TWorker
*getWorker() { return &m_worker
;}
51 AsyncFunc
<WorkerWrapper
<TJob
, TWorker
> > &getAsyncFunc() {
56 m_worker
.onThreadEnter();
58 std::shared_ptr
<TJob
> job
= m_dispatcher
.getNextJob();
62 m_worker
.onThreadExit();
66 JobDispatcher
<TJob
, TWorker
> &m_dispatcher
;
67 AsyncFunc
<WorkerWrapper
<TJob
, TWorker
> > m_func
;
71 ///////////////////////////////////////////////////////////////////////////////
74 * The promise is to make job processing cross multi-worker as easy as
75 * possible. The final syntax is like,
78 * // prepare job list in "jobs"
80 * JobDispatcher<MyJob, MyWorker>(std::move(jobs), 10).run();
84 * JobDispatcher<MyJob, MyWorker> dispatcher(std::move(jobs), 10);
86 * // do something else;
87 * dispatcher.waitForEnd();
90 * The only requirement is MyWorker has to be a class that implements this:
93 * void onThreadEnter();
94 * void doJob(MyJobPtr job);
95 * void onThreadExit();
98 template<class TJob
, class TWorker
>
99 struct JobDispatcher
{
100 JobDispatcher(const std::vector
<std::shared_ptr
<TJob
> > &&jobs
,
101 unsigned int workerCount
, bool showStatus
= false)
103 m_jobs(std::move(jobs
)),
104 m_showStatus(showStatus
),
106 std::mt19937
prng(42);
107 std::shuffle(m_jobs
.begin(), m_jobs
.end(), prng
);
108 if (workerCount
> m_jobs
.size()) {
109 workerCount
= m_jobs
.size();
111 m_workers
.resize(workerCount
);
112 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
113 m_workers
[i
] = std::shared_ptr
<WorkerWrapper
<TJob
, TWorker
> >
114 (new WorkerWrapper
<TJob
, TWorker
>(*this));
119 * Start job processing asynchronously.
122 gettimeofday(&m_start
, 0);
123 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
124 m_workers
[i
]->getAsyncFunc().start();
129 * Wait for all jobs finish running.
132 for (unsigned int i
= 0; i
< m_workers
.size(); i
++) {
133 m_workers
[i
]->getAsyncFunc().waitForEnd();
138 * Start job processing and wait for all jobs finish running.
141 if (m_workers
.size() == 1) {
142 m_workers
[0]->doJob();
149 unsigned int getWorkerCount() {
150 return m_workers
.size();
153 TWorker
*getWorker(unsigned int i
) {
154 assert(i
< m_workers
.size());
155 return m_workers
[i
]->getWorker();
158 void setStatus(int percent
) {
159 if (percent
> m_lastPercent
) {
161 gettimeofday(&tv
, 0);
162 long diff
= (tv
.tv_sec
- m_start
.tv_sec
) * 1000000 +
163 (tv
.tv_usec
- m_start
.tv_usec
);
164 m_lastPercent
= percent
;
165 int seconds
= diff
/ (percent
* 10000);
166 printf("%d%% (ETA: %d sec or %d min or %d hours)\n", percent
,
167 seconds
, seconds
/60, seconds
/3600);
171 std::shared_ptr
<TJob
> getNextJob() {
173 if (m_index
>= m_jobs
.size()) {
174 return std::shared_ptr
<TJob
>();
176 if (m_showStatus
&& m_index
> m_workers
.size()) {
177 setStatus((m_index
- m_workers
.size()) * 100 / m_jobs
.size());
179 return m_jobs
[m_index
++];
184 unsigned int m_index
;
185 std::vector
<std::shared_ptr
<TJob
> > m_jobs
;
186 std::vector
<std::shared_ptr
<WorkerWrapper
<TJob
, TWorker
> > > m_workers
;
189 struct timeval m_start
;
192 ///////////////////////////////////////////////////////////////////////////////