#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/connected_channel.h"
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/grpc.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/transport/transport_impl.h"
#define MAX_BUFFER_LENGTH …
channel_data;
struct callback_state { … };
call_data;
static void run_in_call_combiner(void* arg, grpc_error_handle error) { … }
static void run_cancel_in_call_combiner(void* arg, grpc_error_handle error) { … }
static void intercept_callback(call_data* calld, callback_state* state,
bool free_when_done, const char* reason,
grpc_closure** original_closure) { … }
static callback_state* get_state_for_batch(
call_data* calld, grpc_transport_stream_op_batch* batch) { … }
#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) …
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) …
static void connected_channel_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { … }
static void connected_channel_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) { … }
static grpc_error_handle connected_channel_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) { … }
static void set_pollset_or_pollset_set(grpc_call_element* elem,
grpc_polling_entity* pollent) { … }
static void connected_channel_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* ,
grpc_closure* then_schedule_closure) { … }
static grpc_error_handle connected_channel_init_channel_elem(
grpc_channel_element* elem, grpc_channel_element_args* args) { … }
static void connected_channel_destroy_channel_elem(grpc_channel_element* elem) { … }
static void connected_channel_get_channel_info(
grpc_channel_element* , const grpc_channel_info* ) { … }
namespace grpc_core {
namespace {
#if defined(GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL) || \
defined(GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL)
class ConnectedChannelStream : public Orphanable { … };
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
class ClientStream : public ConnectedChannelStream { … };
class ClientConnectedCallPromise { … };
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
class ServerStream final : public ConnectedChannelStream { … };
class ServerConnectedCallPromise { … };
#endif
template <ArenaPromise<ServerMetadataHandle> (*make_call_promise)(
grpc_transport*, CallArgs, NextPromiseFactory)>
grpc_channel_filter MakeConnectedFilter() { … }
ArenaPromise<ServerMetadataHandle> MakeTransportCallPromise(
grpc_transport* transport, CallArgs call_args, NextPromiseFactory) { … }
const grpc_channel_filter kPromiseBasedTransportFilter = …;
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
const grpc_channel_filter kClientEmulatedFilter = …;
#else
const grpc_channel_filter kClientEmulatedFilter =
MakeConnectedFilter<nullptr>();
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
const grpc_channel_filter kServerEmulatedFilter = …;
#else
const grpc_channel_filter kServerEmulatedFilter =
MakeConnectedFilter<nullptr>();
#endif
}
}
bool grpc_add_connected_filter(grpc_core::ChannelStackBuilder* builder) { … }