folly/folly/channels/ChannelProcessor.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/Executor.h>
#include <folly/channels/RateLimiter.h>

namespace folly {
namespace channels {

namespace detail {
template <typename KeyType>
class ChannelProcessorImpl;
}

/**
 * This object allows for memory-efficient processing of values many channels.
 *
 * A channel is added with a unique key and a callback. The callback will be
 * called for every value pushed to the receiver.
 *
 * A resumable channel can also be added. A resumable channel involves two
 * callbacks. An initialization callback is called to get the receiver, and the
 * update callback is called on every update (as for a normal channel). The
 * update callback can throw a ReinitializeException at any time, which will
 * trigger the initialize callback to re-run.
 *
 * Values for a given channel are processed until one of the following occurs:
 *     1. The channel is closed
 *     2. The channel callback throws an OnClosedException
 *     3. The channel callback throws a folly::OperationCancelled exception.
 *     4. The channel is removed with a call to removeChannel.
 *
 * If a channel is removed with removeChannel, processing will eventually stop
 * for that channel. This will not necessarily happen immediately.
 *
 * If a channel is added for an already existing key, the previous channel for
 * that key will be removed and processing will eventually stop.
 *
 * Processing for all channels will run on the user-provided executor. For any
 * particular channel, all processing will happen sequentially. For any two
 * distinct channels, processing may happen in parallel (subject to any
 * constraints of the provided executor).
 */
template <typename KeyType>
class ChannelProcessor {
 public:
  explicit ChannelProcessor(
      std::unique_ptr<detail::ChannelProcessorImpl<KeyType>> impl);

  /**
   * Returns whether this ChannelProcessor is a valid object.
   */
  explicit operator bool() const;

  /**
   * Processes a channel with a given key and callback. For a receiver of type
   * Receiver<InputValueType>, the callback must accept a single parameter of
   * type Try<InputValueType>, and return a void task. If the callback
   * throws an exception of type OperationCancelled or OnClosedException, the
   * channel will be removed. Any other exception thrown by the callback will
   * terminate the process.
   *
   * If there is an existing channel with the same key, it will be removed
   * before the new channel is added. The old channel's callback can check the
   * current cancellation token to see if it was removed while processing
   * values. See removeChannel for more details.
   *
   * Example:
   *
   *   // Example function that returns a receiver for a given entity
   *   Receiver<int> subscribe(const std::string& entity);
   *
   *   // Example function that returns an executor
   *   folly::Executor::KeepAlive<> getExecutor();
   *
   *   auto channelProcessor = createChannelProcessor<std::string>(
   *       getExecutor());
   *
   *   channelProcessor.addChannel(
   *       "abc",
   *       subscribe("abc"),
   *       [](Try<int> value) -> folly::coro::Task<void> {
   *         LOG(INFO) << fmt::format("Received value {}", *value);
   *         co_return;
   *       });
   */
  template <typename ReceiverType, typename OnUpdateFunc>
  void addChannel(KeyType key, ReceiverType receiver, OnUpdateFunc onUpdate);

