folly/folly/coro/test/AsyncScopeTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/Portability.h>

#include <folly/experimental/coro/AsyncScope.h>

#include <folly/executors/GlobalExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/experimental/coro/GtestHelpers.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/experimental/coro/Task.h>

#include <folly/portability/GTest.h>

#if FOLLY_HAS_COROUTINES

struct AsyncScopeTest : public testing::Test {};

TEST_F(AsyncScopeTest, ConstructDestruct) {
  // Safe to construct/destruct an AsyncScope without calling any methods.
  folly::coro::AsyncScope scope;
}

CO_TEST_F(AsyncScopeTest, AddAndJoin) {
  std::atomic<int> count = 0;
  auto makeTask = [&]() -> folly::coro::Task<> {
    ++count;
    co_return;
  };

  folly::coro::AsyncScope scope;
  for (int i = 0; i < 100; ++i) {
    scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }

  co_await scope.joinAsync();

  EXPECT_EQ(count, 100);
}

CO_TEST_F(AsyncScopeTest, StartChildTasksAfterCleanupStarted) {
  folly::coro::AsyncScope scope;
  folly::coro::Baton baton;
  bool childFinished = false;
  auto executor = co_await folly::coro::co_current_executor;

  auto childTask = [&]() -> folly::coro::Task<> {
    co_await folly::coro::co_reschedule_on_current_executor;
    childFinished = true;
  };

  auto parentTask = [&]() -> folly::coro::Task<> {
    co_await baton;
    scope.add(childTask().scheduleOn(executor));
  };

  scope.add(parentTask().scheduleOn(executor));

  co_await folly::coro::collectAll(
      scope.joinAsync(), [&]() -> folly::coro::Task<> {
        baton.post();
        co_return;
      }());

  EXPECT_TRUE(childFinished);
}

CO_TEST_F(AsyncScopeTest, QueryRemainingCount) {
  folly::coro::Baton baton;

  auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; };
  auto executor = co_await folly::coro::co_current_executor;

  folly::coro::AsyncScope scope;

  CO_ASSERT_EQ(0, scope.remaining());
  for (int i = 0; i < 10; ++i) {
    scope.add(makeTask().scheduleOn(executor));
  }
  CO_ASSERT_EQ(10, scope.remaining());

  baton.post();

  co_await scope.joinAsync();
  CO_ASSERT_EQ(0, scope.remaining());
}

CO_TEST_F(AsyncScopeTest, QueryRemainingCountAfterJoined) {
  folly::coro::AsyncScope scope;
  folly::coro::Baton baton;

  auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; };
  auto executor = co_await folly::coro::co_current_executor;
  scope.add(makeTask().scheduleOn(executor));

  EXPECT_EQ(scope.remaining(), 1);

  folly::coro::Baton validateBaton;
  auto validateTask = [&]() -> folly::coro::Task<> {
    EXPECT_EQ(scope.remaining(), 1);
    validateBaton.post();
    // sleep for scope.joinAsync() to get called.
    co_await folly::coro::sleep(std::chrono::milliseconds(10));
    EXPECT_EQ(scope.remaining(), 1);
    baton.post();
  };
  auto validateFut = validateTask().scheduleOn(executor).start();
  co_await validateBaton;
  co_await scope.joinAsync();
  co_await std::move(validateFut);
}

namespace {
folly::coro::Task<> crash() {
  folly::coro::AsyncScope scope{false};
  auto makeTask = [&]() -> folly::coro::Task<> {
    // sleep to force yielding
    co_await folly::coro::sleep(std::chrono::milliseconds(100));
    throw std::runtime_error("Computer says no");
  };
  scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  co_return;
}
} // namespace

CO_TEST_F(AsyncScopeTest, DontThrowOnJoin) {
  EXPECT_DEATH(folly::coro::blockingWait(crash()), "not yet complete");
  co_return;
}

