folly/folly/coro/Collect-inl.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <atomic>
#include <utility>

#include <folly/CancellationToken.h>
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/AsyncPipe.h>
#include <folly/experimental/coro/AsyncScope.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/detail/Barrier.h>
#include <folly/experimental/coro/detail/BarrierTask.h>
#include <folly/experimental/coro/detail/CurrentAsyncFrame.h>
#include <folly/experimental/coro/detail/Helpers.h>

#if FOLLY_HAS_COROUTINES

namespace folly {
namespace coro {
namespace detail {

template <typename T>
T&& getValueOrUnit(Try<T>&& value) {
  assert(value.hasValue());
  return std::move(value).value();
}

inline Unit getValueOrUnit([[maybe_unused]] Try<void>&& value) {
  assert(value.hasValue());
  return Unit{};
}

template <
    typename InputRange,
    typename Make,
    typename Iter = invoke_result_t<access::begin_fn, InputRange&>,
    typename Elem = remove_cvref_t<decltype(*std::declval<Iter&>())>,
    typename RTask = invoke_result_t<Make&, Elem, std::size_t>>
std::vector<RTask> collectMakeInnerTaskVec(InputRange& awaitables, Make& make) {
  std::vector<RTask> tasks;

  auto abegin = access::begin(awaitables);
  auto aend = access::end(awaitables);

  if constexpr (is_invocable_v<folly::access::size_fn, InputRange&>) {
    tasks.reserve(static_cast<std::size_t>(folly::access::size(awaitables)));
  } else if constexpr (range_has_known_distance_v<InputRange&>) {
    tasks.reserve(static_cast<std::size_t>(std::distance(abegin, aend)));
  }

  std::size_t index = 0;
  for (auto aiter = abegin; aiter != aend; ++aiter) {
    tasks.push_back(make(std::move(*aiter), index++));
  }

  return tasks;
}

template <typename SemiAwaitable, typename Result>
BarrierTask makeCollectAllTryTask(
    Executor::KeepAlive<> executor,
    const CancellationToken& cancelToken,
    SemiAwaitable&& awaitable,
    Try<Result>& result) {
  try {
    if constexpr (std::is_void_v<Result>) {
      co_await co_viaIfAsync(
          std::move(executor),
          co_withCancellation(
              cancelToken, static_cast<SemiAwaitable&&>(awaitable)));
      result.emplace();
    } else {
      result.emplace(co_await co_viaIfAsync(
          std::move(executor),
          co_withCancellation(
              cancelToken, static_cast<SemiAwaitable&&>(awaitable))));
    }
  } catch (...) {
    result.emplaceException(current_exception());
  }
}

template <typename... SemiAwaitables, size_t... Indices>
auto collectAllTryImpl(
    std::index_sequence<Indices...>, SemiAwaitables... awaitables)
    -> folly::coro::Task<
        std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
  static_assert(sizeof...(Indices) == sizeof...(SemiAwaitables));
  if constexpr (sizeof...(SemiAwaitables) == 0) {
    co_return std::tuple<>{};
  } else {
    const Executor::KeepAlive<> executor = co_await co_current_executor;
    const CancellationToken& cancelToken =
        co_await co_current_cancellation_token;

    std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;

    folly::coro::detail::BarrierTask tasks[sizeof...(SemiAwaitables)] = {
        makeCollectAllTryTask(
            executor.get_alias(),
            cancelToken,
            static_cast<SemiAwaitables&&>(awaitables),
            std::get<Indices>(results))...,
    };

    folly::coro::detail::Barrier barrier{sizeof...(SemiAwaitables) + 1};

    auto& asyncFrame = co_await detail::co_current_async_stack_frame;

    // Use std::initializer_list to ensure that the sub-tasks are launched
    // in the order they appear in the parameter pack.

    // Save the initial context and restore it after starting each task
    // as the task may have modified the context before suspending and we
    // want to make sure the next task is started with the same initial
    // context.
    const auto context = RequestContext::saveContext();
    (void)std::initializer_list<int>{
        (tasks[Indices].start(&barrier, asyncFrame),
         RequestContext::setContext(context),
         0)...};

    // Wait for all of the sub-tasks to finish execution.
    // Should be safe to avoid an executor transition here even if the
    // operation completes asynchronously since all of the child tasks
    // should already have transitioned to the correct executor due to
    // the use of co_viaIfAsync() within makeCollectAllTryTask().
    co_await UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

    co_return results;
  }
}

template <typename... SemiAwaitables, size_t... Indices>
auto collectAllImpl(
    std::index_sequence<Indices...>, SemiAwaitables... awaitables)
    -> folly::coro::Task<
        std::tuple<collect_all_component_t<SemiAwaitables>...>> {
  if constexpr (sizeof...(SemiAwaitables) == 0) {
    co_return std::tuple<>{};
  } else {
    const Executor::KeepAlive<> executor = co_await co_current_executor;
    const CancellationToken& parentCancelToken =
        co_await co_current_cancellation_token;

    const CancellationSource cancelSource;
    const CancellationToken cancelToken =
        CancellationToken::merge(parentCancelToken, cancelSource.getToken());

    exception_wrapper firstException;

    auto makeTask = [&](auto&& awaitable, auto& result) -> BarrierTask {
      using await_result = semi_await_result_t<decltype(awaitable)>;
      try {
        if constexpr (std::is_void_v<await_result>) {
          co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(
                  cancelToken, static_cast<decltype(awaitable)>(awaitable)));
          result.emplace();
        } else {
          result.emplace(co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(
                  cancelToken, static_cast<decltype(awaitable)>(awaitable))));
        }
      } catch (...) {
        if (!cancelSource.requestCancellation()) {
          // This was the first failure, remember its error.
          firstException = exception_wrapper{current_exception()};
        }
      }
    };

