#include <grpc/support/port_platform.h>
#include "src/core/lib/surface/server.h"
#include <inttypes.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <list>
#include <new>
#include <queue>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/byte_buffer.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
TraceFlag grpc_server_channel_trace(false, "server_channel");
struct Server::RequestedCall { … };
struct Server::RegisteredMethod { … };
class Server::RequestMatcherInterface { … };
class Server::RealRequestMatcher : public RequestMatcherInterface { … };
class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface { … };
class Server::AllocatingRequestMatcherBatch
: public AllocatingRequestMatcherBase { … };
class Server::AllocatingRequestMatcherRegistered
: public AllocatingRequestMatcherBase { … };
namespace {
class ChannelBroadcaster { … };
}
const grpc_channel_filter Server::kServerTopFilter = …;
namespace {
RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
const ChannelArgs& args) { … }
}
Server::Server(const ChannelArgs& args)
: … { … }
Server::~Server() { … }
void Server::AddListener(OrphanablePtr<ListenerInterface> listener) { … }
void Server::Start() { … }
grpc_error_handle Server::SetupTransport(
grpc_transport* transport, grpc_pollset* accepting_pollset,
const ChannelArgs& args,
const RefCountedPtr<channelz::SocketNode>& socket_node) { … }
bool Server::HasOpenConnections() { … }
void Server::SetRegisteredMethodAllocator(
grpc_completion_queue* cq, void* method_tag,
std::function<RegisteredCallAllocation()> allocator) { … }
void Server::SetBatchMethodAllocator(
grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) { … }
void Server::RegisterCompletionQueue(grpc_completion_queue* cq) { … }
namespace {
bool streq(const std::string& a, const char* b) { … }
}
Server::RegisteredMethod* Server::RegisterMethod(
const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) { … }
void Server::DoneRequestEvent(void* req, grpc_cq_completion* ) { … }
void Server::FailCall(size_t cq_idx, RequestedCall* rc,
grpc_error_handle error) { … }
void Server::MaybeFinishShutdown() { … }
void Server::KillPendingWorkLocked(grpc_error_handle error) { … }
std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const { … }
void Server::ListenerDestroyDone(void* arg, grpc_error_handle ) { … }
namespace {
void DonePublishedShutdown(void* , grpc_cq_completion* storage) { … }
}
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { … }
void Server::StopListening() { … }
void Server::CancelAllCalls() { … }
void Server::SendGoaways() { … }
void Server::Orphan() { … }
grpc_call_error Server::ValidateServerRequest(
grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm) { … }
grpc_call_error Server::ValidateServerRequestAndCq(
size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm) { … }
grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) { … }
grpc_call_error Server::RequestCall(grpc_call** call,
grpc_call_details* details,
grpc_metadata_array* request_metadata,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification,
void* tag) { … }
grpc_call_error Server::RequestRegisteredCall(
RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag_new) { … }
class Server::ChannelData::ConnectivityWatcher
: public AsyncConnectivityStateWatcherInterface { … };
Server::ChannelData::~ChannelData() { … }
void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
RefCountedPtr<Channel> channel,
size_t cq_idx,
grpc_transport* transport,
intptr_t channelz_socket_uuid) { … }
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
const grpc_slice& host, const grpc_slice& path) { … }
void Server::ChannelData::AcceptStream(void* arg, grpc_transport* ,
const void* transport_server_data) { … }
ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) { … }
void Server::ChannelData::FinishDestroy(void* arg,
grpc_error_handle ) { … }
void Server::ChannelData::Destroy() { … }
grpc_error_handle Server::ChannelData::InitChannelElement(
grpc_channel_element* elem, grpc_channel_element_args* args) { … }
void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) { … }
Server::CallData::CallData(grpc_call_element* elem,
const grpc_call_element_args& args,
RefCountedPtr<Server> server)
: … { … }
Server::CallData::~CallData() { … }
void Server::CallData::SetState(CallState state) { … }
bool Server::CallData::MaybeActivate() { … }
void Server::CallData::FailCallCreation() { … }
void Server::CallData::Start(grpc_call_element* elem) { … }
void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) { … }
void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) { … }
namespace {
void KillZombieClosure(void* call, grpc_error_handle ) { … }
}
void Server::CallData::KillZombie() { … }
void Server::CallData::StartNewRpc(grpc_call_element* elem) { … }
void Server::CallData::RecvInitialMetadataBatchComplete(
void* arg, grpc_error_handle error) { … }
void Server::CallData::StartTransportStreamOpBatchImpl(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { … }
void Server::CallData::RecvInitialMetadataReady(void* arg,
grpc_error_handle error) { … }
void Server::CallData::RecvTrailingMetadataReady(void* arg,
grpc_error_handle error) { … }
grpc_error_handle Server::CallData::InitCallElement(
grpc_call_element* elem, const grpc_call_element_args* args) { … }
void Server::CallData::DestroyCallElement(
grpc_call_element* elem, const grpc_call_final_info* ,
grpc_closure* ) { … }
void Server::CallData::StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { … }
}
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { … }
void grpc_server_register_completion_queue(grpc_server* server,
grpc_completion_queue* cq,
void* reserved) { … }
void* grpc_server_register_method(
grpc_server* server, const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) { … }
void grpc_server_start(grpc_server* server) { … }
void grpc_server_shutdown_and_notify(grpc_server* server,
grpc_completion_queue* cq, void* tag) { … }
void grpc_server_cancel_all_calls(grpc_server* server) { … }
void grpc_server_destroy(grpc_server* server) { … }
grpc_call_error grpc_server_request_call(
grpc_server* server, grpc_call** call, grpc_call_details* details,
grpc_metadata_array* request_metadata,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) { … }
grpc_call_error grpc_server_request_registered_call(
grpc_server* server, void* registered_method, grpc_call** call,
gpr_timespec* deadline, grpc_metadata_array* request_metadata,
grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag_new) { … }
void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) { … }
void grpc_server_config_fetcher_destroy(
grpc_server_config_fetcher* server_config_fetcher) { … }