2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
19 #include "hphp/util/hash-map.h"
20 #include "hphp/util/lock-free-lazy.h"
22 #include <folly/Try.h>
23 #include <folly/Unit.h>
24 #include <folly/executors/CPUThreadPoolExecutor.h>
32 //////////////////////////////////////////////////////////////////////
35 * C++ coroutine portability layer
37 * Not all of the compilers we support have coroutine support, and
38 * some that do are buggy. Nonetheless, we still want to be able to
39 * use coroutines. This file provides a portability layer to allow for
40 * the usage of coroutines (with very similar syntax), but supports
43 * If the compiler supports coroutines, this is just a simple wrapper
44 * around the coroutine functionality we need. If not, all computation
45 * is performed *eagerly* and the coroutine syntax mainly desugars
46 * into nothing. For the most part, one can write the code assuming
47 * coroutines are supported and it will just work.
49 * NB: There is one important difference. Since "emulated" coroutines
50 * calculate their values eagerly, they may throw exceptions from
51 * different places than "native" coroutines. Native coroutines will
52 * only throw when they are awaited. Emulated coroutines will throw
53 * when they are first called. When writing code when using this
54 * layer, one should be mindful of this and structure their
55 * try/catches appropriately.
57 * Emulated coroutines may be (much) slower than native coroutines
58 * since they are all calculated eagerly, thus there's no room for
59 * parallelism. We expect to use native coroutines in situations where
60 * performance matters.
62 * Emulated coroutines can be forced by defining HPHP_DISABLE_CORO.
65 //////////////////////////////////////////////////////////////////////
67 // Co-routine support in GCC is currently broken....
68 #if FOLLY_HAS_COROUTINES && defined(__clang__) && !defined(HPHP_DISABLE_CORO)
72 #include <folly/experimental/coro/AsyncScope.h>
73 #include <folly/experimental/coro/Baton.h>
74 #include <folly/experimental/coro/BlockingWait.h>
75 #include <folly/experimental/coro/Collect.h>
76 #include <folly/experimental/coro/Sleep.h>
77 #include <folly/experimental/coro/Task.h>
79 //////////////////////////////////////////////////////////////////////
81 namespace HPHP::coro
{
83 //////////////////////////////////////////////////////////////////////
85 constexpr const bool using_coros
= true;
87 // Operators. They must be macros since they're part of the syntax.
89 // Await a coroutine, blocking execution until it returns a value.
90 #define HPHP_CORO_AWAIT(x) (co_await (x))
91 // Return a value from a coroutine
92 #define HPHP_CORO_RETURN(x) co_return (x)
93 // Return a value from a coroutine while moving it. This distinction
94 // doesn't matter for native coroutines.
95 #define HPHP_CORO_MOVE_RETURN(x) co_return (x)
96 // Return a void value from a coroutine. Due to syntactic issues, this
97 // must be a different macro than the value.
98 #define HPHP_CORO_RETURN_VOID co_return
99 // Return a Executor* representing the executor this coroutine is
101 #define HPHP_CORO_CURRENT_EXECUTOR (co_await folly::coro::co_current_executor)
102 // Yield execution of the current coroutine and reschedule it to run
103 // on its assigned executor. The collect* functions can attempt to run
104 // a coroutine eagerly on the current executor. This forces them to
105 // run on their assigned executor.
106 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR \
107 (co_await folly::coro::co_reschedule_on_current_executor)
108 // Throw a folly::OperationCancelled if this coroutine has been cancelled
109 #define HPHP_CORO_SAFE_POINT (co_await folly::coro::co_safe_point)
111 // The actual arguments of these can be complex, so we just perfect
112 // forward everything rather than replicating it here.
114 // Await the given coroutine, blocking the thread until it has a
115 // value. This can be used to "await" a coroutine without become a
116 // coroutine yourself.
117 template <typename
... Args
>
118 auto wait(Args
&&... args
) {
119 return folly::coro::blockingWait(std::forward
<Args
>(args
)...);
122 // Takes a set of coroutines and returns a coroutine representing all
123 // of the coroutines. Awaiting that coroutine returns a tuple of the
124 // values of the passed coroutines. This lets you await on multiple
125 // coroutines simultaneously.
126 template <typename
... Args
>
127 auto collect(Args
&&... args
) {
128 return folly::coro::collectAll(std::forward
<Args
>(args
)...);
131 // Like collect, but accepts a std::vector of coroutines.
132 template <typename T
>
133 auto collectRange(std::vector
<T
>&& ts
) {
134 return folly::coro::collectAllRange(std::move(ts
));
137 // Like collectRange, but will only run up to a fixed number of tasks
139 template <typename T
>
140 auto collectRangeWindowed(std::vector
<T
>&& ts
, std::size_t window
) {
141 return folly::coro::collectAllWindowed(std::move(ts
), window
);
144 // Given a callable and a set of arguments, store the arguments, then
145 // invoke the callable. This is useful if you have a callable which
146 // takes params by reference. Passing a temporary will cause crashes
147 // because when the coroutine suspends and then runs later, the
148 // temporary will be gone, and the reference will point to
149 // nothing. This ensures that any such temporaries will be stored
150 // somewhere which lives as long as the coroutine. It is also needed
151 // with lambdas which are coroutines for similar reasons. If you call
152 // the lambda immediately, it will be gone when the coroutine resumes.
153 template <typename
... Args
>
154 auto invoke(Args
&&... args
) {
155 return folly::coro::co_invoke(std::forward
<Args
>(args
)...);
158 // Block the execution of this coroutine for the given amount of time.
159 inline auto sleep(std::chrono::milliseconds d
) {
160 return folly::coro::sleep(d
);
163 // All coroutines return Task<T>, means they return T when they
164 // finish. TaskWithExecutor<T> is a Task which has been bound to an
166 template <typename T
> using Task
= folly::coro::Task
<T
>;
167 template <typename T
>
168 using TaskWithExecutor
= folly::coro::TaskWithExecutor
<T
>;
170 // Calculate a value asynchronously on the given executor. This has an
171 // advantage over a bare coroutine because normally a coroutine will
172 // only start execution when you await it (and then you block until
173 // its done). This starts execution before you request the value,
174 // meaning it can run in the background.
175 template <typename T
> struct AsyncValue
{
176 template <typename F
>
177 AsyncValue(F f
, folly::Executor::KeepAlive
<> executor
) {
178 auto work
= folly::coro::co_invoke(
179 [this, f
= std::move(f
)] () mutable -> Task
<void> {
181 m_try
.emplace(co_await
f());
183 m_try
.emplaceException(
184 folly::exception_wrapper(std::current_exception())
191 m_scope
.add(std::move(work
).scheduleOn(std::move(executor
)));
195 // NB: It might seem useful here to access the m_try after
196 // waiting, to ensure that any exception is rethrown. However, we
197 // can't throw from a dtor anyways, so its pointless to check.
198 folly::coro::blockingWait(m_scope
.joinAsync());
201 AsyncValue(const AsyncValue
&) = delete;
202 AsyncValue(AsyncValue
&&) = delete;
203 AsyncValue
& operator=(const AsyncValue
&) = delete;
204 AsyncValue
& operator=(AsyncValue
&&) = delete;
206 // Coroutines which return either a pointer to the value, or a copy
207 // of the value. If the calculation resulted in an exception, that
208 // exception will be thrown.
210 Task
<const T
*> operator*() const {
215 Task
<T
> getCopy() const {
222 folly::coro::Baton m_baton
;
223 folly::coro::AsyncScope m_scope
;
226 // Coro aware semaphore (runs a different task when blocks)
228 explicit Semaphore(uint32_t tokens
) : m_sem
{tokens
} {}
229 void signal() { m_sem
.signal(); }
230 Task
<void> wait() { return m_sem
.co_wait(); }
232 folly::fibers::Semaphore m_sem
;
235 // Allows you to run coroutines asynchronously. Assign an executor to
236 // a Task, then add it to the AsyncScope. The coroutine will start
237 // running and will be automatically cleaned up when done. Since you
238 // cannot await the coroutine after you add it, this is only really
239 // useful for side-effectful coroutines which return void. The
240 // coroutine should not throw.
241 using AsyncScope
= folly::coro::AsyncScope
;
243 //////////////////////////////////////////////////////////////////////
247 //////////////////////////////////////////////////////////////////////
251 #include <folly/synchronization/LifoSem.h>
253 // Emulated coroutines. The coroutine is just the value itself. The
254 // value is calculated eagerly when the coroutine is created (so
255 // basically like any other normal function call).
257 namespace HPHP::coro
{
259 //////////////////////////////////////////////////////////////////////
261 constexpr const bool using_coros
= false;
263 // Task and TaskWithExecutor are just simple wrappers around a value.
266 template <typename T
> struct DummyTask
;
268 template <typename T
>
270 explicit DummyTask(T v
) : val
{std::move(v
)} {}
272 DummyTask
<T
> scheduleOn(folly::Executor::KeepAlive
<>) && {
273 return DummyTask
<T
>{std::move(val
)};
276 T
take() && { return std::move(val
); }
281 template<> struct DummyTask
<void> {
282 DummyTask
<void> scheduleOn(folly::Executor::KeepAlive
<>) && {
283 return DummyTask
<void>{};
288 template <typename T
> DummyTask
<T
> makeDummy(T v
) {
289 return DummyTask
<T
>{std::move(v
)};
293 template <typename T
> using Task
= detail::DummyTask
<T
>;
294 template <typename T
> using TaskWithExecutor
= detail::DummyTask
<T
>;
296 // "Awaiting" is just taking the wrapper value
297 #define HPHP_CORO_AWAIT(x) ((x).take())
298 // Returning just wraps the value
299 #define HPHP_CORO_RETURN(x) return HPHP::coro::detail::makeDummy((x))
300 #define HPHP_CORO_MOVE_RETURN(x) \
301 return HPHP::coro::detail::makeDummy(std::move(x))
302 #define HPHP_CORO_RETURN_VOID return HPHP::coro::detail::DummyTask<void>{}
303 // This isn't ideal but there's no real good notion of "current
304 // executor" if coroutines don't actually exist.
305 #define HPHP_CORO_CURRENT_EXECUTOR ((folly::Executor*)nullptr)
306 #define HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR
307 #define HPHP_CORO_SAFE_POINT
309 template <typename T
>
310 auto wait(Task
<T
>&& t
) { return std::move(t
).take(); }
312 inline void wait(Task
<void>&&) {}
316 auto collectImpl(Task
<T
>&& t
) {
317 return std::move(t
).take();
319 inline auto collectImpl(Task
<void>&&) {
320 return folly::Unit
{};
324 template <typename
... T
>
325 auto collect(Task
<T
>&&... t
) {
326 return detail::makeDummy(
327 std::make_tuple(detail::collectImpl(std::move(t
))...)
331 template <typename T
>
332 auto collectRange(std::vector
<Task
<T
>>&& ts
) {
334 out
.reserve(ts
.size());
335 for (auto& t
: ts
) out
.emplace_back(std::move(t
).take());
336 return detail::makeDummy(std::move(out
));
339 inline auto collectRange(std::vector
<Task
<void>>&&) {
343 template <typename T
>
344 auto collectRangeWindowed(std::vector
<Task
<T
>>&& ts
, std::size_t) {
345 return collectRange(std::move(ts
));
348 template <typename F
, typename
... Args
> auto invoke(F
&& f
, Args
&&... args
) {
349 return f(std::forward
<Args
>(args
)...);
352 inline Task
<void> sleep(std::chrono::milliseconds d
) {
353 std::this_thread::sleep_for(d
);
354 HPHP_CORO_RETURN_VOID
;
357 // Just calculate the value eagerly, fulfilling the interface.
358 template <typename T
> struct AsyncValue
{
359 template <typename F
>
360 AsyncValue(F
&& f
, folly::Executor::KeepAlive
<>) {
362 m_try
.emplace(f().take());
364 m_try
.emplaceException(
365 folly::exception_wrapper(std::current_exception())
370 AsyncValue(const AsyncValue
&) = delete;
371 AsyncValue(AsyncValue
&&) = delete;
372 AsyncValue
& operator=(const AsyncValue
&) = delete;
373 AsyncValue
& operator=(AsyncValue
&&) = delete;
375 Task
<const T
*> operator*() const {
376 return Task
<const T
*>{&*m_try
};
379 Task
<T
> getCopy() const {
380 return Task
<T
>{*m_try
};
386 // A Task<void> has by definition alreadly executed whatever
387 // side-effects it has, so there's nothing to do there.
389 void add(TaskWithExecutor
<void>&&) {}
390 Task
<void> joinAsync() { return Task
<void>{}; }
394 explicit Semaphore(uint32_t tokens
) : m_sem
{tokens
} {}
395 void signal() { m_sem
.post(); }
396 Task
<void> wait() { m_sem
.wait(); return Task
<void>{}; }
398 folly::LifoSem m_sem
;
401 //////////////////////////////////////////////////////////////////////
406 //////////////////////////////////////////////////////////////////////
408 namespace HPHP::coro
{
410 // Generic functionality for both modes:
412 //////////////////////////////////////////////////////////////////////
414 // Maps a key to an asynchronously calculated value. This ensures that
415 // for any key, the value will be calculated precisely once.
416 template <typename K
, typename V
>
418 // If the key is present, return a coroutine which can be awaited to
419 // get the associated value. If the key isn't present, then the
420 // callable is invoked on the given executor and a coroutine is
421 // returned representing that calculation. Any other concurrent (or
422 // later) calls will wait on the same value.
423 template <typename F
>
424 Task
<V
> get(const K
& key
,
426 folly::Executor::KeepAlive
<> executor
) {
427 // AsyncValue is responsible for doing the actual async
428 // calculation. We wrap that in a LockFreeLazy to ensure we only
429 // ever create one of them for each key. The LockFreeLazy is
430 // inside an unique_ptr since the LockFreeLazy is not
431 // moveable. The folly_concurrent_hash_map_simd ensures we only
432 // ever get one LockFreeLazy per key.
433 auto& lazy
= [&] () -> Lazy
& {
434 // Look up the key. If there's an entry, return it. Otherwise
435 // create one and return it. If there's a race with the
436 // insertion, one thread will actually insert it, and the rest
437 // will just free their unique_ptrs (they'll all return the one
438 // actually inserted).
439 auto const it
= m_map
.find(key
);
440 if (it
!= m_map
.end()) return *it
->second
;
441 return *m_map
.insert(
442 key
, std::make_unique
<Lazy
>()
446 // We got the appropriate LockFreeLazy, which might be new or
447 // old. Either way, get its value, which ensures exactly one
448 // thread creates the AsyncValue and starts its calculation.
450 [&] { return Async
{f(), std::move(executor
)}; }
455 using Async
= AsyncValue
<V
>;
456 using Lazy
= LockFreeLazy
<Async
>;
457 folly_concurrent_hash_map_simd
<K
, std::unique_ptr
<Lazy
>> m_map
;
460 //////////////////////////////////////////////////////////////////////
463 * A CPUThreadPoolExecutor with stronger forward progress
464 * guarantees. Using the standard Executor interface it behaves
465 * exactly as a CPUThreadPoolExecutor. However, one can add tasks with
466 * an associated "Ticket". Runnable tasks are always prioritized by
467 * their tickets (lower tickets are higher priority). So far this is
468 * pretty standard, but one can obtain a "sticky" executor, which
469 * remembers the assigned ticket. The same ticket will be used no
470 * matter how many times the task is run then re-added to the
473 * This is useful for coroutines. You can assign a coroutine a sticky
474 * executor and that coroutine will always be prioritized over later
475 * coroutines. This ensures that earlier coroutines will finish before
479 struct TicketExecutor
: public folly::CPUThreadPoolExecutor
{
480 // Create a TicketExecutor with the given number of threads. Each
481 // thread created will call threadInit before running, and
482 // threadFini before exiting. Any idle threads will exit after
483 // threadTimeout passes without work. The created threads will set
484 // to their name to threadPrefix plus some numeric identifier.
485 TicketExecutor(const std::string
& threadPrefix
,
488 std::function
<void()> threadInit
,
489 std::function
<void()> threadFini
,
490 std::chrono::milliseconds threadTimeout
);
493 using Ticket
= int64_t;
495 // Add work to this executor with the specified ticket.
496 void addWithTicket(folly::Func
, Ticket
);
498 // Obtain a new ticket. Tickets are monotonically increasing. Each
499 // stamp() returns an unique new ticket higher than all previous
503 // Create a new Executor (of some unnamed type) which will schedule
504 // work on this TicketExecutor, but using addWithTicket with the
506 folly::Executor::KeepAlive
<> sticky() { return sticky(stamp()); }
507 folly::Executor::KeepAlive
<> sticky(Ticket
);
510 //////////////////////////////////////////////////////////////////////