    std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;

    folly::coro::detail::BarrierTask tasks[sizeof...(SemiAwaitables)] = {
        makeTask(
            static_cast<SemiAwaitables&&>(awaitables),
            std::get<Indices>(results))...,
    };

    folly::coro::detail::Barrier barrier{sizeof...(SemiAwaitables) + 1};

    // Save the initial context and restore it after starting each task
    // as the task may have modified the context before suspending and we
    // want to make sure the next task is started with the same initial
    // context.
    const auto context = RequestContext::saveContext();

    auto& asyncFrame = co_await detail::co_current_async_stack_frame;

    // Use std::initializer_list to ensure that the sub-tasks are launched
    // in the order they appear in the parameter pack.
    (void)std::initializer_list<int>{
        (tasks[Indices].start(&barrier, asyncFrame),
         RequestContext::setContext(context),
         0)...};

    // Wait for all of the sub-tasks to finish execution.
    // Should be safe to avoid an executor transition here even if the
    // operation completes asynchronously since all of the child tasks
    // should already have transitioned to the correct executor due to
    // the use of co_viaIfAsync() within makeBarrierTask().
    co_await UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

    if (firstException) {
      co_yield co_error(std::move(firstException));
    }

    co_return std::tuple<collect_all_component_t<SemiAwaitables>...>{
        getValueOrUnit(std::get<Indices>(std::move(results)))...};
  }
}

