Deshim VirtualExecutor in folly
[hiphop-php.git] / hphp / hhbbc / parallel.h
blob36ab05c4c3b6e67861daa8dca5e32012bde39dcb
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
16 #pragma once
18 #include <stdexcept>
19 #include <iterator>
20 #include <cstdio>
21 #include <thread>
22 #include <vector>
23 #include <type_traits>
24 #include <string>
25 #include <atomic>
26 #include <algorithm>
27 #include <exception>
29 #include <folly/ScopeGuard.h>
31 #include "hphp/runtime/vm/treadmill.h"
32 #include "hphp/runtime/base/program-functions.h"
34 namespace HPHP::HHBBC {
36 namespace parallel {
38 //////////////////////////////////////////////////////////////////////
41 * Before using the parallel module, you can configure these to change
42 * how much parallelism is used.
44 extern size_t num_threads;
45 extern size_t final_threads;
47 //////////////////////////////////////////////////////////////////////
49 namespace detail {
51 template<class Items>
52 auto size_info(Items&& items) {
53 auto const size = items.size();
54 if (!size) return std::make_tuple(size, size);
55 auto const threads = std::min(num_threads, size);
56 return std::make_tuple(size, threads);
59 template<class Func, class Item>
60 auto caller(const Func& func, Item&& item, size_t worker) ->
61 decltype(func(std::forward<Item>(item), worker)) {
62 return func(std::forward<Item>(item), worker);
65 template<class Func, class Item>
66 auto caller(const Func& func, Item&& item, size_t worker) ->
67 decltype(func(std::forward<Item>(item))) {
68 return func(std::forward<Item>(item));
74 * Call a function on each element of `inputs', in parallel.
76 * If `func' throws an exception, some of the work will not be
77 * attempted.
79 template<class Func, class Items>
80 void for_each(Items&& inputs, Func func) {
81 std::atomic<bool> failed{false};
82 std::atomic<size_t> index{0};
83 auto const info = detail::size_info(inputs);
84 auto const size = std::get<0>(info);
85 if (!size) return;
86 auto const threads = std::get<1>(info);
88 std::vector<std::thread> workers;
89 for (auto worker = size_t{0}; worker < threads; ++worker) {
90 workers.push_back(std::thread([&, worker] {
91 try {
92 HphpSessionAndThread _{Treadmill::SessionKind::HHBBC};
93 while (true) {
94 auto const i = index++;
95 if (i >= size) break;
96 detail::caller(
97 func,
98 std::forward<Items>(inputs)[i],
99 worker
102 } catch (const std::exception& e) {
103 std::fprintf(stderr,
104 "worker thread exited with exception: %s\n", e.what());
105 failed = true;
107 }));
110 for (auto& t : workers) t.join();
112 if (failed) throw std::runtime_error("parallel::for_each failed");
115 //////////////////////////////////////////////////////////////////////
118 * Call a function that produces a return value for each element of
119 * `inputs' in parallel, and collect the results.
121 * Requires: the type returned from the function call must be
122 * DefaultConstructible, and either MoveAssignable or Assignable.
124 * If `func' throws an exception, the results of the output vector
125 * will contain some default-constructed values.
127 template<class Func, class Items>
128 auto map(Items&& inputs, Func func) -> std::vector<decltype(func(inputs[0]))> {
129 auto const info = detail::size_info(inputs);
130 auto const size = std::get<0>(info);
131 std::vector<decltype(func(inputs[0]))> retVec(size);
132 if (!size) return retVec;
133 auto const threads = std::get<1>(info);
135 auto const retMem = &retVec[0];
137 std::atomic<bool> failed{false};
138 std::atomic<size_t> index{0};
140 std::vector<std::thread> workers;
141 for (auto worker = size_t{0}; worker < threads; ++worker) {
142 workers.push_back(std::thread([&] {
143 try {
144 HphpSessionAndThread _{Treadmill::SessionKind::HHBBC};
145 while (true) {
146 auto const i = index++;
147 if (i >= size) break;
148 retMem[i] = func(std::forward<Items>(inputs)[i]);
150 } catch (const std::runtime_error& e) {
151 std::fprintf(stderr,
152 "worker thread exited with exception: %s\n", e.what());
153 failed = true;
155 }));
158 for (auto& t : workers) t.join();
159 if (failed) throw std::runtime_error("parallel::map failed");
161 return retVec;
164 //////////////////////////////////////////////////////////////////////
167 * Call a function that produces a return value for each integer from
168 * 0 to count-1 in parallel, and collect the results.
170 * Requires: the type returned from the function call must be
171 * DefaultConstructible, and either MoveAssignable or Assignable.
173 * If `func' throws an exception, the results of the output vector
174 * will contain some default-constructed values.
176 template<typename Func>
177 auto gen(size_t count, Func func) -> std::vector<decltype(func(0))> {
178 std::vector<decltype(func(0))> retVec(count);
179 if (!count) return retVec;
181 auto const threads = std::min(num_threads, count);
183 std::atomic<bool> failed{false};
184 std::atomic<size_t> index{0};
186 std::vector<std::thread> workers;
187 for (auto worker = size_t{0}; worker < threads; ++worker) {
188 workers.emplace_back(
189 [&] {
190 try {
191 HphpSessionAndThread _{Treadmill::SessionKind::HHBBC};
192 while (true) {
193 auto const i = index++;
194 if (i >= count) break;
195 retVec[i] = func(i);
197 } catch (const std::runtime_error& e) {
198 std::fprintf(
199 stderr, "worker thread exited with exception: %s\n", e.what()
201 failed = true;
207 for (auto& t : workers) t.join();
208 if (failed) throw std::runtime_error("parallel::gen failed");
210 return retVec;
213 //////////////////////////////////////////////////////////////////////
215 namespace detail {
217 template<size_t Total>
218 void populateThreads(std::array<std::thread, Total>&,
219 std::atomic<bool>&,
220 size_t) {}
222 template<size_t Total, typename Func, typename... Funcs>
223 void populateThreads(std::array<std::thread, Total>& threads,
224 std::atomic<bool>& failed,
225 size_t idx,
226 Func&& func,
227 Funcs&&... funcs) {
228 threads[idx] = std::thread(
229 [&] {
230 try {
231 HphpSessionAndThread _{Treadmill::SessionKind::HHBBC};
232 func();
233 } catch (const std::runtime_error& e) {
234 std::fprintf(
235 stderr, "worker thread exited with exception: %s\n", e.what()
237 failed = true;
241 populateThreads(threads, failed, idx+1, std::forward<Funcs>(funcs)...);
246 // Run N callables in parallel
247 template<typename... Funcs>
248 void parallel(Funcs&&... funcs) {
249 std::array<std::thread, sizeof...(Funcs)> threads;
250 std::atomic<bool> failed{false};
251 detail::populateThreads(threads, failed, 0, std::forward<Funcs>(funcs)...);
252 for (auto& t : threads) t.join();
253 if (failed) throw std::runtime_error("parallel::parallel failed");
256 //////////////////////////////////////////////////////////////////////
260 //////////////////////////////////////////////////////////////////////