folly/folly/synchronization/test/ThrottledLifoSemTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/synchronization/ThrottledLifoSem.h>

#include <chrono>
#include <thread>
#include <vector>

#include <folly/Benchmark.h>
#include <folly/Random.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/SaturatingSemaphore.h>

namespace folly {

class ThrottledLifoSemTestHelper {
 public:
  static void spinUntilWaiters(
      folly::ThrottledLifoSem& sem, size_t n, bool assertExact = false) {
    while (sem.numWaiters() != n) {
    }
    if (assertExact) {
      ASSERT_EQ(sem.numWaiters(), n);
    }
  }
};

} // namespace folly

const auto kNoSpin =
    folly::WaitOptions{}.spin_max(std::chrono::nanoseconds::zero());

TEST(ThrottledLifoSem, Basic) {
  folly::ThrottledLifoSem sem;
  EXPECT_TRUE(sem.post(0));
  EXPECT_FALSE(sem.post());
  EXPECT_TRUE(sem.try_wait());
  EXPECT_FALSE(sem.try_wait());
  EXPECT_FALSE(sem.try_wait_for(std::chrono::milliseconds(1)));
}

TEST(ThrottledLifoSem, Timeouts) {
  constexpr auto kWakeUpInterval = std::chrono::milliseconds(200);

  folly::ThrottledLifoSem sem({.wakeUpInterval = kWakeUpInterval});
  std::atomic<size_t> timeouts = 0;
  std::vector<std::thread> threads;
  for (size_t i = 0; i < 3; ++i) {
    threads.emplace_back([&] {
      // One waiter will succeed, one will timeout while waiting on the baton,
      // one while sleeping though the wakeup interval. The purpose of this test
      // is to ensure that the invariants (checked at destruction) are restored
      // in both timeout cases.
      if (!sem.try_wait_for(kWakeUpInterval / 2, kNoSpin)) {
        ++timeouts;
      }
    });
  }

  // May hang forever if any waiters time out before we get here, but the large
  // timeout should make this unlikely unless the system is heavily loaded.
  folly::ThrottledLifoSemTestHelper::spinUntilWaiters(
      sem, 3, /* assertExact */ true);

  EXPECT_TRUE(sem.post());
  for (auto& t : threads) {
    t.join();
  }
  EXPECT_EQ(timeouts.load(), 2);
}

// Exercise the race between post() and waiters that have just timed out.
TEST(ThrottledLifoSem, TimeoutsStress) {
  constexpr size_t kNumWaiters = 64;
  constexpr size_t kNumPosts = 100000;
  folly::ThrottledLifoSem sem(
      {.wakeUpInterval = std::chrono::nanoseconds::zero()});

  std::vector<std::thread> threads;
  std::atomic<size_t> received = 0;
  for (size_t i = 0; i < kNumWaiters; ++i) {
    threads.emplace_back([&] {
      while (received.load() != kNumPosts) {
        if (sem.try_wait_for(std::chrono::microseconds{100}, kNoSpin)) {
          ++received;
        }
      }
    });
  }

  for (size_t i = 0; i < kNumPosts; ++i) {
    sem.post();
    // Wait until the post is consumed.
    while (sem.valueGuess() > 0) {
    }
  }

  for (auto& t : threads) {
    t.join();
  }
}

TEST(ThrottledLifoSem, MultipleWaiters) {
  constexpr size_t kNumRounds = 10;
  constexpr size_t kNumWaiters = 64;
  constexpr auto kWakeUpInterval = std::chrono::microseconds(100);

  folly::ThrottledLifoSem sem({.wakeUpInterval = kWakeUpInterval});

  for (size_t round = 0; round < kNumRounds; ++round) {
    std::vector<std::thread> threads;
    for (size_t i = 0; i < kNumWaiters; ++i) {
      threads.emplace_back([&] { sem.wait(); });
    }

    folly::ThrottledLifoSemTestHelper::spinUntilWaiters(
        sem, kNumWaiters, /* assertExact */ true);

    // Use the batch post() for half the rounds.
    if (round % 2 == 0) {
      EXPECT_TRUE(sem.post(kNumWaiters));
    } else {
      for (size_t i = 0; i < kNumWaiters; ++i) {
        EXPECT_TRUE(sem.post());
        folly::ThrottledLifoSemTestHelper::spinUntilWaiters(
            sem, kNumWaiters - i - 1);
        /* sleep override */ std::this_thread::sleep_for(
            i * kWakeUpInterval / 10);
      }
    }

    for (auto& t : threads) {
      t.join();
    }
    // No more waiters.
    EXPECT_FALSE(sem.post());
    ASSERT_TRUE(sem.try_wait());
  }
}

TEST(ThrottledLifoSem, MPMCStress) {
  // Same number of producers and consumers.
  constexpr size_t kNumThreads = 16;
  constexpr size_t kNumPostsPerThread = 10000;
  constexpr size_t kExpectedHandoffs = kNumThreads * kNumPostsPerThread;
  constexpr auto kWakeUpInterval = std::chrono::microseconds(100);

  folly::ThrottledLifoSem sem({.wakeUpInterval = kWakeUpInterval});

  std::vector<std::thread> producers;
  std::vector<std::thread> consumers;
  folly::SaturatingSemaphore<> done;
  std::atomic<size_t> handoffs = 0;
  for (size_t t = 0; t < kNumThreads; ++t) {
    producers.emplace_back([&] {
      for (size_t i = 0; i < kNumPostsPerThread; ++i) {
        // Force the consumers to go to sleep sometimes.
        /* sleep override */ std::this_thread::sleep_for(
            kWakeUpInterval * folly::Random::randDouble01());
        sem.post();
      }
    });
    consumers.emplace_back([&] {
      while (true) {
        sem.wait();
        if (done.ready()) {
          return;
        }
        if (++handoffs == kExpectedHandoffs) {
          done.post();
        }
      }
    });
  }

  done.wait();
  EXPECT_EQ(sem.valueGuess(), 0);

  // Wake up the consumers.
  sem.post(kNumThreads);

  for (auto& t : consumers) {
    t.join();
  }
  for (auto& t : producers) {
    t.join();
  }

  // Re-check that nothing has changed while joining the threads.
  EXPECT_EQ(handoffs.load(), kExpectedHandoffs);
  EXPECT_EQ(sem.valueGuess(), 0);
}

namespace {

// Benchmark the cost of post() under contention when no wakeup is performed (by
// not having waiters), which simulates the case where the waking chain is
// already active or the thread pool is saturated.
void post(size_t iters, size_t numThreads) {
  folly::ThrottledLifoSem sem;

  std::vector<std::thread> threads;
  for (size_t t = 0; t < numThreads; ++t) {
    threads.emplace_back([&] {
      for (size_t i = 0; i < iters / numThreads; ++i) {
        sem.post();
      }
    });
  }

  for (auto& t : threads) {
    t.join();
  }
}

} // namespace

BENCHMARK_NAMED_PARAM(post, 1_thread, 1)
BENCHMARK_NAMED_PARAM(post, 2_threads, 2)
BENCHMARK_NAMED_PARAM(post, 4_threads, 4)
BENCHMARK_NAMED_PARAM(post, 8_threads, 8)

int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  gflags::ParseCommandLineFlags(&argc, &argv, true);
  int rv = RUN_ALL_TESTS();
  folly::runBenchmarksOnFlag();
  return rv;
}