/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/CPortability.h>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ThreadPoolExecutor.h>
#include <folly/lang/Keep.h>
#include <folly/synchronization/Latch.h>
#include <atomic>
#include <memory>
#include <thread>
#include <boost/thread.hpp>
#include <folly/Exception.h>
#include <folly/container/F14Map.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/EDFThreadPoolExecutor.h>
#include <folly/executors/FutureExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/executors/VirtualExecutor.h>
#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
#include <folly/executors/thread_factory/InitThreadFactory.h>
#include <folly/executors/thread_factory/PriorityThreadFactory.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <folly/portability/PThread.h>
#include <folly/portability/SysResource.h>
#include <folly/synchronization/detail/Spin.h>
#include <folly/system/ThreadId.h>
using namespace folly;
using namespace std::chrono;
// Like ASSERT_NEAR, for chrono duration types
#define ASSERT_NEAR_NS(a, b, c) \
do { \
ASSERT_NEAR( \
nanoseconds(a).count(), \
nanoseconds(b).count(), \
nanoseconds(c).count()); \
} while (0)
static Func burnMs(uint64_t ms) {
return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
}
#ifdef __linux__
static std::chrono::nanoseconds thread_clock_now() {
timespec tp;
clockid_t clockid;
CHECK(!pthread_getcpuclockid(pthread_self(), &clockid));
CHECK(!clock_gettime(clockid, &tp));
return std::chrono::nanoseconds(tp.tv_nsec) + std::chrono::seconds(tp.tv_sec);
}
// Loop and burn cpu cycles
static void burnThreadCpu(milliseconds ms) {
auto expires = thread_clock_now() + ms;
while (thread_clock_now() < expires) {
}
}
// Loop without using much cpu time
static void idleLoopFor(milliseconds ms) {
using clock = high_resolution_clock;
auto expires = clock::now() + ms;
while (clock::now() < expires) {
/* sleep override */ std::this_thread::sleep_for(100ms);
}
}
#endif
static WorkerProvider* kWorkerProviderGlobal = nullptr;
namespace folly {
#if FOLLY_HAVE_WEAK_SYMBOLS
FOLLY_KEEP std::unique_ptr<QueueObserverFactory> make_queue_observer_factory(
const std::string&, size_t, WorkerProvider* workerProvider) {
kWorkerProviderGlobal = workerProvider;
return {};
}
#endif
} // namespace folly
template <typename T>
class ThreadPoolExecutorTypedTest : public ::testing::Test {};
using ValueTypes = ::testing::
Types<CPUThreadPoolExecutor, IOThreadPoolExecutor, EDFThreadPoolExecutor>;
TYPED_TEST_SUITE(ThreadPoolExecutorTypedTest, ValueTypes);
template <class TPE>
static void basic() {
// Create and destroy
TPE tpe(10);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Basic) {
basic<TypeParam>();
}
template <class TPE>
static void resize() {
TPE tpe(100);
EXPECT_EQ(100, tpe.numThreads());
tpe.setNumThreads(50);
EXPECT_EQ(50, tpe.numThreads());
tpe.setNumThreads(150);
EXPECT_EQ(150, tpe.numThreads());
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Resize) {
resize<TypeParam>();
}
template <class TPE>
static void stop() {
TPE tpe(1);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(10)();
completed++;
};
for (int i = 0; i < 1000; i++) {
tpe.add(f);
}
tpe.stop();
EXPECT_GT(1000, completed);
}
// IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
// to the event base, will be executed upon its destruction, and cannot be
// taken back.
template <>
void stop<IOThreadPoolExecutor>() {
IOThreadPoolExecutor tpe(1);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(10)();
completed++;
};
for (int i = 0; i < 10; i++) {
tpe.add(f);
}
tpe.stop();
EXPECT_EQ(10, completed);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Stop) {
stop<TypeParam>();
}
template <class TPE>
static void join() {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
tpe.add(f);
}
tpe.join();
EXPECT_EQ(1000, completed);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Join) {
join<TypeParam>();
}
template <class TPE>
static void destroy() {
TPE tpe(1);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(10)();
completed++;
};
for (int i = 0; i < 1000; i++) {
tpe.add(f);
}
tpe.stop();
EXPECT_GT(1000, completed);
}
// IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong
// to the event base, will be executed upon its destruction, and cannot be
// taken back.
template <>
void destroy<IOThreadPoolExecutor>() {
Optional<IOThreadPoolExecutor> tpe(std::in_place, 1);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(10)();
completed++;
};
for (int i = 0; i < 10; i++) {
tpe->add(f);
}
tpe.reset();
EXPECT_EQ(10, completed);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Destroy) {
destroy<TypeParam>();
}
template <class TPE>
static void resizeUnderLoad() {
TPE tpe(10);
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(1)();
completed++;
};
for (int i = 0; i < 1000; i++) {
tpe.add(f);
}
tpe.setNumThreads(5);
tpe.setNumThreads(15);
tpe.join();
EXPECT_EQ(1000, completed);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, ResizeUnderLoad) {
resizeUnderLoad<TypeParam>();
}
template <class TPE>
static void poolStats() {
folly::Baton<> startBaton, endBaton;
TPE tpe(1);
auto stats = tpe.getPoolStats();
EXPECT_GE(1, stats.threadCount);
EXPECT_GE(1, stats.idleThreadCount);
EXPECT_EQ(0, stats.activeThreadCount);
EXPECT_EQ(0, stats.pendingTaskCount);
EXPECT_EQ(0, tpe.getPendingTaskCount());
EXPECT_EQ(0, stats.totalTaskCount);
tpe.add([&]() {
startBaton.post();
endBaton.wait();
});
tpe.add([&]() {});
startBaton.wait();
stats = tpe.getPoolStats();
EXPECT_EQ(1, stats.threadCount);
EXPECT_EQ(0, stats.idleThreadCount);
EXPECT_EQ(1, stats.activeThreadCount);
EXPECT_EQ(1, stats.pendingTaskCount);
EXPECT_EQ(1, tpe.getPendingTaskCount());
EXPECT_EQ(2, stats.totalTaskCount);
endBaton.post();
}
TEST(ThreadPoolExecutorTest, CPUPoolStats) {
poolStats<CPUThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, IOPoolStats) {
poolStats<IOThreadPoolExecutor>();
}
template <class TPE>
static void taskStats() {
TPE tpe(1);
std::atomic<int> c(0);
auto now = std::chrono::steady_clock::now();
tpe.subscribeToTaskStats([&](const ThreadPoolExecutor::TaskStats& stats) {
int i = c++;
EXPECT_LT(now, stats.enqueueTime);
EXPECT_LT(milliseconds(0), stats.runTime);
if (i == 1) {
EXPECT_LT(milliseconds(0), stats.waitTime);
EXPECT_NE(0, stats.requestId);
} else {
EXPECT_EQ(0, stats.requestId);
}
});
tpe.add(burnMs(10));
RequestContextScopeGuard rctx;
tpe.add(burnMs(10));
tpe.join();
EXPECT_EQ(2, c);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, TaskStats) {
taskStats<TypeParam>();
}
TYPED_TEST(ThreadPoolExecutorTypedTest, TaskObserver) {
struct TestTaskObserver : ThreadPoolExecutor::TaskObserver {
struct TaskState {
std::chrono::steady_clock::time_point enqueueTime;
std::optional<std::chrono::nanoseconds> waitTime;
bool ran = false;
};
void taskEnqueued(
const ThreadPoolExecutor::TaskInfo& info) noexcept override {
auto ts = taskStates.wlock();
auto [it, inserted] = ts->try_emplace(info.taskId);
ASSERT_TRUE(inserted);
it->second.enqueueTime = info.enqueueTime;
}
void taskDequeued(
const ThreadPoolExecutor::DequeuedTaskInfo& info) noexcept override {
auto ts = taskStates.wlock();
auto it = ts->find(info.taskId);
ASSERT_TRUE(it != ts->end());
EXPECT_EQ(it->second.enqueueTime, info.enqueueTime);
ASSERT_FALSE(std::exchange(it->second.waitTime, info.waitTime));
}
void taskProcessed(
const ThreadPoolExecutor::ProcessedTaskInfo& info) noexcept override {
auto ts = taskStates.wlock();
auto it = ts->find(info.taskId);
ASSERT_TRUE(it != ts->end());
EXPECT_EQ(it->second.enqueueTime, info.enqueueTime);
ASSERT_TRUE(it->second.waitTime);
EXPECT_EQ(*it->second.waitTime, info.waitTime);
EXPECT_GT(info.runTime.count(), 0);
it->second.ran = true;
}
Synchronized<F14FastMap<uint64_t, TaskState>> taskStates;
};
TypeParam ex{4};
auto observer = std::make_unique<TestTaskObserver>();
auto* observerPtr = observer.get();
ex.addTaskObserver(std::move(observer));
static constexpr size_t kNumTasks = 10;
for (size_t i = 0; i < kNumTasks; ++i) {
ex.add(burnMs(10));
}
ex.join();
auto ts = observerPtr->taskStates.exchange({});
EXPECT_EQ(ts.size(), kNumTasks);
for (auto& [_, taskState] : ts) {
EXPECT_TRUE(taskState.ran);
}
}
TEST(ThreadPoolExecutorTest, GetUsedCpuTime) {
#ifdef __linux__
CPUThreadPoolExecutor e(4);
ASSERT_EQ(e.numActiveThreads(), 0);
ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
// get busy
Latch latch(4);
auto busy_loop = [&] {
burnThreadCpu(1s);
latch.count_down();
};
auto idle_loop = [&] {
idleLoopFor(1s);
latch.count_down();
};
e.add(busy_loop); // +1s cpu time
e.add(busy_loop); // +1s cpu time
e.add(idle_loop); // +0s cpu time
e.add(idle_loop); // +0s cpu time
latch.wait();
// pool should have used 2s cpu time (in 1s wall clock time)
auto elapsed0 = e.getUsedCpuTime();
ASSERT_NEAR_NS(elapsed0, 2s, 100ms);
// stop all threads
e.setNumThreads(0);
ASSERT_EQ(e.numActiveThreads(), 0);
// total pool CPU time should not have changed
auto elapsed1 = e.getUsedCpuTime();
ASSERT_NEAR_NS(elapsed0, elapsed1, 100ms);
// add a thread, do nothing, cpu time should stay the same
e.setNumThreads(1);
Baton<> baton;
e.add([&] { baton.post(); });
baton.wait();
ASSERT_EQ(e.numActiveThreads(), 1);
auto elapsed2 = e.getUsedCpuTime();
ASSERT_NEAR_NS(elapsed1, elapsed2, 100ms);
// now burn some more cycles
baton.reset();
e.add([&] {
burnThreadCpu(500ms);
baton.post();
});
baton.wait();
auto elapsed3 = e.getUsedCpuTime();
ASSERT_NEAR_NS(elapsed3, elapsed2 + 500ms, 100ms);
#else
CPUThreadPoolExecutor e(1);
// Just make sure 0 is returned
ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
Baton<> baton;
e.add([&] {
auto expires = steady_clock::now() + 500ms;
while (steady_clock::now() < expires) {
}
baton.post();
});
baton.wait();
ASSERT_EQ(e.getUsedCpuTime(), nanoseconds(0));
#endif
}
template <class TPE>
static void expiration() {
TPE tpe(1);
std::atomic<int> statCbCount(0);
tpe.subscribeToTaskStats([&](const ThreadPoolExecutor::TaskStats& stats) {
int i = statCbCount++;
if (i == 0) {
EXPECT_FALSE(stats.expired);
} else if (i == 1) {
EXPECT_TRUE(stats.expired);
} else {
FAIL();
}
});
std::atomic<int> expireCbCount(0);
auto expireCb = [&]() { expireCbCount++; };
tpe.add(burnMs(10), seconds(60), expireCb);
tpe.add(burnMs(10), milliseconds(10), expireCb);
tpe.join();
EXPECT_EQ(2, statCbCount);
EXPECT_EQ(1, expireCbCount);
}
TEST(ThreadPoolExecutorTest, CPUExpiration) {
expiration<CPUThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, IOExpiration) {
expiration<IOThreadPoolExecutor>();
}
template <typename TPE>
static void futureExecutor() {
FutureExecutor<TPE> fe(2);
std::atomic<int> c{0};
fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
c++;
EXPECT_EQ(42, t.value());
});
fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
c++;
EXPECT_EQ(100, t.value());
});
fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
c++;
EXPECT_NO_THROW(t.value());
});
fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
c++;
EXPECT_NO_THROW(t.value());
});
fe.addFuture([]() {
throw std::runtime_error("oops");
}).then([&](Try<Unit>&& t) {
c++;
EXPECT_THROW(t.value(), std::runtime_error);
});
// Test doing actual async work
folly::Baton<> baton;
fe.addFuture([&]() {
auto p = std::make_shared<Promise<int>>();
std::thread t([p]() {
burnMs(10)();
p->setValue(42);
});
t.detach();
return p->getFuture();
}).then([&](Try<int>&& t) {
EXPECT_EQ(42, t.value());
c++;
baton.post();
});
baton.wait();
fe.join();
EXPECT_EQ(6, c);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, FuturePool) {
futureExecutor<TypeParam>();
}
TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
bool tookLopri = false;
auto completed = 0;
auto hipri = [&] {
EXPECT_FALSE(tookLopri);
completed++;
};
auto lopri = [&] {
tookLopri = true;
completed++;
};
CPUThreadPoolExecutor pool(0, 2);
{
VirtualExecutor ve(pool);
for (int i = 0; i < 50; i++) {
ve.addWithPriority(lopri, Executor::LO_PRI);
}
for (int i = 0; i < 50; i++) {
ve.addWithPriority(hipri, Executor::HI_PRI);
}
pool.setNumThreads(1);
}
EXPECT_EQ(100, completed);
}
class TestObserver : public ThreadPoolExecutor::Observer {
public:
void threadStarted(ThreadPoolExecutor::ThreadHandle*) override { threads_++; }
void threadStopped(ThreadPoolExecutor::ThreadHandle*) override { threads_--; }
void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
threads_++;
}
void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
threads_--;
}
void checkCalls() { ASSERT_EQ(threads_, 0); }
private:
std::atomic<int> threads_{0};
};
template <typename TPE>
static void testObserver() {
auto observer = std::make_shared<TestObserver>();
{
TPE exe(10);
exe.addObserver(observer);
exe.setNumThreads(3);
exe.setNumThreads(0);
exe.setNumThreads(7);
exe.removeObserver(observer);
exe.setNumThreads(10);
}
observer->checkCalls();
}
TYPED_TEST(ThreadPoolExecutorTypedTest, Observer) {
testObserver<TypeParam>();
}
TEST(ThreadPoolExecutorTest, AddWithPriority) {
std::atomic_int c{0};
auto f = [&] { c++; };
// IO exe doesn't support priorities
IOThreadPoolExecutor ioExe(10);
EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
// EDF exe doesn't support priorities
EDFThreadPoolExecutor edfExe(10);
EXPECT_THROW(edfExe.addWithPriority(f, 0), std::runtime_error);
CPUThreadPoolExecutor cpuExe(10, 3);
cpuExe.addWithPriority(f, -1);
cpuExe.addWithPriority(f, 0);
cpuExe.addWithPriority(f, 1);
cpuExe.addWithPriority(f, -2); // will add at the lowest priority
cpuExe.addWithPriority(f, 2); // will add at the highest priority
cpuExe.addWithPriority(f, Executor::LO_PRI);
cpuExe.addWithPriority(f, Executor::HI_PRI);
cpuExe.join();
EXPECT_EQ(7, c);
}
TEST(ThreadPoolExecutorTest, BlockingQueue) {
std::atomic_int c{0};
auto f = [&] {
burnMs(1)();
c++;
};
const int kQueueCapacity = 1;
const int kThreads = 1;
auto queue = std::make_unique<LifoSemMPMCQueue<
CPUThreadPoolExecutor::CPUTask,
QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
CPUThreadPoolExecutor cpuExe(
kThreads,
std::move(queue),
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
// Add `f` five times. It sleeps for 1ms every time. Calling
// `cppExec.add()` is *almost* guaranteed to block because there's
// only 1 cpu worker thread.
for (int i = 0; i < 5; i++) {
EXPECT_NO_THROW(cpuExe.add(f));
}
cpuExe.join();
EXPECT_EQ(5, c);
}
TEST(PriorityThreadFactoryTest, ThreadPriority) {
errno = 0;
auto currentPriority = getpriority(PRIO_PROCESS, 0);
if (errno != 0) {
throwSystemError("failed to get current priority");
}
// Non-root users can only increase the priority value. Make sure we are
// trying to go to a higher priority than we are currently running as, up to
// the maximum allowed of 20.
int desiredPriority = std::min(20, currentPriority + 1);
PriorityThreadFactory factory(
std::make_shared<NamedThreadFactory>("stuff"), desiredPriority);
int actualPriority = -21;
factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
.join();
EXPECT_EQ(desiredPriority, actualPriority);
}
TEST(InitThreadFactoryTest, InitializerCalled) {
int initializerCalledCount = 0;
InitThreadFactory factory(
std::make_shared<NamedThreadFactory>("test"),
[&initializerCalledCount] { initializerCalledCount++; });
factory
.newThread(
[&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); })
.join();
EXPECT_EQ(initializerCalledCount, 1);
}
TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
bool initializerCalled = false;
bool taskBodyCalled = false;
bool finalizerCalled = false;
InitThreadFactory factory(
std::make_shared<NamedThreadFactory>("test"),
[&] {
// thread initializer
EXPECT_FALSE(initializerCalled);
EXPECT_FALSE(taskBodyCalled);
EXPECT_FALSE(finalizerCalled);
initializerCalled = true;
},
[&] {
// thread finalizer
EXPECT_TRUE(initializerCalled);
EXPECT_TRUE(taskBodyCalled);
EXPECT_FALSE(finalizerCalled);
finalizerCalled = true;
});
factory
.newThread([&]() {
EXPECT_TRUE(initializerCalled);
EXPECT_FALSE(taskBodyCalled);
EXPECT_FALSE(finalizerCalled);
taskBodyCalled = true;
})
.join();
EXPECT_TRUE(initializerCalled);
EXPECT_TRUE(taskBodyCalled);
EXPECT_TRUE(finalizerCalled);
}
class TestData : public folly::RequestData {
public:
explicit TestData(int data) : data_(data) {}
~TestData() override {}
bool hasCallback() override { return false; }
int data_;
};
TEST(ThreadPoolExecutorTest, RequestContext) {
RequestContextScopeGuard rctx; // create new request context for this scope
EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
auto data = RequestContext::get()->getContextData("test");
EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
static constexpr auto verifyRequestContext = +[] {
auto data2 = RequestContext::get()->getContextData("test");
EXPECT_TRUE(data2 != nullptr);
if (data2 != nullptr) {
EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_);
}
};
{
CPUThreadPoolExecutor executor(1);
executor.add([] { verifyRequestContext(); });
executor.add([x = makeGuard(verifyRequestContext)] {});
}
}
std::atomic<int> g_sequence{};
struct SlowMover {
explicit SlowMover(bool slow_ = false) : slow(slow_) {}
SlowMover(SlowMover&& other) noexcept { *this = std::move(other); }
SlowMover& operator=(SlowMover&& other) noexcept {
++g_sequence;
slow = other.slow;
if (slow) {
/* sleep override */ std::this_thread::sleep_for(milliseconds(50));
}
++g_sequence;
return *this;
}
bool slow;
};
template <typename Q>
void bugD3527722_test() {
// Test that the queue does not get stuck if writes are completed in
// order opposite to how they are initiated.
Q q(1024);
std::atomic<int> turn{};
std::thread consumer1([&] {
++turn;
q.take();
});
std::thread consumer2([&] {
++turn;
q.take();
});
std::thread producer1([&] {
++turn;
while (turn < 4) {
;
}
++turn;
q.add(SlowMover(true));
});
std::thread producer2([&] {
++turn;
while (turn < 5) {
;
}
q.add(SlowMover(false));
});
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
}
TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
}
template <typename T>
struct UBQ : public UnboundedBlockingQueue<T> {
explicit UBQ(int) {}
};
TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
bugD3527722_test<UBQ<SlowMover>>();
}
template <typename Q>
void nothrow_not_full_test() {
/* LifoSemMPMCQueue should not throw when not full when active
consumers are delayed. */
Q q(2);
g_sequence = 0;
std::thread consumer1([&] {
while (g_sequence < 4) {
;
}
q.take(); // ++g_sequence to 5 then slow
});
std::thread consumer2([&] {
while (g_sequence < 5) {
;
}
q.take(); // ++g_sequence to 6 and 7 - fast
});
std::thread producer([&] {
q.add(SlowMover(true)); // ++g_sequence to 1 and 2
q.add(SlowMover(false)); // ++g_sequence to 3 and 4
while (g_sequence < 7) { // g_sequence == 7 implies queue is not full
;
}
EXPECT_NO_THROW(q.add(SlowMover(false)));
});
producer.join();
consumer1.join();
consumer2.join();
}
TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueNoThrowNotFull) {
nothrow_not_full_test<LifoSemMPMCQueue<SlowMover>>();
}
template <typename TPE>
static void removeThreadTest() {
// test that adding a .then() after we have removed some threads
// doesn't cause deadlock and they are executed on different threads
folly::Optional<folly::Future<int>> f;
std::thread::id id1, id2;
TPE fe(2);
f = folly::makeFuture()
.via(&fe)
.thenValue([&id1](auto&&) {
burnMs(100)();
id1 = std::this_thread::get_id();
})
.thenValue([&id2](auto&&) {
return 77;
id2 = std::this_thread::get_id();
});
fe.setNumThreads(1);
// future::then should be fulfilled because there is other thread available
EXPECT_EQ(77, std::move(*f).get());
// two thread should be different because then part should be rescheduled to
// the other thread
EXPECT_NE(id1, id2);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, RemoveThread) {
removeThreadTest<TypeParam>();
}
template <typename TPE>
static void resizeThreadWhileExecutingTest() {
TPE tpe(10);
EXPECT_EQ(10, tpe.numThreads());
std::atomic<int> completed(0);
auto f = [&]() {
burnMs(10)();
completed++;
};
for (int i = 0; i < 1000; i++) {
tpe.add(f);
}
tpe.setNumThreads(8);
EXPECT_EQ(8, tpe.numThreads());
tpe.setNumThreads(5);
EXPECT_EQ(5, tpe.numThreads());
tpe.setNumThreads(15);
EXPECT_EQ(15, tpe.numThreads());
tpe.join();
EXPECT_EQ(1000, completed);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, ResizeThreadWhileExecuting) {
resizeThreadWhileExecutingTest<TypeParam>();
}
template <typename TPE>
void keepAliveTest() {
auto executor = std::make_unique<TPE>(4);
auto f = futures::sleep(std::chrono::milliseconds{100})
.via(executor.get())
.thenValue([keepAlive = getKeepAliveToken(executor.get())](
auto&&) { return 42; })
.semi();
executor.reset();
EXPECT_TRUE(f.isReady());
EXPECT_EQ(42, std::move(f).get());
}
TYPED_TEST(ThreadPoolExecutorTypedTest, KeepAlive) {
keepAliveTest<TypeParam>();
}
int getNumThreadPoolExecutors() {
int count = 0;
ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; });
return count;
}
template <typename TPE>
static void registersToExecutorListTest() {
EXPECT_EQ(0, getNumThreadPoolExecutors());
{
TPE tpe(10);
EXPECT_EQ(1, getNumThreadPoolExecutors());
{
TPE tpe2(5);
EXPECT_EQ(2, getNumThreadPoolExecutors());
}
EXPECT_EQ(1, getNumThreadPoolExecutors());
}
EXPECT_EQ(0, getNumThreadPoolExecutors());
}
TYPED_TEST(ThreadPoolExecutorTypedTest, RegistersToExecutorList) {
registersToExecutorListTest<TypeParam>();
}
template <typename TPE>
static void testUsesNameFromNamedThreadFactory() {
// Verify that the name is propagated even if the NamedThreadFactory is
// wrapped.
auto tf = std::make_shared<InitThreadFactory>(
std::make_shared<NamedThreadFactory>("my_executor"), [] {});
TPE tpe(10, tf);
EXPECT_EQ("my_executor", tpe.getName());
}
TYPED_TEST(ThreadPoolExecutorTypedTest, UsesNameFromNamedThreadFactory) {
testUsesNameFromNamedThreadFactory<TypeParam>();
}
TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
boost::barrier barrier{3};
auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
CPUThreadPoolExecutor e(2);
e.setThreadDeathTimeout(std::chrono::milliseconds(100));
e.add(twice_waiting_task);
e.add(twice_waiting_task);
barrier.wait(); // ensure both tasks are mid-flight
EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check";
auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; };
EXPECT_FALSE(pred()) << "sanity check";
barrier.wait(); // let both mid-flight tasks complete
EXPECT_EQ(
folly::detail::spin_result::success,
folly::detail::spin_yield_until(
std::chrono::steady_clock::now() + std::chrono::seconds(1), pred));
}
TEST(ThreadPoolExecutorTest, GetThreadIdCollector) {
CPUThreadPoolExecutor e(1);
auto* collector = e.getThreadIdCollector();
ASSERT_TRUE(collector != nullptr);
EXPECT_THAT(collector->collectThreadIds().threadIds, testing::IsEmpty());
pid_t tid;
Baton<> ready;
Baton<> unblock;
e.add([&] {
tid = getOSThreadID();
ready.post();
unblock.wait(); // Wait until we acquire the keepalive.
});
ready.wait();
auto ids = collector->collectThreadIds();
EXPECT_THAT(ids.threadIds, testing::ElementsAre(tid));
unblock.post();
Baton<> joined;
std::thread t([&] {
e.join();
joined.post();
});
// Until we release ids, the executor join cannot complete
EXPECT_FALSE(joined.try_wait_for(100ms));
// But things should eventually complete when released.
ids = {};
t.join();
EXPECT_THAT(collector->collectThreadIds().threadIds, testing::IsEmpty());
}
TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
CPUThreadPoolExecutor e(1);
e.setThreadDeathTimeout(std::chrono::milliseconds(0));
std::atomic<uint64_t> count{0};
for (int i = 0; i < 10000; i++) {
Baton<> b;
e.add([&]() {
count.fetch_add(1, std::memory_order_relaxed);
b.post();
});
b.wait();
}
e.join();
EXPECT_EQ(count, 10000);
}
TEST(ThreadPoolExecutorTest, AddPerf) {
auto queue = std::make_unique<
UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>>();
CPUThreadPoolExecutor e(
1000,
std::move(queue),
std::make_shared<NamedThreadFactory>("CPUThreadPool"));
e.setThreadDeathTimeout(std::chrono::milliseconds(1));
for (int i = 0; i < 10000; i++) {
e.add([&, ka = getKeepAliveToken(e)]() {
// holding a keep-alive here permits the following add()
// to occur safely, concurrently with the stop() below
e.add([]() { /* sleep override */ usleep(1000); });
});
}
e.stop();
}
class ExecutorWorkerProviderTest : public ::testing::Test {
protected:
void SetUp() override { kWorkerProviderGlobal = nullptr; }
void TearDown() override { kWorkerProviderGlobal = nullptr; }
};
TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBasicTest) {
// Start 4 threads and have all of them work on a task.
// Then invoke the ThreadIdCollector::collectThreadIds()
// method to capture the set of active thread ids.
boost::barrier barrier{5};
Synchronized<std::vector<pid_t>> expectedTids;
auto task = [&]() {
expectedTids.wlock()->push_back(folly::getOSThreadID());
barrier.wait();
};
CPUThreadPoolExecutor e(4);
for (int i = 0; i < 4; ++i) {
e.add(task);
}
barrier.wait();
{
const auto threadIdsWithKA = kWorkerProviderGlobal->collectThreadIds();
const auto& ids = threadIdsWithKA.threadIds;
auto locked = expectedTids.rlock();
EXPECT_EQ(ids.size(), locked->size());
EXPECT_TRUE(std::is_permutation(ids.begin(), ids.end(), locked->begin()));
}
e.join();
}
TEST_F(ExecutorWorkerProviderTest, ThreadCollectorMultipleInvocationTest) {
// Run some tasks via the executor and invoke
// WorkerProvider::collectThreadIds() at least twice to make sure that there
// is no deadlock in repeated invocations.
CPUThreadPoolExecutor e(1);
e.add([&]() {});
{
auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
auto& ids1 = idsWithKA1.threadIds;
auto& ids2 = idsWithKA2.threadIds;
EXPECT_EQ(ids1.size(), 1);
EXPECT_EQ(ids1.size(), ids2.size());
EXPECT_EQ(ids1, ids2);
}
// Add some more threads and schedule tasks while the collector
// is capturing thread Ids.
std::array<folly::Baton<>, 4> bats;
{
auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
e.setNumThreads(4);
for (size_t i = 0; i < 4; ++i) {
e.add([i, &bats]() { bats[i].wait(); });
}
for (auto& bat : bats) {
bat.post();
}
auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
auto& ids1 = idsWithKA1.threadIds;
auto& ids2 = idsWithKA2.threadIds;
EXPECT_EQ(ids1.size(), 1);
EXPECT_EQ(ids2.size(), 4);
}
e.join();
}
TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBlocksThreadExitTest) {
// We need to ensure that the collector's keep alive effectively
// blocks the executor's threads from exiting. This is done by verifying
// that a call to reduce the worker count via setNumThreads() does not
// actually reduce the workers (kills threads) while the keep alive is
// in scope.
constexpr size_t kNumThreads = 4;
std::array<folly::Baton<>, kNumThreads> bats;
CPUThreadPoolExecutor e(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
e.add([i, &bats]() { bats[i].wait(); });
}
Baton<> baton;
Baton<> threadCountBaton;
auto bgCollector = std::thread([&]() {
{
auto idsWithKA = kWorkerProviderGlobal->collectThreadIds();
baton.post();
// Since this thread is holding the KeepAlive, it should block
// the main thread's `setNumThreads()` call which is trying to
// reduce the thread count of the executor. We verify that by
// checking that the baton isn't posted after a 100ms wait.
auto posted =
threadCountBaton.try_wait_for(std::chrono::milliseconds(100));
EXPECT_FALSE(posted);
auto& ids = idsWithKA.threadIds;
// The thread count should still be 4 since the collector's
// keep alive is active. To further verify that the threads are
EXPECT_EQ(ids.size(), kNumThreads);
}
});
baton.wait();
for (auto& bat : bats) {
bat.post();
}
e.setNumThreads(2);
threadCountBaton.post();
bgCollector.join();
// The thread count should now be reduced to 2.
EXPECT_EQ(e.numThreads(), 2);
e.join();
}
template <typename TPE>
static void WeakRefTest() {
// test that adding a .then() after we have
// started shutting down does not deadlock
folly::Optional<folly::Future<folly::Unit>> f;
int counter{0};
{
TPE fe(1);
f = folly::makeFuture()
.via(&fe)
.thenValue([](auto&&) { burnMs(100)(); })
.thenValue([&](auto&&) { ++counter; })
.via(getWeakRef(fe))
.thenValue([](auto&&) { burnMs(100)(); })
.thenValue([&](auto&&) { ++counter; });
}
EXPECT_THROW(std::move(*f).get(), folly::BrokenPromise);
EXPECT_EQ(1, counter);
}
template <typename TPE>
static void virtualExecutorTest() {
using namespace std::literals;
folly::Optional<folly::SemiFuture<folly::Unit>> f;
int counter{0};
{
TPE fe(1);
{
VirtualExecutor ve(fe);
f = futures::sleep(100ms)
.via(&ve)
.thenValue([&](auto&&) {
++counter;
return futures::sleep(100ms);
})
.via(&fe)
.thenValue([&](auto&&) { ++counter; })
.semi();
}
EXPECT_EQ(1, counter);
bool functionDestroyed{false};
bool functionCalled{false};
{
VirtualExecutor ve(fe);
auto guard = makeGuard([&functionDestroyed] {
std::this_thread::sleep_for(100ms);
functionDestroyed = true;
});
ve.add([&functionCalled, guard = std::move(guard)] {
functionCalled = true;
});
}
EXPECT_TRUE(functionCalled);
EXPECT_TRUE(functionDestroyed);
}
EXPECT_TRUE(f->isReady());
EXPECT_NO_THROW(std::move(*f).get());
EXPECT_EQ(2, counter);
}
class SingleThreadedCPUThreadPoolExecutor : public CPUThreadPoolExecutor,
public SequencedExecutor {
public:
explicit SingleThreadedCPUThreadPoolExecutor(size_t)
: CPUThreadPoolExecutor(1) {}
~SingleThreadedCPUThreadPoolExecutor() override { stop(); }
};
TYPED_TEST(ThreadPoolExecutorTypedTest, WeakRef) {
WeakRefTest<TypeParam>();
}
TEST(ThreadPoolExecutorTest, WeakRefTestSingleThreadedCPU) {
WeakRefTest<SingleThreadedCPUThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, WeakRefTestSequential) {
SingleThreadedCPUThreadPoolExecutor ex(1);
auto weakRef = getWeakRef(ex);
EXPECT_TRUE((std::is_same_v<
decltype(weakRef),
Executor::KeepAlive<SequencedExecutor>>));
}
TYPED_TEST(ThreadPoolExecutorTypedTest, VirtualExecutor) {
virtualExecutorTest<TypeParam>();
}
// Test use of guard inside executors
template <class TPE>
static void currentThreadTest(folly::StringPiece executorName) {
folly::Optional<ExecutorBlockingContext> ctx{};
TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
tpe.add([&ctx]() { ctx = getExecutorBlockingContext(); });
tpe.join();
EXPECT_EQ(ctx->tag, executorName);
}
// Test the nesting of the permit guard
template <class TPE>
static void currentThreadTestDisabled(folly::StringPiece executorName) {
folly::Optional<ExecutorBlockingContext> ctxPermit{};
folly::Optional<ExecutorBlockingContext> ctxForbid{};
TPE tpe(1, std::make_shared<NamedThreadFactory>(executorName));
tpe.add([&]() {
{
// Nest the guard that permits blocking
ExecutorBlockingGuard guard{ExecutorBlockingGuard::PermitTag{}};
ctxPermit = getExecutorBlockingContext();
}
ctxForbid = getExecutorBlockingContext();
});
tpe.join();
EXPECT_TRUE(!ctxPermit.has_value());
EXPECT_EQ(ctxForbid->tag, executorName);
}
TYPED_TEST(ThreadPoolExecutorTypedTest, CurrentThreadExecutor) {
currentThreadTest<TypeParam>("ExecutorName");
currentThreadTestDisabled<TypeParam>("ExecutorName");
}