chromium/third_party/grpc/src/src/core/lib/surface/server.cc

//
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/lib/surface/server.h"

#include <inttypes.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <list>
#include <new>
#include <queue>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>

#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"

#include <grpc/byte_buffer.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"

namespace grpc_core {

TraceFlag grpc_server_channel_trace(false, "server_channel");

//
// Server::RequestedCall
//

struct Server::RequestedCall {};

//
// Server::RegisteredMethod
//

struct Server::RegisteredMethod {};

//
// Server::RequestMatcherInterface
//

// RPCs that come in from the transport must be matched against RPC requests
// from the application. An incoming request from the application can be matched
// to an RPC that has already arrived or can be queued up for later use.
// Likewise, an RPC coming in from the transport can either be matched to a
// request that already arrived from the application or can be queued up for
// later use (marked pending). If there is a match, the request's tag is posted
// on the request's notification CQ.
//
// RequestMatcherInterface is the base class to provide this functionality.
class Server::RequestMatcherInterface {};

// The RealRequestMatcher is an implementation of RequestMatcherInterface that
// actually uses all the features of RequestMatcherInterface: expecting the
// application to explicitly request RPCs and then matching those to incoming
// RPCs, along with a slow path by which incoming RPCs are put on a locked
// pending list if they aren't able to be matched to an application request.
class Server::RealRequestMatcher : public RequestMatcherInterface {};

// AllocatingRequestMatchers don't allow the application to request an RPC in
// advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
// will call out to an allocation function passed in at the construction of the
// object. These request matchers are designed for the C++ callback API, so they
// only support 1 completion queue (passed in at the constructor). They are also
// used for the sync API.
class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {};

// An allocating request matcher for non-registered methods (used for generic
// API and unimplemented RPCs).
class Server::AllocatingRequestMatcherBatch
    : public AllocatingRequestMatcherBase {};

// An allocating request matcher for registered methods.
class Server::AllocatingRequestMatcherRegistered
    : public AllocatingRequestMatcherBase {};

//
// ChannelBroadcaster
//

namespace {

class ChannelBroadcaster {};

}  // namespace

//
// Server
//

const grpc_channel_filter Server::kServerTopFilter =;

namespace {

RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
    const ChannelArgs& args) {}

}  // namespace

Server::Server(const ChannelArgs& args)
    :{}

Server::~Server() {}

void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {}

void Server::Start() {}

grpc_error_handle Server::SetupTransport(
    grpc_transport* transport, grpc_pollset* accepting_pollset,
    const ChannelArgs& args,
    const RefCountedPtr<channelz::SocketNode>& socket_node) {}

bool Server::HasOpenConnections() {}

void Server::SetRegisteredMethodAllocator(
    grpc_completion_queue* cq, void* method_tag,
    std::function<RegisteredCallAllocation()> allocator) {}

void Server::SetBatchMethodAllocator(
    grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) {}

void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {}

namespace {

bool streq(const std::string& a, const char* b) {}

}  // namespace

Server::RegisteredMethod* Server::RegisterMethod(
    const char* method, const char* host,
    grpc_server_register_method_payload_handling payload_handling,
    uint32_t flags) {}

void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {}

void Server::FailCall(size_t cq_idx, RequestedCall* rc,
                      grpc_error_handle error) {}

// Before calling MaybeFinishShutdown(), we must hold mu_global_ and not
// hold mu_call_.
void Server::MaybeFinishShutdown() {}

void Server::KillPendingWorkLocked(grpc_error_handle error) {}

std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {}

void Server::ListenerDestroyDone(void* arg, grpc_error_handle /*error*/) {}

namespace {

void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {}

}  // namespace

// - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made
//   via grpc_server_request_call() and grpc_server_request_registered_call()
//   will now be cancelled). See KillPendingWorkLocked().
//
// - Shuts down the listeners (i.e., the server will no longer listen on the
//   port for new incoming channels).
//
// - Iterates through all channels on the server and sends shutdown msg (see
//   ChannelBroadcaster::BroadcastShutdown() for details) to the clients via
//   the transport layer. The transport layer then guarantees the following:
//    -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY).
//    -- If the server has outstanding calls that are in the process, the
//       connection is NOT closed until the server is done with all those calls.
//    -- Once there are no more calls in progress, the channel is closed.
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {}

void Server::StopListening() {}

void Server::CancelAllCalls() {}

void Server::SendGoaways() {}

void Server::Orphan() {}