template <typename InputRange, typename IsTry, typename AsyncScope>
auto makeUnorderedAsyncGeneratorImpl(
    AsyncScope& scope, InputRange awaitables, IsTry) {
  using Item =
      async_generator_from_awaitable_range_item_t<InputRange, IsTry::value>;
  return [](AsyncScope& scopeParam,
            InputRange awaitablesParam) -> AsyncGenerator<Item&&> {
    auto [results, pipe] = AsyncPipe<Item, false>::create();
    const CancellationSource cancelSource;
    auto guard = folly::makeGuard([&] { cancelSource.requestCancellation(); });
    auto ex = co_await co_current_executor;
    size_t expected = 0;
    // Save the initial context and restore it after starting each task
    // as the task may have modified the context before suspending and we
    // want to make sure the next task is started with the same initial
    // context.
    const auto context = RequestContext::saveContext();

    for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitablesParam)) {
      auto task = [](auto semiAwaitableParam,
                     auto& cancelSourceParam,
                     auto& p) -> Task<void> {
        auto result = co_await co_awaitTry(std::move(semiAwaitableParam));
        if (!result.hasValue() && !IsTry::value) {
          cancelSourceParam.requestCancellation();
        }
        p.write(std::move(result));
      }(static_cast<decltype(semiAwaitable)&&>(semiAwaitable),
                              cancelSource,
                              pipe);
      if constexpr (std::is_same_v<AsyncScope, folly::coro::AsyncScope>) {
        scopeParam.add(
            co_withCancellation(cancelSource.getToken(), std::move(task))
                .scheduleOn(ex));
      } else {
        static_assert(std::is_same_v<AsyncScope, CancellableAsyncScope>);
        scopeParam.add(std::move(task).scheduleOn(ex), cancelSource.getToken());
      }
      ++expected;
      RequestContext::setContext(context);
    }

    while (expected > 0) {
      CancellationCallback cancelCallback(
          co_await co_current_cancellation_token,
          [&]() noexcept { cancelSource.requestCancellation(); });

      if constexpr (!IsTry::value) {
        auto result = co_await co_awaitTry(results.next());
        if (result.hasValue() && result->has_value()) {
          co_yield std::move(**result);
          if (--expected) {
            continue;
          }
          result.emplace(); // completion result
        }
        guard.dismiss();
        co_yield co_result(std::move(result));
      } else {
        // Prevent AsyncPipe from receiving cancellation so we get the right
        // number of OperationCancelled.
        auto result = co_await co_withCancellation({}, results.next());
        co_yield std::move(*result);
        if (--expected == 0) {
          guard.dismiss();
          co_return;
        }
      }
    }
  }(scope, std::move(awaitables));
}

