#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/subchannel.h"
#include <inttypes.h>
#include <limits.h>
#include <algorithm>
#include <memory>
#include <new>
#include <utility>
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/health/health_check_client.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/ext/filters/client_channel/subchannel_stream_client.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS …
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER …
#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS …
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS …
#define GRPC_SUBCHANNEL_RECONNECT_JITTER …
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) …
#define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) …
namespace grpc_core {
EventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: … { … }
ConnectedSubchannel::~ConnectedSubchannel() { … }
void ConnectedSubchannel::StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) { … }
void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_closure* on_ack) { … }
size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const { … }
RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
grpc_error_handle* error) { … }
SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
: … { … }
void SubchannelCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) { … }
grpc_call_stack* SubchannelCall::GetCallStack() { … }
void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { … }
RefCountedPtr<SubchannelCall> SubchannelCall::Ref() { … }
RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location,
const char* reason) { … }
void SubchannelCall::Unref() { … }
void SubchannelCall::Unref(const DebugLocation& ,
const char* reason) { … }
void SubchannelCall::Destroy(void* arg, grpc_error_handle ) { … }
void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch) { … }
namespace {
void GetCallStatus(grpc_status_code* status, Timestamp deadline,
grpc_metadata_batch* md_batch, grpc_error_handle error) { … }
}
void SubchannelCall::RecvTrailingMetadataReady(void* arg,
grpc_error_handle error) { … }
void SubchannelCall::IncrementRefCount() { … }
void SubchannelCall::IncrementRefCount(const DebugLocation& ,
const char* reason) { … }
class Subchannel::ConnectedSubchannelStateWatcher
: public AsyncConnectivityStateWatcherInterface { … };
void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { … }
void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
ConnectivityStateWatcherInterface* watcher) { … }
void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
grpc_connectivity_state state, const absl::Status& status) { … }
class Subchannel::HealthWatcherMap::HealthWatcher
: public AsyncConnectivityStateWatcherInterface { … };
void Subchannel::HealthWatcherMap::AddWatcherLocked(
WeakRefCountedPtr<Subchannel> subchannel,
const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { … }
void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
const std::string& health_check_service_name,
ConnectivityStateWatcherInterface* watcher) { … }
void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
const absl::Status& status) { … }
grpc_connectivity_state
Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
Subchannel* subchannel, const std::string& health_check_service_name) { … }
void Subchannel::HealthWatcherMap::ShutdownLocked() { … }
namespace {
BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args,
Duration* min_connect_timeout) { … }
}
Subchannel::Subchannel(SubchannelKey key,
OrphanablePtr<SubchannelConnector> connector,
const ChannelArgs& args)
: … { … }
Subchannel::~Subchannel() { … }
RefCountedPtr<Subchannel> Subchannel::Create(
OrphanablePtr<SubchannelConnector> connector,
const grpc_resolved_address& address, const ChannelArgs& args) { … }
void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { … }
channelz::SubchannelNode* Subchannel::channelz_node() { … }
void Subchannel::WatchConnectivityState(
const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { … }
void Subchannel::CancelConnectivityStateWatch(
const absl::optional<std::string>& health_check_service_name,
ConnectivityStateWatcherInterface* watcher) { … }
void Subchannel::RequestConnection() { … }
void Subchannel::ResetBackoff() { … }
void Subchannel::Orphan() { … }
void Subchannel::GetOrAddDataProducer(
UniqueTypeName type,
std::function<void(DataProducerInterface**)> get_or_add) { … }
void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) { … }
void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
const absl::Status& status) { … }
void Subchannel::OnRetryTimer() { … }
void Subchannel::OnRetryTimerLocked() { … }
void Subchannel::StartConnectingLocked() { … }
void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { … }
void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { … }
bool Subchannel::PublishTransportLocked() { … }
}