folly/folly/executors/test/SequencedExecutorTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <future>

#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/SerialExecutor.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>

namespace folly {

bool isSequencedExecutor(folly::Executor& executor) {
  // Add can be called from different threads, but it should be sequenced.
  auto cpuExecutor = std::make_shared<CPUThreadPoolExecutor>(4);
  auto producer =
      SerialExecutor::create(Executor::getKeepAliveToken(cpuExecutor.get()));

  std::atomic<size_t> nextCallIndex{0};
  std::atomic<bool> result{true};

  auto joinPromise = std::make_shared<std::promise<void>>();
  auto joinFuture = joinPromise->get_future();

  constexpr size_t kNumCalls = 10000;
  for (size_t callIndex = 0; callIndex < kNumCalls; ++callIndex) {
    producer->add([&result, &executor, &nextCallIndex, callIndex, joinPromise] {
      executor.add([&result, &nextCallIndex, callIndex, joinPromise] {
        if (nextCallIndex != callIndex) {
          result = false;
        }
        std::this_thread::yield();
        if (nextCallIndex.exchange(callIndex + 1) != callIndex) {
          result = false;
        }
      });
    });
  }

  joinPromise.reset();
  joinFuture.wait();

  return result;
}

void testExecutor(folly::Executor& executor) {
  EXPECT_FALSE(isSequencedExecutor(executor));
}

void testExecutor(folly::SequencedExecutor& executor) {
  EXPECT_TRUE(isSequencedExecutor(executor));
}

TEST(SequencedExecutor, CPUThreadPoolExecutor) {
  CPUThreadPoolExecutor executor(4);
  testExecutor(executor);
}

TEST(SequencedExecutor, SerialCPUThreadPoolExecutor) {
  auto cpuExecutor = std::make_shared<CPUThreadPoolExecutor>(4);
  auto executor =
      SerialExecutor::create(Executor::getKeepAliveToken(cpuExecutor.get()));
  testExecutor(*executor);
}

TEST(SequencedExecutor, EventBase) {
  testExecutor(*ScopedEventBaseThread().getEventBase());
}

} // namespace folly