/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Coroutine.h>
#if FOLLY_HAS_COROUTINES
namespace folly {
namespace coro {
// Merge the results of a number of input streams.
//
// The 'executor' parameter specifies the execution context to
// be used for awaiting each value from the sources.
// The 'sources' parameter represents an async-stream of async-streams.
// The resulting generator merges the results from each of the streams
// produced by 'sources', interleaving them in the order that the values
// are produced.
//
// The resulting stream will terminate when the end of the 'sources' stream has
// been reached and the ends of all of the input streams it produced have been
// reached.
//
// On exception or cancellation, cancels remaining input streams and 'sources',
// discards any remaining values, and produces an exception (if an input stream
// produced an exception) or end-of-stream (if next() call was cancelled).
//
// Structured concurrency: if the output stream produced an empty value
// (end-of-stream) or an exception, it's guaranteed that 'sources' and all input
// generators have been destroyed.
// If the output stream is destroyed early (before reaching end-of-stream or
// exception), the remaining input generators are cancelled and detached; beware
// of use-after-free.
//
// Normally cancelling output stream's next() call cancels the stream, discards
// any remaining values, and returns an end-of-stream. But there are caveats:
// * If there's an item ready to be delivered, next() call returns it without
// checking for cancellation. So if input streams are fast, and next() is
// called infrequently, cancellation may go unprocessed indefinitely unless
// you also check for cancellation on your side (which you should probably do
// anyway unless you're calling next() in a tight loop).
// * It's possible that the cancelled next() registers the cancellation but
// returns a value anyway (if it was produced at just the right moment). Then
// a later next() call would return end-of-stream even if it was called with
// a different, non-cancelled cancellation token.
template <typename Reference, typename Value>
AsyncGenerator<Reference, Value> merge(
folly::Executor::KeepAlive<> executor,
AsyncGenerator<AsyncGenerator<Reference, Value>> sources);
} // namespace coro
} // namespace folly
#endif // FOLLY_HAS_COROUTINES
#include <folly/coro/Merge-inl.h>