chromium/third_party/grpc/src/src/core/ext/transport/chttp2/transport/chttp2_transport.cc

//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"

#include <inttypes.h>
#include <limits.h>
#include <stdio.h>
#include <string.h>

#include <algorithm>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <utility>

#include "absl/base/attributes.h"
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>

#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
#include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/http_trace.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/resource_quota/trace.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"

#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/ev_posix.h"
#endif

#define DEFAULT_CONNECTION_WINDOW_TARGET
#define MAX_WINDOW
#define MAX_WRITE_BUFFER_SIZE
#define DEFAULT_MAX_HEADER_LIST_SIZE

#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS
#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS
#define DEFAULT_SERVER_KEEPALIVE_TIME_MS
#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS
#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS
#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER

#define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
#define DEFAULT_MAX_PINGS_BETWEEN_DATA
#define DEFAULT_MAX_PING_STRIKES

#define DEFAULT_MAX_PENDING_INDUCED_FRAMES

static int g_default_client_keepalive_time_ms =;
static int g_default_client_keepalive_timeout_ms =;
static int g_default_server_keepalive_time_ms =;
static int g_default_server_keepalive_timeout_ms =;
static bool g_default_client_keepalive_permit_without_calls =;
static bool g_default_server_keepalive_permit_without_calls =;

static int g_default_min_recv_ping_interval_without_data_ms =;
static int g_default_max_pings_without_data =;
static int g_default_max_ping_strikes =;

#define MAX_CLIENT_STREAM_ID
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
                                                         "chttp2_refcount");

// forward declarations of various callbacks that we'll build closures around
static void write_action_begin_locked(void* t, grpc_error_handle error);
static void write_action(void* t, grpc_error_handle error);
static void write_action_end(void* t, grpc_error_handle error);
static void write_action_end_locked(void* t, grpc_error_handle error);

static void read_action(void* t, grpc_error_handle error);
static void read_action_locked(void* t, grpc_error_handle error);
static void continue_read_action_locked(grpc_chttp2_transport* t);

// Set a transport level setting, and push it to our peer
static void queue_setting_update(grpc_chttp2_transport* t,
                                 grpc_chttp2_setting_id id, uint32_t value);

static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
                           grpc_error_handle error);

// Start new streams that have been created if we can
static void maybe_start_some_streams(grpc_chttp2_transport* t);

static void connectivity_state_set(grpc_chttp2_transport* t,
                                   grpc_connectivity_state state,
                                   const absl::Status& status,
                                   const char* reason);

static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);

static void post_benign_reclaimer(grpc_chttp2_transport* t);
static void post_destructive_reclaimer(grpc_chttp2_transport* t);

static void close_transport_locked(grpc_chttp2_transport* t,
                                   grpc_error_handle error);
static void end_all_the_calls(grpc_chttp2_transport* t,
                              grpc_error_handle error);

static void start_bdp_ping(void* tp, grpc_error_handle error);
static void finish_bdp_ping(void* tp, grpc_error_handle error);
static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
static void next_bdp_ping_timer_expired_locked(
    void* tp, GRPC_UNUSED grpc_error_handle error);

static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
static void send_ping_locked(grpc_chttp2_transport* t,
                             grpc_closure* on_initiate, grpc_closure* on_ack);
static void retry_initiate_ping_locked(void* tp,
                                       GRPC_UNUSED grpc_error_handle error);

// keepalive-relevant functions
static void init_keepalive_ping(grpc_chttp2_transport* t);
static void init_keepalive_ping_locked(void* arg,
                                       GRPC_UNUSED grpc_error_handle error);
static void start_keepalive_ping(void* arg, grpc_error_handle error);
static void finish_keepalive_ping(void* arg, grpc_error_handle error);
static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
static void keepalive_watchdog_fired_locked(
    void* arg, GRPC_UNUSED grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);

namespace {
void MaybeRecordTransportAnnotation(grpc_chttp2_stream* s,
                                    absl::string_view annotation) {}
}  // namespace

namespace grpc_core {

namespace {
TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback =;
TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =;
bool test_only_disable_transient_failure_state_notification =;
}  // namespace

void TestOnlySetGlobalHttp2TransportInitCallback(
    TestOnlyGlobalHttp2TransportInitCallback callback) {}

void TestOnlySetGlobalHttp2TransportDestructCallback(
    TestOnlyGlobalHttp2TransportDestructCallback callback) {}

void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
    bool disable) {}

}  // namespace grpc_core

