folly/folly/channels/MergeChannel.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/Channel.h>
#include <folly/container/F14Set.h>
#include <folly/executors/SequencedExecutor.h>

namespace folly {
namespace channels {

namespace detail {
template <typename KeyType, typename ValueType>
class IMergeChannelProcessor;
}

struct MergeChannelReceiverAdded {};
struct MergeChannelReceiverRemoved {};
struct MergeChannelReceiverClosed {
  exception_wrapper exception;
};

template <typename KeyType, typename ValueType>
struct MergeChannelEvent {
  using EventType = std::variant<
      ValueType,
      MergeChannelReceiverAdded,
      MergeChannelReceiverRemoved,
      MergeChannelReceiverClosed>;

  KeyType key;
  EventType event;
};

/**
 * A merge channel allows one to merge multiple receivers into a single
 * output receiver. The set of receivers being merged can be changed at
 * runtime. Each receiver is added with a key that can be used to remove
 * the receiver at a later point.
 *
 * Example:
 *
 *  // Example function that returns a receiver for a given entity:
 *  Receiver<int> subscribe(std::string entity);
 *
 *  // Example function that returns an executor
 *  folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
 *
 *  auto [outputReceiver, mergeChannel]
 *      = createMergeChannel<std::string, int>(getExecutor());
 *  mergeChannel.addNewReceiver("abc", subscribe("abc"));
 *  mergeChannel.addNewReceiver("def", subscribe("def"));
 *  mergeChannel.removeReceiver("abc");
 *  std::move(mergeChannel).close();
 */
template <typename KeyType, typename ValueType>
class MergeChannel {
  using TProcessor = detail::IMergeChannelProcessor<KeyType, ValueType>;

 public:
  explicit MergeChannel(
      detail::IMergeChannelProcessor<KeyType, ValueType>* processor);
  MergeChannel(MergeChannel&& other) noexcept;
  MergeChannel& operator=(MergeChannel&& other) noexcept;
  ~MergeChannel();

  /**
   * Returns whether this MergeChannel is a valid object.
   */
  explicit operator bool() const;

  /**
   * Adds a new receiver to be merged, along with a given key. If the key
   * matches the key of an existing receiver, that existing receiver is replaced
   * with the new one (and updates from the old receiver will no longer be
   * merged). An added receiver can later be removed by passing the same key to
   * removeReceiver.
   */
  template <typename TReceiver>
  void addNewReceiver(KeyType key, TReceiver receiver);

  /**
   * Removes the receiver added with the given key. The receiver will be
   * asynchronously removed, so the consumer may still receive some values from
   * this receiver after this call.
   */
  void removeReceiver(KeyType key);

  /**
   * Returns a set of keys for receivers that are merged into this MergeChannel.
   */
  folly::F14FastSet<KeyType> getReceiverKeys();

  /**
   * Closes the merge channel.
   */
  void close(std::optional<exception_wrapper> ex = std::nullopt) &&;

 private:
  TProcessor* processor_;
};

/**
 * Creates a new merge channel.
 *
 * @param executor: The SequencedExecutor to use for merging values.
 */
template <typename KeyType, typename ValueType>
std::pair<
    Receiver<MergeChannelEvent<KeyType, ValueType>>,
    MergeChannel<KeyType, ValueType>>
createMergeChannel(
    folly::Executor::KeepAlive<folly::SequencedExecutor> executor);
} // namespace channels
} // namespace folly

#include <folly/channels/MergeChannel-inl.h>