/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#include <folly/CancellationToken.h>
#include <folly/Chrono.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/Coroutine.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/DetachOnCancel.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/TimedWait.h>
#include <folly/experimental/coro/WithCancellation.h>
#include <folly/fibers/Semaphore.h>
#include <folly/futures/Future.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/lang/Assume.h>
#include <folly/portability/GTest.h>
#if FOLLY_HAS_COROUTINES
using namespace folly;
class CoroTest : public testing::Test {};
TEST_F(CoroTest, Basic) {
ManualExecutor executor;
auto task42 = []() -> coro::Task<int> { co_return 42; };
auto future = task42().scheduleOn(&executor).start();
EXPECT_FALSE(future.isReady());
executor.drain();
EXPECT_TRUE(future.isReady());
EXPECT_EQ(42, std::move(future).get());
}
TEST_F(CoroTest, BasicSemiFuture) {
ManualExecutor executor;
auto task42 = []() -> coro::Task<int> { co_return 42; };
auto future = task42().semi().via(&executor);
EXPECT_FALSE(future.isReady());
executor.drain();
EXPECT_TRUE(future.isReady());
EXPECT_EQ(42, std::move(future).get());
}
TEST_F(CoroTest, BasicFuture) {
ManualExecutor executor;
auto task42 = []() -> coro::Task<int> { co_return 42; };
auto future = task42().scheduleOn(&executor).start();
EXPECT_FALSE(future.isReady());
EXPECT_EQ(42, std::move(future).via(&executor).getVia(&executor));
}
coro::Task<void> taskVoid() {
auto task42 = []() -> coro::Task<int> { co_return 42; };
(void)co_await task42();
co_return;
}
TEST_F(CoroTest, Basic2) {
ManualExecutor executor;
auto future = taskVoid().scheduleOn(&executor).start();
EXPECT_FALSE(future.isReady());
executor.drain();
EXPECT_TRUE(future.isReady());
}
TEST_F(CoroTest, TaskOfMoveOnly) {
auto f = []() -> coro::Task<std::unique_ptr<int>> {
co_return std::make_unique<int>(123);
};
auto p = coro::blockingWait(f());
EXPECT_TRUE(p);
EXPECT_EQ(123, *p);
}
coro::Task<void> taskSleep() {
(void)co_await coro::sleep(std::chrono::seconds{1});
co_return;
}
TEST_F(CoroTest, Sleep) {
ScopedEventBaseThread evbThread;
auto startTime = std::chrono::steady_clock::now();
auto task = taskSleep().scheduleOn(evbThread.getEventBase());
coro::blockingWait(std::move(task));
// The total time should be roughly 1 second. Some builds, especially
// optimized ones, may result in slightly less than 1 second, so we perform
// rounding here.
auto totalTime = std::chrono::steady_clock::now() - startTime;
EXPECT_GE(
chrono::round<std::chrono::seconds>(totalTime), std::chrono::seconds{1});
}
TEST_F(CoroTest, ExecutorKeepAlive) {
auto future = [] {
ScopedEventBaseThread evbThread;
return taskSleep().scheduleOn(evbThread.getEventBase()).start();
}();
EXPECT_TRUE(future.isReady());
}
TEST_F(CoroTest, ExecutorKeepAliveDummy) {
struct CountingExecutor : public ManualExecutor {
bool keepAliveAcquire() noexcept override {
++keepAliveCounter;
return true;
}
void keepAliveRelease() noexcept override { --keepAliveCounter; }
size_t keepAliveCounter{0};
};
struct ExecutorRec {
static coro::Task<void> go(int depth) {
if (depth == 0) {
co_return;
}
auto executor =
dynamic_cast<CountingExecutor*>(co_await coro::co_current_executor);
DCHECK(executor);
// Note, extra keep-alives are being kept by the Futures.
EXPECT_EQ(3, executor->keepAliveCounter);
co_await go(depth - 1);
}
};
CountingExecutor executor;
ExecutorRec::go(42).scheduleOn(&executor).start().via(&executor).getVia(
&executor);
}
TEST_F(CoroTest, FutureThrow) {
auto taskException = []() -> coro::Task<int> {
throw std::runtime_error("Test exception");
co_return 42;
};
ManualExecutor executor;
auto future = taskException().scheduleOn(&executor).start();
EXPECT_FALSE(future.isReady());
executor.drain();
EXPECT_TRUE(future.isReady());
EXPECT_THROW(std::move(future).get(), std::runtime_error);
}
coro::Task<int> taskRecursion(int depth) {
if (depth > 0) {
EXPECT_EQ(depth - 1, co_await taskRecursion(depth - 1));
} else {
(void)co_await coro::sleep(std::chrono::seconds{1});
}
co_return depth;
}
TEST_F(CoroTest, LargeStack) {
ScopedEventBaseThread evbThread;
auto task = taskRecursion(50000).scheduleOn(evbThread.getEventBase());
EXPECT_EQ(50000, coro::blockingWait(std::move(task)));
}
TEST_F(CoroTest, NestedThreads) {
auto taskThreadNested = [](std::thread::id threadId) -> coro::Task<void> {
EXPECT_EQ(threadId, std::this_thread::get_id());
(void)co_await coro::sleep(std::chrono::seconds{1});
EXPECT_EQ(threadId, std::this_thread::get_id());
co_return;
};
auto taskThread = [&]() -> coro::Task<int> {
auto threadId = std::this_thread::get_id();
// BUG: Under @mode/clang-opt builds this object is placed on the coroutine
// frame and the code for the constructor assumes that it is allocated on
// a 16-byte boundary. However, when placed in the coroutine frame it can
// end up at a location that is not 16-byte aligned. This causes a SIGSEGV
// when performing a store to members that uses SSE instructions.
folly::ScopedEventBaseThread evbThread;
co_await taskThreadNested(evbThread.getThreadId())
.scheduleOn(evbThread.getEventBase());
EXPECT_EQ(threadId, std::this_thread::get_id());
co_return 42;
};
ScopedEventBaseThread evbThread;
auto task = taskThread().scheduleOn(evbThread.getEventBase());
EXPECT_EQ(42, coro::blockingWait(std::move(task)));
}
TEST_F(CoroTest, CurrentExecutor) {
auto taskGetCurrentExecutor = [](Executor* executor) -> coro::Task<int> {
auto current = co_await coro::co_current_executor;
EXPECT_EQ(executor, current);
auto task42 = []() -> coro::Task<int> { co_return 42; };
co_return co_await task42().scheduleOn(current);
};
ScopedEventBaseThread evbThread;
auto task = taskGetCurrentExecutor(evbThread.getEventBase())
.scheduleOn(evbThread.getEventBase());
EXPECT_EQ(42, coro::blockingWait(std::move(task)));
}
TEST_F(CoroTest, TimedWaitFuture) {
auto taskTimedWaitFuture = []() -> coro::Task<void> {
auto ex = co_await coro::co_current_executor;
auto fastFuture = futures::sleep(std::chrono::milliseconds{50})
.via(ex)
.thenValue([](Unit) { return 42; });
auto fastResult = co_await coro::timed_wait(
std::move(fastFuture), std::chrono::milliseconds{100});
EXPECT_TRUE(fastResult);
EXPECT_EQ(42, *fastResult);
struct ExpectedException : public std::runtime_error {
ExpectedException() : std::runtime_error("ExpectedException") {}
};
auto throwingFuture =
futures::sleep(std::chrono::milliseconds{50})
.via(ex)
.thenValue([](Unit) { throw ExpectedException(); });
EXPECT_THROW(
(void)co_await coro::timed_wait(
std::move(throwingFuture), std::chrono::milliseconds{100}),
ExpectedException);
auto [lifetimePromise, lifetimeFuture] =
folly::makePromiseContract<folly::Unit>(ex);
auto slowFuture =
futures::sleep(std::chrono::milliseconds{200})
.via(ex)
.thenValue(
[lifetimePromise = std::move(lifetimePromise)](Unit) mutable {
lifetimePromise.setValue();
return 42;
});
auto slowResult = co_await coro::timed_wait(
std::move(slowFuture), std::chrono::milliseconds{100});
EXPECT_FALSE(slowResult);
// Ensure that task completes for safe executor lifetimes
(void)co_await std::move(lifetimeFuture);
co_return;
};
coro::blockingWait(taskTimedWaitFuture());
}
TEST_F(CoroTest, TimedWaitTask) {
auto taskTimedWaitTask = []() -> coro::Task<void> {
auto fastTask = []() -> coro::Task<int> {
co_await coro::sleep(std::chrono::milliseconds{50});
co_return 42;
}();
auto fastResult = co_await coro::timed_wait(
std::move(fastTask), std::chrono::milliseconds{100});
EXPECT_TRUE(fastResult);
EXPECT_EQ(42, *fastResult);
struct ExpectedException : public std::runtime_error {
ExpectedException() : std::runtime_error("ExpectedException") {}
};
auto throwingTask = []() -> coro::Task<void> {
co_await coro::sleep(std::chrono::milliseconds{50});
throw ExpectedException();
}();
EXPECT_THROW(
(void)co_await coro::timed_wait(
std::move(throwingTask), std::chrono::milliseconds{100}),
ExpectedException);
auto slowTask = []() -> coro::Task<int> {
co_await futures::sleep(std::chrono::milliseconds{200});
co_return 42;
}();
auto slowResult = co_await coro::timed_wait(
std::move(slowTask), std::chrono::milliseconds{100});
EXPECT_FALSE(slowResult);
co_return;
};
coro::blockingWait(taskTimedWaitTask());
}
TEST_F(CoroTest, TimedWaitKeepAlive) {
auto start = std::chrono::steady_clock::now();
coro::blockingWait([]() -> coro::Task<void> {
co_await coro::timed_wait(
coro::sleep(std::chrono::milliseconds{100}), std::chrono::seconds{60});
co_return;
}());
auto duration = std::chrono::steady_clock::now() - start;
EXPECT_LE(duration, std::chrono::seconds{30});
}
TEST_F(CoroTest, TimedWaitNonCopyable) {
auto task = []() -> coro::Task<std::unique_ptr<int>> {
co_return std::make_unique<int>(42);
}();
EXPECT_EQ(
42,
**coro::blockingWait(
[&]() -> coro::Task<folly::Optional<std::unique_ptr<int>>> {
co_return co_await coro::timed_wait(
std::move(task), std::chrono::seconds{60});
}()));
}
namespace {
template <int value>
struct AwaitableInt {
bool await_ready() const { return true; }
bool await_suspend(coro::coroutine_handle<>) { assume_unreachable(); }
int await_resume() { return value; }
};
struct AwaitableWithOperator {};
AwaitableInt<42> operator co_await(const AwaitableWithOperator&) {
return {};
}
struct AwaitableWithMemberOperator {
AwaitableInt<42> operator co_await() { return {}; }
};
[[maybe_unused]] AwaitableInt<24> operator co_await(
const AwaitableWithMemberOperator&) {
return {};
}
} // namespace
TEST_F(CoroTest, AwaitableWithOperator) {
auto taskAwaitableWithOperator = []() -> coro::Task<int> {
co_return co_await AwaitableWithOperator();
};
EXPECT_EQ(42, coro::blockingWait(taskAwaitableWithOperator()));
}
TEST_F(CoroTest, AwaitableWithMemberOperator) {
auto taskAwaitableWithMemberOperator = []() -> coro::Task<int> {
co_return co_await AwaitableWithMemberOperator();
};
EXPECT_EQ(42, coro::blockingWait(taskAwaitableWithMemberOperator()));
}
TEST_F(CoroTest, Baton) {
auto taskBaton = [](fibers::Baton& baton) -> coro::Task<int> {
co_await baton;
co_return 42;
};
ManualExecutor executor;
fibers::Baton baton;
auto future = taskBaton(baton).scheduleOn(&executor).start();
EXPECT_FALSE(future.isReady());
executor.drain();
EXPECT_FALSE(future.isReady());
baton.post();
executor.drain();
EXPECT_TRUE(future.isReady());
EXPECT_EQ(42, std::move(future).get());
}
TEST_F(CoroTest, FulfilledFuture) {
auto taskFuture = [](auto value) -> coro::Task<decltype(value)> {
co_return co_await folly::makeFuture(std::move(value));
};
EXPECT_EQ(42, coro::blockingWait(taskFuture(42)));
}
TEST_F(CoroTest, MoveOnlyReturn) {
auto taskFuture = [](auto value) -> coro::Task<decltype(value)> {
co_return co_await folly::makeFuture(std::move(value));
};
EXPECT_EQ(42, *coro::blockingWait(taskFuture(std::make_unique<int>(42))));
}
TEST_F(CoroTest, co_invoke) {
ManualExecutor executor;
Promise<folly::Unit> p;
auto coroFuture =
coro::co_invoke([f = p.getSemiFuture()]() mutable -> coro::Task<void> {
(void)co_await std::move(f);
co_return;
})
.scheduleOn(&executor)
.start();
executor.drain();
EXPECT_FALSE(coroFuture.isReady());
p.setValue(folly::unit);
executor.drain();
EXPECT_TRUE(coroFuture.isReady());
}
TEST_F(CoroTest, Semaphore) {
static constexpr size_t kTasks = 10;
static constexpr size_t kIterations = 10000;
static constexpr size_t kNumTokens = 10;
static constexpr size_t kNumThreads = 16;
fibers::Semaphore sem(kNumTokens);
struct Worker {
explicit Worker(fibers::Semaphore& s) : sem(s), t([&] { run(); }) {}
void run() {
folly::EventBase evb;
{
std::shared_ptr<folly::EventBase> completionCounter(
&evb, [](folly::EventBase* evb_) { evb_->terminateLoopSoon(); });
for (size_t i = 0; i < kTasks; ++i) {
coro::co_invoke([&, completionCounter]() -> coro::Task<void> {
for (size_t j = 0; j < kIterations; ++j) {
co_await sem.co_wait();
++counter;
sem.signal();
--counter;
EXPECT_LT(counter, kNumTokens);
EXPECT_GE(counter, 0);
}
})
.scheduleOn(&evb)
.start();
}
}
evb.loopForever();
}
fibers::Semaphore& sem;
int counter{0};
std::thread t;
};
std::vector<Worker> workers;
workers.reserve(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
workers.emplace_back(sem);
}
for (auto& worker : workers) {
worker.t.join();
}
for (auto& worker : workers) {
EXPECT_EQ(0, worker.counter);
}
}
TEST_F(CoroTest, SemaphoreWaitWhenCancellationAlreadyRequested) {
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
// Run some logic while in an already-cancelled state.
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
folly::fibers::Semaphore sem{1};
// If in a signalled state then should complete normally
co_await sem.co_wait();
// And the semaphore should no longer be in the signalled state.
// But if not signalled then should complete with cancellation
// immediately.
EXPECT_THROW(co_await sem.co_wait(), folly::OperationCancelled);
}());
}());
}
TEST_F(CoroTest, CancelOutstandingSemaphoreWait) {
struct ExpectedError : std::exception {};
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::fibers::Semaphore sem{0};
try {
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<> {
EXPECT_THROW(co_await sem.co_wait(), OperationCancelled);
}(),
[]() -> folly::coro::Task<> {
co_await folly::coro::co_reschedule_on_current_executor;
// Completing the second task with an error will cause
// collectAll() to request cancellation of the other task
// whcih should reqeust cancellation of sem.co_wait().
co_yield folly::coro::co_error(ExpectedError{});
}());
} catch (const ExpectedError&) {
}
}());
}
TEST_F(CoroTest, CancelOneSemaphoreWaitDoesNotAffectOthers) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::fibers::Semaphore sem{0};
folly::CancellationSource cancelSource;
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<> {
EXPECT_THROW(
(co_await folly::coro::co_withCancellation(
cancelSource.getToken(), sem.co_wait())),
OperationCancelled);
}(),
[&]() -> folly::coro::Task<> { co_await sem.co_wait(); }(),
[&]() -> folly::coro::Task<> {
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
sem.signal();
}());
}());
}
TEST_F(CoroTest, FutureTry) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
auto task42 = []() -> coro::Task<int> { co_return 42; };
auto taskException = []() -> coro::Task<int> {
throw std::runtime_error("Test exception");
co_return 42;
};
{
auto result = co_await folly::coro::co_awaitTry(task42().semi());
EXPECT_TRUE(result.hasValue());
EXPECT_EQ(42, result.value());
}
{
auto result = co_await folly::coro::co_awaitTry(taskException().semi());
EXPECT_TRUE(result.hasException());
}
{
auto result = co_await folly::coro::co_awaitTry(
task42().semi().via(co_await folly::coro::co_current_executor));
EXPECT_TRUE(result.hasValue());
EXPECT_EQ(42, result.value());
}
{
auto result =
co_await folly::coro::co_awaitTry(taskException().semi().via(
co_await folly::coro::co_current_executor));
EXPECT_TRUE(result.hasException());
}
}());
}
TEST_F(CoroTest, CancellableSleep) {
using namespace std::chrono;
using namespace std::chrono_literals;
CancellationSource cancelSrc;
auto start = steady_clock::now();
EXPECT_THROW(
coro::blockingWait([&]() -> coro::Task<void> {
co_await coro::collectAll(
[&]() -> coro::Task<void> {
co_await coro::co_withCancellation(
cancelSrc.getToken(), coro::sleep(10s));
}(),
[&]() -> coro::Task<void> {
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
co_await coro::co_reschedule_on_current_executor;
cancelSrc.requestCancellation();
}());
}()),
OperationCancelled);
auto end = steady_clock::now();
CHECK((end - start) < 1s);
}
TEST_F(CoroTest, DefaultConstructible) {
coro::blockingWait([]() -> coro::Task<void> {
struct S {
int x = 42;
};
auto taskS = []() -> coro::Task<S> { co_return {}; };
auto s = co_await taskS();
EXPECT_EQ(42, s.x);
}());
}
TEST(Coro, CoReturnTry) {
EXPECT_EQ(42, folly::coro::blockingWait([]() -> folly::coro::Task<int> {
co_return folly::Try<int>(42);
}()));
struct ExpectedException : public std::runtime_error {
ExpectedException() : std::runtime_error("ExpectedException") {}
};
EXPECT_THROW(
folly::coro::blockingWait([]() -> folly::coro::Task<int> {
co_return folly::Try<int>(ExpectedException());
}()),
ExpectedException);
EXPECT_EQ(42, folly::coro::blockingWait([]() -> folly::coro::Task<int> {
folly::Try<int> t(42);
co_return t;
}()));
EXPECT_EQ(42, folly::coro::blockingWait([]() -> folly::coro::Task<int> {
const folly::Try<int> tConst(42);
co_return tConst;
}()));
}
TEST(Coro, CoThrow) {
struct ExpectedException : public std::runtime_error {
ExpectedException() : std::runtime_error("ExpectedException") {}
};
EXPECT_THROW(
folly::coro::blockingWait([]() -> folly::coro::Task<int> {
co_yield folly::coro::co_error(ExpectedException());
ADD_FAILURE() << "unreachable";
// Intential lack of co_return statement to check
// that compiler treats code after co_yield co_error()
// as unreachable.
}()),
ExpectedException);
EXPECT_THROW(
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
co_yield folly::coro::co_error(ExpectedException());
ADD_FAILURE() << "unreachable";
}()),
ExpectedException);
}
TEST_F(CoroTest, DetachOnCancel) {
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
// Run some logic while in an already-cancelled state.
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
EXPECT_THROW(
co_await folly::coro::detachOnCancel(
folly::futures::sleep(std::chrono::seconds{1})
.deferValue([](auto) { return 42; })),
folly::OperationCancelled);
}());
}());
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
EXPECT_EQ(
42,
co_await folly::coro::detachOnCancel(
folly::futures::sleep(std::chrono::milliseconds{10})
.deferValue([](auto) { return 42; })));
}());
}());
folly::coro::blockingWait([&]() -> folly::coro::Task<> {
folly::CancellationSource cancelSource;
co_await folly::coro::co_withCancellation(
cancelSource.getToken(), []() -> folly::coro::Task<> {
co_await folly::coro::detachOnCancel([]() -> folly::coro::Task<void> {
co_await folly::futures::sleep(std::chrono::milliseconds{10});
}());
}());
}());
}
struct CountingManualExecutor : public folly::ManualExecutor {
void add(Func func) override {
++addCount_;
ManualExecutor::add(std::move(func));
}
ssize_t getAddCount() const { return addCount_.load(); }
private:
std::atomic<ssize_t> addCount_{0};
};
ssize_t runAndCountExecutorAdd(folly::coro::Task<void> task) {
CountingManualExecutor executor;
auto future = std::move(task).scheduleOn(&executor).start();
executor.drain();
EXPECT_TRUE(future.isReady());
return executor.getAddCount();
}
TEST_F(CoroTest, SemiNoReschedule) {
auto task42 = []() -> coro::Task<int> { co_return 42; };
auto semiFuture42 = []() {
return makeSemiFuture().deferValue([](auto) { return 42; });
};
EXPECT_EQ(
2, // One extra for keepAlive release logic of ManualExecutor
runAndCountExecutorAdd(folly::coro::co_invoke(
[&]() -> coro::Task<void> { EXPECT_EQ(42, co_await task42()); })));
EXPECT_EQ(
2, // One extra for keepAlive release logic of ManualExecutor
runAndCountExecutorAdd(folly::coro::co_invoke([&]() -> coro::Task<void> {
EXPECT_EQ(42, co_await task42().semi());
})));
EXPECT_EQ(
2, // One extra for keepAlive release logic of ManualExecutor
runAndCountExecutorAdd(folly::coro::co_invoke([&]() -> coro::Task<void> {
EXPECT_EQ(42, co_await semiFuture42());
})));
}
#endif