folly/folly/executors/test/IOThreadPoolExecutorBaseTestLib.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <array>
#include <atomic>
#include <memory>
#include <optional>
#include <random>

#include <fmt/format.h>
#include <glog/logging.h>
#include <folly/Random.h>
#include <folly/container/F14Set.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

namespace folly {
namespace test {

template <typename T>
class IOThreadPoolExecutorBaseTest : public ::testing::Test {
 public:
  IOThreadPoolExecutorBaseTest() = default;
};

TYPED_TEST_SUITE_P(IOThreadPoolExecutorBaseTest);

TYPED_TEST_P(IOThreadPoolExecutorBaseTest, IOObserver) {
  struct EventBaseAccumulator : IOThreadPoolExecutorBase::IOObserver {
    void registerEventBase(EventBase& evb) override {
      // Observers should be called while the evbs are running, so this
      // operation should complete.
      evb.runInEventBaseThreadAndWait([&] { evbs.insert(&evb); });
    }
    void unregisterEventBase(EventBase& evb) override {
      // Same as registerEventBase().
      evb.runInEventBaseThreadAndWait([&] { evbs.erase(&evb); });
    }

    F14FastSet<EventBase*> evbs;
  };

  static constexpr size_t kNumThreads = 16;

  TypeParam ex{kNumThreads};
  auto observer1 = std::make_shared<EventBaseAccumulator>();
  auto observer2 = std::make_shared<EventBaseAccumulator>();
  ex.addObserver(observer1);
  ex.addObserver(observer2);
  EXPECT_EQ(observer1->evbs.size(), kNumThreads);
  EXPECT_EQ(observer2->evbs.size(), kNumThreads);
  ex.removeObserver(observer1);
  EXPECT_EQ(observer1->evbs.size(), 0);
  EXPECT_EQ(observer2->evbs.size(), kNumThreads);
  ex.join();
  EXPECT_EQ(observer1->evbs.size(), 0);
  EXPECT_EQ(observer2->evbs.size(), 0);
}

TYPED_TEST_P(IOThreadPoolExecutorBaseTest, GetEventBaseFromEvb) {
  static constexpr size_t kNumThreads = 16;
  TypeParam ex{kNumThreads};
  auto evbs = ex.getAllEventBases();
  std::shuffle(evbs.begin(), evbs.end(), std::default_random_engine{});

  for (auto ka : evbs) {
    auto* evb = ka.get();
    // If called from the EventBase thread, getEventBase() and add() stick to
    // the current EventBase.
    evb->runInEventBaseThreadAndWait([&] {
      EXPECT_EQ(evb, EventBaseManager::get()->getExistingEventBase());
      EXPECT_EQ(evb, ex.getEventBase());
      ex.add([&ex, evb] {
        EXPECT_EQ(evb, ex.getEventBase());
        EXPECT_EQ(evb, EventBaseManager::get()->getExistingEventBase());
      });
    });
  }
}

TYPED_TEST_P(IOThreadPoolExecutorBaseTest, StressTimeouts) {
  // Exercise multiplexed executors.
  static constexpr size_t kNumExecutors = 2;
  static constexpr size_t kNumThreads = 16;
  static constexpr size_t kNumTimersPerEvb = 100;
  static constexpr size_t kNumItersPerTimer = 16;
  static constexpr uint32_t kMaxTimeoutMs = 100;

  struct Timer : AsyncTimeout {
    using AsyncTimeout::AsyncTimeout;
    using Clock = std::chrono::steady_clock;

    void schedule() {
      timeout =
          std::chrono::milliseconds(folly::Random::rand32(1, kMaxTimeoutMs));
      start = Clock::now();
      scheduleTimeout(timeout);
    }

    void timeoutExpired() noexcept override {
      auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
          Clock::now() - (start + timeout));
      EXPECT_GE(delay.count(), 0);
      sumDelay += delay;
      maxDelay = std::max(maxDelay, delay);

      if (++count < kNumItersPerTimer) {
        schedule();
      } else {
        done.post();
      }
    }

    std::chrono::milliseconds timeout;
    Clock::time_point start;

    size_t count = 0;
    std::chrono::milliseconds sumDelay{0};
    std::chrono::milliseconds maxDelay{0};
    Baton<> done;
  };

  std::vector<std::unique_ptr<TypeParam>> executors;
  std::vector<folly::Executor::KeepAlive<folly::EventBase>> evbs;
  for (size_t i = 0; i < kNumExecutors; ++i) {
    auto& ex = executors.emplace_back(std::make_unique<TypeParam>(kNumThreads));
    auto exEvbs = ex->getAllEventBases();
    evbs.insert(evbs.end(), exEvbs.begin(), exEvbs.end());
  }

  using TimerList = std::array<Timer, kNumTimersPerEvb>;
  std::vector<std::unique_ptr<TimerList>> timerLists;
  timerLists.reserve(evbs.size());
  for (const auto& evb : evbs) {
    auto& timerList = timerLists.emplace_back();
    timerList = std::make_unique<TimerList>();

    evb->runInEventBaseThread([&timerList, &evb] {
      for (auto& timer : *timerList) {
        timer.attachEventBase(evb.get());
        timer.schedule();
      }
    });
  }

  size_t count = 0;
  std::chrono::milliseconds sumDelay{0};
  std::chrono::milliseconds maxDelay{0};
  for (auto& timerList : timerLists) {
    for (auto& timer : *timerList) {
      timer.done.wait();
      count += timer.count;
      sumDelay += timer.sumDelay;
      maxDelay = std::max(maxDelay, timer.maxDelay);
    }
  }

  EXPECT_EQ(
      count,
      kNumExecutors * kNumThreads * kNumTimersPerEvb * kNumItersPerTimer);
  LOG(INFO) << fmt::format(
      "Avg delay: {:.3f}ms, max delay: {}ms",
      static_cast<double>(sumDelay.count()) / count,
      maxDelay.count());
}

REGISTER_TYPED_TEST_SUITE_P(
    IOThreadPoolExecutorBaseTest,
    IOObserver,
    GetEventBaseFromEvb,
    StressTimeouts);

} // namespace test
} // namespace folly