CO_TEST_F(AsyncScopeTest, ThrowOnJoin) {
  folly::coro::AsyncScope scope{true};
  auto makeTask = [&]() -> folly::coro::Task<> {
    // sleep to force yielding
    co_await folly::coro::sleep(std::chrono::milliseconds(100));
    throw std::runtime_error("Computer says no");
  };
  scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));

  EXPECT_THROW(co_await scope.joinAsync(), std::runtime_error);
}

struct CancellableAsyncScopeTest : public testing::Test {};

TEST_F(CancellableAsyncScopeTest, ConstructDestruct) {
  // Safe to construct/destruct an AsyncScope without calling any methods.
  folly::coro::CancellableAsyncScope scope;
}

CO_TEST_F(CancellableAsyncScopeTest, AddAndJoin) {
  std::atomic<int> count = 0;
  auto makeTask = [&]() -> folly::coro::Task<> {
    ++count;
    co_return;
  };

  folly::coro::CancellableAsyncScope scope;
  for (int i = 0; i < 99; ++i) {
    scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }
  scope.addWithSourceLoc(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));

  co_await scope.joinAsync();

  EXPECT_EQ(count, 100);
}

CO_TEST_F(CancellableAsyncScopeTest, StartChildTasksAfterCleanupStarted) {
  folly::coro::CancellableAsyncScope scope;
  folly::coro::Baton baton;
  bool childFinished = false;
  auto executor = co_await folly::coro::co_current_executor;

  auto childTask = [&]() -> folly::coro::Task<> {
    co_await folly::coro::co_reschedule_on_current_executor;
    childFinished = true;
  };

  auto parentTask = [&]() -> folly::coro::Task<> {
    co_await baton;
    scope.add(childTask().scheduleOn(executor));
  };

  scope.add(parentTask().scheduleOn(executor));

  co_await folly::coro::collectAll(
      scope.joinAsync(), [&]() -> folly::coro::Task<> {
        baton.post();
        co_return;
      }());

  EXPECT_TRUE(childFinished);
}

CO_TEST_F(CancellableAsyncScopeTest, QueryRemainingCount) {
  folly::coro::Baton baton;

  auto makeTask = [&]() -> folly::coro::Task<> { co_await baton; };
  auto executor = co_await folly::coro::co_current_executor;

  folly::coro::CancellableAsyncScope scope;

  CO_ASSERT_EQ(0, scope.remaining());
  for (int i = 0; i < 10; ++i) {
    scope.add(makeTask().scheduleOn(executor));
  }
  CO_ASSERT_EQ(10, scope.remaining());

  baton.post();

  co_await scope.joinAsync();
  CO_ASSERT_EQ(0, scope.remaining());
}

