2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Try.h>
20 #include <folly/experimental/coro/AsyncGenerator.h>
21 #include <folly/experimental/coro/Coroutine.h>
22 #include <folly/experimental/coro/Invoke.h>
23 #include <folly/experimental/coro/SmallUnboundedQueue.h>
24 #include <folly/experimental/coro/ViaIfAsync.h>
25 #include <folly/fibers/Semaphore.h>
30 #if FOLLY_HAS_COROUTINES
35 // An AsyncGenerator with a write end
38 // auto pipe = AsyncPipe<T>::create();
39 // pipe.second.write(std::move(val1));
40 // auto val2 = co_await pipe.first.next();
42 // write() returns false if the read end has been destroyed (unless
43 // SingleProducer is disabled, in which case this behavior is undefined).
44 // The generator is completed when the write end is destroyed or on close()
45 // close() can also be passed an exception, which is thrown when read.
47 // An optional onClosed callback can be passed to create(). This callback will
48 // be called either when the generator is destroyed by the consumer, or when
49 // the pipe is closed by the publisher (whichever comes first). The onClosed
50 // callback may destroy the AsyncPipe object inline, and must not call close()
51 // on the AsyncPipe object inline. If an onClosed callback is specified and the
52 // publisher would like to destroy the pipe outside of the callback, it must
53 // first close the pipe.
55 // If SingleProducer is disabled, AsyncPipe's write() method (but not its
56 // close() method) becomes thread-safe. close() must be sequenced after all
57 // write()s in this mode.
61 bool SingleProducer
= true,
62 template <typename
, bool, bool> typename QueueType
= SmallUnboundedQueue
>
66 CHECK(!onClosed_
|| onClosed_
->wasInvokeRequested())
67 << "If an onClosed callback is specified and the generator still "
68 << "exists, the publisher must explicitly close the pipe prior to "
70 std::move(*this).close();
73 AsyncPipe(AsyncPipe
&& pipe
) noexcept
{
74 queue_
= std::move(pipe
.queue_
);
75 onClosed_
= std::move(pipe
.onClosed_
);
78 AsyncPipe
& operator=(AsyncPipe
&& pipe
) {
80 CHECK(!onClosed_
|| onClosed_
->wasInvokeRequested())
81 << "If an onClosed callback is specified and the generator still "
82 << "exists, the publisher must explicitly close the pipe prior to "
84 std::move(*this).close();
85 queue_
= std::move(pipe
.queue_
);
86 onClosed_
= std::move(pipe
.onClosed_
);
91 static std::pair
<folly::coro::AsyncGenerator
<T
&&>, AsyncPipe
> create(
92 folly::Function
<void()> onClosed
= nullptr) {
93 auto queue
= std::make_shared
<Queue
>();
94 auto cancellationSource
= std::optional
<folly::CancellationSource
>();
95 auto onClosedCallback
= std::unique_ptr
<OnClosedCallback
>();
96 if (onClosed
!= nullptr) {
97 cancellationSource
.emplace();
98 onClosedCallback
= std::make_unique
<OnClosedCallback
>(
99 *cancellationSource
, std::move(onClosed
));
102 folly::makeGuard([cancellationSource
= std::move(cancellationSource
)] {
103 if (cancellationSource
) {
104 cancellationSource
->requestCancellation();
108 folly::coro::co_invoke(
110 guard
= std::move(guard
)]() -> folly::coro::AsyncGenerator
<T
&&> {
112 auto val
= co_await
co_nothrow(queue
->dequeue());
113 if (val
.hasValue() || val
.hasException()) {
114 co_yield
std::move(*val
);
120 AsyncPipe(queue
, std::move(onClosedCallback
))};
123 template <typename U
= T
>
124 bool write(U
&& val
) {
125 if (auto queue
= queue_
.lock()) {
126 queue
->enqueue(folly::Try
<T
>(std::forward
<U
>(val
)));
132 void close(folly::exception_wrapper ew
) && {
133 if (auto queue
= queue_
.lock()) {
134 queue
->enqueue(folly::Try
<T
>(std::move(ew
)));
137 if (onClosed_
!= nullptr) {
138 onClosed_
->requestInvoke();
144 if (auto queue
= queue_
.lock()) {
145 queue
->enqueue(folly::Try
<T
>());
148 if (onClosed_
!= nullptr) {
149 onClosed_
->requestInvoke();
154 bool isClosed() const { return queue_
.expired(); }
157 using Queue
= QueueType
<folly::Try
<T
>, SingleProducer
, true>;
159 class OnClosedCallback
{
162 folly::CancellationSource cancellationSource
,
163 folly::Function
<void()> onClosedFunc
)
164 : cancellationSource_(std::move(cancellationSource
)),
165 cancellationCallback_(
166 cancellationSource_
.getToken(), std::move(onClosedFunc
)) {}
168 void requestInvoke() { cancellationSource_
.requestCancellation(); }
170 bool wasInvokeRequested() {
171 return cancellationSource_
.isCancellationRequested();
175 folly::CancellationSource cancellationSource_
;
176 folly::CancellationCallback cancellationCallback_
;
180 std::weak_ptr
<Queue
> queue
, std::unique_ptr
<OnClosedCallback
> onClosed
)
181 : queue_(std::move(queue
)), onClosed_(std::move(onClosed
)) {}
183 std::weak_ptr
<Queue
> queue_
;
184 std::unique_ptr
<OnClosedCallback
> onClosed_
;
187 // Bounded variant of AsyncPipe which buffers a fixed number of writes
188 // before blocking new attempts to write until the buffer is drained.
191 // auto [generator, pipe] = BoundedAsyncPipe<T>::create(/* tokens */ 10);
192 // co_await pipe.write(std::move(entry));
193 // auto entry = co_await generator.next().value();
195 // write() is a coroutine which only blocks when
196 // no capacity is remaining. write() returns false if the read-end has been
197 // destroyed or was destroyed while blocking, only throwing OperationCanceled
198 // if the parent coroutine was canceled while blocking.
200 // try_write() is offered which will never block, but will return false
201 // and not write if no capacity is remaining or the read end is already
204 // close() functions the same as AsyncPipe, and must be invoked before
205 // destruction if an onClose callback is attached.
208 bool SingleProducer
= true,
209 template <typename
, bool, bool> typename QueueType
= SmallUnboundedQueue
>
210 class BoundedAsyncPipe
{
212 using Pipe
= AsyncPipe
<T
, SingleProducer
, QueueType
>;
214 static std::pair
<AsyncGenerator
<T
&&>, BoundedAsyncPipe
> create(
215 size_t tokens
, folly::Function
<void()> onClosed
= nullptr) {
216 auto [generator
, pipe
] = Pipe::create(std::move(onClosed
));
218 auto semaphore
= std::make_shared
<folly::fibers::Semaphore
>(tokens
);
220 folly::CancellationSource cancellationSource
;
221 auto cancellationToken
= cancellationSource
.getToken();
222 auto guard
= folly::makeGuard(
223 [cancellationSource
= std::move(cancellationSource
)]() {
224 cancellationSource
.requestCancellation();
227 auto signalingGenerator
= co_invoke(
228 [generator
= std::move(generator
),
229 guard
= std::move(guard
),
230 semaphore
]() mutable -> folly::coro::AsyncGenerator
<T
&&> {
232 auto itemTry
= co_await
co_awaitTry(generator
.next());
234 co_yield
co_result(std::move(itemTry
));
237 return std::pair
<AsyncGenerator
<T
&&>, BoundedAsyncPipe
>(
238 std::move(signalingGenerator
),
241 std::move(semaphore
),
242 std::move(cancellationToken
)));
245 template <typename U
= T
>
246 folly::coro::Task
<bool> write(U
&& u
) {
247 auto parentToken
= co_await co_current_cancellation_token
;
249 auto waitResult
= co_await
co_awaitTry(co_withCancellation(
250 folly::CancellationToken::merge(
251 std::move(parentToken
), cancellationToken_
),
252 semaphore_
->co_wait()));
253 if (cancellationToken_
.isCancellationRequested()) {
254 // eagerly return false if the read-end was destroyed instead of throwing
255 // OperationCanceled, to have uniform behavior when the generator is
258 } else if (waitResult
.hasException()) {
259 co_yield
co_error(std::move(waitResult
).exception());
262 co_return pipe_
.write(std::forward
<U
>(u
));
265 template <typename U
= T
>
266 bool try_write(U
&& u
) {
267 bool available
= semaphore_
->try_wait();
271 return pipe_
.write(std::forward
<U
>(u
));
274 void close(exception_wrapper
&& w
) && { std::move(pipe_
).close(std::move(w
)); }
275 void close() && { std::move(pipe_
).close(); }
277 bool isClosed() const { return pipe_
.isClosed(); }
282 std::shared_ptr
<folly::fibers::Semaphore
> semaphore
,
283 folly::CancellationToken cancellationToken
)
284 : pipe_(std::move(pipe
)),
285 semaphore_(std::move(semaphore
)),
286 cancellationToken_(std::move(cancellationToken
)) {}
289 std::shared_ptr
<folly::fibers::Semaphore
> semaphore_
;
290 folly::CancellationToken cancellationToken_
;
296 #endif // FOLLY_HAS_COROUTINES