Re-sync with internal repository
[hiphop-php.git] / third-party / folly / src / folly / experimental / coro / AsyncPipe.h
blobb2b3515a889769c58c88620128efe0a581d93d21
1 /*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #pragma once
19 #include <folly/Try.h>
20 #include <folly/experimental/coro/AsyncGenerator.h>
21 #include <folly/experimental/coro/Coroutine.h>
22 #include <folly/experimental/coro/Invoke.h>
23 #include <folly/experimental/coro/SmallUnboundedQueue.h>
24 #include <folly/experimental/coro/ViaIfAsync.h>
25 #include <folly/fibers/Semaphore.h>
27 #include <memory>
28 #include <utility>
30 #if FOLLY_HAS_COROUTINES
32 namespace folly {
33 namespace coro {
35 // An AsyncGenerator with a write end
37 // Usage:
38 // auto pipe = AsyncPipe<T>::create();
39 // pipe.second.write(std::move(val1));
40 // auto val2 = co_await pipe.first.next();
42 // write() returns false if the read end has been destroyed (unless
43 // SingleProducer is disabled, in which case this behavior is undefined).
44 // The generator is completed when the write end is destroyed or on close()
45 // close() can also be passed an exception, which is thrown when read.
47 // An optional onClosed callback can be passed to create(). This callback will
48 // be called either when the generator is destroyed by the consumer, or when
49 // the pipe is closed by the publisher (whichever comes first). The onClosed
50 // callback may destroy the AsyncPipe object inline, and must not call close()
51 // on the AsyncPipe object inline. If an onClosed callback is specified and the
52 // publisher would like to destroy the pipe outside of the callback, it must
53 // first close the pipe.
55 // If SingleProducer is disabled, AsyncPipe's write() method (but not its
56 // close() method) becomes thread-safe. close() must be sequenced after all
57 // write()s in this mode.
59 template <
60 typename T,
61 bool SingleProducer = true,
62 template <typename, bool, bool> typename QueueType = SmallUnboundedQueue>
63 class AsyncPipe {
64 public:
65 ~AsyncPipe() {
66 CHECK(!onClosed_ || onClosed_->wasInvokeRequested())
67 << "If an onClosed callback is specified and the generator still "
68 << "exists, the publisher must explicitly close the pipe prior to "
69 << "destruction.";
70 std::move(*this).close();
73 AsyncPipe(AsyncPipe&& pipe) noexcept {
74 queue_ = std::move(pipe.queue_);
75 onClosed_ = std::move(pipe.onClosed_);
78 AsyncPipe& operator=(AsyncPipe&& pipe) {
79 if (this != &pipe) {
80 CHECK(!onClosed_ || onClosed_->wasInvokeRequested())
81 << "If an onClosed callback is specified and the generator still "
82 << "exists, the publisher must explicitly close the pipe prior to "
83 << "destruction.";
84 std::move(*this).close();
85 queue_ = std::move(pipe.queue_);
86 onClosed_ = std::move(pipe.onClosed_);
88 return *this;
91 static std::pair<folly::coro::AsyncGenerator<T&&>, AsyncPipe> create(
92 folly::Function<void()> onClosed = nullptr) {
93 auto queue = std::make_shared<Queue>();
94 auto cancellationSource = std::optional<folly::CancellationSource>();
95 auto onClosedCallback = std::unique_ptr<OnClosedCallback>();
96 if (onClosed != nullptr) {
97 cancellationSource.emplace();
98 onClosedCallback = std::make_unique<OnClosedCallback>(
99 *cancellationSource, std::move(onClosed));
101 auto guard =
102 folly::makeGuard([cancellationSource = std::move(cancellationSource)] {
103 if (cancellationSource) {
104 cancellationSource->requestCancellation();
107 return {
108 folly::coro::co_invoke(
109 [queue,
110 guard = std::move(guard)]() -> folly::coro::AsyncGenerator<T&&> {
111 while (true) {
112 auto val = co_await co_nothrow(queue->dequeue());
113 if (val.hasValue() || val.hasException()) {
114 co_yield std::move(*val);
115 } else {
116 co_return;
120 AsyncPipe(queue, std::move(onClosedCallback))};
123 template <typename U = T>
124 bool write(U&& val) {
125 if (auto queue = queue_.lock()) {
126 queue->enqueue(folly::Try<T>(std::forward<U>(val)));
127 return true;
129 return false;
132 void close(folly::exception_wrapper ew) && {
133 if (auto queue = queue_.lock()) {
134 queue->enqueue(folly::Try<T>(std::move(ew)));
135 queue_.reset();
137 if (onClosed_ != nullptr) {
138 onClosed_->requestInvoke();
139 onClosed_.reset();
143 void close() && {
144 if (auto queue = queue_.lock()) {
145 queue->enqueue(folly::Try<T>());
146 queue_.reset();
148 if (onClosed_ != nullptr) {
149 onClosed_->requestInvoke();
150 onClosed_.reset();
154 bool isClosed() const { return queue_.expired(); }
156 private:
157 using Queue = QueueType<folly::Try<T>, SingleProducer, true>;
159 class OnClosedCallback {
160 public:
161 OnClosedCallback(
162 folly::CancellationSource cancellationSource,
163 folly::Function<void()> onClosedFunc)
164 : cancellationSource_(std::move(cancellationSource)),
165 cancellationCallback_(
166 cancellationSource_.getToken(), std::move(onClosedFunc)) {}
168 void requestInvoke() { cancellationSource_.requestCancellation(); }
170 bool wasInvokeRequested() {
171 return cancellationSource_.isCancellationRequested();
174 private:
175 folly::CancellationSource cancellationSource_;
176 folly::CancellationCallback cancellationCallback_;
179 explicit AsyncPipe(
180 std::weak_ptr<Queue> queue, std::unique_ptr<OnClosedCallback> onClosed)
181 : queue_(std::move(queue)), onClosed_(std::move(onClosed)) {}
183 std::weak_ptr<Queue> queue_;
184 std::unique_ptr<OnClosedCallback> onClosed_;
187 // Bounded variant of AsyncPipe which buffers a fixed number of writes
188 // before blocking new attempts to write until the buffer is drained.
190 // Usage:
191 // auto [generator, pipe] = BoundedAsyncPipe<T>::create(/* tokens */ 10);
192 // co_await pipe.write(std::move(entry));
193 // auto entry = co_await generator.next().value();
195 // write() is a coroutine which only blocks when
196 // no capacity is remaining. write() returns false if the read-end has been
197 // destroyed or was destroyed while blocking, only throwing OperationCanceled
198 // if the parent coroutine was canceled while blocking.
200 // try_write() is offered which will never block, but will return false
201 // and not write if no capacity is remaining or the read end is already
202 // destroyed.
204 // close() functions the same as AsyncPipe, and must be invoked before
205 // destruction if an onClose callback is attached.
206 template <
207 typename T,
208 bool SingleProducer = true,
209 template <typename, bool, bool> typename QueueType = SmallUnboundedQueue>
210 class BoundedAsyncPipe {
211 public:
212 using Pipe = AsyncPipe<T, SingleProducer, QueueType>;
214 static std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe> create(
215 size_t tokens, folly::Function<void()> onClosed = nullptr) {
216 auto [generator, pipe] = Pipe::create(std::move(onClosed));
218 auto semaphore = std::make_shared<folly::fibers::Semaphore>(tokens);
220 folly::CancellationSource cancellationSource;
221 auto cancellationToken = cancellationSource.getToken();
222 auto guard = folly::makeGuard(
223 [cancellationSource = std::move(cancellationSource)]() {
224 cancellationSource.requestCancellation();
227 auto signalingGenerator = co_invoke(
228 [generator = std::move(generator),
229 guard = std::move(guard),
230 semaphore]() mutable -> folly::coro::AsyncGenerator<T&&> {
231 while (true) {
232 auto itemTry = co_await co_awaitTry(generator.next());
233 semaphore->signal();
234 co_yield co_result(std::move(itemTry));
237 return std::pair<AsyncGenerator<T&&>, BoundedAsyncPipe>(
238 std::move(signalingGenerator),
239 BoundedAsyncPipe(
240 std::move(pipe),
241 std::move(semaphore),
242 std::move(cancellationToken)));
245 template <typename U = T>
246 folly::coro::Task<bool> write(U&& u) {
247 auto parentToken = co_await co_current_cancellation_token;
249 auto waitResult = co_await co_awaitTry(co_withCancellation(
250 folly::CancellationToken::merge(
251 std::move(parentToken), cancellationToken_),
252 semaphore_->co_wait()));
253 if (cancellationToken_.isCancellationRequested()) {
254 // eagerly return false if the read-end was destroyed instead of throwing
255 // OperationCanceled, to have uniform behavior when the generator is
256 // destroyed
257 co_return false;
258 } else if (waitResult.hasException()) {
259 co_yield co_error(std::move(waitResult).exception());
262 co_return pipe_.write(std::forward<U>(u));
265 template <typename U = T>
266 bool try_write(U&& u) {
267 bool available = semaphore_->try_wait();
268 if (!available) {
269 return false;
271 return pipe_.write(std::forward<U>(u));
274 void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); }
275 void close() && { std::move(pipe_).close(); }
277 bool isClosed() const { return pipe_.isClosed(); }
279 private:
280 BoundedAsyncPipe(
281 Pipe&& pipe,
282 std::shared_ptr<folly::fibers::Semaphore> semaphore,
283 folly::CancellationToken cancellationToken)
284 : pipe_(std::move(pipe)),
285 semaphore_(std::move(semaphore)),
286 cancellationToken_(std::move(cancellationToken)) {}
288 Pipe pipe_;
289 std::shared_ptr<folly::fibers::Semaphore> semaphore_;
290 folly::CancellationToken cancellationToken_;
293 } // namespace coro
294 } // namespace folly
296 #endif // FOLLY_HAS_COROUTINES