Parallelize update phase
[hiphop-php.git] / hphp / hhbbc / parallel.h
blob66deecd643bd819a44e143a68cd2b388ed20dfda
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
16 #ifndef incl_HHBBC_PARALLEL_H_
17 #define incl_HHBBC_PARALLEL_H_
19 #include <stdexcept>
20 #include <iterator>
21 #include <cstdio>
22 #include <thread>
23 #include <vector>
24 #include <type_traits>
25 #include <string>
26 #include <atomic>
27 #include <algorithm>
28 #include <exception>
30 #include <folly/ScopeGuard.h>
32 #include "hphp/runtime/vm/treadmill.h"
33 #include "hphp/runtime/base/program-functions.h"
35 namespace HPHP { namespace HHBBC {
37 namespace parallel {
39 //////////////////////////////////////////////////////////////////////
42 * Before using the parallel module, you can configure these to change
43 * how much parallelism is used.
45 extern size_t num_threads;
46 extern size_t work_chunk;
48 //////////////////////////////////////////////////////////////////////
50 namespace detail {
52 template<class Func, class Item>
53 auto caller(const Func& func, Item&& item, size_t worker) ->
54 decltype(func(std::forward<Item>(item), worker)) {
55 return func(std::forward<Item>(item), worker);
58 template<class Func, class Item>
59 auto caller(const Func& func, Item&& item, size_t worker) ->
60 decltype(func(std::forward<Item>(item))) {
61 return func(std::forward<Item>(item));
67 * Call a function on each element of `inputs', in parallel.
69 * If `func' throws an exception, some of the work will not be
70 * attempted.
72 template<class Func, class Items>
73 void for_each(Items&& inputs, Func func) {
74 std::atomic<bool> failed{false};
75 std::atomic<size_t> index{0};
76 auto const size = inputs.size();
78 std::vector<std::thread> workers;
79 for (auto worker = size_t{0}; worker < num_threads; ++worker) {
80 workers.push_back(std::thread([&, worker] {
81 try {
82 hphp_thread_init();
83 hphp_session_init(Treadmill::SessionKind::HHBBC);
84 SCOPE_EXIT {
85 hphp_context_exit();
86 hphp_session_exit();
87 hphp_thread_exit();
90 for (;;) {
91 auto start = index.fetch_add(work_chunk);
92 auto const stop = std::min(start + work_chunk, size);
93 if (start >= stop) break;
94 for (auto i = start; i != stop; ++i) {
95 detail::caller(func,
96 std::forward<Items>(inputs)[i],
97 worker);
100 } catch (const std::exception& e) {
101 std::fprintf(stderr,
102 "worker thread exited with exception: %s\n", e.what());
103 failed = true;
105 }));
108 for (auto& t : workers) t.join();
110 if (failed) throw std::runtime_error("parallel::for_each failed");
113 //////////////////////////////////////////////////////////////////////
116 * Call a function that produces a return value for each element of
117 * `inputs' in parallel, and collect the results.
119 * Requires: the type returned from the function call must be
120 * DefaultConstructible, and either MoveAssignable or Assignable.
122 * If `func' throws an exception, the results of the output vector
123 * will contain some default-constructed values.
125 template<class Func, class Items>
126 auto map(Items&& inputs, Func func) -> std::vector<decltype(func(inputs[0]))> {
127 std::vector<decltype(func(inputs[0]))> retVec(inputs.size());
128 auto const retMem = &retVec[0];
130 std::atomic<bool> failed{false};
131 std::atomic<size_t> index{0};
133 std::vector<std::thread> workers;
134 for (auto worker = size_t{0}; worker < num_threads; ++worker) {
135 workers.push_back(std::thread([&] {
136 try {
137 hphp_thread_init();
138 hphp_session_init(Treadmill::SessionKind::HHBBC);
139 SCOPE_EXIT {
140 hphp_context_exit();
141 hphp_session_exit();
142 hphp_thread_exit();
145 for (;;) {
146 auto start = index.fetch_add(work_chunk);
147 auto const stop = std::min(start + work_chunk, inputs.size());
148 if (start >= stop) break;
150 std::transform(
151 begin(inputs) + start, begin(inputs) + stop,
152 retMem + start,
153 func
156 } catch (const std::runtime_error& e) {
157 std::fprintf(stderr,
158 "worker thread exited with exception: %s\n", e.what());
159 failed = true;
161 }));
164 for (auto& t : workers) t.join();
165 if (failed) throw std::runtime_error("parallel::map failed");
167 return retVec;
170 //////////////////////////////////////////////////////////////////////
176 #endif