template <typename... SemiAwaitables, size_t... Indices>
auto collectAnyImpl(
    std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::pair<
        std::size_t,
        folly::Try<collect_any_component_t<SemiAwaitables...>>>> {
  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  std::pair<std::size_t, folly::Try<collect_any_component_t<SemiAwaitables...>>>
      firstCompletion;
  firstCompletion.first = size_t(-1);
  co_await folly::coro::collectAll(folly::coro::co_withCancellation(
      cancelToken,
      folly::coro::co_invoke(
          [&, aw = static_cast<SemiAwaitables&&>(awaitables)]() mutable
          -> folly::coro::Task<void> {
            auto result = co_await folly::coro::co_awaitTry(
                static_cast<SemiAwaitables&&>(aw));
            if (!cancelSource.requestCancellation()) {
              // This is first entity to request cancellation.
              firstCompletion.first = Indices;
              firstCompletion.second = std::move(result);
            }
          }))...);

  co_return firstCompletion;
}

template <typename... SemiAwaitables, size_t... Indices>
auto collectAnyWithoutExceptionImpl(
    std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::pair<
        std::size_t,
        folly::Try<detail::collect_any_component_t<SemiAwaitables...>>>> {
  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  constexpr std::size_t nAwaitables = sizeof...(SemiAwaitables);
  std::atomic<std::size_t> nAwaited = 1;
  std::pair<std::size_t, folly::Try<collect_any_component_t<SemiAwaitables...>>>
      firstValueOrLastException;
  firstValueOrLastException.first = std::numeric_limits<size_t>::max();
  co_await folly::coro::collectAll(folly::coro::co_withCancellation(
      cancelToken, [&]() -> folly::coro::Task<void> {
        auto result = co_await folly::coro::co_awaitTry(
            std::forward<SemiAwaitables>(awaitables));
        if ((result.hasValue() ||
             nAwaited.fetch_add(1, std::memory_order_relaxed) == nAwaitables) &&
            !cancelSource.requestCancellation()) {
          firstValueOrLastException.first = Indices;
          firstValueOrLastException.second = std::move(result);
        }
      }())...);

  co_return firstValueOrLastException;
}

template <typename... SemiAwaitables, size_t... Indices>
auto collectAnyNoDiscardImpl(
    std::index_sequence<Indices...>, SemiAwaitables&&... awaitables)
    -> folly::coro::Task<
        std::tuple<collect_all_try_component_t<SemiAwaitables>...>> {
  const CancellationSource cancelSource;
  const CancellationToken cancelToken = CancellationToken::merge(
      co_await co_current_cancellation_token, cancelSource.getToken());

  std::tuple<collect_all_try_component_t<SemiAwaitables>...> results;
  co_await folly::coro::collectAll(folly::coro::co_withCancellation(
      cancelToken, folly::coro::co_invoke([&]() -> folly::coro::Task<void> {
        auto result = co_await folly::coro::co_awaitTry(
            std::forward<SemiAwaitables>(awaitables));
        cancelSource.requestCancellation();
        std::get<Indices>(results) = std::move(result);
      }))...);

  co_return results;
}

} // namespace detail

template <typename... SemiAwaitables>
auto collectAll(SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::tuple<
        detail::collect_all_component_t<remove_cvref_t<SemiAwaitables>>...>> {
  return detail::collectAllImpl(
      std::make_index_sequence<sizeof...(SemiAwaitables)>{},
      static_cast<SemiAwaitables&&>(awaitables)...);
}

template <typename... SemiAwaitables>
auto collectAllTry(SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
        remove_cvref_t<SemiAwaitables>>...>> {
  return detail::collectAllTryImpl(
      std::make_index_sequence<sizeof...(SemiAwaitables)>{},
      static_cast<SemiAwaitables&&>(awaitables)...);
}

template <
    typename InputRange,
    std::enable_if_t<
        !std::is_void_v<
            semi_await_result_t<detail::range_reference_t<InputRange>>>,
        int>>
auto collectAllRange(InputRange awaitables)
    -> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken = CancellationToken::merge(
      co_await co_current_cancellation_token, cancelSource.getToken());

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      tryResults;

  exception_wrapper firstException;

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      std::size_t index) -> detail::BarrierTask {
    assert(index < tryResults.size());

    try {
      tryResults[index].emplace(co_await co_viaIfAsync(
          executor.get_alias(),
          co_withCancellation(cancelToken, std::move(semiAwaitable))));
    } catch (...) {
      if (!cancelSource.requestCancellation()) {
        firstException = exception_wrapper{current_exception()};
      }
    }
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);

  tryResults.resize(tasks.size());

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  // Launch the tasks and wait for them all to finish.
  {
    detail::Barrier barrier{tasks.size() + 1};
    for (auto&& task : tasks) {
      task.start(&barrier, asyncFrame);
      RequestContext::setContext(context);
    }
    co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
  }

  // Check if there were any exceptions and rethrow the first one.
  if (firstException) {
    co_yield co_error(std::move(firstException));
  }

  std::vector<detail::collect_all_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;
  results.reserve(tryResults.size());
  for (auto& result : tryResults) {
    results.emplace_back(std::move(result).value());
  }

  co_return results;
}

template <
    typename InputRange,
    std::enable_if_t<
        std::is_void_v<
            semi_await_result_t<detail::range_reference_t<InputRange>>>,
        int>>
auto collectAllRange(InputRange awaitables) -> folly::coro::Task<void> {
  const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken = CancellationToken::merge(
      co_await co_current_cancellation_token, cancelSource.getToken());

  exception_wrapper firstException;

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      std::size_t) -> detail::BarrierTask {
    try {
      co_await co_viaIfAsync(
          executor.get_alias(),
          co_withCancellation(cancelToken, std::move(semiAwaitable)));
    } catch (...) {
      if (!cancelSource.requestCancellation()) {
        firstException = exception_wrapper{current_exception()};
      }
    }
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  // Launch the tasks and wait for them all to finish.
  {
    detail::Barrier barrier{tasks.size() + 1};
    for (auto&& task : tasks) {
      task.start(&barrier, asyncFrame);
      RequestContext::setContext(context);
    }
    co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
  }

  // Check if there were any exceptions and rethrow the first one.
  if (firstException) {
    co_yield co_error(std::move(firstException));
  }
}

template <typename InputRange>
auto collectAllTryRange(InputRange awaitables)
    -> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;

  const folly::Executor::KeepAlive<> executor =
      folly::getKeepAliveToken(co_await co_current_executor);

  const CancellationToken& cancelToken = co_await co_current_cancellation_token;

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      std::size_t index) -> detail::BarrierTask {
    assert(index < results.size());
    auto& result = results[index];
    try {
      using await_result = semi_await_result_t<awaitable_type>;
      if constexpr (std::is_void_v<await_result>) {
        co_await co_viaIfAsync(
            executor.get_alias(),
            co_withCancellation(cancelToken, std::move(semiAwaitable)));
        result.emplace();
      } else {
        result.emplace(co_await co_viaIfAsync(
            executor.get_alias(),
            co_withCancellation(cancelToken, std::move(semiAwaitable))));
      }
    } catch (...) {
      result.emplaceException(current_exception());
    }
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);

  // Now that we know how many tasks there are, allocate that
  // many Try objects to store the results before we start
  // executing the tasks.
  results.resize(tasks.size());

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  // Launch the tasks and wait for them all to finish.
  {
    detail::Barrier barrier{tasks.size() + 1};
    for (auto&& task : tasks) {
      task.start(&barrier, asyncFrame);
      RequestContext::setContext(context);
    }
    co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
  }

  co_return results;
}

