// Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H #include <grpc/support/port_platform.h> #include <stdint.h> #include <stdlib.h> #include <memory> #include <string> #include <type_traits> #include <utility> #include "absl/base/attributes.h" #include "absl/strings/str_cat.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/if.h" #include "src/core/lib/promise/interceptor_list.h" #include "src/core/lib/promise/intra_activity_waiter.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/trace.h" #include "src/core/lib/resource_quota/arena.h" namespace grpc_core { namespace pipe_detail { template <typename T> class Center; } template <typename T> struct Pipe; // Result of Pipe::Next - represents a received value. // If has_value() is false, the pipe was closed by the time we polled for the // next value. No value was received, nor will there ever be. // This type is movable but not copyable. // Once the final move is destroyed the pipe will ack the read and unblock the // send. template <typename T> class NextResult final { … }; namespace pipe_detail { template <typename T> class Push; template <typename T> class Next; // Center sits between a sender and a receiver to provide a one-deep buffer of // Ts template <typename T> class Center : public InterceptorList<T> { … }; } // namespace pipe_detail // Send end of a Pipe. template <typename T> class PipeSender { … }; // Receive end of a Pipe. template <typename T> class PipeReceiver { … }; namespace pipe_detail { // Implementation of PipeSender::Push promise. template <typename T> class Push { … }; // Implementation of PipeReceiver::Next promise. template <typename T> class Next { … }; } // namespace pipe_detail template <typename T> pipe_detail::Push<T> PipeSender<T>::Push(T value) { … } template <typename T> auto PipeReceiver<T>::Next() { … } PipeReceiverNextType; template <typename T> bool NextResult<T>::has_value() const { … } template <typename T> T& NextResult<T>::operator*() { … } template <typename T> const T& NextResult<T>::operator*() const { … } template <typename T> NextResult<T>::~NextResult() { … } template <typename T> void NextResult<T>::reset() { … } // A Pipe is an intra-Activity communications channel that transmits T's from // one end to the other. // It is only safe to use a Pipe within the context of a single Activity. // No synchronization is performed internally. // The primary Pipe data structure is allocated from an arena, so the activity // must have an arena as part of its context. // By performing that allocation we can ensure stable pointer to shared data // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their // implementation. // This type has been optimized with the expectation that there are relatively // few pipes per activity. If this assumption does not hold then a design // allowing inline filtering of pipe contents (instead of connecting pipes with // polling code) would likely be more appropriate. template <typename T> struct Pipe { … }; } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H