folly/folly/channels/Channel.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/Channel-fwd.h>
#include <folly/experimental/channels/detail/ChannelBridge.h>

namespace folly {
namespace channels {

/*
 * A channel is a sender and receiver pair that allows one component to send
 * values to another. A sender and receiver pair is similar to an AsyncPipe and
 * AsyncGenerator pair. However, unlike AsyncPipe/AsyncGenerator, senders and
 * receivers can be used by memory-efficient higher level transformation
 * abstractions.
 *
 * Typical usage:
 *   auto [receiver, sender] = Channel<T>::create();
 *   sender.write(val1);
 *   auto val2 = co_await receiver.next();
 */
template <typename TValue>
class Channel {
 public:
  /**
   * Creates a new channel with a sender/receiver pair. The channel will be
   * closed if the sender is destroyed, and will be cancelled if the receiver is
   * destroyed.
   */
  static std::pair<Receiver<TValue>, Sender<TValue>> create() {
    auto senderBridge = detail::ChannelBridge<TValue>::create();
    auto receiverBridge = senderBridge->copy();
    return std::make_pair(
        Receiver<TValue>(std::move(receiverBridge)),
        Sender<TValue>(std::move(senderBridge)));
  }
};

/**
 * A sender sends values to be consumed by a receiver.
 */
template <typename TValue>
class Sender {
 public:
  friend Channel<TValue>;
  using ValueType = TValue;

  Sender(Sender&& other) noexcept : bridge_(std::move(other.bridge_)) {}

  Sender& operator=(Sender&& other) noexcept {
    if (this == &other) {
      return *this;
    }

    if (bridge_) {
      std::move(*this).close();
    }
    bridge_ = std::move(other.bridge_);
    return *this;
  }

  ~Sender() {
    if (bridge_) {
      std::move(*this).close();
    }
  }

  /**
   * Returns whether or not this sender instance is valid. This will return
   * false if the sender was closed or moved away.
   */
  explicit operator bool() const { return bridge_ != nullptr; }

  /**
   * Writes a value into the pipe.
   */
  template <typename U = TValue>
  void write(U&& element) {
    if (!bridge_->isSenderClosed()) {
      bridge_->senderPush(std::forward<U>(element));
    }
  }

  /**
   * Closes the pipe without an exception.
   */
  void close() && {
    if (!bridge_->isSenderClosed()) {
      bridge_->senderClose();
    }
    bridge_ = nullptr;
  }

  /**
   * Closes the pipe with an exception.
   */
  void close(exception_wrapper exception) && {
    if (!bridge_->isSenderClosed()) {
      bridge_->senderClose(std::move(exception));
    }
    bridge_ = nullptr;
  }

  /**
   * Returns whether or not the corresponding receiver has been cancelled or
   * destroyed.
   */
  bool isReceiverCancelled() {
    if (bridge_->isSenderClosed()) {
      return true;
    }
    auto values = bridge_->senderGetValues();
    if (!values.empty()) {
      bridge_->senderClose();
      return true;
    }
    return false;
  }

 private:
  friend detail::ChannelBridgePtr<TValue>& detail::senderGetBridge<>(
      Sender<TValue>&);

  explicit Sender(detail::ChannelBridgePtr<TValue> bridge)
      : bridge_(std::move(bridge)) {}

  detail::ChannelBridgePtr<TValue> bridge_;
};

/**
 * A receiver that receives values sent by a sender. There are several ways that
 * a receiver can be consumed:
 *
 * 1. Call co_await receiver.next() to get the next value. See the docstring of
 *    next() for more details. This is the easiest way to consume the values
 *    from a receiver, but it is also the most expensive memory-wise (as it
 *    creates a long-lived coroutine frame). This is typically used in scenarios
 *    where O(1) channels are being consumed (and therefore coroutine memory
 *    overhead is negligible).
 *
 * 2. Call consumeChannelWithCallback to get a callback when each value comes
 *    in. See ConsumeChannel.h for more details. This uses less memory than
 *    #1, as it only needs to allocate coroutine frames when processing values
 *    (rather than always having such frames allocated when waiting for values).
 *
 * 3. Use MergeChannel in folly/experimental/channels/MergeChannel.h.
 *    This construct allows you to consume the merged output of a dynamically
 *    changing set of receivers. This is the cheapest way to consume the output
 *    of a large number of receivers. It is useful when the consumer wants to
 *    process all values from all receivers sequentially.
 *
 * 4. Use ChannelProcessor in folly/experimental/channels/ChannelProcessor.h.
 *    This construct allows you to consume a dynamically changing set of
 *    receivers in parallel.
 *
 * 5. A receiver may also be passed to other framework primitives that consume
 *    the receiver (such as transform). As with options 2-4, these primitives
 *    do not require coroutine frames to be allocated when waiting for values.
 */
template <typename TValue>
class Receiver {
  class Waiter;
  struct NextSemiAwaitable;