template <
    typename InputRange,
    std::enable_if_t<
        std::is_void_v<
            semi_await_result_t<detail::range_reference_t<InputRange>>>,
        int>>
auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
    -> folly::coro::Task<void> {
  assert(maxConcurrency > 0);

  const folly::Executor::KeepAlive<> executor = co_await co_current_executor;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken = CancellationToken::merge(
      co_await co_current_cancellation_token, cancelSource.getToken());

  exception_wrapper firstException;

  const auto trySetFirstException = [&](exception_wrapper&& e) noexcept {
    if (!cancelSource.requestCancellation()) {
      // This is first entity to request cancellation.
      firstException = std::move(e);
    }
  };

  auto iter = access::begin(awaitables);
  const auto iterEnd = access::end(awaitables);

  using iterator_t = decltype(iter);
  using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;

  folly::coro::Mutex mutex;

  exception_wrapper iterationException;

  auto makeWorker = [&]() -> detail::BarrierTask {
    auto lock =
        co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

    while (!iterationException && iter != iterEnd) {
      std::optional<awaitable_t> awaitable;
      try {
        awaitable.emplace(*iter);
        ++iter;
      } catch (...) {
        iterationException = exception_wrapper{current_exception()};
        cancelSource.requestCancellation();
      }

      if (!awaitable) {
        co_return;
      }

      lock.unlock();

      try {
        co_await co_viaIfAsync(
            executor.get_alias(),
            co_withCancellation(cancelToken, std::move(*awaitable)));
      } catch (...) {
        trySetFirstException(exception_wrapper{current_exception()});
      }

      lock =
          co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());
    }
  };

  std::vector<detail::BarrierTask> workerTasks;

  detail::Barrier barrier{1};

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  try {
    auto lock = co_await mutex.co_scoped_lock();

    while (!iterationException && iter != iterEnd &&
           workerTasks.size() < maxConcurrency) {
      // Unlock the mutex before starting the worker so that
      // it can consume as many results synchronously as it can before
      // returning here and letting us spawn another task.
      // This can avoid spawning more worker coroutines than is necessary
      // to consume all of the awaitables.
      lock.unlock();

      workerTasks.push_back(makeWorker());
      barrier.add(1);
      workerTasks.back().start(&barrier, asyncFrame);

      RequestContext::setContext(context);

      lock = co_await mutex.co_scoped_lock();
    }
  } catch (...) {
    if (workerTasks.empty()) {
      iterationException = exception_wrapper{current_exception()};
    }
  }

  co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

  if (auto& ex = iterationException ? iterationException : firstException) {
    co_yield co_error(std::move(ex));
  }
}

template <
    typename InputRange,
    std::enable_if_t<
        !std::is_void_v<
            semi_await_result_t<detail::range_reference_t<InputRange>>>,
        int>>
