2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
16 #ifndef incl_HHBBC_PARALLEL_H_
17 #define incl_HHBBC_PARALLEL_H_
24 #include <type_traits>
30 #include <folly/ScopeGuard.h>
32 #include "hphp/runtime/vm/treadmill.h"
33 #include "hphp/runtime/base/program-functions.h"
35 namespace HPHP
{ namespace HHBBC
{
39 //////////////////////////////////////////////////////////////////////
42 * Before using the parallel module, you can configure these to change
43 * how much parallelism is used.
45 extern size_t num_threads
;
46 extern size_t work_chunk
;
48 //////////////////////////////////////////////////////////////////////
52 template<class Func
, class Item
>
53 auto caller(const Func
& func
, Item
&& item
, size_t worker
) ->
54 decltype(func(std::forward
<Item
>(item
), worker
)) {
55 return func(std::forward
<Item
>(item
), worker
);
58 template<class Func
, class Item
>
59 auto caller(const Func
& func
, Item
&& item
, size_t worker
) ->
60 decltype(func(std::forward
<Item
>(item
))) {
61 return func(std::forward
<Item
>(item
));
67 * Call a function on each element of `inputs', in parallel.
69 * If `func' throws an exception, some of the work will not be
72 template<class Func
, class Items
>
73 void for_each(Items
&& inputs
, Func func
) {
74 std::atomic
<bool> failed
{false};
75 std::atomic
<size_t> index
{0};
76 auto const size
= inputs
.size();
78 std::vector
<std::thread
> workers
;
79 for (auto worker
= size_t{0}; worker
< num_threads
; ++worker
) {
80 workers
.push_back(std::thread([&, worker
] {
83 hphp_session_init(Treadmill::SessionKind::HHBBC
);
91 auto start
= index
.fetch_add(work_chunk
);
92 auto const stop
= std::min(start
+ work_chunk
, size
);
93 if (start
>= stop
) break;
94 for (auto i
= start
; i
!= stop
; ++i
) {
96 std::forward
<Items
>(inputs
)[i
],
100 } catch (const std::exception
& e
) {
102 "worker thread exited with exception: %s\n", e
.what());
108 for (auto& t
: workers
) t
.join();
110 if (failed
) throw std::runtime_error("parallel::for_each failed");
113 //////////////////////////////////////////////////////////////////////
116 * Call a function that produces a return value for each element of
117 * `inputs' in parallel, and collect the results.
119 * Requires: the type returned from the function call must be
120 * DefaultConstructible, and either MoveAssignable or Assignable.
122 * If `func' throws an exception, the results of the output vector
123 * will contain some default-constructed values.
125 template<class Func
, class Items
>
126 auto map(Items
&& inputs
, Func func
) -> std::vector
<decltype(func(inputs
[0]))> {
127 std::vector
<decltype(func(inputs
[0]))> retVec(inputs
.size());
128 auto const retMem
= &retVec
[0];
130 std::atomic
<bool> failed
{false};
131 std::atomic
<size_t> index
{0};
133 std::vector
<std::thread
> workers
;
134 for (auto worker
= size_t{0}; worker
< num_threads
; ++worker
) {
135 workers
.push_back(std::thread([&] {
138 hphp_session_init(Treadmill::SessionKind::HHBBC
);
146 auto start
= index
.fetch_add(work_chunk
);
147 auto const stop
= std::min(start
+ work_chunk
, inputs
.size());
148 if (start
>= stop
) break;
151 begin(inputs
) + start
, begin(inputs
) + stop
,
156 } catch (const std::runtime_error
& e
) {
158 "worker thread exited with exception: %s\n", e
.what());
164 for (auto& t
: workers
) t
.join();
165 if (failed
) throw std::runtime_error("parallel::map failed");
170 //////////////////////////////////////////////////////////////////////