folly/folly/channels/FanoutSender.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/Channel.h>
#include <folly/container/F14Set.h>
#include <folly/experimental/channels/detail/PointerVariant.h>

namespace folly {
namespace channels {

namespace detail {
template <typename ValueType>
class FanoutSenderProcessor;
}

/**
 * A FanoutSender allows fanning out updates to multiple output receivers.
 * Values can be written as with a normal Sender. When there is only one output
 * receiver, the memory used by a FanoutSender (and the corresponding output
 * receiver) is the same as the memory used by a normal channel.
 *
 * When a new output receiver is added, an optional vector of initial values
 * can be provided. These initial values will only be sent to the new receiver.
 *
 * Memory used by closed receivers is reclaimed lazily (when iterating over
 * receivers).
 *
 * Example:
 *
 *  FanoutSender<int> fanoutSender;
 *  auto receiver1 = fanoutSender.subscribe();
 *  auto receiver2 = fanoutSender.subscribe();
 *  auto receiver3 = fanoutSender.subscribe({1, 2, 3});
 *  std::move(fanoutSender).close();
 */
template <typename ValueType>
class FanoutSender {
 public:
  FanoutSender();
  FanoutSender(FanoutSender&& other) noexcept;
  FanoutSender& operator=(FanoutSender&& other) noexcept;
  ~FanoutSender();

  /**
   * Returns a new output receiver that will receive all values written to the
   * FanoutSender. If the initialValues parameter is provided, the given values
   * will (only) go to the new output receiver.
   */
  Receiver<ValueType> subscribe(std::vector<ValueType> initialValues = {});

  /**
   * Subscribes with an already-created sender.
   */
  void subscribe(Sender<ValueType> sender);

  /**
   * Returns whether this fanout sender has any active output receivers.
   */
  bool anySubscribers() const;

  /**
   * Returns the number of output receivers for this fanout sender.
   */
  std::uint64_t numSubscribers() const;

  /**
   * Sends the given value to all corresponding receivers.
   */
  template <typename U = ValueType>
  void write(U&& element);

  /**
   * Closes the fanout sender.
   */
  void close(exception_wrapper ex = exception_wrapper()) &&;

 private:
  bool anySubscribersImpl() const;

  bool hasProcessor() const;

  detail::ChannelBridge<ValueType>* getSingleSender() const;

  detail::FanoutSenderProcessor<ValueType>* getProcessor() const;

  void clearSendersWithClosedReceivers() const;

  mutable detail::PointerVariant<
      detail::ChannelBridge<ValueType>,
      detail::FanoutSenderProcessor<ValueType>>
      senders_;
};
} // namespace channels
} // namespace folly

#include <folly/channels/FanoutSender-inl.h>