auto collectAllWindowed(InputRange awaitables, std::size_t maxConcurrency)
    -> folly::coro::Task<std::vector<detail::collect_all_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  assert(maxConcurrency > 0);

  const folly::Executor::KeepAlive<> executor = co_await co_current_executor;

  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  exception_wrapper firstException;

  auto trySetFirstException = [&](exception_wrapper&& e) noexcept {
    if (!cancelSource.requestCancellation()) {
      // This is first entity to request cancellation.
      firstException = std::move(e);
    }
  };

  auto iter = access::begin(awaitables);
  const auto iterEnd = access::end(awaitables);

  using iterator_t = decltype(iter);
  using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;

  folly::coro::Mutex mutex;

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      tryResults;

  exception_wrapper iterationException;

  auto makeWorker = [&]() -> detail::BarrierTask {
    auto lock =
        co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

    while (!iterationException && iter != iterEnd) {
      const std::size_t thisIndex = tryResults.size();
      std::optional<awaitable_t> awaitable;
      try {
        tryResults.emplace_back();
        awaitable.emplace(*iter);
        ++iter;
      } catch (...) {
        iterationException = exception_wrapper{current_exception()};
        cancelSource.requestCancellation();
      }

      if (!awaitable) {
        co_return;
      }

      lock.unlock();

      detail::collect_all_try_range_component_t<
          detail::range_reference_t<InputRange>>
          tryResult;

      try {
        tryResult.emplace(co_await co_viaIfAsync(
            executor.get_alias(),
            co_withCancellation(
                cancelToken, static_cast<awaitable_t&&>(*awaitable))));
      } catch (...) {
        trySetFirstException(exception_wrapper{current_exception()});
      }

      lock =
          co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

      try {
        tryResults[thisIndex] = std::move(tryResult);
      } catch (...) {
        trySetFirstException(exception_wrapper{current_exception()});
      }
    }
  };

  std::vector<detail::BarrierTask> workerTasks;

  detail::Barrier barrier{1};

  exception_wrapper workerCreationException;

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  try {
    auto lock = co_await mutex.co_scoped_lock();

    while (!iterationException && iter != iterEnd &&
           workerTasks.size() < maxConcurrency) {
      // Unlock the mutex before starting the worker so that
      // it can consume as many results synchronously as it can before
      // returning here and letting us spawn another task.
      // This can avoid spawning more worker coroutines than is necessary
      // to consume all of the awaitables.
      lock.unlock();

      workerTasks.push_back(makeWorker());
      barrier.add(1);
      workerTasks.back().start(&barrier, asyncFrame);

      RequestContext::setContext(context);

      lock = co_await mutex.co_scoped_lock();
    }
  } catch (...) {
    // Only a fatal error if we failed to create any worker tasks.
    if (workerTasks.empty()) {
      // No need to synchronise here. There are no concurrent tasks running.
      iterationException = exception_wrapper{current_exception()};
    }
  }

  co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

  if (auto& ex = iterationException ? iterationException : firstException) {
    co_yield co_error(std::move(ex));
  }

  std::vector<detail::collect_all_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;
  results.reserve(tryResults.size());

  for (auto&& tryResult : tryResults) {
    assert(tryResult.hasValue());
    results.emplace_back(std::move(tryResult).value());
  }

  co_return results;
}

