#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include <inttypes.h>
#include <limits.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
#include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/http_trace.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/resource_quota/trace.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/ev_posix.h"
#endif
#define DEFAULT_CONNECTION_WINDOW_TARGET …
#define MAX_WINDOW …
#define MAX_WRITE_BUFFER_SIZE …
#define DEFAULT_MAX_HEADER_LIST_SIZE …
#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS …
#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS …
#define DEFAULT_SERVER_KEEPALIVE_TIME_MS …
#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS …
#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS …
#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER …
#define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS …
#define DEFAULT_MAX_PINGS_BETWEEN_DATA …
#define DEFAULT_MAX_PING_STRIKES …
#define DEFAULT_MAX_PENDING_INDUCED_FRAMES …
static int g_default_client_keepalive_time_ms = …;
static int g_default_client_keepalive_timeout_ms = …;
static int g_default_server_keepalive_time_ms = …;
static int g_default_server_keepalive_timeout_ms = …;
static bool g_default_client_keepalive_permit_without_calls = …;
static bool g_default_server_keepalive_permit_without_calls = …;
static int g_default_min_recv_ping_interval_without_data_ms = …;
static int g_default_max_pings_without_data = …;
static int g_default_max_ping_strikes = …;
#define MAX_CLIENT_STREAM_ID …
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount");
static void write_action_begin_locked(void* t, grpc_error_handle error);
static void write_action(void* t, grpc_error_handle error);
static void write_action_end(void* t, grpc_error_handle error);
static void write_action_end_locked(void* t, grpc_error_handle error);
static void read_action(void* t, grpc_error_handle error);
static void read_action_locked(void* t, grpc_error_handle error);
static void continue_read_action_locked(grpc_chttp2_transport* t);
static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value);
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error);
static void maybe_start_some_streams(grpc_chttp2_transport* t);
static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
const absl::Status& status,
const char* reason);
static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);
static void post_benign_reclaimer(grpc_chttp2_transport* t);
static void post_destructive_reclaimer(grpc_chttp2_transport* t);
static void close_transport_locked(grpc_chttp2_transport* t,
grpc_error_handle error);
static void end_all_the_calls(grpc_chttp2_transport* t,
grpc_error_handle error);
static void start_bdp_ping(void* tp, grpc_error_handle error);
static void finish_bdp_ping(void* tp, grpc_error_handle error);
static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
static void next_bdp_ping_timer_expired_locked(
void* tp, GRPC_UNUSED grpc_error_handle error);
static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack);
static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error);
static void init_keepalive_ping(grpc_chttp2_transport* t);
static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error);
static void start_keepalive_ping(void* arg, grpc_error_handle error);
static void finish_keepalive_ping(void* arg, grpc_error_handle error);
static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
static void keepalive_watchdog_fired_locked(
void* arg, GRPC_UNUSED grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
namespace {
void MaybeRecordTransportAnnotation(grpc_chttp2_stream* s,
absl::string_view annotation) { … }
}
namespace grpc_core {
namespace {
TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = …;
TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback = …;
bool test_only_disable_transient_failure_state_notification = …;
}
void TestOnlySetGlobalHttp2TransportInitCallback(
TestOnlyGlobalHttp2TransportInitCallback callback) { … }
void TestOnlySetGlobalHttp2TransportDestructCallback(
TestOnlyGlobalHttp2TransportDestructCallback callback) { … }
void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
bool disable) { … }
}
grpc_chttp2_transport::~grpc_chttp2_transport() { … }
static const grpc_transport_vtable* get_vtable(void);
static void read_channel_args(grpc_chttp2_transport* t,
const grpc_core::ChannelArgs& channel_args,
bool is_client) { … }
static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { … }
static void configure_transport_ping_policy(grpc_chttp2_transport* t) { … }
static void init_keepalive_pings_if_enabled_locked(
void* arg, GRPC_UNUSED grpc_error_handle error) { … }
grpc_chttp2_transport::grpc_chttp2_transport(
const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
bool is_client)
: … { … }
static void destroy_transport_locked(void* tp, grpc_error_handle ) { … }
static void destroy_transport(grpc_transport* gt) { … }
static void close_transport_locked(grpc_chttp2_transport* t,
grpc_error_handle error) { … }
#ifndef NDEBUG
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) { … }
void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) { … }
#else
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
grpc_stream_ref(s->refcount);
}
void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
grpc_stream_unref(s->refcount);
}
#endif
grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) { … }
grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
grpc_stream_refcount* refcount,
const void* server_data,
grpc_core::Arena* arena)
: … { … }
grpc_chttp2_stream::~grpc_chttp2_stream() { … }
static int init_stream(grpc_transport* gt, grpc_stream* gs,
grpc_stream_refcount* refcount, const void* server_data,
grpc_core::Arena* arena) { … }
static void destroy_stream_locked(void* sp, grpc_error_handle ) { … }
static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) { … }
grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
uint32_t id) { … }
static const char* write_state_name(grpc_chttp2_write_state st) { … }
static void set_write_state(grpc_chttp2_transport* t,
grpc_chttp2_write_state st, const char* reason) { … }
void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
grpc_chttp2_initiate_write_reason reason) { … }
void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { … }
static const char* begin_writing_desc(bool partial) { … }
static void write_action_begin_locked(void* gt,
grpc_error_handle ) { … }
static void write_action(void* gt, grpc_error_handle ) { … }
static void write_action_end(void* tp, grpc_error_handle error) { … }
static void write_action_end_locked(void* tp, grpc_error_handle error) { … }
static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value) { … }
static void cancel_unstarted_streams(grpc_chttp2_transport* t,
grpc_error_handle error) { … }
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
uint32_t goaway_error,
uint32_t last_stream_id,
absl::string_view goaway_text) { … }
static void maybe_start_some_streams(grpc_chttp2_transport* t) { … }
static grpc_closure* add_closure_barrier(grpc_closure* closure) { … }
static void null_then_sched_closure(grpc_closure** closure) { … }
void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error_handle error,
const char* desc) { … }
static bool contains_non_ok_status(grpc_metadata_batch* batch) { … }
static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
bool is_client, bool is_initial) { … }
static void perform_stream_op_locked(void* stream_op,
grpc_error_handle ) { … }
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) { … }
static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) { … }
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack) { … }
static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { … }
void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) { … }
static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error) { … }
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { … }
namespace {
class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> { … };
}
static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
bool immediate_disconnect_hint) { … }
void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { … }
void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) { … }
static void perform_transport_op_locked(void* stream_op,
grpc_error_handle ) { … }
static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { … }
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { … }
void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { … }
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) { … }
static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
grpc_error_handle error) { … }
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error) { … }
void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error) { … }
static void add_error(grpc_error_handle error, grpc_error_handle* refs,
size_t* nrefs) { … }
static grpc_error_handle removal_error(grpc_error_handle extra_error,
grpc_chttp2_stream* s,
const char* main_error_msg) { … }
static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_chttp2_write_cb** list,
grpc_error_handle error) { … }
void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_error_handle error) { … }
void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error) { … }
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error) { … }
struct cancel_stream_cb_args { … };
static void cancel_stream_cb(void* user_data, uint32_t , void* stream) { … }
static void end_all_the_calls(grpc_chttp2_transport* t,
grpc_error_handle error) { … }
template <class F>
static void WithUrgency(grpc_chttp2_transport* t,
grpc_core::chttp2::FlowControlAction::Urgency urgency,
grpc_chttp2_initiate_write_reason reason, F action) { … }
void grpc_chttp2_act_on_flowctl_action(
const grpc_core::chttp2::FlowControlAction& action,
grpc_chttp2_transport* t, grpc_chttp2_stream* s) { … }
static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) { … }
static void read_action(void* tp, grpc_error_handle error) { … }
static void read_action_locked(void* tp, grpc_error_handle error) { … }
static void continue_read_action_locked(grpc_chttp2_transport* t) { … }
void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { … }
static void start_bdp_ping(void* tp, grpc_error_handle error) { … }
static void start_bdp_ping_locked(void* tp, grpc_error_handle error) { … }
static void finish_bdp_ping(void* tp, grpc_error_handle error) { … }
static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) { … }
static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) { … }
static void next_bdp_ping_timer_expired_locked(
void* tp, GRPC_UNUSED grpc_error_handle error) { … }
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
bool is_client) { … }
static void init_keepalive_ping(grpc_chttp2_transport* t) { … }
static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error) { … }
static void start_keepalive_ping(void* arg, grpc_error_handle error) { … }
static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) { … }
static void finish_keepalive_ping(void* arg, grpc_error_handle error) { … }
static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { … }
static void keepalive_watchdog_fired(grpc_chttp2_transport* t) { … }
static void keepalive_watchdog_fired_locked(
void* arg, GRPC_UNUSED grpc_error_handle error) { … }
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) { … }
static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
const absl::Status& status,
const char* reason) { … }
static void set_pollset(grpc_transport* gt, grpc_stream* ,
grpc_pollset* pollset) { … }
static void set_pollset_set(grpc_transport* gt, grpc_stream* ,
grpc_pollset_set* pollset_set) { … }
static void post_benign_reclaimer(grpc_chttp2_transport* t) { … }
static void post_destructive_reclaimer(grpc_chttp2_transport* t) { … }
static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { … }
static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { … }
const char* grpc_chttp2_initiate_write_reason_string(
grpc_chttp2_initiate_write_reason reason) { … }
static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) { … }
static const grpc_transport_vtable vtable = …;
static const grpc_transport_vtable* get_vtable(void) { … }
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport* transport) { … }
grpc_transport* grpc_create_chttp2_transport(
const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
bool is_client) { … }
void grpc_chttp2_transport_start_reading(
grpc_transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) { … }