folly/folly/channels/FanoutChannel-inl.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/Channel.h>
#include <folly/channels/FanoutSender.h>
#include <folly/container/F14Set.h>
#include <folly/executors/SequencedExecutor.h>
#include <folly/experimental/channels/detail/Utility.h>

namespace folly {
namespace channels {

template <typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType>::FanoutChannel(TProcessor* processor)
    : processor_(processor) {}

template <typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType>::FanoutChannel(
    FanoutChannel&& other) noexcept
    : processor_(std::exchange(other.processor_, nullptr)) {}

template <typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType>&
FanoutChannel<ValueType, ContextType>::operator=(
    FanoutChannel&& other) noexcept {
  if (&other == this) {
    return *this;
  }
  if (processor_) {
    std::move(*this).close();
  }
  processor_ = std::exchange(other.processor_, nullptr);
  return *this;
}

template <typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType>::~FanoutChannel() {
  if (processor_ != nullptr) {
    std::move(*this).close(exception_wrapper());
  }
}

template <typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType>::operator bool() const {
  return processor_ != nullptr;
}

template <typename ValueType, typename ContextType>
Receiver<ValueType> FanoutChannel<ValueType, ContextType>::subscribe(
    folly::Function<std::vector<ValueType>(const ContextType&)>
        getInitialValues) {
  return processor_->subscribe(std::move(getInitialValues));
}

template <typename ValueType, typename ContextType>
bool FanoutChannel<ValueType, ContextType>::anySubscribers() const {
  return processor_->anySubscribers();
}

template <typename ValueType, typename ContextType>
void FanoutChannel<ValueType, ContextType>::closeSubscribers(
    exception_wrapper ex) {
  processor_->closeSubscribers(
      ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
}

template <typename ValueType, typename ContextType>
void FanoutChannel<ValueType, ContextType>::close(exception_wrapper ex) && {
  processor_->destroyHandle(
      ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
  processor_ = nullptr;
}

namespace detail {

template <typename ValueType, typename ContextType>
class IFanoutChannelProcessor : public IChannelCallback {
 public:
  virtual Receiver<ValueType> subscribe(
      folly::Function<std::vector<ValueType>(const ContextType&)>
          getInitialValues) = 0;

  virtual bool anySubscribers() = 0;

  virtual void closeSubscribers(CloseResult closeResult) = 0;

  virtual void destroyHandle(CloseResult closeResult) = 0;
};

/**
 * This object fans out values from the input receiver to all output receivers.
 * The lifetime of this object is described by the following state machine.
 *
 * The input receiver can be in one of three conceptual states: Active,
 * CancellationTriggered, or CancellationProcessed (removed). When the input
 * receiver reaches the CancellationProcessed state AND the user's FanoutChannel
 * object is deleted, this object is deleted.
 *
 * When an input receiver receives a value indicating that the channel has
 * been closed, the state of the input receiver transitions from Active directly
 * to CancellationProcessed (and this object will be deleted once the user
 * destroys their FanoutChannel object).
 *
 * When the user destroys their FanoutChannel object, the state of the input
 * receiver transitions from Active to CancellationTriggered. This object will
 * then be deleted once the input receiver transitions to the
 * CancellationProcessed state.
 */
template <typename ValueType, typename ContextType>
class FanoutChannelProcessor
    : public IFanoutChannelProcessor<ValueType, ContextType> {
 private:
  struct State {
    State(ContextType _context) : context(std::move(_context)) {}

    ChannelState getReceiverState() {
      return detail::getReceiverState(receiver.get());
    }

    ChannelBridgePtr<ValueType> receiver;
    FanoutSender<ValueType> fanoutSender;
    ContextType context;
    bool handleDeleted{false};
  };

  using WLockedStatePtr = typename folly::Synchronized<State>::WLockedPtr;

 public:
  explicit FanoutChannelProcessor(
      folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
      ContextType context)
      : executor_(std::move(executor)), state_(std::move(context)) {}

  /**
   * Starts fanning out values from the input receiver to all output receivers.
   *
   * @param inputReceiver: The input receiver to fan out values from.
   */
  void start(Receiver<ValueType> inputReceiver) {
    auto state = state_.wlock();
    auto [unbufferedInputReceiver, buffer] =
        detail::receiverUnbuffer(std::move(inputReceiver));
    state->receiver = std::move(unbufferedInputReceiver);

    // Start processing new values that come in from the input receiver.
    processAllAvailableValues(state, std::move(buffer));
  }

  /**
   * Returns a new output receiver that will receive all values from the input
   * receiver. If a getInitialValues parameter is provided, it will be executed
   * to determine the set of initial values that will (only) go to the new input
   * receiver.
   */
  Receiver<ValueType> subscribe(
      folly::Function<std::vector<ValueType>(const ContextType&)>
          getInitialValues) override {
    auto state = state_.wlock();
    auto initialValues = getInitialValues ? getInitialValues(state->context)
                                          : std::vector<ValueType>();
    if (!state->receiver) {
      auto [receiver, sender] = Channel<ValueType>::create();
      for (auto&& value : initialValues) {
        sender.write(std::move(value));
      }
      std::move(sender).close();
      return std::move(receiver);
    }
    return state->fanoutSender.subscribe(std::move(initialValues));
  }

  /**
   * Closes all subscribers without closing the fanout channel.
   */
  void closeSubscribers(CloseResult closeResult) {
    auto state = state_.wlock();
    std::move(state->fanoutSender)
        .close(
            closeResult.exception.has_value() ? closeResult.exception.value()
                                              : exception_wrapper());
  }

  /**
   * This is called when the user's FanoutChannel object has been destroyed.
   */
  void destroyHandle(CloseResult closeResult) {
    auto state = state_.wlock();
    processHandleDestroyed(state, std::move(closeResult));
  }

  /**
   * Returns whether this fanout channel has any output receivers.
   */
  bool anySubscribers() override {
    return state_.wlock()->fanoutSender.anySubscribers();
  }

 private:
  /**
   * Called when one of the channels we are listening to has an update (either
   * a value from the input receiver or a cancellation from an output receiver).
   */
  void consume(ChannelBridgeBase*) override {
    executor_->add([=, this]() {
      // One or more values are now available from the input receiver.
      auto state = state_.wlock();
      CHECK_NE(state->getReceiverState(), ChannelState::CancellationProcessed);
      processAllAvailableValues(state);
    });
  }

  void canceled(ChannelBridgeBase*) override {
    executor_->add([=, this]() {
      // We previously cancelled this input receiver, due to the destruction of
      // the handle. Process the cancellation for this input receiver.
      auto state = state_.wlock();
      processReceiverCancelled(state, CloseResult());
    });
  }

  /**
   * Processes all available values from the input receiver (starting from the
   * provided buffer, if present).
   *
   * If an value was received indicating that the input channel has been closed
   * (or if the transform function indicated that channel should be closed), we
   * will process cancellation for the input receiver.
   */
  void processAllAvailableValues(
      WLockedStatePtr& state,
      std::optional<ReceiverQueue<ValueType>> buffer = std::nullopt) {
    auto closeResult = state->receiver->isReceiverCancelled()
        ? CloseResult()
        : (buffer.has_value() ? processValues(state, std::move(buffer.value()))
                              : std::nullopt);
    while (!closeResult.has_value()) {
      if (state->receiver->receiverWait(this)) {
        // There are no more values available right now. We will stop processing
        // until the channel fires the consume() callback (indicating that more
        // values are available).
        break;
      }
      auto values = state->receiver->receiverGetValues();
      CHECK(!values.empty());
      closeResult = processValues(state, std::move(values));
    }
    if (closeResult.has_value()) {
      // The receiver received a value indicating channel closure.
      state->receiver->receiverCancel();
      processReceiverCancelled(state, std::move(closeResult.value()));
    }
  }

  /**
   * Processes the given set of values for the input receiver. Returns a
   * CloseResult if channel was closed, so the caller can stop attempting to
   * process values from it.
   */
  std::optional<CloseResult> processValues(
      WLockedStatePtr& state, ReceiverQueue<ValueType> values) {
    while (!values.empty()) {
      auto inputResult = std::move(values.front());
      values.pop();
      if (inputResult.hasValue()) {
        // We have received a normal value from the input receiver. Write it to
        // all output senders.
        state->context.update(
            inputResult.value(), state->fanoutSender.numSubscribers());
        state->fanoutSender.write(std::move(inputResult.value()));
      } else {
        // The input receiver was closed.
        return inputResult.hasException()
            ? CloseResult(std::move(inputResult.exception()))
            : CloseResult();
      }
    }
    return std::nullopt;
  }

  /**
   * Processes the cancellation of the input receiver. We will close all senders
   * with the exception received from the input receiver (if any).
   */
  void processReceiverCancelled(
      WLockedStatePtr& state, CloseResult closeResult) {
    CHECK_EQ(state->getReceiverState(), ChannelState::CancellationTriggered);
    state->receiver = nullptr;
    std::move(state->fanoutSender)
        .close(
            closeResult.exception.has_value() ? closeResult.exception.value()
                                              : exception_wrapper());
    maybeDelete(state);
  }

  /**
   * Processes the destruction of the user's FanoutChannel object.  We will
   * cancel the receiver and trigger cancellation for all senders not already
   * cancelled.
   */
  void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) {
    state->handleDeleted = true;
    if (state->getReceiverState() == ChannelState::Active) {
      state->receiver->receiverCancel();
    }
    std::move(state->fanoutSender)
        .close(
            closeResult.exception.has_value() ? closeResult.exception.value()
                                              : exception_wrapper());
    maybeDelete(state);
  }

  /**
   * Deletes this object if we have already processed cancellation for the
   * receiver and all senders, and if the user's FanoutChannel object was
   * destroyed.
   */
  void maybeDelete(WLockedStatePtr& state) {
    if (state->getReceiverState() == ChannelState::CancellationProcessed &&
        state->handleDeleted) {
      state.unlock();
      delete this;
    }
  }

  folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
  folly::Synchronized<State> state_;
};
} // namespace detail

template <typename TReceiver, typename ValueType, typename ContextType>
FanoutChannel<ValueType, ContextType> createFanoutChannel(
    TReceiver inputReceiver,
    folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
    ContextType context) {
  auto* processor = new detail::FanoutChannelProcessor<ValueType, ContextType>(
      std::move(executor), std::move(context));
  processor->start(std::move(inputReceiver));
  return FanoutChannel<ValueType, ContextType>(processor);
}
} // namespace channels
} // namespace folly