folly/folly/channels/Producer.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/channels/ChannelCallbackHandle.h>
#include <folly/executors/SequencedExecutor.h>
#include <folly/experimental/coro/Task.h>

namespace folly {
namespace channels {

/**
 * A Producer is a base class for an object that produces a channel. The
 * subclass can call write to write a new value to the channel, and close to
 * close the channel. It is a useful way to generate output values for a
 * receiver, without having to keep alive an extraneous object that produces
 * those values.
 *
 * When the consumer of the channel stops consuming, the onClosed function will
 * be called. The subclass should cancel any ongoing work in this function.
 * After onCancelled is called, the object will be deleted once the last
 * outstanding KeepAlive is destroyed.
 *
 * Example:
 *   // Function that returns an executor
 *   folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
 *
 *   // Function that returns output values
 *   std::vector<int> getLatestOutputValues();
 *
 *   // Example producer implementation
 *   class PollingProducer : public Producer<int> {
 *    public:
 *     PollingProducer(
 *         Sender<int> sender,
 *         folly::Executor::KeepAlive<folly::SequencedExecutor> executor)
 *         : Producer<int>(std::move(sender), std::move(executor)) {
 *       // Start polling for values.
 *       folly::coro::co_withCancellation(
 *             cancelSource_.getToken(),
 *             [=, keepAlive = getKeepAlive()]() {
 *               return pollForOutputValues();
 *             })
 *         .scheduleOn(getExecutor())
 *         .start();
 *     }
 *
 *     folly::coro::Task<void> onClosed() override {
 *       // The consumer has stopped consuming our values. Stop polling.
 *       cancelSource_.requestCancellation();
 *     }
 *
 *    private:
 *     folly::coro::Task<void> pollForOutputValues() {
 *       auto cancelToken = co_await folly::coro::co_current_cancellation_token;
 *       while (!cancelToken.isCancellationRequested()) {
 *         auto outputValues = getLatestOutputValues();
 *         for (auto& outputValue : outputValues) {
 *           write(std::move(outputValue));
 *         }
 *       }
 *       co_await folly::coro::sleep(std::chrono::seconds(1));
 *     }
 *
 *     folly::CancellationSource cancelSource_;
 *   };
 *
 *   // Producer usage
 *   Receiver<int> receiver = makeProducer<PollingProducer>(getExecutor());
 */
template <typename TValue>
class Producer : public detail::IChannelCallback {
 public:
  using ValueType = TValue;

 protected:
  /**
   * This object will ensure that the corresponding Producer that created it
   * will not be destroyed.
   */
  class KeepAlive {
   public:
    ~KeepAlive();
    KeepAlive(KeepAlive&&) noexcept;
    KeepAlive& operator=(KeepAlive&&) noexcept;

   private:
    friend class Producer<TValue>;

    explicit KeepAlive(Producer<TValue>* ptr);

    Producer<TValue>* ptr_;
  };

  Producer(
      Sender<TValue> sender,
      folly::Executor::KeepAlive<folly::SequencedExecutor> executor);
  virtual ~Producer() override = default;

  /**
   * Writes a value into the channel.
   */
  void write(TValue value);

  /**
   * Closes the channel.
   */
  void close(std::optional<exception_wrapper> ex = std::nullopt);

  /**
   * Returns whether or not this producer is closed or cancelled.
   */
  bool isClosed();

  /**
   * Returns the executor used for this producer.
   */
  folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();

  /**
   * Returns a KeepAlive object. This object will not be destroyed before all
   * KeepAlive objects are destroyed.
   */
  KeepAlive getKeepAlive();

  /**
   * Called when the corresponding receiver is cancelled, or the sender is
   * closed.
   */
  virtual folly::coro::Task<void> onClosed() { co_return; }

  /**
   * If you get an error that this function is not implemented, do not
   * implement it. Instead, create your object with makeProducer
   * below.
   */
  virtual void ensureMakeProducerUsedForCreation() = 0;

 private:
  template <typename TProducer, typename... Args>
  friend Receiver<typename TProducer::ValueType> makeProducer(
      folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
      Args&&... args);

  void consume(detail::ChannelBridgeBase* bridge) override;

  void canceled(detail::ChannelBridgeBase* bridge) override;

  detail::ChannelBridgePtr<TValue> sender_;
  folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
  std::atomic<int> refCount_{1};
};

/**
 * Creates a new object that extends the Producer class, and returns a receiver.
 * The receiver will receive any values produced by the producer. See the
 * description of the Producer class for information on how to implement a
 * producer.
 */
template <typename TProducer, typename... Args>
Receiver<typename TProducer::ValueType> makeProducer(
    folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
    Args&&... args);
} // namespace channels
} // namespace folly

#include <folly/channels/Producer-inl.h>