Deshim VirtualExecutor in folly
[hiphop-php.git] / third-party / folly / src / folly / futures / test / TimekeeperTestLib.h
blobe6ccd2eb00c4365cc3b9025028349003ff6f8966
1 /*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #pragma once
19 #include <thread>
20 #include <vector>
22 #include <fmt/format.h>
23 #include <glog/logging.h>
24 #include <folly/DefaultKeepAliveExecutor.h>
25 #include <folly/Random.h>
26 #include <folly/Singleton.h>
27 #include <folly/executors/GlobalExecutor.h>
28 #include <folly/executors/ManualExecutor.h>
29 #include <folly/executors/SerialExecutor.h>
30 #include <folly/executors/VirtualExecutor.h>
31 #include <folly/futures/Future.h>
32 #include <folly/portability/GTest.h>
34 namespace folly {
36 using namespace std::chrono_literals;
37 using std::chrono::steady_clock;
39 template <class Tk>
40 class TimekeeperTest : public testing::Test {
41 protected:
42 void SetUp() override {
43 // Replace the default timekeeper with the class under test, and verify that
44 // the replacement was successful.
45 Singleton<Timekeeper, detail::TimekeeperSingletonTag>::make_mock(
46 [] { return new Tk; });
47 ASSERT_TRUE(
48 dynamic_cast<Tk*>(detail::getTimekeeperSingleton().get()) != nullptr);
51 void TearDown() override {
52 // Invalidate any mocks that were installed.
53 folly::SingletonVault::singleton()->destroyInstances();
54 folly::SingletonVault::singleton()->reenableInstances();
58 TYPED_TEST_SUITE_P(TimekeeperTest);
60 TYPED_TEST_P(TimekeeperTest, After) {
61 auto t1 = steady_clock::now();
62 auto f = detail::getTimekeeperSingleton()->after(10ms);
63 EXPECT_FALSE(f.isReady());
64 std::move(f).get();
65 auto t2 = steady_clock::now();
67 EXPECT_GE(t2 - t1, 10ms);
70 TYPED_TEST_P(TimekeeperTest, AfterUnsafe) {
71 auto t1 = steady_clock::now();
72 auto f = detail::getTimekeeperSingleton()->afterUnsafe(10ms);
73 EXPECT_FALSE(f.isReady());
74 std::move(f).get();
75 auto t2 = steady_clock::now();
77 EXPECT_GE(t2 - t1, 10ms);
80 TYPED_TEST_P(TimekeeperTest, FutureGet) {
81 Promise<int> p;
82 auto t = std::thread([&] { p.setValue(42); });
83 EXPECT_EQ(42, p.getFuture().get());
84 t.join();
87 TYPED_TEST_P(TimekeeperTest, FutureGetBeforeTimeout) {
88 Promise<int> p;
89 auto t = std::thread([&] { p.setValue(42); });
90 // Technically this is a race and if the test server is REALLY overloaded
91 // and it takes more than a second to do that thread it could be flaky. But
92 // I want a low timeout (in human terms) so if this regresses and someone
93 // runs it by hand they're not sitting there forever wondering why it's
94 // blocked, and get a useful error message instead. If it does get flaky,
95 // empirically increase the timeout to the point where it's very improbable.
96 EXPECT_EQ(42, p.getFuture().get(std::chrono::seconds(2)));
97 t.join();
100 TYPED_TEST_P(TimekeeperTest, FutureGetTimeout) {
101 Promise<int> p;
102 EXPECT_THROW(p.getFuture().get(1ms), folly::FutureTimeout);
105 TYPED_TEST_P(TimekeeperTest, FutureSleep) {
106 auto t1 = steady_clock::now();
107 futures::sleep(1ms).get();
108 EXPECT_GE(steady_clock::now() - t1, 1ms);
111 FOLLY_PUSH_WARNING
112 FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
113 TYPED_TEST_P(TimekeeperTest, FutureSleepUnsafe) {
114 auto t1 = steady_clock::now();
115 futures::sleepUnsafe(1ms).get();
116 EXPECT_GE(steady_clock::now() - t1, 1ms);
118 FOLLY_POP_WARNING
120 TYPED_TEST_P(TimekeeperTest, FutureDelayed) {
121 auto t1 = steady_clock::now();
122 auto dur = makeFuture()
123 .delayed(1ms)
124 .thenValue([=](auto&&) { return steady_clock::now() - t1; })
125 .get();
127 EXPECT_GE(dur, 1ms);
130 TYPED_TEST_P(TimekeeperTest, SemiFutureDelayed) {
131 auto t1 = steady_clock::now();
132 auto dur = makeSemiFuture()
133 .delayed(1ms)
134 .toUnsafeFuture()
135 .thenValue([=](auto&&) { return steady_clock::now() - t1; })
136 .get();
138 EXPECT_GE(dur, 1ms);
141 TYPED_TEST_P(TimekeeperTest, FutureDelayedStickyExecutor) {
142 // Check that delayed without an executor binds the inline executor.
144 auto t1 = steady_clock::now();
145 std::thread::id timekeeper_thread_id =
146 folly::detail::getTimekeeperSingleton()
147 // Ensure that the continuation is run almost certainly in the
148 // timekeeper's thread.
149 ->after(100ms)
150 .toUnsafeFuture()
151 .thenValue([](auto&&) { return std::this_thread::get_id(); })
152 .get();
153 std::thread::id task_thread_id{};
154 auto dur = makeFuture()
155 .delayed(1ms)
156 .thenValue([=, &task_thread_id](auto&&) {
157 task_thread_id = std::this_thread::get_id();
158 return steady_clock::now() - t1;
160 .get();
162 EXPECT_GE(dur, 1ms);
163 EXPECT_EQ(timekeeper_thread_id, task_thread_id);
166 // Check that delayed applied to an executor returns a future that binds
167 // to the same executor as was input.
169 auto t1 = steady_clock::now();
170 std::thread::id driver_thread_id{};
171 std::thread::id first_task_thread_id{};
172 std::thread::id second_task_thread_id{};
173 folly::ManualExecutor me;
174 std::atomic<bool> stop_signal{false};
175 std::thread me_driver{[&me, &driver_thread_id, &stop_signal] {
176 driver_thread_id = std::this_thread::get_id();
177 while (!stop_signal) {
178 me.run();
181 auto dur = makeSemiFuture()
182 .via(&me)
183 .thenValue([&first_task_thread_id](auto&&) {
184 first_task_thread_id = std::this_thread::get_id();
186 .delayed(1ms)
187 .thenValue([=, &second_task_thread_id](auto&&) {
188 second_task_thread_id = std::this_thread::get_id();
189 return steady_clock::now() - t1;
191 .get();
192 stop_signal = true;
193 me_driver.join();
194 EXPECT_GE(dur, 1ms);
195 EXPECT_EQ(driver_thread_id, first_task_thread_id);
196 EXPECT_EQ(driver_thread_id, second_task_thread_id);
200 TYPED_TEST_P(TimekeeperTest, FutureWithinThrows) {
201 Promise<int> p;
202 auto f = p.getFuture().within(1ms).thenError(
203 tag_t<FutureTimeout>{}, [](auto&&) { return -1; });
205 EXPECT_EQ(-1, std::move(f).get());
208 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinThrows) {
209 Promise<int> p;
210 auto f = p.getSemiFuture().within(1ms).toUnsafeFuture().thenError(
211 tag_t<FutureTimeout>{}, [](auto&&) { return -1; });
213 EXPECT_EQ(-1, std::move(f).get());
216 TYPED_TEST_P(TimekeeperTest, FutureWithinAlreadyComplete) {
217 auto f = makeFuture(42).within(1ms).thenError(
218 tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
220 EXPECT_EQ(42, std::move(f).get());
223 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinAlreadyComplete) {
224 auto f = makeSemiFuture(42).within(1ms).toUnsafeFuture().thenError(
225 tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
227 EXPECT_EQ(42, std::move(f).get());
230 TYPED_TEST_P(TimekeeperTest, FutureWithinFinishesInTime) {
231 Promise<int> p;
232 auto f = p.getFuture()
233 .within(std::chrono::minutes(1))
234 .thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
235 p.setValue(42);
237 EXPECT_EQ(42, std::move(f).get());
240 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinFinishesInTime) {
241 Promise<int> p;
242 auto f = p.getSemiFuture()
243 .within(std::chrono::minutes(1))
244 .toUnsafeFuture()
245 .thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
246 p.setValue(42);
248 EXPECT_EQ(42, std::move(f).get());
251 TYPED_TEST_P(TimekeeperTest, FutureWithinVoidSpecialization) {
252 makeFuture().within(1ms);
255 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinVoidSpecialization) {
256 makeSemiFuture().within(1ms);
259 TYPED_TEST_P(TimekeeperTest, FutureWithinException) {
260 Promise<Unit> p;
261 auto f = p.getFuture().within(10ms, std::runtime_error("expected"));
262 EXPECT_THROW(std::move(f).get(), std::runtime_error);
265 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinException) {
266 Promise<Unit> p;
267 auto f = p.getSemiFuture().within(10ms, std::runtime_error("expected"));
268 EXPECT_THROW(std::move(f).get(), std::runtime_error);
271 TYPED_TEST_P(TimekeeperTest, OnTimeout) {
272 bool flag = false;
273 makeFuture(42)
274 .delayed(10 * 1ms)
275 .onTimeout(
276 0ms,
277 [&] {
278 flag = true;
279 return -1;
281 .get();
282 EXPECT_TRUE(flag);
285 TYPED_TEST_P(TimekeeperTest, OnTimeoutComplete) {
286 bool flag = false;
287 makeFuture(42)
288 .onTimeout(
289 0ms,
290 [&] {
291 flag = true;
292 return -1;
294 .get();
295 EXPECT_FALSE(flag);
298 TYPED_TEST_P(TimekeeperTest, OnTimeoutReturnsFuture) {
299 bool flag = false;
300 makeFuture(42)
301 .delayed(10 * 1ms)
302 .onTimeout(
303 0ms,
304 [&] {
305 flag = true;
306 return makeFuture(-1);
308 .get();
309 EXPECT_TRUE(flag);
312 TYPED_TEST_P(TimekeeperTest, OnTimeoutVoid) {
313 makeFuture().delayed(1ms).onTimeout(0ms, [&] {});
314 makeFuture().delayed(1ms).onTimeout(
315 0ms, [&] { return makeFuture<Unit>(std::runtime_error("expected")); });
316 // just testing compilation here
319 TYPED_TEST_P(TimekeeperTest, InterruptDoesntCrash) {
320 auto f = futures::sleep(10s);
321 f.cancel();
324 TYPED_TEST_P(TimekeeperTest, ChainedInterruptTest) {
325 bool test = false;
326 auto f = futures::sleep(100ms).deferValue([&](auto&&) { test = true; });
327 f.cancel();
328 f.wait();
329 EXPECT_FALSE(test);
332 TYPED_TEST_P(TimekeeperTest, FutureWithinChainedInterruptTest) {
333 bool test = false;
334 Promise<Unit> p;
335 p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
336 ex.handle(
337 [&test](const FutureCancellation& /* cancellation */) { test = true; });
338 p.setException(ex);
340 auto f = p.getFuture().within(100ms);
341 EXPECT_FALSE(test) << "Sanity check";
342 f.cancel();
343 f.wait();
344 EXPECT_TRUE(test);
347 TYPED_TEST_P(TimekeeperTest, SemiFutureWithinChainedInterruptTest) {
348 bool test = false;
349 Promise<Unit> p;
350 p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
351 ex.handle(
352 [&test](const FutureCancellation& /* cancellation */) { test = true; });
353 p.setException(ex);
355 auto f = p.getSemiFuture().within(100ms);
356 EXPECT_FALSE(test) << "Sanity check";
357 f.cancel();
358 f.wait();
359 EXPECT_TRUE(test);
362 TYPED_TEST_P(TimekeeperTest, Executor) {
363 class ExecutorTester : public DefaultKeepAliveExecutor {
364 public:
365 virtual void add(Func f) override {
366 count++;
367 f();
369 void join() { joinKeepAlive(); }
370 std::atomic<int> count{0};
374 Promise<Unit> p;
375 ExecutorTester tester;
376 auto f = p.getFuture()
377 .via(&tester)
378 .within(std::chrono::seconds(10))
379 .thenValue([&](auto&&) {});
380 p.setValue();
381 std::move(f).get();
382 tester.join();
383 EXPECT_EQ(3, tester.count);
387 Promise<Unit> p;
388 ExecutorTester tester;
389 auto f = p.getFuture()
390 .via(&tester)
391 .within(std::chrono::milliseconds(10))
392 .thenValue([&](auto&&) {});
393 EXPECT_THROW(std::move(f).get(), FutureTimeout);
394 p.setValue();
395 tester.join();
396 EXPECT_EQ(3, tester.count);
400 // TODO(5921764)
402 TYPED_TEST_P(TimekeeperTest, OnTimeoutPropagates) {
403 bool flag = false;
404 EXPECT_THROW(
405 makeFuture(42).delayed(1ms)
406 .onTimeout(0ms, [&]{ flag = true; })
407 .get(),
408 FutureTimeout);
409 EXPECT_TRUE(flag);
413 TYPED_TEST_P(TimekeeperTest, AtBeforeNow) {
414 auto f = detail::getTimekeeperSingleton()->at(steady_clock::now() - 10s);
415 EXPECT_TRUE(f.isReady());
416 EXPECT_FALSE(f.hasException());
419 TYPED_TEST_P(TimekeeperTest, HowToCastDuration) {
420 // I'm not sure whether this rounds up or down but it's irrelevant for the
421 // purpose of this example.
422 auto f = detail::getTimekeeperSingleton()->after(
423 std::chrono::duration_cast<Duration>(std::chrono::nanoseconds(1)));
426 TYPED_TEST_P(TimekeeperTest, Destruction) {
427 folly::Optional<TypeParam> tk{std::in_place};
428 auto f = tk->after(std::chrono::seconds(10));
429 EXPECT_FALSE(f.isReady());
430 tk.reset();
431 EXPECT_TRUE(f.isReady());
432 EXPECT_TRUE(f.hasException());
435 TYPED_TEST_P(TimekeeperTest, ConcurrentDestructionAndCancellation) {
436 folly::Optional<TypeParam> tk{std::in_place};
437 auto f = tk->after(std::chrono::seconds(10));
438 EXPECT_FALSE(f.isReady());
439 std::thread t{[&] { f.cancel(); }};
440 tk.reset();
441 t.join();
442 EXPECT_TRUE(f.isReady());
443 EXPECT_TRUE(f.hasException());
446 namespace {
448 template <class Tk>
449 void stressTest(
450 std::chrono::microseconds duration, std::chrono::microseconds period) {
451 using usec = std::chrono::microseconds;
453 folly::Optional<Tk> tk{std::in_place};
454 std::vector<std::thread> workers;
456 // Run continuations on a serial executor so we don't need synchronization to
457 // modify shared state.
458 folly::Optional<VirtualExecutor> continuationsThread{
459 std::in_place, SerialExecutor::create(folly::getGlobalCPUExecutor())};
460 size_t numCompletions = 0;
461 usec sumDelay{0};
462 usec maxDelay{0};
464 // Wait for any lazy initialization in the timekeeper and executor.
465 tk->after(1ms).via(&*continuationsThread).then([](auto&&) {}).get();
467 static const auto jitter = [](usec avg) {
468 // Center around average.
469 return usec(folly::Random::rand64(2 * avg.count()));
472 static const auto jitterSleep = [](steady_clock::time_point& now, usec avg) {
473 now += jitter(avg);
474 if (now - steady_clock::now() < 10us) {
475 // Busy-sleep if yielding the CPU would take too long.
476 while (now > steady_clock::now()) {
478 } else {
479 /* sleep override */ std::this_thread::sleep_until(now);
483 for (size_t i = 0; i < 8; ++i) {
484 workers.emplace_back([&] {
485 std::vector<Future<Unit>> futures;
486 for (auto start = steady_clock::now(), now = start;
487 now < start + duration;
488 jitterSleep(now, period)) {
489 // Use the test duration as rough range for the timeouts.
490 auto dur = jitter(duration);
491 auto expected = steady_clock::now() + dur;
492 futures.push_back(
493 tk->after(dur)
494 .toUnsafeFuture()
495 .thenValue([](auto&&) { return steady_clock::now(); })
496 .via(&*continuationsThread)
497 .thenValue([&, expected](auto fired) {
498 auto delay =
499 std::chrono::duration_cast<usec>(fired - expected);
500 // TODO(ott): HHWheelTimer-based timekeepers round down the
501 // timeout, so they may fire early, for now ignore this.
502 if (delay < 0us && delay > -1ms) {
503 delay = 0us;
505 ASSERT_GE(delay.count(), 0);
506 ++numCompletions;
507 sumDelay += delay;
508 maxDelay = std::max(maxDelay, delay);
509 }));
512 for (auto& f : futures) {
513 // While at it, check that canceling the future after it has been
514 // fulfilled has no effect. To do so, we wait non-destructively.
515 while (!f.isReady()) {
516 /* sleep override */ std::this_thread::sleep_for(1ms);
518 f.cancel();
519 EXPECT_NO_THROW(std::move(f).get());
524 // Add a worker that cancels all its futures.
525 size_t numAttemptedCancellations = 0;
526 size_t numCancellations = 0;
527 workers.emplace_back([&] {
528 std::vector<SemiFuture<Unit>> futures;
529 for (auto start = steady_clock::now(), now = start; now < start + duration;
530 jitterSleep(now, 1ms)) {
531 // Pick a wide range of durations to exercise various positions in the
532 // sequence of timeouts.
533 auto dur = 5ms + jitter(5s);
534 futures.push_back(tk->after(dur));
535 // Cancel the future scheduled in the previous iteration.
536 if (futures.size() > 1) {
537 futures[futures.size() - 2].cancel();
541 futures.back().cancel();
542 numAttemptedCancellations = futures.size();
544 for (auto& f : futures) {
545 if (std::move(f).getTry().hasException<FutureCancellation>()) {
546 ++numCancellations;
551 // Add a few timeouts that will not survive the timekeeper.
552 std::vector<SemiFuture<Unit>> shutdownFutures;
553 for (size_t i = 0; i < 10; ++i) {
554 shutdownFutures.push_back(tk->after(10min));
557 for (auto& worker : workers) {
558 worker.join();
561 continuationsThread.reset(); // Wait for all continuations.
562 ASSERT_GT(numCompletions, 0);
564 // In principle the delay is unbounded (depending on the state of the system),
565 // so we cannot have any upper bound that is both meaningful and reliable, but
566 // we can log it to manually inspect the behavior.
567 LOG(INFO) << fmt::format(
568 "Successful completions: {}, avg delay: {} us, max delay: {} us ",
569 numCompletions,
570 sumDelay.count() / numCompletions,
571 maxDelay.count());
573 // Similarly, a cancellation may be processed only after the future has fired,
574 // but in normal conditions this should never happen.
575 LOG(INFO) << fmt::format(
576 "Attempted cancellations: {}, successful: {}",
577 numAttemptedCancellations,
578 numCancellations);
580 tk.reset();
581 for (auto& f : shutdownFutures) {
582 EXPECT_TRUE(std::move(f).getTry().hasException<FutureNoTimekeeper>());
586 } // namespace
588 TYPED_TEST_P(TimekeeperTest, Stress) {
589 stressTest<TypeParam>(/* duration */ 1s, /* period */ 10ms);
592 TYPED_TEST_P(TimekeeperTest, StressHighContention) {
593 // Test that nothing breaks when scheduling a large number of timeouts
594 // concurrently. In this case the timekeeper thread will be overloaded, so the
595 // measured delays are going to be large.
596 stressTest<TypeParam>(/* duration */ 50ms, /* period */ 5us);
599 REGISTER_TYPED_TEST_SUITE_P(
600 TimekeeperTest,
601 After,
602 AfterUnsafe,
603 FutureGet,
604 FutureGetBeforeTimeout,
605 FutureGetTimeout,
606 FutureSleep,
607 FutureSleepUnsafe,
608 FutureDelayed,
609 SemiFutureDelayed,
610 FutureDelayedStickyExecutor,
611 FutureWithinThrows,
612 SemiFutureWithinThrows,
613 FutureWithinAlreadyComplete,
614 SemiFutureWithinAlreadyComplete,
615 FutureWithinFinishesInTime,
616 SemiFutureWithinFinishesInTime,
617 FutureWithinVoidSpecialization,
618 SemiFutureWithinVoidSpecialization,
619 FutureWithinException,
620 SemiFutureWithinException,
621 OnTimeout,
622 OnTimeoutComplete,
623 OnTimeoutReturnsFuture,
624 OnTimeoutVoid,
625 InterruptDoesntCrash,
626 ChainedInterruptTest,
627 FutureWithinChainedInterruptTest,
628 SemiFutureWithinChainedInterruptTest,
629 Executor,
630 AtBeforeNow,
631 HowToCastDuration,
632 Destruction,
633 ConcurrentDestructionAndCancellation,
634 Stress,
635 StressHighContention);
637 } // namespace folly