declare_folded_class NO LONGER _in_file
[hiphop-php.git] / hphp / util / coro.h
blob9ba2ca738d3d73ecedde80b1a7bc58c7154c9f61
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #pragma once
19 #include "hphp/util/hash-map.h"
20 #include "hphp/util/lock-free-lazy.h"
22 #include <folly/Try.h>
23 #include <folly/Unit.h>
24 #include <folly/executors/CPUThreadPoolExecutor.h>
26 #include <cstddef>
27 #include <chrono>
28 #include <functional>
29 #include <memory>
30 #include <string>
32 //////////////////////////////////////////////////////////////////////
35 * C++ coroutine portability layer
37 * Not all of the compilers we support have coroutine support, and
38 * some that do are buggy. Nonetheless, we still want to be able to
39 * use coroutines. This file provides a portability layer to allow for
40 * the usage of coroutines (with very similar syntax), but supports
41 * older compilers.
43 * If the compiler supports coroutines, this is just a simple wrapper
44 * around the coroutine functionality we need. If not, all computation
45 * is performed *eagerly* and the coroutine syntax mainly desugars
46 * into nothing. For the most part, one can write the code assuming
47 * coroutines are supported and it will just work.
49 * NB: There is one important difference. Since "emulated" coroutines
50 * calculate their values eagerly, they may throw exceptions from
51 * different places than "native" coroutines. Native coroutines will
52 * only throw when they are awaited. Emulated coroutines will throw
53 * when they are first called. When writing code when using this
54 * layer, one should be mindful of this and structure their
55 * try/catches appropriately.
57 * Emulated coroutines may be (much) slower than native coroutines
58 * since they are all calculated eagerly, thus there's no room for
59 * parallelism. We expect to use native coroutines in situations where
60 * performance matters.
62 * Emulated coroutines can be forced by defining HPHP_DISABLE_CORO.
65 //////////////////////////////////////////////////////////////////////
67 // Co-routine support in GCC is currently broken....
68 #if FOLLY_HAS_COROUTINES && defined(__clang__) && !defined(HPHP_DISABLE_CORO)
70 // Native coroutines:
72 #include <folly/experimental/coro/AsyncScope.h>
73 #include <folly/experimental/coro/Baton.h>
74 #include <folly/experimental/coro/BlockingWait.h>
75 #include <folly/experimental/coro/Collect.h>
76 #include <folly/experimental/coro/Sleep.h>
77 #include <folly/experimental/coro/Task.h>
79 //////////////////////////////////////////////////////////////////////
81 namespace HPHP::coro {
83 //////////////////////////////////////////////////////////////////////
85 constexpr const bool using_coros = true;
87 // Operators. They must be macros since they're part of the syntax.
89 // Await a coroutine, blocking execution until it returns a value.
90 #define HPHP_CORO_AWAIT(x) (co_await (x))
91 // Return a value from a coroutine
92 #define HPHP_CORO_RETURN(x) co_return (x)
93 // Return a value from a coroutine while moving it. This distinction
94 // doesn't matter for native coroutines.
95 #define HPHP_CORO_MOVE_RETURN(x) co_return (x)
96 // Return a void value from a coroutine. Due to syntactic issues, this
97 // must be a different macro than the value.
98 #define HPHP_CORO_RETURN_VOID co_return
99 // Return a Executor* representing the executor this coroutine is
100 // assigned to.
101 #define HPHP_CORO_CURRENT_EXECUTOR (co_await folly::coro::co_current_executor)
102 // Yield execution of the current coroutine and reschedule it to run
103 // on its assigned executor. The collect* functions can attempt to run
104 // a coroutine eagerly on the current executor. This forces them to
105 // run on their assigned executor.
106 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR \
107 (co_await folly::coro::co_reschedule_on_current_executor)
108 // Throw a folly::OperationCancelled if this coroutine has been cancelled
109 #define HPHP_CORO_SAFE_POINT (co_await folly::coro::co_safe_point)
111 // The actual arguments of these can be complex, so we just perfect
112 // forward everything rather than replicating it here.
114 // Await the given coroutine, blocking the thread until it has a
115 // value. This can be used to "await" a coroutine without become a
116 // coroutine yourself.
117 template <typename... Args>
118 auto wait(Args&&... args) {
119 return folly::coro::blockingWait(std::forward<Args>(args)...);
122 // Takes a set of coroutines and returns a coroutine representing all
123 // of the coroutines. Awaiting that coroutine returns a tuple of the
124 // values of the passed coroutines. This lets you await on multiple
125 // coroutines simultaneously.
126 template <typename... Args>
127 auto collect(Args&&... args) {
128 return folly::coro::collectAll(std::forward<Args>(args)...);
131 // Like collect, but accepts a std::vector of coroutines.
132 template <typename T>
133 auto collectRange(std::vector<T>&& ts) {
134 return folly::coro::collectAllRange(std::move(ts));
137 // Like collectRange, but will only run up to a fixed number of tasks
138 // simultaneously.
139 template <typename T>
140 auto collectRangeWindowed(std::vector<T>&& ts, std::size_t window) {
141 return folly::coro::collectAllWindowed(std::move(ts), window);
144 // Given a callable and a set of arguments, store the arguments, then
145 // invoke the callable. This is useful if you have a callable which
146 // takes params by reference. Passing a temporary will cause crashes
147 // because when the coroutine suspends and then runs later, the
148 // temporary will be gone, and the reference will point to
149 // nothing. This ensures that any such temporaries will be stored
150 // somewhere which lives as long as the coroutine. It is also needed
151 // with lambdas which are coroutines for similar reasons. If you call
152 // the lambda immediately, it will be gone when the coroutine resumes.
153 template <typename... Args>
154 auto invoke(Args&&... args) {
155 return folly::coro::co_invoke(std::forward<Args>(args)...);
158 // Block the execution of this coroutine for the given amount of time.
159 inline auto sleep(std::chrono::milliseconds d) {
160 return folly::coro::sleep(d);
163 // All coroutines return Task<T>, means they return T when they
164 // finish. TaskWithExecutor<T> is a Task which has been bound to an
165 // executor.
166 template <typename T> using Task = folly::coro::Task<T>;
167 template <typename T>
168 using TaskWithExecutor = folly::coro::TaskWithExecutor<T>;
170 // Calculate a value asynchronously on the given executor. This has an
171 // advantage over a bare coroutine because normally a coroutine will
172 // only start execution when you await it (and then you block until
173 // its done). This starts execution before you request the value,
174 // meaning it can run in the background.
175 template <typename T> struct AsyncValue {
176 template <typename F>
177 AsyncValue(F f, folly::Executor::KeepAlive<> executor) {
178 auto work = folly::coro::co_invoke(
179 [this, f = std::move(f)] () mutable -> Task<void> {
180 try {
181 m_try.emplace(co_await f());
182 } catch (...) {
183 m_try.emplaceException(
184 folly::exception_wrapper(std::current_exception())
187 m_baton.post();
188 co_return;
191 m_scope.add(std::move(work).scheduleOn(std::move(executor)));
194 ~AsyncValue() {
195 // NB: It might seem useful here to access the m_try after
196 // waiting, to ensure that any exception is rethrown. However, we
197 // can't throw from a dtor anyways, so its pointless to check.
198 folly::coro::blockingWait(m_scope.joinAsync());
201 AsyncValue(const AsyncValue&) = delete;
202 AsyncValue(AsyncValue&&) = delete;
203 AsyncValue& operator=(const AsyncValue&) = delete;
204 AsyncValue& operator=(AsyncValue&&) = delete;
206 // Coroutines which return either a pointer to the value, or a copy
207 // of the value. If the calculation resulted in an exception, that
208 // exception will be thrown.
210 Task<const T*> operator*() const {
211 co_await m_baton;
212 co_return &*m_try;
215 Task<T> getCopy() const {
216 co_await m_baton;
217 co_return *m_try;
220 private:
221 folly::Try<T> m_try;
222 folly::coro::Baton m_baton;
223 folly::coro::AsyncScope m_scope;
226 // Coro aware semaphore (runs a different task when blocks)
227 struct Semaphore {
228 explicit Semaphore(uint32_t tokens) : m_sem{tokens} {}
229 void signal() { m_sem.signal(); }
230 Task<void> wait() { return m_sem.co_wait(); }
231 private:
232 folly::fibers::Semaphore m_sem;
235 // Allows you to run coroutines asynchronously. Assign an executor to
236 // a Task, then add it to the AsyncScope. The coroutine will start
237 // running and will be automatically cleaned up when done. Since you
238 // cannot await the coroutine after you add it, this is only really
239 // useful for side-effectful coroutines which return void. The
240 // coroutine should not throw.
241 using AsyncScope = folly::coro::AsyncScope;
243 //////////////////////////////////////////////////////////////////////
247 //////////////////////////////////////////////////////////////////////
249 #else
251 #include <folly/synchronization/LifoSem.h>
253 // Emulated coroutines. The coroutine is just the value itself. The
254 // value is calculated eagerly when the coroutine is created (so
255 // basically like any other normal function call).
257 namespace HPHP::coro {
259 //////////////////////////////////////////////////////////////////////
261 constexpr const bool using_coros = false;
263 // Task and TaskWithExecutor are just simple wrappers around a value.
265 namespace detail {
266 template <typename T> struct DummyTask;
268 template <typename T>
269 struct DummyTask {
270 explicit DummyTask(T v) : val{std::move(v)} {}
272 DummyTask<T> scheduleOn(folly::Executor::KeepAlive<>) && {
273 return DummyTask<T>{std::move(val)};
276 T take() && { return std::move(val); }
277 private:
278 T val;
281 template<> struct DummyTask<void> {
282 DummyTask<void> scheduleOn(folly::Executor::KeepAlive<>) && {
283 return DummyTask<void>{};
285 void take() && {}
288 template <typename T> DummyTask<T> makeDummy(T v) {
289 return DummyTask<T>{std::move(v)};
293 template <typename T> using Task = detail::DummyTask<T>;
294 template <typename T> using TaskWithExecutor = detail::DummyTask<T>;
296 // "Awaiting" is just taking the wrapper value
297 #define HPHP_CORO_AWAIT(x) ((x).take())
298 // Returning just wraps the value
299 #define HPHP_CORO_RETURN(x) return HPHP::coro::detail::makeDummy((x))
300 #define HPHP_CORO_MOVE_RETURN(x) \
301 return HPHP::coro::detail::makeDummy(std::move(x))
302 #define HPHP_CORO_RETURN_VOID return HPHP::coro::detail::DummyTask<void>{}
303 // This isn't ideal but there's no real good notion of "current
304 // executor" if coroutines don't actually exist.
305 #define HPHP_CORO_CURRENT_EXECUTOR ((folly::Executor*)nullptr)
306 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR
307 #define HPHP_CORO_SAFE_POINT
309 template <typename T>
310 auto wait(Task<T>&& t) { return std::move(t).take(); }
312 inline void wait(Task<void>&&) {}
314 namespace detail {
315 template<typename T>
316 auto collectImpl(Task<T>&& t) {
317 return std::move(t).take();
319 inline auto collectImpl(Task<void>&&) {
320 return folly::Unit{};
324 template <typename... T>
325 auto collect(Task<T>&&... t) {
326 return detail::makeDummy(
327 std::make_tuple(detail::collectImpl(std::move(t))...)
331 template <typename T>
332 auto collectRange(std::vector<Task<T>>&& ts) {
333 std::vector<T> out;
334 out.reserve(ts.size());
335 for (auto& t : ts) out.emplace_back(std::move(t).take());
336 return detail::makeDummy(std::move(out));
339 inline auto collectRange(std::vector<Task<void>>&&) {
340 return Task<void>{};
343 template <typename T>
344 auto collectRangeWindowed(std::vector<Task<T>>&& ts, std::size_t) {
345 return collectRange(std::move(ts));
348 template <typename F, typename... Args> auto invoke(F&& f, Args&&... args) {
349 return f(std::forward<Args>(args)...);
352 inline Task<void> sleep(std::chrono::milliseconds d) {
353 std::this_thread::sleep_for(d);
354 HPHP_CORO_RETURN_VOID;
357 // Just calculate the value eagerly, fulfilling the interface.
358 template <typename T> struct AsyncValue {
359 template <typename F>
360 AsyncValue(F&& f, folly::Executor::KeepAlive<>) {
361 try {
362 m_try.emplace(f().take());
363 } catch (...) {
364 m_try.emplaceException(
365 folly::exception_wrapper(std::current_exception())
370 AsyncValue(const AsyncValue&) = delete;
371 AsyncValue(AsyncValue&&) = delete;
372 AsyncValue& operator=(const AsyncValue&) = delete;
373 AsyncValue& operator=(AsyncValue&&) = delete;
375 Task<const T*> operator*() const {
376 return Task<const T*>{&*m_try};
379 Task<T> getCopy() const {
380 return Task<T>{*m_try};
382 private:
383 folly::Try<T> m_try;
386 // A Task<void> has by definition alreadly executed whatever
387 // side-effects it has, so there's nothing to do there.
388 struct AsyncScope {
389 void add(TaskWithExecutor<void>&&) {}
390 Task<void> joinAsync() { return Task<void>{}; }
393 struct Semaphore {
394 explicit Semaphore(uint32_t tokens) : m_sem{tokens} {}
395 void signal() { m_sem.post(); }
396 Task<void> wait() { m_sem.wait(); return Task<void>{}; }
397 private:
398 folly::LifoSem m_sem;
401 //////////////////////////////////////////////////////////////////////
404 #endif
406 //////////////////////////////////////////////////////////////////////
408 namespace HPHP::coro {
410 // Generic functionality for both modes:
412 //////////////////////////////////////////////////////////////////////
414 // Maps a key to an asynchronously calculated value. This ensures that
415 // for any key, the value will be calculated precisely once.
416 template <typename K, typename V>
417 struct AsyncMap {
418 // If the key is present, return a coroutine which can be awaited to
419 // get the associated value. If the key isn't present, then the
420 // callable is invoked on the given executor and a coroutine is
421 // returned representing that calculation. Any other concurrent (or
422 // later) calls will wait on the same value.
423 template <typename F>
424 Task<V> get(const K& key,
425 F&& f,
426 folly::Executor::KeepAlive<> executor) {
427 // AsyncValue is responsible for doing the actual async
428 // calculation. We wrap that in a LockFreeLazy to ensure we only
429 // ever create one of them for each key. The LockFreeLazy is
430 // inside an unique_ptr since the LockFreeLazy is not
431 // moveable. The folly_concurrent_hash_map_simd ensures we only
432 // ever get one LockFreeLazy per key.
433 auto& lazy = [&] () -> Lazy& {
434 // Look up the key. If there's an entry, return it. Otherwise
435 // create one and return it. If there's a race with the
436 // insertion, one thread will actually insert it, and the rest
437 // will just free their unique_ptrs (they'll all return the one
438 // actually inserted).
439 auto const it = m_map.find(key);
440 if (it != m_map.end()) return *it->second;
441 return *m_map.insert(
442 key, std::make_unique<Lazy>()
443 ).first->second;
444 }();
446 // We got the appropriate LockFreeLazy, which might be new or
447 // old. Either way, get its value, which ensures exactly one
448 // thread creates the AsyncValue and starts its calculation.
449 return lazy.get(
450 [&] { return Async{f(), std::move(executor)}; }
451 ).getCopy();
454 private:
455 using Async = AsyncValue<V>;
456 using Lazy = LockFreeLazy<Async>;
457 folly_concurrent_hash_map_simd<K, std::unique_ptr<Lazy>> m_map;
460 //////////////////////////////////////////////////////////////////////
463 * A CPUThreadPoolExecutor with stronger forward progress
464 * guarantees. Using the standard Executor interface it behaves
465 * exactly as a CPUThreadPoolExecutor. However, one can add tasks with
466 * an associated "Ticket". Runnable tasks are always prioritized by
467 * their tickets (lower tickets are higher priority). So far this is
468 * pretty standard, but one can obtain a "sticky" executor, which
469 * remembers the assigned ticket. The same ticket will be used no
470 * matter how many times the task is run then re-added to the
471 * executor.
473 * This is useful for coroutines. You can assign a coroutine a sticky
474 * executor and that coroutine will always be prioritized over later
475 * coroutines. This ensures that earlier coroutines will finish before
476 * later coroutines.
479 struct TicketExecutor : public folly::CPUThreadPoolExecutor {
480 // Create a TicketExecutor with the given number of threads. Each
481 // thread created will call threadInit before running, and
482 // threadFini before exiting. Any idle threads will exit after
483 // threadTimeout passes without work. The created threads will set
484 // to their name to threadPrefix plus some numeric identifier.
485 TicketExecutor(const std::string& threadPrefix,
486 size_t minThreads,
487 size_t maxThreads,
488 std::function<void()> threadInit,
489 std::function<void()> threadFini,
490 std::chrono::milliseconds threadTimeout);
491 ~TicketExecutor();
493 using Ticket = int64_t;
495 // Add work to this executor with the specified ticket.
496 void addWithTicket(folly::Func, Ticket);
498 // Obtain a new ticket. Tickets are monotonically increasing. Each
499 // stamp() returns an unique new ticket higher than all previous
500 // ones.
501 Ticket stamp();
503 // Create a new Executor (of some unnamed type) which will schedule
504 // work on this TicketExecutor, but using addWithTicket with the
505 // same ticket.
506 folly::Executor::KeepAlive<> sticky() { return sticky(stamp()); }
507 folly::Executor::KeepAlive<> sticky(Ticket);
510 //////////////////////////////////////////////////////////////////////