/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/GlobalExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/IOExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/executors/VirtualExecutor.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
#include <folly/synchronization/SaturatingSemaphore.h>
#include <folly/system/HardwareConcurrency.h>
using namespace folly;
TEST(GlobalExecutorTest, GlobalImmutableCPUExecutor) {
folly::Baton<> b;
auto count = 0;
auto f = [&]() {
count++;
b.post();
};
// Don't explode, we should create the default global CPUExecutor lazily here.
getGlobalCPUExecutor()->add(f);
b.wait();
EXPECT_EQ(1, count);
}
TEST(GlobalExecutorTest, GlobalImmutableIOExecutor) {
folly::Baton<> b;
auto count = 0;
auto f = [&]() {
count++;
b.post();
};
// Don't explode, we should create the default global CPUExecutor lazily here.
getGlobalIOExecutor()->add(f);
b.wait();
EXPECT_EQ(1, count);
}
TEST(GlobalExecutorTest, GlobalCPUExecutor) {
class DummyExecutor : public folly::Executor {
public:
void add(Func fn) override {
fn();
count++;
}
int count{0};
};
// The default CPU executor is a thread pool
auto f = [&]() {};
// Don't explode, we should create the default global CPUExecutor lazily here.
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
getCPUExecutor()->add(f);
{
auto dummy = std::make_shared<DummyExecutor>();
setCPUExecutor(dummy);
getCPUExecutor()->add(f);
// Make sure we were properly installed.
EXPECT_EQ(1, dummy->count);
}
// Don't explode, we should restore the default global CPUExecutor because our
// weak reference to dummy has expired
getCPUExecutor()->add(f);
FOLLY_POP_WARNING
}
TEST(GlobalExecutorTest, GlobalIOExecutor) {
class DummyExecutor : public IOExecutor {
public:
void add(Func) override { count++; }
folly::EventBase* getEventBase() override { return nullptr; }
int count{0};
};
auto f = []() {};
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
// Don't explode, we should create the default global IOExecutor lazily here.
getIOExecutor()->add(f);
{
auto dummy = std::make_shared<DummyExecutor>();
setIOExecutor(dummy);
getIOExecutor()->add(f);
// Make sure we were properly installed.
EXPECT_EQ(1, dummy->count);
}
// Don't explode, we should restore the default global IOExecutor because our
// weak reference to dummy has expired
getIOExecutor()->add(f);
FOLLY_POP_WARNING
}
TEST(GlobalExecutorTest, IOThreadCountFlagUnset) {
gflags::FlagSaver flagsaver;
auto io_threadpool = dynamic_cast<folly::IOThreadPoolExecutor*>(
folly::getGlobalIOExecutor().get());
EXPECT_EQ(io_threadpool->numThreads(), folly::hardware_concurrency());
}
TEST(GlobalExecutorTest, CPUThreadCountFlagUnset) {
gflags::FlagSaver flagsaver;
EXPECT_EQ(
getGlobalCPUExecutorCounters().numThreads, folly::hardware_concurrency());
}
TEST(GlobalExecutorTest, GetGlobalCPUExecutorCounters) {
const size_t numThreads = getGlobalCPUExecutorCounters().numThreads;
const size_t numTasks = 1000 + numThreads;
// Makes all tasks block until we're done.
folly::SaturatingSemaphore<> block;
// Ensures that the semaphore is alive until all tasks have run.
folly::VirtualExecutor ex(getGlobalCPUExecutor());
for (size_t i = 0; i < numTasks; ++i) {
ex.add([&] { block.wait(); });
}
while (true) {
auto counters = getGlobalCPUExecutorCounters();
// We don't know how many tasks have been picked up, but we know they have
// to be at most numThreads because they're blocked.
EXPECT_GE(counters.numPendingTasks, numTasks - numThreads);
EXPECT_LE(counters.numPendingTasks, numTasks);
// Eventually, the executor should start all the available threads. If not,
// the test will deadlock (and thus time out).
if (counters.numActiveThreads == counters.numThreads) {
break;
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// Unblock everything so we can shut down.
block.post();
}