folly/folly/futures/test/TimekeeperTestLib.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <thread>
#include <vector>

#include <fmt/format.h>
#include <glog/logging.h>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Random.h>
#include <folly/Singleton.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/executors/SerialExecutor.h>
#include <folly/executors/VirtualExecutor.h>
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>

namespace folly {

using namespace std::chrono_literals;
using std::chrono::steady_clock;

template <class Tk>
class TimekeeperTest : public testing::Test {
 protected:
  void SetUp() override {
    // Replace the default timekeeper with the class under test, and verify that
    // the replacement was successful.
    Singleton<Timekeeper, detail::TimekeeperSingletonTag>::make_mock(
        [] { return new Tk; });
    ASSERT_TRUE(
        dynamic_cast<Tk*>(detail::getTimekeeperSingleton().get()) != nullptr);
  }

  void TearDown() override {
    // Invalidate any mocks that were installed.
    folly::SingletonVault::singleton()->destroyInstances();
    folly::SingletonVault::singleton()->reenableInstances();
  }
};

TYPED_TEST_SUITE_P(TimekeeperTest);

TYPED_TEST_P(TimekeeperTest, After) {
  auto t1 = steady_clock::now();
  auto f = detail::getTimekeeperSingleton()->after(10ms);
  EXPECT_FALSE(f.isReady());
  std::move(f).get();
  auto t2 = steady_clock::now();

  EXPECT_GE(t2 - t1, 10ms);
}

TYPED_TEST_P(TimekeeperTest, AfterUnsafe) {
  auto t1 = steady_clock::now();
  auto f = detail::getTimekeeperSingleton()->afterUnsafe(10ms);
  EXPECT_FALSE(f.isReady());
  std::move(f).get();
  auto t2 = steady_clock::now();

  EXPECT_GE(t2 - t1, 10ms);
}

TYPED_TEST_P(TimekeeperTest, FutureGet) {
  Promise<int> p;
  auto t = std::thread([&] { p.setValue(42); });
  EXPECT_EQ(42, p.getFuture().get());
  t.join();
}

TYPED_TEST_P(TimekeeperTest, FutureGetBeforeTimeout) {
  Promise<int> p;
  auto t = std::thread([&] { p.setValue(42); });
  // Technically this is a race and if the test server is REALLY overloaded
  // and it takes more than a second to do that thread it could be flaky. But
  // I want a low timeout (in human terms) so if this regresses and someone
  // runs it by hand they're not sitting there forever wondering why it's
  // blocked, and get a useful error message instead. If it does get flaky,
  // empirically increase the timeout to the point where it's very improbable.
  EXPECT_EQ(42, p.getFuture().get(std::chrono::seconds(2)));
  t.join();
}

TYPED_TEST_P(TimekeeperTest, FutureGetTimeout) {
  Promise<int> p;
  EXPECT_THROW(p.getFuture().get(1ms), folly::FutureTimeout);
}

TYPED_TEST_P(TimekeeperTest, FutureSleep) {
  auto t1 = steady_clock::now();
  futures::sleep(1ms).get();
  EXPECT_GE(steady_clock::now() - t1, 1ms);
}

FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
TYPED_TEST_P(TimekeeperTest, FutureSleepUnsafe) {
  auto t1 = steady_clock::now();
  futures::sleepUnsafe(1ms).get();
  EXPECT_GE(steady_clock::now() - t1, 1ms);
}
FOLLY_POP_WARNING

TYPED_TEST_P(TimekeeperTest, FutureDelayed) {
  auto t1 = steady_clock::now();
  auto dur = makeFuture()
                 .delayed(1ms)
                 .thenValue([=](auto&&) { return steady_clock::now() - t1; })
                 .get();

  EXPECT_GE(dur, 1ms);
}

TYPED_TEST_P(TimekeeperTest, SemiFutureDelayed) {
  auto t1 = steady_clock::now();
  auto dur = makeSemiFuture()
                 .delayed(1ms)
                 .toUnsafeFuture()
                 .thenValue([=](auto&&) { return steady_clock::now() - t1; })
                 .get();

  EXPECT_GE(dur, 1ms);
}

TYPED_TEST_P(TimekeeperTest, FutureDelayedStickyExecutor) {
  // Check that delayed without an executor binds the inline executor.
  {
    auto t1 = steady_clock::now();
    std::thread::id timekeeper_thread_id =
        folly::detail::getTimekeeperSingleton()
            // Ensure that the continuation is run almost certainly in the
            // timekeeper's thread.
            ->after(100ms)
            .toUnsafeFuture()
            .thenValue([](auto&&) { return std::this_thread::get_id(); })
            .get();
    std::thread::id task_thread_id{};
    auto dur = makeFuture()
                   .delayed(1ms)
                   .thenValue([=, &task_thread_id](auto&&) {
                     task_thread_id = std::this_thread::get_id();
                     return steady_clock::now() - t1;
                   })
                   .get();

    EXPECT_GE(dur, 1ms);
    EXPECT_EQ(timekeeper_thread_id, task_thread_id);
  }

  // Check that delayed applied to an executor returns a future that binds
  // to the same executor as was input.
  {
    auto t1 = steady_clock::now();
    std::thread::id driver_thread_id{};
    std::thread::id first_task_thread_id{};
    std::thread::id second_task_thread_id{};
    folly::ManualExecutor me;
    std::atomic<bool> stop_signal{false};
    std::thread me_driver{[&me, &driver_thread_id, &stop_signal] {
      driver_thread_id = std::this_thread::get_id();
      while (!stop_signal) {
        me.run();
      }
    }};
    auto dur = makeSemiFuture()
                   .via(&me)
                   .thenValue([&first_task_thread_id](auto&&) {
                     first_task_thread_id = std::this_thread::get_id();
                   })
                   .delayed(1ms)
                   .thenValue([=, &second_task_thread_id](auto&&) {
                     second_task_thread_id = std::this_thread::get_id();
                     return steady_clock::now() - t1;
                   })
                   .get();
    stop_signal = true;
    me_driver.join();
    EXPECT_GE(dur, 1ms);
    EXPECT_EQ(driver_thread_id, first_task_thread_id);
    EXPECT_EQ(driver_thread_id, second_task_thread_id);
  }
}

TYPED_TEST_P(TimekeeperTest, FutureWithinThrows) {
  Promise<int> p;
  auto f = p.getFuture().within(1ms).thenError(
      tag_t<FutureTimeout>{}, [](auto&&) { return -1; });

  EXPECT_EQ(-1, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinThrows) {
  Promise<int> p;
  auto f = p.getSemiFuture().within(1ms).toUnsafeFuture().thenError(
      tag_t<FutureTimeout>{}, [](auto&&) { return -1; });

  EXPECT_EQ(-1, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, FutureWithinAlreadyComplete) {
  auto f = makeFuture(42).within(1ms).thenError(
      tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });

  EXPECT_EQ(42, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinAlreadyComplete) {
  auto f = makeSemiFuture(42).within(1ms).toUnsafeFuture().thenError(
      tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });

  EXPECT_EQ(42, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, FutureWithinFinishesInTime) {
  Promise<int> p;
  auto f = p.getFuture()
               .within(std::chrono::minutes(1))
               .thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
  p.setValue(42);

  EXPECT_EQ(42, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinFinishesInTime) {
  Promise<int> p;
  auto f = p.getSemiFuture()
               .within(std::chrono::minutes(1))
               .toUnsafeFuture()
               .thenError(tag_t<FutureTimeout>{}, [&](auto&&) { return -1; });
  p.setValue(42);

  EXPECT_EQ(42, std::move(f).get());
}

TYPED_TEST_P(TimekeeperTest, FutureWithinVoidSpecialization) {
  makeFuture().within(1ms);
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinVoidSpecialization) {
  makeSemiFuture().within(1ms);
}

TYPED_TEST_P(TimekeeperTest, FutureWithinException) {
  Promise<Unit> p;
  auto f = p.getFuture().within(10ms, std::runtime_error("expected"));
  EXPECT_THROW(std::move(f).get(), std::runtime_error);
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinException) {
  Promise<Unit> p;
  auto f = p.getSemiFuture().within(10ms, std::runtime_error("expected"));
  EXPECT_THROW(std::move(f).get(), std::runtime_error);
}

TYPED_TEST_P(TimekeeperTest, OnTimeout) {
  bool flag = false;
  makeFuture(42)
      .delayed(10 * 1ms)
      .onTimeout(
          0ms,
          [&] {
            flag = true;
            return -1;
          })
      .get();
  EXPECT_TRUE(flag);
}

TYPED_TEST_P(TimekeeperTest, OnTimeoutComplete) {
  bool flag = false;
  makeFuture(42)
      .onTimeout(
          0ms,
          [&] {
            flag = true;
            return -1;
          })
      .get();
  EXPECT_FALSE(flag);
}

TYPED_TEST_P(TimekeeperTest, OnTimeoutReturnsFuture) {
  bool flag = false;
  makeFuture(42)
      .delayed(10 * 1ms)
      .onTimeout(
          0ms,
          [&] {
            flag = true;
            return makeFuture(-1);
          })
      .get();
  EXPECT_TRUE(flag);
}

TYPED_TEST_P(TimekeeperTest, OnTimeoutVoid) {
  makeFuture().delayed(1ms).onTimeout(0ms, [&] {});
  makeFuture().delayed(1ms).onTimeout(
      0ms, [&] { return makeFuture<Unit>(std::runtime_error("expected")); });
  // just testing compilation here
}

TYPED_TEST_P(TimekeeperTest, InterruptDoesntCrash) {
  auto f = futures::sleep(10s);
  f.cancel();
}

TYPED_TEST_P(TimekeeperTest, ChainedInterruptTest) {
  bool test = false;
  auto f = futures::sleep(100ms).deferValue([&](auto&&) { test = true; });
  f.cancel();
  f.wait();
  EXPECT_FALSE(test);
}

TYPED_TEST_P(TimekeeperTest, FutureWithinChainedInterruptTest) {
  bool test = false;
  Promise<Unit> p;
  p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
    ex.handle(
        [&test](const FutureCancellation& /* cancellation */) { test = true; });
    p.setException(ex);
  });
  auto f = p.getFuture().within(100ms);
  EXPECT_FALSE(test) << "Sanity check";
  f.cancel();
  f.wait();
  EXPECT_TRUE(test);
}

TYPED_TEST_P(TimekeeperTest, SemiFutureWithinChainedInterruptTest) {
  bool test = false;
  Promise<Unit> p;
  p.setInterruptHandler([&test, &p](const exception_wrapper& ex) {
    ex.handle(
        [&test](const FutureCancellation& /* cancellation */) { test = true; });
    p.setException(ex);
  });
  auto f = p.getSemiFuture().within(100ms);
  EXPECT_FALSE(test) << "Sanity check";
  f.cancel();
  f.wait();
  EXPECT_TRUE(test);
}

TYPED_TEST_P(TimekeeperTest, Executor) {
  class ExecutorTester : public DefaultKeepAliveExecutor {
   public:
    virtual void add(Func f) override {
      count++;
      f();
    }
    void join() { joinKeepAlive(); }
    std::atomic<int> count{0};
  };

  {
    Promise<Unit> p;
    ExecutorTester tester;
    auto f = p.getFuture()
                 .via(&tester)
                 .within(std::chrono::seconds(10))
                 .thenValue([&](auto&&) {});
    p.setValue();
    std::move(f).get();
    tester.join();
    EXPECT_EQ(3, tester.count);
  }

  {
    Promise<Unit> p;
    ExecutorTester tester;
    auto f = p.getFuture()
                 .via(&tester)
                 .within(std::chrono::milliseconds(10))
                 .thenValue([&](auto&&) {});
    EXPECT_THROW(std::move(f).get(), FutureTimeout);
    p.setValue();
    tester.join();
    EXPECT_EQ(3, tester.count);
  }
}

// TODO(5921764)
/*
TYPED_TEST_P(TimekeeperTest, OnTimeoutPropagates) {
  bool flag = false;
  EXPECT_THROW(
    makeFuture(42).delayed(1ms)
      .onTimeout(0ms, [&]{ flag = true; })
      .get(),
    FutureTimeout);
  EXPECT_TRUE(flag);
}
*/

TYPED_TEST_P(TimekeeperTest, AtBeforeNow) {
  auto f = detail::getTimekeeperSingleton()->at(steady_clock::now() - 10s);
  EXPECT_TRUE(f.isReady());
  EXPECT_FALSE(f.hasException());
}

TYPED_TEST_P(TimekeeperTest, HowToCastDuration) {
  // I'm not sure whether this rounds up or down but it's irrelevant for the
  // purpose of this example.
  auto f = detail::getTimekeeperSingleton()->after(
      std::chrono::duration_cast<Duration>(std::chrono::nanoseconds(1)));
}

TYPED_TEST_P(TimekeeperTest, Destruction) {
  folly::Optional<TypeParam> tk{std::in_place};
  auto f = tk->after(std::chrono::seconds(10));
  EXPECT_FALSE(f.isReady());
  tk.reset();
  EXPECT_TRUE(f.isReady());
  EXPECT_TRUE(f.hasException());
}

TYPED_TEST_P(TimekeeperTest, ConcurrentDestructionAndCancellation) {
  folly::Optional<TypeParam> tk{std::in_place};
  auto f = tk->after(std::chrono::seconds(10));
  EXPECT_FALSE(f.isReady());
  std::thread t{[&] { f.cancel(); }};
  tk.reset();
  t.join();
  EXPECT_TRUE(f.isReady());
  EXPECT_TRUE(f.hasException());
}

namespace {

template <class Tk>
void stressTest(
    std::chrono::microseconds duration, std::chrono::microseconds period) {
  using usec = std::chrono::microseconds;

  folly::Optional<Tk> tk{std::in_place};
  std::vector<std::thread> workers;

  // Run continuations on a serial executor so we don't need synchronization to
  // modify shared state.
  folly::Optional<VirtualExecutor> continuationsThread{
      std::in_place, SerialExecutor::create(folly::getGlobalCPUExecutor())};
  size_t numCompletions = 0;
  usec sumDelay{0};
  usec maxDelay{0};

  // Wait for any lazy initialization in the timekeeper and executor.
  tk->after(1ms).via(&*continuationsThread).then([](auto&&) {}).get();

  static const auto jitter = [](usec avg) {
    // Center around average.
    return usec(folly::Random::rand64(2 * avg.count()));
  };

  static const auto jitterSleep = [](steady_clock::time_point& now, usec avg) {
    now += jitter(avg);
    if (now - steady_clock::now() < 10us) {
      // Busy-sleep if yielding the CPU would take too long.
      while (now > steady_clock::now()) {
      }
    } else {
      /* sleep override */ std::this_thread::sleep_until(now);
    }
  };

  for (size_t i = 0; i < 8; ++i) {
    workers.emplace_back([&] {
      std::vector<Future<Unit>> futures;
      for (auto start = steady_clock::now(), now = start;
           now < start + duration;
           jitterSleep(now, period)) {
        // Use the test duration as rough range for the timeouts.
        auto dur = jitter(duration);
        auto expected = steady_clock::now() + dur;
        futures.push_back(
            tk->after(dur)
                .toUnsafeFuture()
                .thenValue([](auto&&) { return steady_clock::now(); })
                .via(&*continuationsThread)
                .thenValue([&, expected](auto fired) {
                  auto delay =
                      std::chrono::duration_cast<usec>(fired - expected);
                  // TODO(ott): HHWheelTimer-based timekeepers round down the
                  // timeout, so they may fire early, for now ignore this.
                  if (delay < 0us && delay > -1ms) {
                    delay = 0us;
                  }
                  ASSERT_GE(delay.count(), 0);
                  ++numCompletions;
                  sumDelay += delay;
                  maxDelay = std::max(maxDelay, delay);
                }));
      }

      for (auto& f : futures) {
        // While at it, check that canceling the future after it has been
        // fulfilled has no effect. To do so, we wait non-destructively.
        while (!f.isReady()) {
          /* sleep override */ std::this_thread::sleep_for(1ms);
        }
        f.cancel();
        EXPECT_NO_THROW(std::move(f).get());
      }
    });
  }

  // Add a worker that cancels all its futures.
  size_t numAttemptedCancellations = 0;
  size_t numCancellations = 0;
  workers.emplace_back([&] {
    std::vector<SemiFuture<Unit>> futures;
    for (auto start = steady_clock::now(), now = start; now < start + duration;
         jitterSleep(now, 1ms)) {
      // Pick a wide range of durations to exercise various positions in the
      // sequence of timeouts.
      auto dur = 5ms + jitter(5s);
      futures.push_back(tk->after(dur));
      // Cancel the future scheduled in the previous iteration.
      if (futures.size() > 1) {
        futures[futures.size() - 2].cancel();
      }
    }

    futures.back().cancel();
    numAttemptedCancellations = futures.size();

    for (auto& f : futures) {
      if (std::move(f).getTry().hasException<FutureCancellation>()) {
        ++numCancellations;
      }
    }
  });

  // Add a few timeouts that will not survive the timekeeper.
  std::vector<SemiFuture<Unit>> shutdownFutures;
  for (size_t i = 0; i < 10; ++i) {
    shutdownFutures.push_back(tk->after(10min));
  }

  for (auto& worker : workers) {
    worker.join();
  }

  continuationsThread.reset(); // Wait for all continuations.
  ASSERT_GT(numCompletions, 0);

  // In principle the delay is unbounded (depending on the state of the system),
  // so we cannot have any upper bound that is both meaningful and reliable, but
  // we can log it to manually inspect the behavior.
  LOG(INFO) << fmt::format(
      "Successful completions: {}, avg delay: {} us, max delay: {} us ",
      numCompletions,
      sumDelay.count() / numCompletions,
      maxDelay.count());

  // Similarly, a cancellation may be processed only after the future has fired,
  // but in normal conditions this should never happen.
  LOG(INFO) << fmt::format(
      "Attempted cancellations: {}, successful: {}",
      numAttemptedCancellations,
      numCancellations);

  tk.reset();
  for (auto& f : shutdownFutures) {
    EXPECT_TRUE(std::move(f).getTry().hasException<FutureNoTimekeeper>());
  }
}

} // namespace

TYPED_TEST_P(TimekeeperTest, Stress) {
  stressTest<TypeParam>(/* duration */ 1s, /* period */ 10ms);
}

TYPED_TEST_P(TimekeeperTest, StressHighContention) {
  // Test that nothing breaks when scheduling a large number of timeouts
  // concurrently. In this case the timekeeper thread will be overloaded, so the
  // measured delays are going to be large.
  stressTest<TypeParam>(/* duration */ 50ms, /* period */ 5us);
}

REGISTER_TYPED_TEST_SUITE_P(
    TimekeeperTest,
    After,
    AfterUnsafe,
    FutureGet,
    FutureGetBeforeTimeout,
    FutureGetTimeout,
    FutureSleep,
    FutureSleepUnsafe,
    FutureDelayed,
    SemiFutureDelayed,
    FutureDelayedStickyExecutor,
    FutureWithinThrows,
    SemiFutureWithinThrows,
    FutureWithinAlreadyComplete,
    SemiFutureWithinAlreadyComplete,
    FutureWithinFinishesInTime,
    SemiFutureWithinFinishesInTime,
    FutureWithinVoidSpecialization,
    SemiFutureWithinVoidSpecialization,
    FutureWithinException,
    SemiFutureWithinException,
    OnTimeout,
    OnTimeoutComplete,
    OnTimeoutReturnsFuture,
    OnTimeoutVoid,
    InterruptDoesntCrash,
    ChainedInterruptTest,
    FutureWithinChainedInterruptTest,
    SemiFutureWithinChainedInterruptTest,
    Executor,
    AtBeforeNow,
    HowToCastDuration,
    Destruction,
    ConcurrentDestructionAndCancellation,
    Stress,
    StressHighContention);

} // namespace folly