template <typename InputRange>
auto collectAllTryWindowed(InputRange awaitables, std::size_t maxConcurrency)
    -> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  assert(maxConcurrency > 0);

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;

  exception_wrapper iterationException;

  folly::coro::Mutex mutex;

  const Executor::KeepAlive<> executor = co_await co_current_executor;
  const CancellationToken& cancelToken = co_await co_current_cancellation_token;

  auto iter = access::begin(awaitables);
  const auto iterEnd = access::end(awaitables);

  using iterator_t = decltype(iter);
  using awaitable_t = typename std::iterator_traits<iterator_t>::value_type;
  using result_t = semi_await_result_t<awaitable_t>;

  auto makeWorker = [&]() -> detail::BarrierTask {
    auto lock =
        co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

    while (!iterationException && iter != iterEnd) {
      const std::size_t thisIndex = results.size();
      std::optional<awaitable_t> awaitable;

      try {
        results.emplace_back();
        awaitable.emplace(*iter);
        ++iter;
      } catch (...) {
        iterationException = exception_wrapper{current_exception()};
      }

      if (!awaitable) {
        co_return;
      }

      lock.unlock();

      detail::collect_all_try_range_component_t<
          detail::range_reference_t<InputRange>>
          result;

      try {
        if constexpr (std::is_void_v<result_t>) {
          co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(cancelToken, std::move(*awaitable)));
          result.emplace();
        } else {
          result.emplace(co_await co_viaIfAsync(
              executor.get_alias(),
              co_withCancellation(cancelToken, std::move(*awaitable))));
        }
      } catch (...) {
        result.emplaceException(current_exception());
      }

      lock =
          co_await co_viaIfAsync(executor.get_alias(), mutex.co_scoped_lock());

      try {
        results[thisIndex] = std::move(result);
      } catch (...) {
        results[thisIndex].emplaceException(current_exception());
      }
    }
  };

  std::vector<detail::BarrierTask> workerTasks;

  detail::Barrier barrier{1};

  // Save the initial context and restore it after starting each task
  // as the task may have modified the context before suspending and we
  // want to make sure the next task is started with the same initial
  // context.
  const auto context = RequestContext::saveContext();

  auto& asyncFrame = co_await detail::co_current_async_stack_frame;

  try {
    auto lock = co_await mutex.co_scoped_lock();
    while (!iterationException && iter != iterEnd &&
           workerTasks.size() < maxConcurrency) {
      // Unlock the mutex before starting the child operation so that
      // it can consume as many results synchronously as it can before
      // returning here and letting us potentially spawn another task.
      // This can avoid spawning more worker coroutines than is necessary
      // to consume all of the awaitables.
      lock.unlock();

      workerTasks.push_back(makeWorker());
      barrier.add(1);
      workerTasks.back().start(&barrier, asyncFrame);

      RequestContext::setContext(context);

      lock = co_await mutex.co_scoped_lock();
    }
  } catch (...) {
    // Failure to create a worker is an error if we failed
    // to create _any_ workers. As long as we created one then
    // the algorithm should still be able to make forward progress.
    if (workerTasks.empty()) {
      iterationException = exception_wrapper{current_exception()};
    }
  }

  co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};

  if (iterationException) {
    co_yield co_error(std::move(iterationException));
  }

  co_return results;
}

template <typename InputRange>
auto makeUnorderedAsyncGenerator(AsyncScope& scope, InputRange awaitables)
    -> AsyncGenerator<detail::async_generator_from_awaitable_range_item_t<
        InputRange,
        false>&&> {
  return detail::makeUnorderedAsyncGeneratorImpl(
      scope, std::move(awaitables), std::bool_constant<false>{});
}

template <typename InputRange>
auto makeUnorderedTryAsyncGenerator(AsyncScope& scope, InputRange awaitables)
    -> AsyncGenerator<detail::async_generator_from_awaitable_range_item_t<
        InputRange,
        true>&&> {
  return detail::makeUnorderedAsyncGeneratorImpl(
      scope, std::move(awaitables), std::bool_constant<true>{});
}

template <typename InputRange>
auto makeUnorderedAsyncGenerator(
    CancellableAsyncScope& scope, InputRange awaitables)
    -> AsyncGenerator<detail::async_generator_from_awaitable_range_item_t<
        InputRange,
        false>&&> {
  return detail::makeUnorderedAsyncGeneratorImpl(
      scope, std::move(awaitables), std::bool_constant<false>{});
}

template <typename InputRange>
auto makeUnorderedTryAsyncGenerator(
    CancellableAsyncScope& scope, InputRange awaitables)
    -> AsyncGenerator<detail::async_generator_from_awaitable_range_item_t<
        InputRange,
        true>&&> {
  return detail::makeUnorderedAsyncGeneratorImpl(
      scope, std::move(awaitables), std::bool_constant<true>{});
}

