folly/folly/channels/FanoutChannel.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/Channel.h>
#include <folly/executors/SequencedExecutor.h>

namespace folly {
namespace channels {

namespace detail {
template <typename ValueType, typename ContextType>
class IFanoutChannelProcessor;
}

template <typename TValue>
struct NoContext {
  void update(const TValue&, size_t) {}
};

/**
 * A fanout channel allows fanning out updates from a single input receiver
 * to multiple output receivers.
 *
 * When a new output receiver is added, an optional function will be run that
 * computes a set of initial values. These initial values will only be sent to
 * the new receiver.
 *
 * FanoutChannel allows specifying an optional context object. If specified, the
 * context object must have a void update function:
 *
 *   void update(const ValueType&);
 *
 * This update function will be called on every value from the input receiver.
 * The context will be passed to the getInitialUpdates argument to subscribe,
 * allowing for initial updates to depend on the context. This facilitates the
 * common pattern of letting new subscribers know where they are starting from.
 *
 * Example without context:
 *
 *   // Function that returns a receiver:
 *   Receiver<int> getInputReceiver();
 *
 *   // Function that returns an executor
 *   folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
 *
 *   auto fanoutChannel = createFanoutChannel(getReceiver(), getExecutor());
 *   auto receiver1 = fanoutChannel.subscribe();
 *   auto receiver2 = fanoutChannel.subscribe();
 *   auto receiver3 = fanoutChannel.subscribe([]{ return {1, 2, 3}; });
 *
 * Example with context:
 *
 *   struct Context {
 *     int lastValue{-1};
 *
 *     void update(const int& value) {
 *       lastValue = value;
 *     }
 *   };
 *
 *   auto fanoutChannel =
 *       createFanoutChannel(getReceiver(), getExecutor(), Context());
 *   auto receiver1 = fanoutChannel.subscribe(
 *       [](const Context& context) { return {context.latestValue}; });
 *   auto receiver2 = fanoutChannel.subscribe(
 *       [](const Context& context) { return {context.latestValue}; });
 *   std::move(fanoutChannel).close();
 */
template <typename ValueType, typename ContextType = NoContext<ValueType>>
class FanoutChannel {
  using TProcessor = detail::IFanoutChannelProcessor<ValueType, ContextType>;

 public:
  explicit FanoutChannel(TProcessor* processor);
  FanoutChannel(FanoutChannel&& other) noexcept;
  FanoutChannel& operator=(FanoutChannel&& other) noexcept;
  ~FanoutChannel();

  /**
   * Returns whether this FanoutChannel is a valid object.
   */
  explicit operator bool() const;

  /**
   * Returns a new output receiver that will receive all values from the input
   * receiver.
   *
   * If a getInitialValues parameter is provided, it will be executed
   * to determine the set of initial values that will (only) go to the new input
   * receiver. Other functions on this class should not be called from within
   * getInitialValues, or a deadlock will occur.
   */
  Receiver<ValueType> subscribe(
      folly::Function<std::vector<ValueType>(const ContextType&)>
          getInitialValues = {});

  /**
   * Returns whether this fanout channel has any subscribers.
   */
  bool anySubscribers() const;

  /**
   * Closes all subscribers, without closing the fanout channel. New subscribers
   * can be added after this call.
   */
  void closeSubscribers(exception_wrapper ex = exception_wrapper());

  /**
   * Closes the fanout channel.
   */
  void close(exception_wrapper ex = exception_wrapper()) &&;

 private:
  TProcessor* processor_;
};

/**
 * Creates a new fanout channel that fans out updates from an input receiver.
 */
template <
    typename ReceiverType,
    typename ValueType = typename ReceiverType::ValueType,
    typename ContextType = NoContext<typename ReceiverType::ValueType>>
FanoutChannel<ValueType, ContextType> createFanoutChannel(
    ReceiverType inputReceiver,
    folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
    ContextType context = ContextType());
} // namespace channels
} // namespace folly

#include <folly/channels/FanoutChannel-inl.h>