/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/DetachOnCancel.h>
#include <folly/experimental/coro/SharedPromise.h>
#include <folly/portability/GTest.h>
#if FOLLY_HAS_COROUTINES
using namespace folly::coro;
class BlockingWaitWaitInterface {
public:
std::string waitAndGetValue(folly::coro::Future<std::string> future) {
return folly::coro::blocking_wait(std::move(future));
}
};
class CPUThreadPoolWaitInterface {
public:
std::string waitAndGetValue(folly::coro::Future<std::string> future) {
return coGet(std::move(future))
.scheduleOn(cpuThreadPoolExecutor_.get())
.start()
.get();
}
private:
folly::coro::Task<std::string> coGet(
folly::coro::Future<std::string> future) {
co_return co_await std::move(future);
}
std::unique_ptr<folly::CPUThreadPoolExecutor> cpuThreadPoolExecutor_{
std::make_unique<folly::CPUThreadPoolExecutor>(1)};
};
template <typename WaitInterface>
class ValueInterface : public WaitInterface {
public:
void set(std::string value, SharedPromise<std::string>& promise) {
promise.setValue(std::move(value));
}
std::string get(folly::coro::Future<std::string> future) {
return this->waitAndGetValue(std::move(future));
}
};
template <typename WaitInterface>
class ExceptionInterface : public WaitInterface {
class StringException : public std::exception {
public:
explicit StringException(std::string string) : string_{string} {}
std::string get() { return string_; }
private:
std::string string_;
};
public:
void set(std::string value, SharedPromise<std::string>& promise) {
promise.setException(StringException{value});
}
std::string get(folly::coro::Future<std::string> future) {
try {
this->waitAndGetValue(std::move(future));
} catch (StringException& exception) {
return exception.get();
}
CHECK(false) << "Expected value in exception, "
<< " but didn't get any exception";
}
};
template <typename TestInterface>
class SharedPromiseTest : public ::testing::Test, public TestInterface {};
using TestTypes = ::testing::Types<
ValueInterface<BlockingWaitWaitInterface>,
ValueInterface<CPUThreadPoolWaitInterface>,
ExceptionInterface<BlockingWaitWaitInterface>,
ExceptionInterface<CPUThreadPoolWaitInterface>>;
TYPED_TEST_SUITE(SharedPromiseTest, TestTypes);
TYPED_TEST(SharedPromiseTest, Basic) {
auto promise = SharedPromise<std::string>{};
auto future = promise.getFuture();
this->set("ynwa", promise);
EXPECT_EQ("ynwa", this->get(std::move(future)));
}
TYPED_TEST(SharedPromiseTest, MultipleFutures) {
auto promise = SharedPromise<std::string>{};
auto futures = std::vector<folly::coro::Future<std::string>>{};
for (auto i = 0; i < 10; ++i) {
futures.push_back(promise.getFuture());
}
this->set("liverpool_best_club", promise);
for (auto& future : futures) {
EXPECT_EQ("liverpool_best_club", this->get(std::move(future)));
}
}
TYPED_TEST(SharedPromiseTest, BrokenPromise) {
auto testFutures = []() {
auto promise = SharedPromise<std::string>{};
auto futures = std::vector<folly::coro::Future<std::string>>{};
for (auto i = 0; i < 10; ++i) {
futures.push_back(promise.getFuture());
}
return futures;
}();
for (auto& future : testFutures) {
EXPECT_THROW(this->get(std::move(future)), folly::BrokenPromise);
}
}
TYPED_TEST(SharedPromiseTest, PromiseAlreadySatisfied) {
{
auto promise = SharedPromise<std::string>{};
this->set("liverpool_epl_champions", promise);
EXPECT_THROW(this->set("ynwa", promise), folly::PromiseAlreadySatisfied);
}
{
auto promise = SharedPromise<std::string>{};
auto futures = std::vector<folly::coro::Future<std::string>>{};
for (auto i = 0; i < 10; ++i) {
futures.push_back(promise.getFuture());
}
this->set("ynwa_1", promise);
EXPECT_THROW(this->set("ynwa_2", promise), folly::PromiseAlreadySatisfied);
for (auto& future : futures) {
EXPECT_EQ("ynwa_1", this->get(std::move(future)));
}
}
}
TYPED_TEST(SharedPromiseTest, FutureAfterFulfilled) {
auto promise = SharedPromise<std::string>{};
auto futures = std::vector<folly::coro::Future<std::string>>{};
for (auto i = 0; i < 10; ++i) {
futures.push_back(promise.getFuture());
}
this->set("ynwa", promise);
for (auto& future : futures) {
EXPECT_EQ(this->get(std::move(future)), "ynwa");
}
EXPECT_EQ(this->get(promise.getFuture()), "ynwa");
}
TYPED_TEST(SharedPromiseTest, PostMoveConstruction) {
auto promise = SharedPromise<std::string>{};
auto future = promise.getFuture();
this->set("ynwa_1", promise);
auto anotherPromise = std::move(promise);
EXPECT_EQ(this->get(std::move(future)), "ynwa_1");
EXPECT_EQ(this->get(anotherPromise.getFuture()), "ynwa_1");
// @lint-ignore CLANGTIDY
auto anotherFuture = promise.getFuture();
this->set("ynwa_2", promise);
EXPECT_EQ(this->get(std::move(anotherFuture)), "ynwa_2");
EXPECT_EQ(this->get(promise.getFuture()), "ynwa_2");
}
TYPED_TEST(SharedPromiseTest, PostMoveAssignment) {
auto promise = SharedPromise<std::string>{};
auto future = promise.getFuture();
this->set("ynwa_1", promise);
auto anotherPromise = SharedPromise<std::string>{};
anotherPromise = std::move(promise);
EXPECT_EQ(this->get(std::move(future)), "ynwa_1");
EXPECT_EQ(this->get(anotherPromise.getFuture()), "ynwa_1");
// @lint-ignore CLANGTIDY
auto anotherFuture = promise.getFuture();
this->set("ynwa_2", promise);
EXPECT_EQ(this->get(std::move(anotherFuture)), "ynwa_2");
EXPECT_EQ(this->get(promise.getFuture()), "ynwa_2");
}
namespace {
class FallibleExecutor : public folly::Executor {
public:
explicit FallibleExecutor(std::unique_ptr<folly::Executor> executor)
: executor_{std::move(executor)} {}
void add(folly::Function<void()> function) override {
if (!failed_.load()) {
executor_->add(std::move(function));
return;
}
CHECK(false) << "Cannot add to FallibleExecutor after it has entered the "
<< "fail state.";
}
void fail() { failed_.store(true); }
private:
std::unique_ptr<folly::Executor> executor_;
std::atomic<bool> failed_{false};
};
} // namespace
TYPED_TEST(SharedPromiseTest, CleanlyCancellableWait) {
// This test makes sure that there are no detached tasks lying around on the
// executor _after_ the cancellation exception has propagated to the user. In
// particular, this bug was found in the falcon codebase and fixed in
// D39738899.
//
// folly::coro::Promise and folly::coro::Future should natively support
// cancellable waits. We test for this assertion here.
//
// If the promise and future types were to be switched to folly::Promise and
// folly::Future, this test would never return because cancellation would
// never get triggered. Adding detachOnCancel() leaves around a detached
// coroutine on the executor, which we put in a "failed" state, similar to
// folly::Future's internal WaitExecutor. So using detachOnCancel() will
// cause the test to fail as well
auto promise = SharedPromise<std::string>{};
auto future = promise.getFuture();
auto baton = folly::Baton<>{};
auto task = folly::coro::co_invoke([&]() -> folly::coro::Task<std::string> {
co_return co_await std::move(future);
});
auto cpu = std::make_unique<folly::CPUThreadPoolExecutor>(1);
auto fallibleExecutor = std::make_unique<FallibleExecutor>(std::move(cpu));
auto cancellationSource = folly::CancellationSource{};
auto cancellationToken = cancellationSource.getToken();
auto started =
folly::coro::co_withCancellation(cancellationToken, std::move(task))
.scheduleOn(fallibleExecutor.get())
.start();
cancellationSource.requestCancellation();
EXPECT_THROW(std::move(started).get(), folly::OperationCancelled);
// make the executor go in a failed state and set the promise, so if the
// future was detached, we would get the detached coroutine to run on the
// executor
fallibleExecutor->fail();
promise.setValue("ynwa");
}
TYPED_TEST(SharedPromiseTest, NoHeapAllocation) {
char allocBuffer[1024];
auto promise = new (allocBuffer) SharedPromise<std::string>{};
promise->setValue("foo");
// If SharedPromise has heap allocation, it would trigger ASAN failures
}
TEST(SharedPromiseTest, BasicVoid) {
{
auto promise = SharedPromise<void>{};
auto future = promise.getFuture();
promise.setValue();
blocking_wait(std::move(future));
}
{
auto promise = SharedPromise<void>{};
promise.setValue();
blocking_wait(promise.getFuture());
}
}
#endif