template <typename SemiAwaitable, typename... SemiAwaitables>
auto collectAny(SemiAwaitable&& awaitable, SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::pair<
        std::size_t,
        folly::Try<detail::collect_any_component_t<
            SemiAwaitable,
            SemiAwaitables...>>>> {
  return detail::collectAnyImpl(
      std::make_index_sequence<sizeof...(SemiAwaitables) + 1>{},
      static_cast<SemiAwaitable&&>(awaitable),
      static_cast<SemiAwaitables&&>(awaitables)...);
}

template <typename... SemiAwaitables>
auto collectAnyWithoutException(SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::pair<
        std::size_t,
        folly::Try<detail::collect_any_component_t<SemiAwaitables...>>>> {
  return detail::collectAnyWithoutExceptionImpl(
      std::make_index_sequence<sizeof...(SemiAwaitables)>{},
      static_cast<SemiAwaitables&&>(awaitables)...);
}

template <typename... SemiAwaitables>
auto collectAnyNoDiscard(SemiAwaitables&&... awaitables)
    -> folly::coro::Task<std::tuple<detail::collect_all_try_component_t<
        remove_cvref_t<SemiAwaitables>>...>> {
  return detail::collectAnyNoDiscardImpl(
      std::make_index_sequence<sizeof...(SemiAwaitables)>{},
      static_cast<SemiAwaitables&&>(awaitables)...);
}

template <typename InputRange>
auto collectAnyRange(InputRange awaitables)
    -> folly::coro::Task<std::pair<
        size_t,
        folly::Try<detail::collect_all_range_component_t<
            detail::range_reference_t<InputRange>>>>> {
  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  std::pair<
      size_t,
      folly::Try<detail::collect_all_range_component_t<
          detail::range_reference_t<InputRange>>>>
      firstCompletion;
  firstCompletion.first = size_t(-1);

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      size_t index) -> folly::coro::Task<void> {
    auto result = co_await folly::coro::co_awaitTry(std::move(semiAwaitable));
    if (!cancelSource.requestCancellation()) {
      // This is first entity to request cancellation.
      firstCompletion.first = index;
      firstCompletion.second = std::move(result);
    }
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);

  co_await folly::coro::co_withCancellation(
      cancelToken, folly::coro::collectAllRange(detail::MoveRange(tasks)));

  co_return firstCompletion;
}

template <typename InputRange>
auto collectAnyWithoutExceptionRange(InputRange awaitables)
    -> folly::coro::Task<std::pair<
        size_t,
        folly::Try<detail::collect_all_range_component_t<
            detail::range_reference_t<InputRange>>>>> {
  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  size_t nAwaitables;
  std::atomic<std::size_t> nAwaited = 1;
  std::pair<
      size_t,
      folly::Try<detail::collect_all_range_component_t<
          detail::range_reference_t<InputRange>>>>
      firstValueOrLastException;
  firstValueOrLastException.first = std::numeric_limits<size_t>::max();

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      size_t index) -> folly::coro::Task<void> {
    auto result = co_await folly::coro::co_awaitTry(std::move(semiAwaitable));
    if ((result.hasValue() ||
         nAwaited.fetch_add(1, std::memory_order_relaxed) == nAwaitables) &&
        !cancelSource.requestCancellation()) {
      firstValueOrLastException.first = index;
      firstValueOrLastException.second = std::move(result);
    }
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);
  nAwaitables = tasks.size();
  co_await folly::coro::co_withCancellation(
      cancelToken, folly::coro::collectAllRange(detail::MoveRange(tasks)));

  co_return firstValueOrLastException;
}

template <typename InputRange>
auto collectAnyNoDiscardRange(InputRange awaitables)
    -> folly::coro::Task<std::vector<detail::collect_all_try_range_component_t<
        detail::range_reference_t<InputRange>>>> {
  const CancellationToken& parentCancelToken =
      co_await co_current_cancellation_token;
  const CancellationSource cancelSource;
  const CancellationToken cancelToken =
      CancellationToken::merge(parentCancelToken, cancelSource.getToken());

  std::vector<detail::collect_all_try_range_component_t<
      detail::range_reference_t<InputRange>>>
      results;

  using awaitable_type = remove_cvref_t<detail::range_reference_t<InputRange>>;
  auto makeTask = [&](awaitable_type semiAwaitable,
                      size_t index) -> folly::coro::Task<void> {
    auto result = co_await folly::coro::co_awaitTry(std::move(semiAwaitable));
    cancelSource.requestCancellation();
    results[index] = std::move(result);
  };

  auto tasks = detail::collectMakeInnerTaskVec(awaitables, makeTask);

  results.resize(tasks.size());
  co_await folly::coro::co_withCancellation(
      cancelToken, folly::coro::collectAllRange(detail::MoveRange(tasks)));

  co_return results;
}

} // namespace coro
} // namespace folly

#endif // FOLLY_HAS_COROUTINES