#include <limits.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <cstdlib>
#include <memory>
#include <new>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
#include <grpcpp/impl/call_op_set_interface.h>
#include <grpcpp/impl/completion_queue_tag.h>
#include <grpcpp/impl/interceptor_common.h>
#include <grpcpp/impl/metadata_map.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/impl/server_callback_handlers.h>
#include <grpcpp/impl/server_initializer.h>
#include <grpcpp/impl/service_type.h>
#include <grpcpp/impl/sync.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_context.h>
#include <grpcpp/server_interface.h>
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/client_interceptor.h>
#include <grpcpp/support/interceptor.h>
#include <grpcpp/support/method_handler.h>
#include <grpcpp/support/server_interceptor.h>
#include <grpcpp/support/slice.h>
#include <grpcpp/support/status.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
namespace grpc {
namespace {
#define DEFAULT_MAX_SYNC_SERVER_THREADS …
const char* kServerThreadpoolExhausted = …;
const char* kUnknownRpcMethod = …;
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { … };
std::shared_ptr<Server::GlobalCallbacks> g_callbacks = …;
gpr_once g_once_init_callbacks = …;
void InitGlobalCallbacks() { … }
class ShutdownTag : public internal::CompletionQueueTag { … };
class PhonyTag : public internal::CompletionQueueTag { … };
class UnimplementedAsyncRequestContext { … };
}
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: … { … }
ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { … }
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) { … }
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() { … }
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, const char* name,
internal::RpcMethod::RpcType type)
: … { … }
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) { … }
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: … { … }
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
bool* status) { … }
namespace {
class ShutdownCallback : public grpc_completion_queue_functor { … };
}
class Server::UnimplementedAsyncRequest final
: private grpc::UnimplementedAsyncRequestContext,
public GenericAsyncRequest { … };
class Server::UnimplementedAsyncResponse final
: public grpc::internal::CallOpSet<
grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus> { … };
class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { … };
template <class ServerContextType>
class Server::CallbackRequest final
: public grpc::internal::CompletionQueueTag { … };
template <>
bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
void** , bool* ) { … }
template <>
bool Server::CallbackRequest<
grpc::GenericCallbackServerContext>::FinalizeResult(void** ,
bool* status) { … }
template <>
const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
const { … }
template <>
const char* Server::CallbackRequest<
grpc::GenericCallbackServerContext>::method_name() const { … }
class Server::SyncRequestThreadManager : public grpc::ThreadManager { … };
Server::Server(
grpc::ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_server_config_fetcher* server_config_fetcher,
grpc_resource_quota* server_rq,
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
interceptor_creators,
experimental::ServerMetricRecorder* server_metric_recorder)
: … { … }
Server::~Server() { … }
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { … }
grpc_server* Server::c_server() { … }
std::shared_ptr<grpc::Channel> Server::InProcessChannel(
const grpc::ChannelArguments& args) { … }
std::shared_ptr<grpc::Channel>
Server::experimental_type::InProcessChannelWithInterceptors(
const grpc::ChannelArguments& args,
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators) { … }
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
grpc::internal::RpcServiceMethod* method) { … }
bool Server::RegisterService(const std::string* addr, grpc::Service* service) { … }
void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { … }
void Server::RegisterCallbackGenericService(
grpc::CallbackGenericService* service) { … }
int Server::AddListeningPort(const std::string& addr,
grpc::ServerCredentials* creds) { … }
void Server::Ref() { … }
void Server::UnrefWithPossibleNotify() { … }
void Server::UnrefAndWaitLocked() { … }
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { … }
void Server::ShutdownInternal(gpr_timespec deadline) { … }
void Server::Wait() { … }
void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
grpc::internal::Call* call) { … }
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) { … }
Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: … { … }
grpc::ServerInitializer* Server::initializer() { … }
grpc::CompletionQueue* Server::CallbackCQ() { … }
}