folly/folly/coro/Promise.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <utility>

#include <folly/CancellationToken.h>
#include <folly/Try.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/Coroutine.h>
#include <folly/futures/Promise.h>
#include <folly/synchronization/RelaxedAtomic.h>

#if FOLLY_HAS_COROUTINES

namespace folly::coro {
template <typename T>
class Promise;
template <typename T>
class Future;

// Creates promise and associated unfulfilled future
template <typename T>
std::pair<Promise<T>, Future<T>> makePromiseContract();

// Creates fulfilled future
template <typename T>
Future<remove_cvref_t<T>> makeFuture(T&&);
template <typename T>
Future<T> makeFuture(exception_wrapper&&);
Future<void> makeFuture();

namespace detail {
template <typename T>
struct PromiseState {
  PromiseState() = default;

  Try<T> result;
  // Must be exchanged to true before setting result
  folly::relaxed_atomic<bool> fulfilled{false};
  // Must be posted after setting result
  coro::Baton ready;
};
} // namespace detail

template <typename T>
class Promise {
 public:
  /**
   * Construct an empty Promise.
   *
   * This object is not valid use until you initialize it with move assignment.
   */
  Promise() = default;

  Promise(Promise&& other) noexcept
      : ct_(std::move(other.ct_)),
        state_(std::exchange(other.state_, nullptr)) {}
  Promise& operator=(Promise&& other) noexcept {
    if (this != &other && state_ && !state_->fulfilled) {
      setException(BrokenPromise{tag<T>});
    }
    ct_ = std::move(other.ct_);
    state_ = std::exchange(other.state_, nullptr);
    return *this;
  }
  Promise(const Promise&) = delete;
  Promise& operator=(const Promise&) = delete;

  ~Promise() {
    if (state_ && !state_->fulfilled) {
      setException(BrokenPromise{tag<T>});
    }
  }

  bool valid() const noexcept { return state_; }

  bool isFulfilled() const noexcept { return state_ && state_->fulfilled; }

  template <typename... Args>
  void setValue(Args&&... args) {
    trySetValue(std::forward<Args>(args)...);
  }

  template <typename... Args>
  void setException(Args&&... args) {
    trySetException(std::forward<Args>(args)...);
  }

  void setResult(Try<T>&& result) { trySetResult(std::move(result)); }

  /**
   * Fulfills the promise with a value if not already fulfilled.
   * @returns Whether the fulfillment took place.
   */
  template <typename... Args>
  bool trySetValue(Args&&... args) {
    DCHECK(state_);
    if (state_->fulfilled.exchange(true)) {
      return false;
    }
    if constexpr (std::is_void_v<T>) {
      static_assert(sizeof...(Args) == 0);
    } else {
      state_->result.emplace(std::forward<Args>(args)...);
    }
    state_->ready.post();
    return true;
  }

  /**
   * Fulfills the promise with an exception if not already fulfilled.
   * @returns Whether the fulfillment took place.
   */
  template <typename... Args>
  bool trySetException(Args&&... args) {
    DCHECK(state_);
    if (state_->fulfilled.exchange(true)) {
      return false;
    }
    state_->result.emplaceException(std::forward<Args>(args)...);
    state_->ready.post();
    return true;
  }

  /**
   * Fulfills the promise with a Try if not already fulfilled.
   * @returns Whether the fulfillment took place.
   */
  bool trySetResult(Try<T>&& result) {
    DCHECK(state_);
    if (state_->fulfilled.exchange(true)) {
      return false;
    }
    state_->result = std::move(result);
    state_->ready.post();
    return true;
  }

  /**
   * Fulfills the promise with a value/Try returned from calling func if not
   * already fulfilled.
   *
   * If either the call to func or the result's constructor completes with an
   * exception then the exception is caught and stored as the result.
   *
   * @returns Whether the fulfillment took place.
   */
  template <typename Func>
  bool trySetWith(Func&& func) {
    DCHECK(state_);
    if (state_->fulfilled.exchange(true)) {
      return false;
    }
    try {
      state_->result = Try<T>(std::forward<Func>(func)());
    } catch (...) {
      state_->result.emplaceException(current_exception());
    }
    state_->ready.post();
    return true;
  }

  /**
   * Fulfills the promise with an exception returned from calling func if not
   * already fulfilled.
   *
   * If either the call to func or the result's constructor completes with an
   * exception then the exception is caught and stored as the result.
   *
   * @returns Whether the fulfillment took place.
   */
  template <typename Func>
  bool trySetExceptionWith(Func&& func) {
    DCHECK(state_);
    if (state_->fulfilled.exchange(true)) {
      return false;
    }
    try {
      state_->result.emplaceException(std::forward<Func>(func)());
    } catch (...) {
      state_->result.emplaceException(current_exception());
    }
    state_->ready.post();
    return true;
  }

  const CancellationToken& getCancellationToken() const { return ct_; }

 private:
  Promise(CancellationToken ct, detail::PromiseState<T>& state)
      : ct_(std::move(ct)), state_(&state) {}

  CancellationToken ct_;
  detail::PromiseState<T>* state_{nullptr};

  friend std::pair<Promise<T>, Future<T>> makePromiseContract<T>();
};

template <typename T>
class Future {
 public:
  /**
   * Construct an empty Future.
   *
   * This object is not valid use until you initialize it with move assignment.
   */
  Future() = default;

  Future(Future&&) noexcept = default;
  Future& operator=(Future&&) noexcept = default;
  Future(const Future&) = delete;
  Future& operator=(const Future&) = delete;

  class WaitOperation : private Baton::WaitOperation {
   public:
    explicit WaitOperation(Future& future) noexcept
        : Baton::WaitOperation(future.state_->ready),
          future_(future),
          cb_(std::move(future.ct_), [&] { future_.cancel(); }) {}

    using Baton::WaitOperation::await_ready;
    using Baton::WaitOperation::await_suspend;

    T await_resume() {
      if constexpr (!std::is_void_v<T>) {
        return std::move(future_.state_->result.value());
      } else {
        future_.state_->result.throwIfFailed();
      }
    }

    folly::Try<T> await_resume_try() {
      return std::move(future_.state_->result);
    }

   private:
    Future& future_;
    CancellationCallback cb_;
  };

  [[nodiscard]] WaitOperation operator co_await() && noexcept {
    return WaitOperation{*this};
  }

  bool isReady() const noexcept { return state_->ready.ready(); }

  friend Future co_withCancellation(
      folly::CancellationToken ct, Future&& future) noexcept {
    if (!std::exchange(future.hasCancelTokenOverride_, true)) {
      future.ct_ = std::move(ct);
    }
    return std::move(future);
  }

 private:
  Future(CancellationSource cs, detail::PromiseState<T>& state)
      : cs_(std::move(cs)), state_(&state) {}

  void cancel() {
    if (!state_->fulfilled.exchange(true)) {
      cs_.requestCancellation();
      state_->result.emplaceException(OperationCancelled{});
      state_->ready.post();
    }
  }

  CancellationSource cs_;
  detail::PromiseState<T>* state_{nullptr};
  // The token inherited when the future is awaited
  CancellationToken ct_;
  bool hasCancelTokenOverride_{false};

  friend std::pair<Promise<T>, Future<T>> makePromiseContract<T>();
};

/**
 * makePromiseContract can help you migrating your non-coroutine code base to
 * coroutine. If your code already uses Future/SemiFuture, you don't need this
 * tool. A common use case is with async callback functions. In the example, we
 * can pass a callback function into the legacy code sleepAndNotify and
 * sleepAndNotify sets the promise on completion. Consider to use detachOnCancel
 * with this makePromiseContract to handle long running (longer than your
 * timeout) tasks that don't handle cancellation properly.
 *
 * \refcode folly/docs/examples/folly/experimental/coro/Promise.cpp
 */
template <typename T>
std::pair<Promise<T>, Future<T>> makePromiseContract() {
  auto [cs, data] = CancellationSource::create(
      folly::detail::WithDataTag<detail::PromiseState<T>>{});
  return {
      Promise<T>{cs.getToken(), std::get<0>(*data)},
      Future<T>{std::move(cs), std::get<0>(*data)}};
}

template <typename T>
Future<remove_cvref_t<T>> makeFuture(T&& t) {
  auto [promise, future] = makePromiseContract<remove_cvref_t<T>>();
  promise.setValue(std::forward<T>(t));
  return std::move(future);
}
template <typename T>
Future<T> makeFuture(exception_wrapper&& ex) {
  auto [promise, future] = makePromiseContract<T>();
  promise.setException(std::move(ex));
  return std::move(future);
}
inline Future<void> makeFuture() {
  auto [promise, future] = makePromiseContract<void>();
  promise.setValue();
  return std::move(future);
}

} // namespace folly::coro

#endif