chromium/third_party/grpc/src/src/core/ext/filters/client_channel/client_channel.cc

//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/ext/filters/client_channel/client_channel.h"

#include <inttypes.h>
#include <limits.h>

#include <algorithm>
#include <functional>
#include <new>
#include <type_traits>
#include <utility>
#include <vector>

#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>

#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/client_channel_service_config.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/retry_filter.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"

//
// Client channel filter
//

#define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME

namespace grpc_core {

ClientChannelMethodParsedConfig;

TraceFlag grpc_client_channel_trace(false, "client_channel");
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");

//
// ClientChannel::CallData definition
//

class ClientChannel::CallData {};

class ClientChannel::FilterBasedCallData : public ClientChannel::CallData {};

//
// Filter vtable
//

const grpc_channel_filter ClientChannel::kFilterVtable =;

//
// dynamic termination filter
//

namespace {

class DynamicTerminationFilter {};

class DynamicTerminationFilter::CallData {};

const grpc_channel_filter DynamicTerminationFilter::kFilterVtable =;

}  // namespace

//
// ClientChannel::ResolverResultHandler
//

class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {};

//
// ClientChannel::SubchannelWrapper
//

// This class is a wrapper for Subchannel that hides details of the
// channel's implementation (such as the health check service name and
// connected subchannel) from the LB policy API.
//
// Note that no synchronization is needed here, because even if the
// underlying subchannel is shared between channels, this wrapper will only
// be used within one channel, so it will always be synchronized by the
// control plane work_serializer.
class ClientChannel::SubchannelWrapper : public SubchannelInterface {};

//
// ClientChannel::ExternalConnectivityWatcher
//

ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
    ClientChannel* chand, grpc_polling_entity pollent,
    grpc_connectivity_state* state, grpc_closure* on_complete,
    grpc_closure* watcher_timer_init)
    :{}

ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {}

void ClientChannel::ExternalConnectivityWatcher::
    RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
                                         grpc_closure* on_complete,
                                         bool cancel) {}

void ClientChannel::ExternalConnectivityWatcher::Notify(
    grpc_connectivity_state state, const absl::Status& /* status */) {}

void ClientChannel::ExternalConnectivityWatcher::Cancel() {}

void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() {}

void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() {}

//
// ClientChannel::ConnectivityWatcherAdder
//

class ClientChannel::ConnectivityWatcherAdder {};

//
// ClientChannel::ConnectivityWatcherRemover
//

class ClientChannel::ConnectivityWatcherRemover {};

//
// ClientChannel::ClientChannelControlHelper
//

class ClientChannel::ClientChannelControlHelper
    : public LoadBalancingPolicy::ChannelControlHelper {};

//
// ClientChannel implementation
//

ClientChannel* ClientChannel::GetFromChannel(Channel* channel) {}

grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
                                      grpc_channel_element_args* args) {}

void ClientChannel::Destroy(grpc_channel_element* elem) {}

namespace {

RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
    const ChannelArgs& args) {}

}  // namespace

ClientChannel::ClientChannel(grpc_channel_element_args* args,
                             grpc_error_handle* error)
    :{}

ClientChannel::~ClientChannel() {}

OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
ClientChannel::CreateLoadBalancedCall(
    const grpc_call_element_args& args, grpc_polling_entity* pollent,
    grpc_closure* on_call_destruction_complete,
    ConfigSelector::CallDispatchController* call_dispatch_controller,
    bool is_transparent_retry) {}

ChannelArgs ClientChannel::MakeSubchannelArgs(
    const ChannelArgs& channel_args, const ChannelArgs& address_args,
    const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
    const std::string& channel_default_authority) {}

void ClientChannel::ReprocessQueuedResolverCalls() {}

namespace {

RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
    const Resolver::Result& resolver_result,
    const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {}

}  // namespace

void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {}

void ClientChannel::OnResolverErrorLocked(absl::Status status) {}

absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
    RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
    const absl::optional<std::string>& health_check_service_name,
    Resolver::Result result) {}

// Creates a new LB policy.
OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
    const ChannelArgs& args) {}

void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
    RefCountedPtr<ServiceConfig> service_config,
    RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {}

void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {}

void ClientChannel::CreateResolverLocked() {}

void ClientChannel::DestroyResolverAndLbPolicyLocked() {}