//
// CONSTRUCTION/DESTRUCTION/REFCOUNTING
//

grpc_chttp2_transport::~grpc_chttp2_transport() {}

static const grpc_transport_vtable* get_vtable(void);

static void read_channel_args(grpc_chttp2_transport* t,
                              const grpc_core::ChannelArgs& channel_args,
                              bool is_client) {}

static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {}

static void configure_transport_ping_policy(grpc_chttp2_transport* t) {}

static void init_keepalive_pings_if_enabled_locked(
    void* arg, GRPC_UNUSED grpc_error_handle error) {}

grpc_chttp2_transport::grpc_chttp2_transport(
    const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
    bool is_client)
    :{}

static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {}

static void destroy_transport(grpc_transport* gt) {}

static void close_transport_locked(grpc_chttp2_transport* t,
                                   grpc_error_handle error) {}

#ifndef NDEBUG
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {}
void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {}
#else
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
  grpc_stream_ref(s->refcount);
}
void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
  grpc_stream_unref(s->refcount);
}
#endif

grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {}

grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
                                       grpc_stream_refcount* refcount,
                                       const void* server_data,
                                       grpc_core::Arena* arena)
    :{}

grpc_chttp2_stream::~grpc_chttp2_stream() {}

static int init_stream(grpc_transport* gt, grpc_stream* gs,
                       grpc_stream_refcount* refcount, const void* server_data,
                       grpc_core::Arena* arena) {}

static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {}

static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
                           grpc_closure* then_schedule_closure) {}

grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
                                                      uint32_t id) {}

//
// OUTPUT PROCESSING
//

static const char* write_state_name(grpc_chttp2_write_state st) {}

static void set_write_state(grpc_chttp2_transport* t,
                            grpc_chttp2_write_state st, const char* reason) {}

void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
                                grpc_chttp2_initiate_write_reason reason) {}

void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
                                      grpc_chttp2_stream* s) {}

static const char* begin_writing_desc(bool partial) {}

static void write_action_begin_locked(void* gt,
                                      grpc_error_handle /*error_ignored*/) {}

static void write_action(void* gt, grpc_error_handle /*error*/) {}

static void write_action_end(void* tp, grpc_error_handle error) {}

// Callback from the grpc_endpoint after bytes have been written by calling
// sendmsg
static void write_action_end_locked(void* tp, grpc_error_handle error) {}

// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
// If the change needs to occur immediately, manually initiate a write.
static void queue_setting_update(grpc_chttp2_transport* t,
                                 grpc_chttp2_setting_id id, uint32_t value) {}

// Cancel out streams that haven't yet started if we have received a GOAWAY
static void cancel_unstarted_streams(grpc_chttp2_transport* t,
                                     grpc_error_handle error) {}

void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
                                     uint32_t goaway_error,
                                     uint32_t last_stream_id,
                                     absl::string_view goaway_text) {}

static void maybe_start_some_streams(grpc_chttp2_transport* t) {}

static grpc_closure* add_closure_barrier(grpc_closure* closure) {}

static void null_then_sched_closure(grpc_closure** closure) {}

void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
                                       grpc_chttp2_stream* s,
                                       grpc_closure** pclosure,
                                       grpc_error_handle error,
                                       const char* desc) {}

static bool contains_non_ok_status(grpc_metadata_batch* batch) {}

static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
                         bool is_client, bool is_initial) {}

static void perform_stream_op_locked(void* stream_op,
                                     grpc_error_handle /*error_ignored*/) {}

static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
                              grpc_transport_stream_op_batch* op) {}

static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {}

static void send_ping_locked(grpc_chttp2_transport* t,
                             grpc_closure* on_initiate, grpc_closure* on_ack) {}

// Specialized form of send_ping_locked for keepalive ping. If there is already
// a ping in progress, the keepalive ping would piggyback onto that ping,
// instead of waiting for that ping to complete and then starting a new ping.
static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {}

void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) {}

static void retry_initiate_ping_locked(void* tp,
                                       GRPC_UNUSED grpc_error_handle error) {}

void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {}

namespace {

// Fire and forget (deletes itself on completion). Does a graceful shutdown by
// sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
// and waiting for an ack (effective waiting for an RTT) and then sending a
// final GOAWAY freame with an updated last stream identifier. This helps ensure
// that a connection can be cleanly shut down without losing requests.
// In the event, that the client does not respond to the ping for some reason,
// we add a 20 second deadline, after which we send the second goaway.
class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {};

}  // namespace

