2 * Copyright (c) Facebook, Inc. and its affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <thrift/lib/cpp/concurrency/SFQThreadManager.h>
21 #include <glog/logging.h>
22 #include <folly/executors/MeteredExecutor.h>
23 #include <folly/experimental/FunctionScheduler.h>
24 #include <folly/hash/Hash.h>
26 #include <thrift/lib/cpp/concurrency/SFQThreadManager.h>
27 #include <thrift/lib/cpp/concurrency/ThreadManager.h>
31 namespace concurrency
{
33 static constexpr uint64_t kDefaultTenantId
{0};
35 SFQThreadManager::SFQThreadManager(SFQThreadManagerConfig config
)
36 : ThreadManagerExecutorAdapter(config
.getExecutors()), config_(config
) {
37 if (config_
.getPerturbInterval().count() > 0) {
44 SFQThreadManager::~SFQThreadManager() {
45 perturbationSchedule_
.shutdown();
48 size_t SFQThreadManager::getTaskCount(ExecutionScope es
) {
49 auto pri
= es
.getPriority();
50 auto tenantId
= es
.getTenantId().value_or(kDefaultTenantId
);
51 return getMeteredExecutor(pri
, tenantId
)->pendingTasks();
54 void SFQThreadManager::initQueues() {
55 // We make fair queues to be used on UPSTREAM sources for each priority.
56 for (size_t pri
= 0; pri
< PRIORITY::N_PRIORITIES
; ++pri
) {
57 fqs_
[pri
].resize(config_
.getNumFairQueuesForUpstream());
58 for (uint32_t ii
= 0; ii
< config_
.getNumFairQueuesForUpstream(); ++ii
) {
59 auto keepalive
= ThreadManagerExecutorAdapter::getKeepAlive(
60 ExecutionScope(static_cast<PRIORITY
>(pri
)), Source::UPSTREAM
);
61 fqs_
[pri
][ii
] = std::make_unique
<folly::MeteredExecutor
>(keepalive
);
66 [[nodiscard
]] ThreadManager::KeepAlive
<> SFQThreadManager::getKeepAlive(
67 ExecutionScope es
, Source source
) const {
68 // We only use the metered executor fair queuing for upstream sources. Bypass
69 // the FQs if it's any other source.
70 if (source
!= Source::UPSTREAM
) {
71 return ThreadManagerExecutorAdapter::getKeepAlive(std::move(es
), source
);
74 const size_t pri
= es
.getPriority();
76 getMeteredExecutor(pri
, es
.getTenantId().value_or(kDefaultTenantId
));
77 return getKeepAliveToken(mx
);
80 } // namespace concurrency