void ClientChannel::UpdateStateAndPickerLocked(
    grpc_connectivity_state state, const absl::Status& status,
    const char* reason,
    RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {}

namespace {

// TODO(roth): Remove this in favor of the gprpp Match() function once
// we can do that without breaking lock annotations.
template <typename T>
T HandlePickResult(
    LoadBalancingPolicy::PickResult* result,
    std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
    std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
    std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
    std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {}

}  // namespace

grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {}

void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {}

void ClientChannel::StartTransportOp(grpc_channel_element* elem,
                                     grpc_transport_op* op) {}

void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
                                   const grpc_channel_info* info) {}

void ClientChannel::TryToConnectLocked() {}

grpc_connectivity_state ClientChannel::CheckConnectivityState(
    bool try_to_connect) {}

void ClientChannel::AddConnectivityWatcher(
    grpc_connectivity_state initial_state,
    OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {}

void ClientChannel::RemoveConnectivityWatcher(
    AsyncConnectivityStateWatcherInterface* watcher) {}

//
// CallData implementation
//

void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() {}

void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() {}

grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
    const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {}

absl::optional<absl::Status> ClientChannel::CallData::CheckResolution(
    bool was_queued) {}

bool ClientChannel::CallData::CheckResolutionLocked(
    absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {}

//
// FilterBasedCallData implementation
//

ClientChannel::FilterBasedCallData::FilterBasedCallData(
    grpc_call_element* elem, const grpc_call_element_args& args)
    :{}

ClientChannel::FilterBasedCallData::~FilterBasedCallData() {}

grpc_error_handle ClientChannel::FilterBasedCallData::Init(
    grpc_call_element* elem, const grpc_call_element_args* args) {}

void ClientChannel::FilterBasedCallData::Destroy(
    grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
    grpc_closure* then_schedule_closure) {}

void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch(
    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {}

void ClientChannel::FilterBasedCallData::SetPollent(
    grpc_call_element* elem, grpc_polling_entity* pollent) {}

size_t ClientChannel::FilterBasedCallData::GetBatchIndex(
    grpc_transport_stream_op_batch* batch) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedCallData::PendingBatchesAdd(
    grpc_transport_stream_op_batch* batch) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner(
    void* arg, grpc_error_handle error) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedCallData::PendingBatchesFail(
    grpc_error_handle error,
    YieldCallCombinerPredicate yield_call_combiner_predicate) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner(
    void* arg, grpc_error_handle /*ignored*/) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedCallData::PendingBatchesResume() {}

// A class to handle the call combiner cancellation callback for a
// queued pick.
class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller {};

void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) {}

void ClientChannel::FilterBasedCallData::OnAddToQueueLocked() {}

void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() {}

void ClientChannel::FilterBasedCallData::CreateDynamicCall() {}

void ClientChannel::FilterBasedCallData::
    RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
        void* arg, grpc_error_handle error) {}

//
// ClientChannel::LoadBalancedCall::LbCallState
//

class ClientChannel::LoadBalancedCall::LbCallState
    : public ClientChannelLbCallState {};

//
// ClientChannel::LoadBalancedCall::Metadata
//

class ClientChannel::LoadBalancedCall::Metadata
    : public LoadBalancingPolicy::MetadataInterface {};

//
// ClientChannel::LoadBalancedCall::LbCallState
//

absl::string_view
ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute(
    UniqueTypeName type) {}

//
// ClientChannel::LoadBalancedCall::BackendMetricAccessor
//

class ClientChannel::LoadBalancedCall::BackendMetricAccessor
    : public LoadBalancingPolicy::BackendMetricAccessor {};

//
// ClientChannel::LoadBalancedCall
//

namespace {

CallTracer::CallAttemptTracer* GetCallAttemptTracer(
    grpc_call_context_element* context, bool is_transparent_retry) {}

}  // namespace

ClientChannel::LoadBalancedCall::LoadBalancedCall(
    ClientChannel* chand, grpc_call_context_element* call_context,
    ConfigSelector::CallDispatchController* call_dispatch_controller,
    bool is_transparent_retry)
    :{}

ClientChannel::LoadBalancedCall::~LoadBalancedCall() {}

void ClientChannel::LoadBalancedCall::Orphan() {}

void ClientChannel::LoadBalancedCall::RecordCallCompletion(
    absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
    grpc_transport_stream_stats* transport_stream_stats,
    absl::string_view peer_address) {}

void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() {}

void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {}

absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
    bool was_queued) {}

bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
    LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {}

//
// ClientChannel::FilterBasedLoadBalancedCall
//

ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
    ClientChannel* chand, const grpc_call_element_args& args,
    grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
    ConfigSelector::CallDispatchController* call_dispatch_controller,
    bool is_transparent_retry)
    :{}

ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() {}

void ClientChannel::FilterBasedLoadBalancedCall::Orphan() {}

size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex(
    grpc_transport_stream_op_batch* batch) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd(
    grpc_transport_stream_op_batch* batch) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner(
    void* arg, grpc_error_handle error) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail(
    grpc_error_handle error,
    YieldCallCombinerPredicate yield_call_combiner_predicate) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedLoadBalancedCall::
    ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {}

// This is called via the call combiner, so access to calld is synchronized.
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() {}

void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
    grpc_transport_stream_op_batch* batch) {}

void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
    void* arg, grpc_error_handle error) {}

void ClientChannel::FilterBasedLoadBalancedCall::RecvMessageReady(
    void* arg, grpc_error_handle error) {}

void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
    void* arg, grpc_error_handle error) {}

// A class to handle the call combiner cancellation callback for a
// queued pick.
// TODO(roth): When we implement hedging support, we won't be able to
// register a call combiner cancellation closure for each LB pick,
// because there may be multiple LB picks happening in parallel.
// Instead, we will probably need to maintain a list in the CallData
// object of pending LB picks to be cancelled when the closure runs.
class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller {};

void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) {}

void ClientChannel::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {}

void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() {}

void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() {}

}  // namespace grpc_core