 public:
  friend Channel<TValue>;
  using ValueType = TValue;

  Receiver() {}

  Receiver(Receiver&& other) noexcept
      : bridge_(std::move(other.bridge_)), buffer_(std::move(other.buffer_)) {}

  Receiver& operator=(Receiver&& other) noexcept {
    if (this == &other) {
      return *this;
    }
    if (bridge_ != nullptr) {
      std::move(*this).cancel();
    }
    bridge_ = std::move(other.bridge_);
    buffer_ = std::move(other.buffer_);
    return *this;
  }

  ~Receiver() {
    if (bridge_ != nullptr) {
      std::move(*this).cancel();
    }
  }

  /**
   * Returns whether or not this receiver instance is valid. This will return
   * false if the receiver was cancelled or moved away.
   */
  explicit operator bool() const { return bridge_ != nullptr; }

  /**
   * Returns the next value sent by a sender. The behavior similar to the
   * behavior of next() on folly::coro::AsyncGenerator<TValue>.
   *
   * When closeOnCancel is true, if the returned semi-awaitable is cancelled,
   * the underlying channel will be closed. No more values will be received,
   * even if they were sent by the sender. This matches the behavior of
   * folly::coro::AsyncGenerator.
   *
   * When closeOnCancel is false, cancelling the returned semi-awaitable will
   * not close the underlying channel. Instead, it will just cancel the next()
   * operation. This means that the caller can call next() again and continue
   * to receive values sent by the sender.
   *
   * If consumed directly with co_await, next() will return an std::optional:
   *
   *    std::optional<TValue> value = co_await receiver.next();
   *
   *    - If a value is sent, the std::optional will contain the value.
   *    - If the channel is closed by the sender with no exception, the optional
   *        will be empty.
   *    - If the channel is closed by the sender with an exception, next() will
   *        throw the exception.
   *    - If the next() call was cancelled, next() will throw an exception of
   *        type folly::OperationCancelled.
   *
   * If consumed with folly::coro::co_awaitTry, this will return a Try:
   *
   *    Try<TValue> value = co_await folly::coro::co_awaitTry(
   *        receiver.next());
   *
   *    - If a value is sent, the Try will contain the value.
   *    - If the channel is closed by the sender with no exception, the try will
   *        be empty (with no value or exception).
   *    - If the channel is closed by the sender with an exception, the try will
   *        contain the exception.
   *    - If the next() call was cancelled, the try will contain an exception of
   *        type folly::OperationCancelled.
   */
  NextSemiAwaitable next(bool closeOnCancel = true) {
    return NextSemiAwaitable(*this ? this : nullptr, closeOnCancel);
  }

  /**
   * Cancels this receiver. If the receiver is currently being consumed, the
   * consumer will receive a folly::OperationCancelled exception.
   */
  void cancel() && {
    bridge_->receiverCancel();
    bridge_ = nullptr;
    buffer_.clear();
  }

 private:
  explicit Receiver(detail::ChannelBridgePtr<TValue> bridge)
      : bridge_(std::move(bridge)) {}

  friend bool detail::receiverWait<>(
      Receiver<TValue>&, detail::IChannelCallback*);

  friend detail::IChannelCallback* detail::cancelReceiverWait<>(
      Receiver<TValue>&);

  friend std::optional<Try<TValue>> detail::receiverGetValue<>(
      Receiver<TValue>&);

  friend std::
      pair<detail::ChannelBridgePtr<TValue>, detail::ReceiverQueue<TValue>>
      detail::receiverUnbuffer<>(Receiver<TValue>&& receiver);

  detail::ChannelBridgePtr<TValue> bridge_;
  detail::ReceiverQueue<TValue> buffer_;
};
} // namespace channels
} // namespace folly

#include <folly/channels/Channel-inl.h>