2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/executors/SerialExecutor.h>
19 #include <glog/logging.h>
21 #include <folly/ExceptionString.h>
25 SerialExecutor::SerialExecutor(KeepAlive
<Executor
> parent
)
26 : parent_(std::move(parent
)) {}
28 SerialExecutor::~SerialExecutor() {
29 DCHECK(!keepAliveCounter_
);
32 Executor::KeepAlive
<SerialExecutor
> SerialExecutor::create(
33 KeepAlive
<Executor
> parent
) {
34 return makeKeepAlive
<SerialExecutor
>(new SerialExecutor(std::move(parent
)));
37 SerialExecutor::UniquePtr
SerialExecutor::createUnique(
38 std::shared_ptr
<Executor
> parent
) {
39 auto executor
= new SerialExecutor(getKeepAliveToken(parent
.get()));
40 return {executor
, Deleter
{std::move(parent
)}};
43 bool SerialExecutor::keepAliveAcquire() noexcept
{
44 auto keepAliveCounter
=
45 keepAliveCounter_
.fetch_add(1, std::memory_order_relaxed
);
46 DCHECK(keepAliveCounter
> 0);
50 void SerialExecutor::keepAliveRelease() noexcept
{
51 auto keepAliveCounter
=
52 keepAliveCounter_
.fetch_sub(1, std::memory_order_acq_rel
);
53 DCHECK(keepAliveCounter
> 0);
54 if (keepAliveCounter
== 1) {
59 void SerialExecutor::add(Func func
) {
60 queue_
.enqueue(Task
{std::move(func
), RequestContext::saveContext()});
61 parent_
->add([keepAlive
= getKeepAliveToken(this)] { keepAlive
->run(); });
64 void SerialExecutor::addWithPriority(Func func
, int8_t priority
) {
65 queue_
.enqueue(Task
{std::move(func
), RequestContext::saveContext()});
66 parent_
->addWithPriority(
67 [keepAlive
= getKeepAliveToken(this)] { keepAlive
->run(); }, priority
);
70 void SerialExecutor::run() {
71 // We want scheduled_ to guard side-effects of completed tasks, so we can't
72 // use std::memory_order_relaxed here.
73 if (scheduled_
.fetch_add(1, std::memory_order_acquire
) > 0) {
81 folly::RequestContextScopeGuard
ctxGuard(std::move(task
.ctx
));
82 invokeCatchingExns("SerialExecutor: func", std::exchange(task
.func
, {}));
84 // We want scheduled_ to guard side-effects of completed tasks, so we can't
85 // use std::memory_order_relaxed here.
86 } while (scheduled_
.fetch_sub(1, std::memory_order_release
) > 1);