move getMapIdByValue to FieldMask.h
[hiphop-php.git] / hphp / util / coro.h
blob8ab29240531d5b9e5089a32e3223e7dd879a6d28
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #pragma once
19 #include "hphp/util/hash-map.h"
20 #include "hphp/util/lock-free-lazy.h"
22 #include <folly/Try.h>
23 #include <folly/Unit.h>
24 #include <folly/executors/CPUThreadPoolExecutor.h>
26 #include <cstddef>
27 #include <chrono>
28 #include <functional>
29 #include <memory>
30 #include <string>
32 //////////////////////////////////////////////////////////////////////
35 * C++ coroutine portability layer
37 * Not all of the compilers we support have coroutine support, and
38 * some that do are buggy. Nonetheless, we still want to be able to
39 * use coroutines. This file provides a portability layer to allow for
40 * the usage of coroutines (with very similar syntax), but supports
41 * older compilers.
43 * If the compiler supports coroutines, this is just a simple wrapper
44 * around the coroutine functionality we need. If not, all computation
45 * is performed *eagerly* and the coroutine syntax mainly desugars
46 * into nothing. For the most part, one can write the code assuming
47 * coroutines are supported and it will just work.
49 * NB: There is one important difference. Since "emulated" coroutines
50 * calculate their values eagerly, they may throw exceptions from
51 * different places than "native" coroutines. Native coroutines will
52 * only throw when they are awaited. Emulated coroutines will throw
53 * when they are first called. When writing code when using this
54 * layer, one should be mindful of this and structure their
55 * try/catches appropriately.
57 * Emulated coroutines may be (much) slower than native coroutines
58 * since they are all calculated eagerly, thus there's no room for
59 * parallelism. We expect to use native coroutines in situations where
60 * performance matters.
62 * Emulated coroutines can be forced by defining HPHP_DISABLE_CORO.
65 //////////////////////////////////////////////////////////////////////
67 // Co-routine support in GCC is currently broken....
68 #if FOLLY_HAS_COROUTINES && defined(__clang__) && !defined(HPHP_DISABLE_CORO)
70 // Native coroutines:
72 #include <folly/experimental/coro/AsyncScope.h>
73 #include <folly/experimental/coro/Baton.h>
74 #include <folly/experimental/coro/BlockingWait.h>
75 #include <folly/experimental/coro/Collect.h>
76 #include <folly/experimental/coro/Sleep.h>
77 #include <folly/experimental/coro/Task.h>
79 //////////////////////////////////////////////////////////////////////
81 namespace HPHP::coro {
83 //////////////////////////////////////////////////////////////////////
85 constexpr const bool using_coros = true;
87 // Operators. They must be macros since they're part of the syntax.
89 // Await a coroutine, blocking execution until it returns a value.
90 #define HPHP_CORO_AWAIT(x) (co_await (x))
91 // Return a value from a coroutine
92 #define HPHP_CORO_RETURN(x) co_return (x)
93 // Return a value from a coroutine while moving it. This distinction
94 // doesn't matter for native coroutines.
95 #define HPHP_CORO_MOVE_RETURN(x) co_return (x)
96 // Return a void value from a coroutine. Due to syntactic issues, this
97 // must be a different macro than the value.
98 #define HPHP_CORO_RETURN_VOID co_return
99 // Return a Executor* representing the executor this coroutine is
100 // assigned to.
101 #define HPHP_CORO_CURRENT_EXECUTOR (co_await folly::coro::co_current_executor)
102 // Yield execution of the current coroutine and reschedule it to run
103 // on its assigned executor. The collect* functions can attempt to run
104 // a coroutine eagerly on the current executor. This forces them to
105 // run on their assigned executor.
106 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR \
107 (co_await folly::coro::co_reschedule_on_current_executor)
108 // Throw a folly::OperationCancelled if this coroutine has been cancelled
109 #define HPHP_CORO_SAFE_POINT (co_await folly::coro::co_safe_point)
111 // The actual arguments of these can be complex, so we just perfect
112 // forward everything rather than replicating it here.
114 // Await the given coroutine, blocking the thread until it has a
115 // value. This can be used to "await" a coroutine without become a
116 // coroutine yourself.
117 template <typename... Args>
118 auto wait(Args&&... args) {
119 return folly::coro::blockingWait(std::forward<Args>(args)...);
122 // Takes a set of coroutines and returns a coroutine representing all
123 // of the coroutines. Awaiting that coroutine returns a tuple of the
124 // values of the passed coroutines. This lets you await on multiple
125 // coroutines simultaneously.
126 template <typename... Args>
127 auto collect(Args&&... args) {
128 return folly::coro::collectAll(std::forward<Args>(args)...);
131 // Like collect, but accepts a std::vector of coroutines.
132 template <typename T>
133 auto collectRange(std::vector<T>&& ts) {
134 return folly::coro::collectAllRange(std::move(ts));
137 // Given a callable and a set of arguments, store the arguments, then
138 // invoke the callable. This is useful if you have a callable which
139 // takes params by reference. Passing a temporary will cause crashes
140 // because when the coroutine suspends and then runs later, the
141 // temporary will be gone, and the reference will point to
142 // nothing. This ensures that any such temporaries will be stored
143 // somewhere which lives as long as the coroutine. It is also needed
144 // with lambdas which are coroutines for similar reasons. If you call
145 // the lambda immediately, it will be gone when the coroutine resumes.
146 template <typename... Args>
147 auto invoke(Args&&... args) {
148 return folly::coro::co_invoke(std::forward<Args>(args)...);
151 // Block the execution of this coroutine for the given amount of time.
152 inline auto sleep(std::chrono::milliseconds d) {
153 return folly::coro::sleep(d);
156 // All coroutines return Task<T>, means they return T when they
157 // finish. TaskWithExecutor<T> is a Task which has been bound to an
158 // executor.
159 template <typename T> using Task = folly::coro::Task<T>;
160 template <typename T>
161 using TaskWithExecutor = folly::coro::TaskWithExecutor<T>;
163 // Calculate a value asynchronously on the given executor. This has an
164 // advantage over a bare coroutine because normally a coroutine will
165 // only start execution when you await it (and then you block until
166 // its done). This starts execution before you request the value,
167 // meaning it can run in the background.
168 template <typename T> struct AsyncValue {
169 template <typename F>
170 AsyncValue(F f, folly::Executor::KeepAlive<> executor) {
171 auto work = folly::coro::co_invoke(
172 [this, f = std::move(f)] () mutable -> Task<void> {
173 try {
174 m_try.emplace(co_await f());
175 } catch (...) {
176 m_try.emplaceException(
177 folly::exception_wrapper(std::current_exception())
180 m_baton.post();
181 co_return;
184 m_scope.add(std::move(work).scheduleOn(std::move(executor)));
187 ~AsyncValue() {
188 // NB: It might seem useful here to access the m_try after
189 // waiting, to ensure that any exception is rethrown. However, we
190 // can't throw from a dtor anyways, so its pointless to check.
191 folly::coro::blockingWait(m_scope.joinAsync());
194 AsyncValue(const AsyncValue&) = delete;
195 AsyncValue(AsyncValue&&) = delete;
196 AsyncValue& operator=(const AsyncValue&) = delete;
197 AsyncValue& operator=(AsyncValue&&) = delete;
199 // Coroutines which return either a pointer to the value, or a copy
200 // of the value. If the calculation resulted in an exception, that
201 // exception will be thrown.
203 Task<const T*> operator*() const {
204 co_await m_baton;
205 co_return &*m_try;
208 Task<T> getCopy() const {
209 co_await m_baton;
210 co_return *m_try;
213 private:
214 folly::Try<T> m_try;
215 folly::coro::Baton m_baton;
216 folly::coro::AsyncScope m_scope;
219 // Coro aware semaphore (runs a different task when blocks)
220 struct Semaphore {
221 explicit Semaphore(uint32_t tokens) : m_sem{tokens} {}
222 void signal() { m_sem.signal(); }
223 Task<void> wait() { return m_sem.co_wait(); }
224 private:
225 folly::fibers::Semaphore m_sem;
228 // Allows you to run coroutines asynchronously. Assign an executor to
229 // a Task, then add it to the AsyncScope. The coroutine will start
230 // running and will be automatically cleaned up when done. Since you
231 // cannot await the coroutine after you add it, this is only really
232 // useful for side-effectful coroutines which return void. The
233 // coroutine should not throw.
234 using AsyncScope = folly::coro::AsyncScope;
236 //////////////////////////////////////////////////////////////////////
240 //////////////////////////////////////////////////////////////////////
242 #else
244 #include <folly/synchronization/LifoSem.h>
246 // Emulated coroutines. The coroutine is just the value itself. The
247 // value is calculated eagerly when the coroutine is created (so
248 // basically like any other normal function call).
250 namespace HPHP::coro {
252 //////////////////////////////////////////////////////////////////////
254 constexpr const bool using_coros = false;
256 // Task and TaskWithExecutor are just simple wrappers around a value.
258 namespace detail {
259 template <typename T> struct DummyTask;
261 template <typename T>
262 struct DummyTask {
263 explicit DummyTask(T v) : val{std::move(v)} {}
265 DummyTask<T> scheduleOn(folly::Executor::KeepAlive<>) && {
266 return DummyTask<T>{std::move(val)};
269 T take() && { return std::move(val); }
270 private:
271 T val;
274 template<> struct DummyTask<void> {
275 DummyTask<void> scheduleOn(folly::Executor::KeepAlive<>) && {
276 return DummyTask<void>{};
278 void take() && {}
281 template <typename T> DummyTask<T> makeDummy(T v) {
282 return DummyTask<T>{std::move(v)};
286 template <typename T> using Task = detail::DummyTask<T>;
287 template <typename T> using TaskWithExecutor = detail::DummyTask<T>;
289 // "Awaiting" is just taking the wrapper value
290 #define HPHP_CORO_AWAIT(x) ((x).take())
291 // Returning just wraps the value
292 #define HPHP_CORO_RETURN(x) return HPHP::coro::detail::makeDummy((x))
293 #define HPHP_CORO_MOVE_RETURN(x) \
294 return HPHP::coro::detail::makeDummy(std::move(x))
295 #define HPHP_CORO_RETURN_VOID return HPHP::coro::detail::DummyTask<void>{}
296 // This isn't ideal but there's no real good notion of "current
297 // executor" if coroutines don't actually exist.
298 #define HPHP_CORO_CURRENT_EXECUTOR ((folly::Executor*)nullptr)
299 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR
300 #define HPHP_CORO_SAFE_POINT
302 template <typename T>
303 auto wait(Task<T>&& t) { return std::move(t).take(); }
305 inline void wait(Task<void>&&) {}
307 namespace detail {
308 template<typename T>
309 auto collectImpl(Task<T>&& t) {
310 return std::move(t).take();
312 inline auto collectImpl(Task<void>&&) {
313 return folly::Unit{};
317 template <typename... T>
318 auto collect(Task<T>&&... t) {
319 return detail::makeDummy(
320 std::make_tuple(detail::collectImpl(std::move(t))...)
324 template <typename T>
325 auto collectRange(std::vector<Task<T>>&& ts) {
326 std::vector<T> out;
327 out.reserve(ts.size());
328 for (auto& t : ts) out.emplace_back(std::move(t).take());
329 return detail::makeDummy(std::move(out));
332 inline auto collectRange(std::vector<Task<void>>&&) {
333 return Task<void>{};
336 template <typename F, typename... Args> auto invoke(F&& f, Args&&... args) {
337 return f(std::forward<Args>(args)...);
340 inline Task<void> sleep(std::chrono::milliseconds d) {
341 std::this_thread::sleep_for(d);
342 HPHP_CORO_RETURN_VOID;
345 // Just calculate the value eagerly, fulfilling the interface.
346 template <typename T> struct AsyncValue {
347 template <typename F>
348 AsyncValue(F&& f, folly::Executor::KeepAlive<>) {
349 try {
350 m_try.emplace(f().take());
351 } catch (...) {
352 m_try.emplaceException(
353 folly::exception_wrapper(std::current_exception())
358 AsyncValue(const AsyncValue&) = delete;
359 AsyncValue(AsyncValue&&) = delete;
360 AsyncValue& operator=(const AsyncValue&) = delete;
361 AsyncValue& operator=(AsyncValue&&) = delete;
363 Task<const T*> operator*() const {
364 return Task<const T*>{&*m_try};
367 Task<T> getCopy() const {
368 return Task<T>{*m_try};
370 private:
371 folly::Try<T> m_try;
374 // A Task<void> has by definition alreadly executed whatever
375 // side-effects it has, so there's nothing to do there.
376 struct AsyncScope {
377 void add(TaskWithExecutor<void>&&) {}
378 Task<void> joinAsync() { return Task<void>{}; }
381 struct Semaphore {
382 explicit Semaphore(uint32_t tokens) : m_sem{tokens} {}
383 void signal() { m_sem.post(); }
384 Task<void> wait() { m_sem.wait(); return Task<void>{}; }
385 private:
386 folly::LifoSem m_sem;
389 //////////////////////////////////////////////////////////////////////
392 #endif
394 //////////////////////////////////////////////////////////////////////
396 namespace HPHP::coro {
398 // Generic functionality for both modes:
400 //////////////////////////////////////////////////////////////////////
402 // Maps a key to an asynchronously calculated value. This ensures that
403 // for any key, the value will be calculated precisely once.
404 template <typename K, typename V>
405 struct AsyncMap {
406 // If the key is present, return a coroutine which can be awaited to
407 // get the associated value. If the key isn't present, then the
408 // callable is invoked on the given executor and a coroutine is
409 // returned representing that calculation. Any other concurrent (or
410 // later) calls will wait on the same value.
411 template <typename F>
412 Task<V> get(const K& key,
413 F&& f,
414 folly::Executor::KeepAlive<> executor) {
415 // AsyncValue is responsible for doing the actual async
416 // calculation. We wrap that in a LockFreeLazy to ensure we only
417 // ever create one of them for each key. The LockFreeLazy is
418 // inside an unique_ptr since the LockFreeLazy is not
419 // moveable. The folly_concurrent_hash_map_simd ensures we only
420 // ever get one LockFreeLazy per key.
421 auto& lazy = [&] () -> Lazy& {
422 // Look up the key. If there's an entry, return it. Otherwise
423 // create one and return it. If there's a race with the
424 // insertion, one thread will actually insert it, and the rest
425 // will just free their unique_ptrs (they'll all return the one
426 // actually inserted).
427 auto const it = m_map.find(key);
428 if (it != m_map.end()) return *it->second;
429 return *m_map.insert(
430 key, std::make_unique<Lazy>()
431 ).first->second;
432 }();
434 // We got the appropriate LockFreeLazy, which might be new or
435 // old. Either way, get its value, which ensures exactly one
436 // thread creates the AsyncValue and starts its calculation.
437 return lazy.get(
438 [&] { return Async{f(), std::move(executor)}; }
439 ).getCopy();
442 private:
443 using Async = AsyncValue<V>;
444 using Lazy = LockFreeLazy<Async>;
445 folly_concurrent_hash_map_simd<K, std::unique_ptr<Lazy>> m_map;
448 //////////////////////////////////////////////////////////////////////
451 * A CPUThreadPoolExecutor with stronger forward progress
452 * guarantees. Using the standard Executor interface it behaves
453 * exactly as a CPUThreadPoolExecutor. However, one can add tasks with
454 * an associated "Ticket". Runnable tasks are always prioritized by
455 * their tickets (lower tickets are higher priority). So far this is
456 * pretty standard, but one can obtain a "sticky" executor, which
457 * remembers the assigned ticket. The same ticket will be used no
458 * matter how many times the task is run then re-added to the
459 * executor.
461 * This is useful for coroutines. You can assign a coroutine a sticky
462 * executor and that coroutine will always be prioritized over later
463 * coroutines. This ensures that earlier coroutines will finish before
464 * later coroutines.
467 struct TicketExecutor : public folly::CPUThreadPoolExecutor {
468 // Create a TicketExecutor with the given number of threads. Each
469 // thread created will call threadInit before running, and
470 // threadFini before exiting. Any idle threads will exit after
471 // threadTimeout passes without work. The created threads will set
472 // to their name to threadPrefix plus some numeric identifier.
473 TicketExecutor(const std::string& threadPrefix,
474 size_t minThreads,
475 size_t maxThreads,
476 std::function<void()> threadInit,
477 std::function<void()> threadFini,
478 std::chrono::milliseconds threadTimeout);
479 ~TicketExecutor();
481 using Ticket = int64_t;
483 // Add work to this executor with the specified ticket.
484 void addWithTicket(folly::Func, Ticket);
486 // Obtain a new ticket. Tickets are monotonically increasing. Each
487 // stamp() returns an unique new ticket higher than all previous
488 // ones.
489 Ticket stamp();
491 // Create a new Executor (of some unnamed type) which will schedule
492 // work on this TicketExecutor, but using addWithTicket with the
493 // same ticket.
494 folly::Executor::KeepAlive<> sticky() { return sticky(stamp()); }
495 folly::Executor::KeepAlive<> sticky(Ticket);
498 //////////////////////////////////////////////////////////////////////