2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <fmt/format.h>
23 #include <glog/logging.h>
24 #include <folly/DefaultKeepAliveExecutor.h>
25 #include <folly/Random.h>
26 #include <folly/Singleton.h>
27 #include <folly/executors/GlobalExecutor.h>
28 #include <folly/executors/ManualExecutor.h>
29 #include <folly/executors/SerialExecutor.h>
30 #include <folly/executors/VirtualExecutor.h>
31 #include <folly/futures/Future.h>
32 #include <folly/portability/GTest.h>
36 using namespace std::chrono_literals
;
37 using std::chrono::steady_clock
;
40 class TimekeeperTest
: public testing::Test
{
42 void SetUp() override
{
43 // Replace the default timekeeper with the class under test, and verify that
44 // the replacement was successful.
45 Singleton
<Timekeeper
, detail::TimekeeperSingletonTag
>::make_mock(
46 [] { return new Tk
; });
48 dynamic_cast<Tk
*>(detail::getTimekeeperSingleton().get()) != nullptr);
51 void TearDown() override
{
52 // Invalidate any mocks that were installed.
53 folly::SingletonVault::singleton()->destroyInstances();
54 folly::SingletonVault::singleton()->reenableInstances();
58 TYPED_TEST_SUITE_P(TimekeeperTest
);
60 TYPED_TEST_P(TimekeeperTest
, After
) {
61 auto t1
= steady_clock::now();
62 auto f
= detail::getTimekeeperSingleton()->after(10ms
);
63 EXPECT_FALSE(f
.isReady());
65 auto t2
= steady_clock::now();
67 EXPECT_GE(t2
- t1
, 10ms
);
70 TYPED_TEST_P(TimekeeperTest
, AfterUnsafe
) {
71 auto t1
= steady_clock::now();
72 auto f
= detail::getTimekeeperSingleton()->afterUnsafe(10ms
);
73 EXPECT_FALSE(f
.isReady());
75 auto t2
= steady_clock::now();
77 EXPECT_GE(t2
- t1
, 10ms
);
80 TYPED_TEST_P(TimekeeperTest
, FutureGet
) {
82 auto t
= std::thread([&] { p
.setValue(42); });
83 EXPECT_EQ(42, p
.getFuture().get());
87 TYPED_TEST_P(TimekeeperTest
, FutureGetBeforeTimeout
) {
89 auto t
= std::thread([&] { p
.setValue(42); });
90 // Technically this is a race and if the test server is REALLY overloaded
91 // and it takes more than a second to do that thread it could be flaky. But
92 // I want a low timeout (in human terms) so if this regresses and someone
93 // runs it by hand they're not sitting there forever wondering why it's
94 // blocked, and get a useful error message instead. If it does get flaky,
95 // empirically increase the timeout to the point where it's very improbable.
96 EXPECT_EQ(42, p
.getFuture().get(std::chrono::seconds(2)));
100 TYPED_TEST_P(TimekeeperTest
, FutureGetTimeout
) {
102 EXPECT_THROW(p
.getFuture().get(1ms
), folly::FutureTimeout
);
105 TYPED_TEST_P(TimekeeperTest
, FutureSleep
) {
106 auto t1
= steady_clock::now();
107 futures::sleep(1ms
).get();
108 EXPECT_GE(steady_clock::now() - t1
, 1ms
);
112 FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
113 TYPED_TEST_P(TimekeeperTest
, FutureSleepUnsafe
) {
114 auto t1
= steady_clock::now();
115 futures::sleepUnsafe(1ms
).get();
116 EXPECT_GE(steady_clock::now() - t1
, 1ms
);
120 TYPED_TEST_P(TimekeeperTest
, FutureDelayed
) {
121 auto t1
= steady_clock::now();
122 auto dur
= makeFuture()
124 .thenValue([=](auto&&) { return steady_clock::now() - t1
; })
130 TYPED_TEST_P(TimekeeperTest
, SemiFutureDelayed
) {
131 auto t1
= steady_clock::now();
132 auto dur
= makeSemiFuture()
135 .thenValue([=](auto&&) { return steady_clock::now() - t1
; })
141 TYPED_TEST_P(TimekeeperTest
, FutureDelayedStickyExecutor
) {
142 // Check that delayed without an executor binds the inline executor.
144 auto t1
= steady_clock::now();
145 std::thread::id timekeeper_thread_id
=
146 folly::detail::getTimekeeperSingleton()
147 // Ensure that the continuation is run almost certainly in the
148 // timekeeper's thread.
151 .thenValue([](auto&&) { return std::this_thread::get_id(); })
153 std::thread::id task_thread_id
{};
154 auto dur
= makeFuture()
156 .thenValue([=, &task_thread_id
](auto&&) {
157 task_thread_id
= std::this_thread::get_id();
158 return steady_clock::now() - t1
;
163 EXPECT_EQ(timekeeper_thread_id
, task_thread_id
);
166 // Check that delayed applied to an executor returns a future that binds
167 // to the same executor as was input.
169 auto t1
= steady_clock::now();
170 std::thread::id driver_thread_id
{};
171 std::thread::id first_task_thread_id
{};
172 std::thread::id second_task_thread_id
{};
173 folly::ManualExecutor me
;
174 std::atomic
<bool> stop_signal
{false};
175 std::thread me_driver
{[&me
, &driver_thread_id
, &stop_signal
] {
176 driver_thread_id
= std::this_thread::get_id();
177 while (!stop_signal
) {
181 auto dur
= makeSemiFuture()
183 .thenValue([&first_task_thread_id
](auto&&) {
184 first_task_thread_id
= std::this_thread::get_id();
187 .thenValue([=, &second_task_thread_id
](auto&&) {
188 second_task_thread_id
= std::this_thread::get_id();
189 return steady_clock::now() - t1
;
195 EXPECT_EQ(driver_thread_id
, first_task_thread_id
);
196 EXPECT_EQ(driver_thread_id
, second_task_thread_id
);
200 TYPED_TEST_P(TimekeeperTest
, FutureWithinThrows
) {
202 auto f
= p
.getFuture().within(1ms
).thenError(
203 tag_t
<FutureTimeout
>{}, [](auto&&) { return -1; });
205 EXPECT_EQ(-1, std::move(f
).get());
208 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinThrows
) {
210 auto f
= p
.getSemiFuture().within(1ms
).toUnsafeFuture().thenError(
211 tag_t
<FutureTimeout
>{}, [](auto&&) { return -1; });
213 EXPECT_EQ(-1, std::move(f
).get());
216 TYPED_TEST_P(TimekeeperTest
, FutureWithinAlreadyComplete
) {
217 auto f
= makeFuture(42).within(1ms
).thenError(
218 tag_t
<FutureTimeout
>{}, [&](auto&&) { return -1; });
220 EXPECT_EQ(42, std::move(f
).get());
223 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinAlreadyComplete
) {
224 auto f
= makeSemiFuture(42).within(1ms
).toUnsafeFuture().thenError(
225 tag_t
<FutureTimeout
>{}, [&](auto&&) { return -1; });
227 EXPECT_EQ(42, std::move(f
).get());
230 TYPED_TEST_P(TimekeeperTest
, FutureWithinFinishesInTime
) {
232 auto f
= p
.getFuture()
233 .within(std::chrono::minutes(1))
234 .thenError(tag_t
<FutureTimeout
>{}, [&](auto&&) { return -1; });
237 EXPECT_EQ(42, std::move(f
).get());
240 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinFinishesInTime
) {
242 auto f
= p
.getSemiFuture()
243 .within(std::chrono::minutes(1))
245 .thenError(tag_t
<FutureTimeout
>{}, [&](auto&&) { return -1; });
248 EXPECT_EQ(42, std::move(f
).get());
251 TYPED_TEST_P(TimekeeperTest
, FutureWithinVoidSpecialization
) {
252 makeFuture().within(1ms
);
255 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinVoidSpecialization
) {
256 makeSemiFuture().within(1ms
);
259 TYPED_TEST_P(TimekeeperTest
, FutureWithinException
) {
261 auto f
= p
.getFuture().within(10ms
, std::runtime_error("expected"));
262 EXPECT_THROW(std::move(f
).get(), std::runtime_error
);
265 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinException
) {
267 auto f
= p
.getSemiFuture().within(10ms
, std::runtime_error("expected"));
268 EXPECT_THROW(std::move(f
).get(), std::runtime_error
);
271 TYPED_TEST_P(TimekeeperTest
, OnTimeout
) {
285 TYPED_TEST_P(TimekeeperTest
, OnTimeoutComplete
) {
298 TYPED_TEST_P(TimekeeperTest
, OnTimeoutReturnsFuture
) {
306 return makeFuture(-1);
312 TYPED_TEST_P(TimekeeperTest
, OnTimeoutVoid
) {
313 makeFuture().delayed(1ms
).onTimeout(0ms
, [&] {});
314 makeFuture().delayed(1ms
).onTimeout(
315 0ms
, [&] { return makeFuture
<Unit
>(std::runtime_error("expected")); });
316 // just testing compilation here
319 TYPED_TEST_P(TimekeeperTest
, InterruptDoesntCrash
) {
320 auto f
= futures::sleep(10s
);
324 TYPED_TEST_P(TimekeeperTest
, ChainedInterruptTest
) {
326 auto f
= futures::sleep(100ms
).deferValue([&](auto&&) { test
= true; });
332 TYPED_TEST_P(TimekeeperTest
, FutureWithinChainedInterruptTest
) {
335 p
.setInterruptHandler([&test
, &p
](const exception_wrapper
& ex
) {
337 [&test
](const FutureCancellation
& /* cancellation */) { test
= true; });
340 auto f
= p
.getFuture().within(100ms
);
341 EXPECT_FALSE(test
) << "Sanity check";
347 TYPED_TEST_P(TimekeeperTest
, SemiFutureWithinChainedInterruptTest
) {
350 p
.setInterruptHandler([&test
, &p
](const exception_wrapper
& ex
) {
352 [&test
](const FutureCancellation
& /* cancellation */) { test
= true; });
355 auto f
= p
.getSemiFuture().within(100ms
);
356 EXPECT_FALSE(test
) << "Sanity check";
362 TYPED_TEST_P(TimekeeperTest
, Executor
) {
363 class ExecutorTester
: public DefaultKeepAliveExecutor
{
365 virtual void add(Func f
) override
{
369 void join() { joinKeepAlive(); }
370 std::atomic
<int> count
{0};
375 ExecutorTester tester
;
376 auto f
= p
.getFuture()
378 .within(std::chrono::seconds(10))
379 .thenValue([&](auto&&) {});
383 EXPECT_EQ(3, tester
.count
);
388 ExecutorTester tester
;
389 auto f
= p
.getFuture()
391 .within(std::chrono::milliseconds(10))
392 .thenValue([&](auto&&) {});
393 EXPECT_THROW(std::move(f
).get(), FutureTimeout
);
396 EXPECT_EQ(3, tester
.count
);
402 TYPED_TEST_P(TimekeeperTest, OnTimeoutPropagates) {
405 makeFuture(42).delayed(1ms)
406 .onTimeout(0ms, [&]{ flag = true; })
413 TYPED_TEST_P(TimekeeperTest
, AtBeforeNow
) {
414 auto f
= detail::getTimekeeperSingleton()->at(steady_clock::now() - 10s
);
415 EXPECT_TRUE(f
.isReady());
416 EXPECT_FALSE(f
.hasException());
419 TYPED_TEST_P(TimekeeperTest
, HowToCastDuration
) {
420 // I'm not sure whether this rounds up or down but it's irrelevant for the
421 // purpose of this example.
422 auto f
= detail::getTimekeeperSingleton()->after(
423 std::chrono::duration_cast
<Duration
>(std::chrono::nanoseconds(1)));
426 TYPED_TEST_P(TimekeeperTest
, Destruction
) {
427 folly::Optional
<TypeParam
> tk
{std::in_place
};
428 auto f
= tk
->after(std::chrono::seconds(10));
429 EXPECT_FALSE(f
.isReady());
431 EXPECT_TRUE(f
.isReady());
432 EXPECT_TRUE(f
.hasException());
435 TYPED_TEST_P(TimekeeperTest
, ConcurrentDestructionAndCancellation
) {
436 folly::Optional
<TypeParam
> tk
{std::in_place
};
437 auto f
= tk
->after(std::chrono::seconds(10));
438 EXPECT_FALSE(f
.isReady());
439 std::thread t
{[&] { f
.cancel(); }};
442 EXPECT_TRUE(f
.isReady());
443 EXPECT_TRUE(f
.hasException());
450 std::chrono::microseconds duration
, std::chrono::microseconds period
) {
451 using usec
= std::chrono::microseconds
;
453 folly::Optional
<Tk
> tk
{std::in_place
};
454 std::vector
<std::thread
> workers
;
456 // Run continuations on a serial executor so we don't need synchronization to
457 // modify shared state.
458 folly::Optional
<VirtualExecutor
> continuationsThread
{
459 std::in_place
, SerialExecutor::create(folly::getGlobalCPUExecutor())};
460 size_t numCompletions
= 0;
464 // Wait for any lazy initialization in the timekeeper and executor.
465 tk
->after(1ms
).via(&*continuationsThread
).then([](auto&&) {}).get();
467 static const auto jitter
= [](usec avg
) {
468 // Center around average.
469 return usec(folly::Random::rand64(2 * avg
.count()));
472 static const auto jitterSleep
= [](steady_clock::time_point
& now
, usec avg
) {
474 if (now
- steady_clock::now() < 10us
) {
475 // Busy-sleep if yielding the CPU would take too long.
476 while (now
> steady_clock::now()) {
479 /* sleep override */ std::this_thread::sleep_until(now
);
483 for (size_t i
= 0; i
< 8; ++i
) {
484 workers
.emplace_back([&] {
485 std::vector
<Future
<Unit
>> futures
;
486 for (auto start
= steady_clock::now(), now
= start
;
487 now
< start
+ duration
;
488 jitterSleep(now
, period
)) {
489 // Use the test duration as rough range for the timeouts.
490 auto dur
= jitter(duration
);
491 auto expected
= steady_clock::now() + dur
;
495 .thenValue([](auto&&) { return steady_clock::now(); })
496 .via(&*continuationsThread
)
497 .thenValue([&, expected
](auto fired
) {
499 std::chrono::duration_cast
<usec
>(fired
- expected
);
500 // TODO(ott): HHWheelTimer-based timekeepers round down the
501 // timeout, so they may fire early, for now ignore this.
502 if (delay
< 0us
&& delay
> -1ms
) {
505 ASSERT_GE(delay
.count(), 0);
508 maxDelay
= std::max(maxDelay
, delay
);
512 for (auto& f
: futures
) {
513 // While at it, check that canceling the future after it has been
514 // fulfilled has no effect. To do so, we wait non-destructively.
515 while (!f
.isReady()) {
516 /* sleep override */ std::this_thread::sleep_for(1ms
);
519 EXPECT_NO_THROW(std::move(f
).get());
524 // Add a worker that cancels all its futures.
525 size_t numAttemptedCancellations
= 0;
526 size_t numCancellations
= 0;
527 workers
.emplace_back([&] {
528 std::vector
<SemiFuture
<Unit
>> futures
;
529 for (auto start
= steady_clock::now(), now
= start
; now
< start
+ duration
;
530 jitterSleep(now
, 1ms
)) {
531 // Pick a wide range of durations to exercise various positions in the
532 // sequence of timeouts.
533 auto dur
= 5ms
+ jitter(5s
);
534 futures
.push_back(tk
->after(dur
));
535 // Cancel the future scheduled in the previous iteration.
536 if (futures
.size() > 1) {
537 futures
[futures
.size() - 2].cancel();
541 futures
.back().cancel();
542 numAttemptedCancellations
= futures
.size();
544 for (auto& f
: futures
) {
545 if (std::move(f
).getTry().hasException
<FutureCancellation
>()) {
551 // Add a few timeouts that will not survive the timekeeper.
552 std::vector
<SemiFuture
<Unit
>> shutdownFutures
;
553 for (size_t i
= 0; i
< 10; ++i
) {
554 shutdownFutures
.push_back(tk
->after(10min
));
557 for (auto& worker
: workers
) {
561 continuationsThread
.reset(); // Wait for all continuations.
562 ASSERT_GT(numCompletions
, 0);
564 // In principle the delay is unbounded (depending on the state of the system),
565 // so we cannot have any upper bound that is both meaningful and reliable, but
566 // we can log it to manually inspect the behavior.
567 LOG(INFO
) << fmt::format(
568 "Successful completions: {}, avg delay: {} us, max delay: {} us ",
570 sumDelay
.count() / numCompletions
,
573 // Similarly, a cancellation may be processed only after the future has fired,
574 // but in normal conditions this should never happen.
575 LOG(INFO
) << fmt::format(
576 "Attempted cancellations: {}, successful: {}",
577 numAttemptedCancellations
,
581 for (auto& f
: shutdownFutures
) {
582 EXPECT_TRUE(std::move(f
).getTry().hasException
<FutureNoTimekeeper
>());
588 TYPED_TEST_P(TimekeeperTest
, Stress
) {
589 stressTest
<TypeParam
>(/* duration */ 1s
, /* period */ 10ms
);
592 TYPED_TEST_P(TimekeeperTest
, StressHighContention
) {
593 // Test that nothing breaks when scheduling a large number of timeouts
594 // concurrently. In this case the timekeeper thread will be overloaded, so the
595 // measured delays are going to be large.
596 stressTest
<TypeParam
>(/* duration */ 50ms
, /* period */ 5us
);
599 REGISTER_TYPED_TEST_SUITE_P(
604 FutureGetBeforeTimeout
,
610 FutureDelayedStickyExecutor
,
612 SemiFutureWithinThrows
,
613 FutureWithinAlreadyComplete
,
614 SemiFutureWithinAlreadyComplete
,
615 FutureWithinFinishesInTime
,
616 SemiFutureWithinFinishesInTime
,
617 FutureWithinVoidSpecialization
,
618 SemiFutureWithinVoidSpecialization
,
619 FutureWithinException
,
620 SemiFutureWithinException
,
623 OnTimeoutReturnsFuture
,
625 InterruptDoesntCrash
,
626 ChainedInterruptTest
,
627 FutureWithinChainedInterruptTest
,
628 SemiFutureWithinChainedInterruptTest
,
633 ConcurrentDestructionAndCancellation
,
635 StressHighContention
);