static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
                        bool immediate_disconnect_hint) {}

void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {}

void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {}

static void perform_transport_op_locked(void* stream_op,
                                        grpc_error_handle /*error_ignored*/) {}

static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {}

//
// INPUT PROCESSING - GENERAL
//

void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
                                                      grpc_chttp2_stream* s) {}

void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
                                             grpc_chttp2_stream* s) {}

void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
                                                       grpc_chttp2_stream* s) {}

static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
                          grpc_error_handle error) {}

void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
                               grpc_error_handle due_to_error) {}

void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
                             grpc_error_handle error) {}

static void add_error(grpc_error_handle error, grpc_error_handle* refs,
                      size_t* nrefs) {}

static grpc_error_handle removal_error(grpc_error_handle extra_error,
                                       grpc_chttp2_stream* s,
                                       const char* main_error_msg) {}

static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
                             grpc_chttp2_write_cb** list,
                             grpc_error_handle error) {}

void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
                                     grpc_chttp2_stream* s,
                                     grpc_error_handle error) {}

void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
                                    grpc_chttp2_stream* s, int close_reads,
                                    int close_writes, grpc_error_handle error) {}

static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
                           grpc_error_handle error) {}

struct cancel_stream_cb_args {};

static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {}

static void end_all_the_calls(grpc_chttp2_transport* t,
                              grpc_error_handle error) {}

//
// INPUT PROCESSING - PARSING
//

template <class F>
static void WithUrgency(grpc_chttp2_transport* t,
                        grpc_core::chttp2::FlowControlAction::Urgency urgency,
                        grpc_chttp2_initiate_write_reason reason, F action) {}

void grpc_chttp2_act_on_flowctl_action(
    const grpc_core::chttp2::FlowControlAction& action,
    grpc_chttp2_transport* t, grpc_chttp2_stream* s) {}

static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {}

static void read_action(void* tp, grpc_error_handle error) {}

static void read_action_locked(void* tp, grpc_error_handle error) {}

static void continue_read_action_locked(grpc_chttp2_transport* t) {}

// t is reffed prior to calling the first time, and once the callback chain
// that kicks off finishes, it's unreffed
void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {}

static void start_bdp_ping(void* tp, grpc_error_handle error) {}

static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {}

static void finish_bdp_ping(void* tp, grpc_error_handle error) {}

static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {}

static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {}

static void next_bdp_ping_timer_expired_locked(
    void* tp, GRPC_UNUSED grpc_error_handle error) {}

void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
                                               bool is_client) {}

static void init_keepalive_ping(grpc_chttp2_transport* t) {}

static void init_keepalive_ping_locked(void* arg,
                                       GRPC_UNUSED grpc_error_handle error) {}

static void start_keepalive_ping(void* arg, grpc_error_handle error) {}

static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {}

static void finish_keepalive_ping(void* arg, grpc_error_handle error) {}

static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {}

static void keepalive_watchdog_fired(grpc_chttp2_transport* t) {}

static void keepalive_watchdog_fired_locked(
    void* arg, GRPC_UNUSED grpc_error_handle error) {}

static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {}

//
// CALLBACK LOOP
//

static void connectivity_state_set(grpc_chttp2_transport* t,
                                   grpc_connectivity_state state,
                                   const absl::Status& status,
                                   const char* reason) {}

//
// POLLSET STUFF
//

static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
                        grpc_pollset* pollset) {}

static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
                            grpc_pollset_set* pollset_set) {}

//
// RESOURCE QUOTAS
//

static void post_benign_reclaimer(grpc_chttp2_transport* t) {}

static void post_destructive_reclaimer(grpc_chttp2_transport* t) {}

static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {}

static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {}

//
// MONITORING
//

const char* grpc_chttp2_initiate_write_reason_string(
    grpc_chttp2_initiate_write_reason reason) {}

static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {}

static const grpc_transport_vtable vtable =;

static const grpc_transport_vtable* get_vtable(void) {}

grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {}

grpc_transport* grpc_create_chttp2_transport(
    const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
    bool is_client) {}

void grpc_chttp2_transport_start_reading(
    grpc_transport* transport, grpc_slice_buffer* read_buffer,
    grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {}