#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include <inttypes.h>
#include <limits.h>
#include <algorithm>
#include <functional>
#include <new>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/client_channel_service_config.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/retry_filter.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME …
namespace grpc_core {
ClientChannelMethodParsedConfig;
TraceFlag grpc_client_channel_trace(false, "client_channel");
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
class ClientChannel::CallData { … };
class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { … };
const grpc_channel_filter ClientChannel::kFilterVtable = …;
namespace {
class DynamicTerminationFilter { … };
class DynamicTerminationFilter::CallData { … };
const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = …;
}
class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { … };
class ClientChannel::SubchannelWrapper : public SubchannelInterface { … };
ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
ClientChannel* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete,
grpc_closure* watcher_timer_init)
: … { … }
ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { … }
void ClientChannel::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
grpc_closure* on_complete,
bool cancel) { … }
void ClientChannel::ExternalConnectivityWatcher::Notify(
grpc_connectivity_state state, const absl::Status& ) { … }
void ClientChannel::ExternalConnectivityWatcher::Cancel() { … }
void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() { … }
void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() { … }
class ClientChannel::ConnectivityWatcherAdder { … };
class ClientChannel::ConnectivityWatcherRemover { … };
class ClientChannel::ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper { … };
ClientChannel* ClientChannel::GetFromChannel(Channel* channel) { … }
grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) { … }
void ClientChannel::Destroy(grpc_channel_element* elem) { … }
namespace {
RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
const ChannelArgs& args) { … }
}
ClientChannel::ClientChannel(grpc_channel_element_args* args,
grpc_error_handle* error)
: … { … }
ClientChannel::~ClientChannel() { … }
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
ClientChannel::CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) { … }
ChannelArgs ClientChannel::MakeSubchannelArgs(
const ChannelArgs& channel_args, const ChannelArgs& address_args,
const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
const std::string& channel_default_authority) { … }
void ClientChannel::ReprocessQueuedResolverCalls() { … }
namespace {
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config) { … }
}
void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { … }
void ClientChannel::OnResolverErrorLocked(absl::Status status) { … }
absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) { … }
OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
const ChannelArgs& args) { … }
void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) { … }
void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { … }
void ClientChannel::CreateResolverLocked() { … }
void ClientChannel::DestroyResolverAndLbPolicyLocked() { … }
void ClientChannel::UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) { … }
namespace {
template <typename T>
T HandlePickResult(
LoadBalancingPolicy::PickResult* result,
std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) { … }
}
grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { … }
void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { … }
void ClientChannel::StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) { … }
void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info) { … }
void ClientChannel::TryToConnectLocked() { … }
grpc_connectivity_state ClientChannel::CheckConnectivityState(
bool try_to_connect) { … }
void ClientChannel::AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) { … }
void ClientChannel::RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher) { … }
void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() { … }
void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() { … }
grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) { … }
absl::optional<absl::Status> ClientChannel::CallData::CheckResolution(
bool was_queued) { … }
bool ClientChannel::CallData::CheckResolutionLocked(
absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) { … }
ClientChannel::FilterBasedCallData::FilterBasedCallData(
grpc_call_element* elem, const grpc_call_element_args& args)
: … { … }
ClientChannel::FilterBasedCallData::~FilterBasedCallData() { … }
grpc_error_handle ClientChannel::FilterBasedCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) { … }
void ClientChannel::FilterBasedCallData::Destroy(
grpc_call_element* elem, const grpc_call_final_info* ,
grpc_closure* then_schedule_closure) { … }
void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedCallData::SetPollent(
grpc_call_element* elem, grpc_polling_entity* pollent) { … }
size_t ClientChannel::FilterBasedCallData::GetBatchIndex(
grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedCallData::PendingBatchesAdd(
grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner(
void* arg, grpc_error_handle error) { … }
void ClientChannel::FilterBasedCallData::PendingBatchesFail(
grpc_error_handle error,
YieldCallCombinerPredicate yield_call_combiner_predicate) { … }
void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner(
void* arg, grpc_error_handle ) { … }
void ClientChannel::FilterBasedCallData::PendingBatchesResume() { … }
class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller { … };
void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) { … }
void ClientChannel::FilterBasedCallData::OnAddToQueueLocked() { … }
void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() { … }
void ClientChannel::FilterBasedCallData::CreateDynamicCall() { … }
void ClientChannel::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
void* arg, grpc_error_handle error) { … }
class ClientChannel::LoadBalancedCall::LbCallState
: public ClientChannelLbCallState { … };
class ClientChannel::LoadBalancedCall::Metadata
: public LoadBalancingPolicy::MetadataInterface { … };
absl::string_view
ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute(
UniqueTypeName type) { … }
class ClientChannel::LoadBalancedCall::BackendMetricAccessor
: public LoadBalancingPolicy::BackendMetricAccessor { … };
namespace {
CallTracer::CallAttemptTracer* GetCallAttemptTracer(
grpc_call_context_element* context, bool is_transparent_retry) { … }
}
ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel* chand, grpc_call_context_element* call_context,
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry)
: … { … }
ClientChannel::LoadBalancedCall::~LoadBalancedCall() { … }
void ClientChannel::LoadBalancedCall::Orphan() { … }
void ClientChannel::LoadBalancedCall::RecordCallCompletion(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
grpc_transport_stream_stats* transport_stream_stats,
absl::string_view peer_address) { … }
void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() { … }
void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { … }
absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
bool was_queued) { … }
bool ClientChannel::LoadBalancedCall::PickSubchannelImpl(
LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) { … }
ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry)
: … { … }
ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() { … }
void ClientChannel::FilterBasedLoadBalancedCall::Orphan() { … }
size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex(
grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd(
grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner(
void* arg, grpc_error_handle error) { … }
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail(
grpc_error_handle error,
YieldCallCombinerPredicate yield_call_combiner_predicate) { … }
void ClientChannel::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle ) { … }
void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() { … }
void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) { … }
void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) { … }
void ClientChannel::FilterBasedLoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) { … }
void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) { … }
class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller { … };
void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) { … }
void ClientChannel::FilterBasedLoadBalancedCall::OnAddToQueueLocked() { … }
void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() { … }
void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() { … }
}