CO_TEST_F(CancellableAsyncScopeTest, QueryIsCancellationRequested) {
  using namespace std::chrono_literals;

  auto makeTask = [&]() -> folly::coro::Task<> {
    while (true) {
      co_await folly::coro::sleep(500s);
    }
  };
  auto executor = co_await folly::coro::co_current_executor;

  // default constructed scope
  folly::coro::CancellableAsyncScope scope;
  CO_ASSERT_EQ(false, scope.isScopeCancellationRequested());
  for (int i = 0; i < 10; ++i) {
    scope.add(makeTask().scheduleOn(executor));
  }
  CO_ASSERT_EQ(10, scope.remaining());

  co_await scope.cancelAndJoinAsync();
  CO_ASSERT_EQ(true, scope.isScopeCancellationRequested());
  CO_ASSERT_EQ(0, scope.remaining());

  // construct scope using external CancellationSource and cancel using the
  // external cancellationSource
  folly::CancellationSource source;
  folly::coro::CancellableAsyncScope scope2(source.getToken());

  CO_ASSERT_EQ(0, scope2.remaining());
  for (int i = 0; i < 10; ++i) {
    scope2.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }
  CO_ASSERT_EQ(10, scope2.remaining());
  CO_ASSERT_EQ(false, scope2.isScopeCancellationRequested());

  source.requestCancellation();
  CO_ASSERT_EQ(true, scope2.isScopeCancellationRequested());
  co_await scope2.joinAsync();
  CO_ASSERT_EQ(0, scope2.remaining());

  source = {};
  // construct scope using external CancellationSource and cancel using the
  // class's cancellation source
  folly::coro::CancellableAsyncScope scope3(source.getToken());

  CO_ASSERT_EQ(0, scope3.remaining());
  for (int i = 0; i < 10; ++i) {
    scope3.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }
  CO_ASSERT_EQ(10, scope3.remaining());
  CO_ASSERT_EQ(false, scope3.isScopeCancellationRequested());
  co_await scope3.cancelAndJoinAsync();
  CO_ASSERT_EQ(true, scope3.isScopeCancellationRequested());
  CO_ASSERT_EQ(0, scope3.remaining());

  source = {};
  // default scope construction; each task is added with custom cancellation
  // token
  folly::coro::CancellableAsyncScope scope4;
  CO_ASSERT_EQ(0, scope4.remaining());
  for (int i = 0; i < 10; ++i) {
    scope4.add(
        makeTask().scheduleOn(folly::getGlobalCPUExecutor()),
        source.getToken());
  }
  CO_ASSERT_EQ(10, scope4.remaining());
  source.requestCancellation();
  CO_ASSERT_EQ(source.isCancellationRequested(), true);
  CO_ASSERT_EQ(false, scope4.isScopeCancellationRequested());
  co_await scope4.joinAsync();
  // this is false since we the token that is used is not part of the AsyncScope
  // state
  CO_ASSERT_EQ(false, scope4.isScopeCancellationRequested());
  CO_ASSERT_EQ(0, scope4.remaining());
}

CO_TEST_F(CancellableAsyncScopeTest, CancelSuspendedWork) {
  using namespace std::chrono_literals;

  auto makeTask = [&]() -> folly::coro::Task<> {
    co_await folly::coro::sleep(300s);
  };

  folly::coro::CancellableAsyncScope scope;

  CO_ASSERT_EQ(0, scope.remaining());
  for (int i = 0; i < 10; ++i) {
    scope.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }
  CO_ASSERT_EQ(10, scope.remaining());

  // Although we are suspended while sleeping, cancelAndJoinAsync will handle
  // this correctly.
  co_await scope.cancelAndJoinAsync();
  CO_ASSERT_EQ(0, scope.remaining());

  folly::CancellationSource source;
  folly::coro::CancellableAsyncScope scope2(source.getToken());

  CO_ASSERT_EQ(0, scope2.remaining());
  for (int i = 0; i < 10; ++i) {
    scope2.add(makeTask().scheduleOn(folly::getGlobalCPUExecutor()));
  }
  CO_ASSERT_EQ(10, scope2.remaining());

  source.requestCancellation();
  co_await scope2.joinAsync();
  CO_ASSERT_EQ(0, scope2.remaining());

  source = {};
  folly::coro::CancellableAsyncScope scope3;

  CO_ASSERT_EQ(0, scope3.remaining());
  for (int i = 0; i < 10; ++i) {
    scope3.add(
        makeTask().scheduleOn(folly::getGlobalCPUExecutor()),
        source.getToken());
  }
  CO_ASSERT_EQ(10, scope3.remaining());

  source.requestCancellation();
  co_await scope3.joinAsync();
  CO_ASSERT_EQ(0, scope3.remaining());
}

CO_TEST_F(CancellableAsyncScopeTest, CancelSuspendedWorkCoSchedule) {
  using namespace std::chrono_literals;

  auto makeTask = [&]() -> folly::coro::Task<> {
    co_await folly::coro::sleep(300s);
  };

  folly::coro::CancellableAsyncScope scope;

  CO_ASSERT_EQ(0, scope.remaining());
  for (int i = 0; i < 10; ++i) {
    co_await scope.co_schedule(makeTask());
  }
  CO_ASSERT_EQ(10, scope.remaining());

  // Although we are suspended while sleeping, cancelAndJoinAsync will handle
  // this correctly.
  co_await scope.cancelAndJoinAsync();
  CO_ASSERT_EQ(0, scope.remaining());
}

#endif // FOLLY_HAS_COROUTINES