Re-sync with internal repository
[hiphop-php.git] / third-party / thrift / src / thrift / lib / cpp / concurrency / SFQThreadManager.cpp
blob4f571fa21654cc583a9245de998101a4eafd28ef
1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <memory>
18 #include <vector>
19 #include <thrift/lib/cpp/concurrency/SFQThreadManager.h>
21 #include <glog/logging.h>
22 #include <folly/executors/MeteredExecutor.h>
23 #include <folly/experimental/FunctionScheduler.h>
24 #include <folly/hash/Hash.h>
26 #include <thrift/lib/cpp/concurrency/SFQThreadManager.h>
27 #include <thrift/lib/cpp/concurrency/ThreadManager.h>
29 namespace apache {
30 namespace thrift {
31 namespace concurrency {
33 static constexpr uint64_t kDefaultTenantId{0};
35 SFQThreadManager::SFQThreadManager(SFQThreadManagerConfig config)
36 : ThreadManagerExecutorAdapter(config.getExecutors()), config_(config) {
37 if (config_.getPerturbInterval().count() > 0) {
38 initPerturbation();
41 initQueues();
44 SFQThreadManager::~SFQThreadManager() {
45 perturbationSchedule_.shutdown();
48 size_t SFQThreadManager::getTaskCount(ExecutionScope es) {
49 auto pri = es.getPriority();
50 auto tenantId = es.getTenantId().value_or(kDefaultTenantId);
51 return getMeteredExecutor(pri, tenantId)->pendingTasks();
54 void SFQThreadManager::initQueues() {
55 // We make fair queues to be used on UPSTREAM sources for each priority.
56 for (size_t pri = 0; pri < PRIORITY::N_PRIORITIES; ++pri) {
57 fqs_[pri].resize(config_.getNumFairQueuesForUpstream());
58 for (uint32_t ii = 0; ii < config_.getNumFairQueuesForUpstream(); ++ii) {
59 auto keepalive = ThreadManagerExecutorAdapter::getKeepAlive(
60 ExecutionScope(static_cast<PRIORITY>(pri)), Source::UPSTREAM);
61 fqs_[pri][ii] = std::make_unique<folly::MeteredExecutor>(keepalive);
66 [[nodiscard]] ThreadManager::KeepAlive<> SFQThreadManager::getKeepAlive(
67 ExecutionScope es, Source source) const {
68 // We only use the metered executor fair queuing for upstream sources. Bypass
69 // the FQs if it's any other source.
70 if (source != Source::UPSTREAM) {
71 return ThreadManagerExecutorAdapter::getKeepAlive(std::move(es), source);
74 const size_t pri = es.getPriority();
75 auto* mx =
76 getMeteredExecutor(pri, es.getTenantId().value_or(kDefaultTenantId));
77 return getKeepAliveToken(mx);
80 } // namespace concurrency
81 } // namespace thrift
82 } // namespace apache