grpc_call_error Server::ValidateServerRequest(
    grpc_completion_queue* cq_for_notification, void* tag,
    grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {}

grpc_call_error Server::ValidateServerRequestAndCq(
    size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
    grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {}

grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) {}

grpc_call_error Server::RequestCall(grpc_call** call,
                                    grpc_call_details* details,
                                    grpc_metadata_array* request_metadata,
                                    grpc_completion_queue* cq_bound_to_call,
                                    grpc_completion_queue* cq_for_notification,
                                    void* tag) {}

grpc_call_error Server::RequestRegisteredCall(
    RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
    grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
    grpc_completion_queue* cq_bound_to_call,
    grpc_completion_queue* cq_for_notification, void* tag_new) {}

//
// Server::ChannelData::ConnectivityWatcher
//

class Server::ChannelData::ConnectivityWatcher
    : public AsyncConnectivityStateWatcherInterface {};

//
// Server::ChannelData
//

Server::ChannelData::~ChannelData() {}

void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
                                        RefCountedPtr<Channel> channel,
                                        size_t cq_idx,
                                        grpc_transport* transport,
                                        intptr_t channelz_socket_uuid) {}

Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
    const grpc_slice& host, const grpc_slice& path) {}

void Server::ChannelData::AcceptStream(void* arg, grpc_transport* /*transport*/,
                                       const void* transport_server_data) {}

ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
    grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {}

void Server::ChannelData::FinishDestroy(void* arg,
                                        grpc_error_handle /*error*/) {}

void Server::ChannelData::Destroy() {}

grpc_error_handle Server::ChannelData::InitChannelElement(
    grpc_channel_element* elem, grpc_channel_element_args* args) {}

void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) {}

//
// Server::CallData
//

Server::CallData::CallData(grpc_call_element* elem,
                           const grpc_call_element_args& args,
                           RefCountedPtr<Server> server)
    :{}

Server::CallData::~CallData() {}

void Server::CallData::SetState(CallState state) {}

bool Server::CallData::MaybeActivate() {}

void Server::CallData::FailCallCreation() {}

void Server::CallData::Start(grpc_call_element* elem) {}

void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {}

void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {}

namespace {

void KillZombieClosure(void* call, grpc_error_handle /*error*/) {}

}  // namespace

void Server::CallData::KillZombie() {}

// If this changes, change MakeCallPromise too.
void Server::CallData::StartNewRpc(grpc_call_element* elem) {}

void Server::CallData::RecvInitialMetadataBatchComplete(
    void* arg, grpc_error_handle error) {}

void Server::CallData::StartTransportStreamOpBatchImpl(
    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {}

void Server::CallData::RecvInitialMetadataReady(void* arg,
                                                grpc_error_handle error) {}

void Server::CallData::RecvTrailingMetadataReady(void* arg,
                                                 grpc_error_handle error) {}

grpc_error_handle Server::CallData::InitCallElement(
    grpc_call_element* elem, const grpc_call_element_args* args) {}

void Server::CallData::DestroyCallElement(
    grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
    grpc_closure* /*ignored*/) {}

void Server::CallData::StartTransportStreamOpBatch(
    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {}

}  // namespace grpc_core

//
// C-core API
//

grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {}

void grpc_server_register_completion_queue(grpc_server* server,
                                           grpc_completion_queue* cq,
                                           void* reserved) {}

void* grpc_server_register_method(
    grpc_server* server, const char* method, const char* host,
    grpc_server_register_method_payload_handling payload_handling,
    uint32_t flags) {}

void grpc_server_start(grpc_server* server) {}

void grpc_server_shutdown_and_notify(grpc_server* server,
                                     grpc_completion_queue* cq, void* tag) {}

void grpc_server_cancel_all_calls(grpc_server* server) {}

void grpc_server_destroy(grpc_server* server) {}

grpc_call_error grpc_server_request_call(
    grpc_server* server, grpc_call** call, grpc_call_details* details,
    grpc_metadata_array* request_metadata,
    grpc_completion_queue* cq_bound_to_call,
    grpc_completion_queue* cq_for_notification, void* tag) {}

grpc_call_error grpc_server_request_registered_call(
    grpc_server* server, void* registered_method, grpc_call** call,
    gpr_timespec* deadline, grpc_metadata_array* request_metadata,
    grpc_byte_buffer** optional_payload,
    grpc_completion_queue* cq_bound_to_call,
    grpc_completion_queue* cq_for_notification, void* tag_new) {}

void grpc_server_set_config_fetcher(
    grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {}

void grpc_server_config_fetcher_destroy(
    grpc_server_config_fetcher* server_config_fetcher) {}