  /**
   * Processing a resumable channel involves two callbacks. The initialization
   * callback accepts an initialization argument of a user-defined type, and
   * must return a folly::coro::Task<Receiver<InputValueType>>. The onUpdate
   * callback accepts a Try<InputValueType>, and returns a void task. The
   * onUpdate callback can throw a ReinitializeException<InitializeArg> at any
   * time, which will trigger the initialize function to be run again. In
   * addition, if either callback throws an exception of type OperationCancelled
   * or OnClosedException, the channel will be removed. Any other exception
   * thrown by either callback will terminate the process.
   *
   * If there is an existing channel with the same key, it will be removed
   * before the new channel is added. The old channel's callbacks can check the
   * current cancellation token to see if it was removed while processing
   * values. See removeChannel for more details.
   *
   * Example:
   *
   *   struct InitializeArg {
   *     std::string param;
   *   }
   *
   *   // Example function that returns a receiver for a given entity
   *   Receiver<int> subscribe(const InitializeArg& initializeArg);
   *
   *   // Example function that returns an executor
   *   folly::Executor::KeepAlive<> getExecutor();
   *
   *   auto channelProcessor = createChannelProcessor<std::string>(
   *       getExecutor());
   *
   *   channelProcessor.addResumableChannel(
   *       "abc",
   *       InitializeArg({"param"}),
   *       [](InitializeArg initializeArg) -> folly::coro::Task<Receiver<int>> {
   *         co_return subscribe(initializeArg);
   *       },
   *       [](Try<int> value) -> folly::coro::Task<void> {
   *         if (*value == -1) {
   *           throw ReinitializeException(InitializeArg({"param"}));
   *         }
   *         LOG(INFO) << fmt::format("Received value {}", *value);
   *         co_return;
   *       });
   */
  template <
      typename InitializeArg,
      typename InitializeFunc,
      typename OnUpdateFunc>
  void addResumableChannel(
      KeyType key,
      InitializeArg initializeArg,
      InitializeFunc initialize,
      OnUpdateFunc onUpdate);

  /*
   * This is similar to addResumableChannel. However, it allows a user-provided
   * state object to be stored with the channel. That state object will be
   * passed to both callbacks, and will be destructed when the channel is
   * removed or closed.
   *
   * * Example:
   *
   *   struct InitializeArg {
   *     std::string param;
   *   }
   *
   *   struct State {
   *     int prevValue{-1};
   *   }
   *
   *   // Example function that returns a receiver for a given entity
   *   Receiver<int> subscribe(const InitializeArg& initializeArg);
   *
   *   // Example function that returns an executor
   *   folly::Executor::KeepAlive<> getExecutor();
   *
   *   auto channelProcessor = createChannelProcessor<std::string>(
   *       getExecutor());
   *
   *   channelProcessor.addResumableChannelWithState(
   *       "abc",
   *       InitializeArg({"param"}),
   *       [](InitializeArg initializeArg, State& state)
   *                        -> folly::coro::Task<Receiver<int>> {
   *         co_return subscribe(initializeArg);
   *       },
   *       [](Try<int> value, State& state) -> folly::coro::Task<void> {
   *         if (*value == -1) {
   *           throw ReinitializeException(InitializeArg({"param"}));
   *         }
   *         LOG(INFO) << fmt::format(
   *             "Received value {}. Previous: {}.", *value, state.prevValue);
   *         state.prevValue = *value;
   *         co_return;
   *       },
   *       State());
   */
  template <
      typename InitializeArg,
      typename InitializeFunc,
      typename OnUpdateFunc,
      typename ChannelState>
  void addResumableChannelWithState(
      KeyType key,
      InitializeArg initializeArg,
      InitializeFunc initialize,
      OnUpdateFunc onUpdate,
      ChannelState channelState);

  /**
   * Removes the channel with the given key, if such a channel exists. The
   * channel will be asynchronously removed, so the channels' callback may
   * still receive some values after this call. The callback can detect whether
   * or not the channel was removed by examining its current cancellation token.
   */
  void removeChannel(const KeyType& keyType);

  /**
   * Closes all channels being processed, causing all processing to eventually
   * stop. Calling this function will make the object invalid.
   */
  void close() &&;

 private:
  std::unique_ptr<detail::ChannelProcessorImpl<KeyType>> impl_;
};

/**
 * Creates a new channel processor.
 */
template <typename KeyType>
ChannelProcessor<KeyType> createChannelProcessor(
    folly::Executor::KeepAlive<> executor,
    std::shared_ptr<RateLimiter> rateLimiter = nullptr,
    size_t numSequencedExecutors = 1);

/**
 * Creates a new channel processor.
 */
template <typename KeyType>
ChannelProcessor<KeyType> createChannelProcessor(
    std::vector<folly::Executor::KeepAlive<folly::SequencedExecutor>> executors,
    std::shared_ptr<RateLimiter> rateLimiter = nullptr);
} // namespace channels
} // namespace folly

#include <folly/channels/ChannelProcessor-inl.h>