/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/channels/Channel.h>
#include <folly/channels/OnClosedException.h>
#include <folly/channels/RateLimiter.h>
#include <folly/executors/SequencedExecutor.h>
namespace folly {
namespace channels {
/**
* Returns an output receiver that applies a given transformation function to
* each value from an input receiver.
*
* The TransformValue function takes a Try<InputValueType>, and returns a
* folly::coro::AsyncGenerator<OutputValueType>.
*
* - If the TransformValue function yields one or more output values, those
* output values are sent to the output receiver.
*
* - If the TransformValue function throws an OnClosedException, the output
* receiver is closed (without an exception).
*
* - If the TransformValue function throws any other type of exception, the
* output receiver is closed with that exception.
*
* If the input receiver was closed, the TransformValue function is called with
* a Try containing an exception (either OnClosedException if the input receiver
* was closed without an exception, or the closure exception if the input
* receiver was closed with an exception). In this case, regardless of what the
* TransformValue function returns, the output receiver will be closed
* (potentially after receiving the last output values the TransformValue
* function returned, if any).
*
* @param inputReceiver: The input receiver.
*
* @param executor: A folly::SequencedExecutor used to transform the values.
*
* @param transformValue: A function as described above.
*
* @param rateLimiter: An optional rate limiter. If specified, the given rate
* limiter will limit the number of transformation functions that are
* simultaneously running.
*
* Example:
*
* // Function that returns a receiver
* Receiver<int> getInputReceiver();
*
* // Function that returns an executor
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* Receiver<std::string> outputReceiver = transform(
* getInputReceiver(),
* getExecutor(),
* [](Try<int> try) -> folly::coro::AsyncGenerator<std::string&&> {
* co_yield folly::to<std::string>(try.value());
* });
*/
template <
typename ReceiverType,
typename TransformValueFunc,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType = typename folly::invoke_result_t< //
TransformValueFunc,
Try<InputValueType>>::value_type>
Receiver<OutputValueType> transform(
ReceiverType inputReceiver,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
TransformValueFunc transformValue,
std::shared_ptr<RateLimiter> rateLimiter = nullptr);
/**
* This overload accepts arguments in the form of a transformer object. The
* transformer object must have the following functions:
*
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* folly::coro::AsyncGenerator<OutputValueType&&> transformValue(
* Try<InputValueType> inputValue);
*
* std::shared_ptr<RateLimiter> getRateLimiter(); // Can return nullptr
*/
template <
typename ReceiverType,
typename TransformerType,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType =
typename decltype(std::declval<TransformerType>().transformValue(
std::declval<Try<InputValueType>>()))::value_type>
Receiver<OutputValueType> transform(
ReceiverType inputReceiver, TransformerType transformer);
/**
* This function is similar to the above transform function. However, instead of
* taking a single input receiver, it takes an initialization function that
* accepts a value of type InitializeArg, and returns a
* std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>.
*
* - If the InitializeTransform function returns successfully, the vector's
* output values will be immediately sent to the output receiver. The input
* receiver is then processed as described in the transform function's
* documentation, unless and until it throws a ReinitializeException. At
* that point, the InitializationTransform is re-run with the InitializeArg
* specified in the ReinitializeException, and the transform begins anew.
*
* - If the InitializeTransform function or the TransformValue function throws
* an OnClosedException, the output receiver is closed (with no exception).
*
* - If the InitializeTransform function or the TransformValue function throws
* any other type of exception, the output receiver is closed with that
* exception.
*
* @param executor: A folly::SequencedExecutor used to transform the values.
*
* @param initializeArg: The initial argument passed to the InitializeTransform
* function.
*
* @param initializeTransform: The InitializeTransform function as described
* above.
*
* @param transformValue: The TransformValue function as described above.
*
* @param rateLimiter: An optional rate limiter. If specified, the given rate
* limiter will limit the number of transformation functions that are
* simultaneously running.
*
* Example:
*
* struct InitializeArg {
* std::string param;
* }
*
* // Function that returns a receiver
* Receiver<int> getInputReceiver(InitializeArg initializeArg);
*
* // Function that returns an executor
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* Receiver<std::string> outputReceiver = resumableTransform(
* getExecutor(),
* InitializeArg{"param"},
* [](InitializeArg initializeArg) -> folly::coro::Task<
* std::pair<std::vector<std::string>, Receiver<int>> {
* co_return std::make_pair(
* std::vector<std::string>({"Initialized"}),
* getInputReceiver(initializeArg));
* },
* [](Try<int> try) -> folly::coro::AsyncGenerator<std::string&&> {
* try {
* co_yield folly::to<std::string>(try.value());
* } catch (const SomeApplicationException& ex) {
* throw ReinitializeException(InitializeArg{ex.getParam()});
* }
* });
*
*/
template <
typename InitializeArg,
typename InitializeTransformFunc,
typename TransformValueFunc,
typename ReceiverType = typename folly::invoke_result_t<
InitializeTransformFunc,
InitializeArg>::StorageType::second_type,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType = typename folly::invoke_result_t< //
TransformValueFunc,
Try<InputValueType>>::value_type>
Receiver<OutputValueType> resumableTransform(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor,
InitializeArg initializeArg,
InitializeTransformFunc initializeTransform,
TransformValueFunc transformValue,
std::shared_ptr<RateLimiter> rateLimiter = nullptr);
/**
* This overload accepts arguments in the form of a transformer object. The
* transformer object must have the following functions:
*
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
*
* std::pair<std::vector<OutputValueType>, Receiver<InputValueType>>
* initializeTransform(InitializeArg initializeArg);
*
* folly::coro::AsyncGenerator<OutputValueType&&> transformValue(
* Try<InputValueType> inputValue);
*
* std::shared_ptr<RateLimiter> getRateLimiter(); // Can return nullptr
*/
template <
typename InitializeArg,
typename TransformerType,
typename ReceiverType =
typename decltype(std::declval<TransformerType>().initializeTransform(
std::declval<InitializeArg>()))::StorageType::second_type,
typename InputValueType = typename ReceiverType::ValueType,
typename OutputValueType =
typename decltype(std::declval<TransformerType>().transformValue(
std::declval<Try<InputValueType>>()))::value_type>
Receiver<OutputValueType> resumableTransform(
InitializeArg initializeArg, TransformerType transformer);
/**
* A ReinitializeException thrown by a transform callback indicates that the
* resumable transform needs to be re-initialized.
*/
template <typename InitializeArg>
struct ReinitializeException : public std::exception {
explicit ReinitializeException(InitializeArg _initializeArg)
: initializeArg(std::move(_initializeArg)) {}
const char* what() const noexcept override {
return "This resumable transform should be re-initialized.";
}
InitializeArg initializeArg;
};
} // namespace channels
} // namespace folly
#include <folly/channels/Transform-inl.h>