chromium/third_party/grpc/src/src/core/lib/surface/call.cc

//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include "src/core/lib/surface/call.h"

#include <inttypes.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/cleanup/cleanup.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"

#include <grpc/byte_buffer.h>
#include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>

#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"

grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
grpc_core::TraceFlag grpc_call_trace(false, "call");
grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount");

namespace grpc_core {

///////////////////////////////////////////////////////////////////////////////
// Call

class Call : public CppImplOf<Call, grpc_call> {};

Call::ParentCall* Call::GetOrCreateParentCall() {}

Call::ParentCall* Call::parent_call() {}

absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) {}

void Call::PublishToParent(Call* parent) {}

void Call::MaybeUnpublishFromParent() {}

void Call::CancelWithStatus(grpc_status_code status, const char* description) {}

void Call::PropagateCancellationToChildren() {}

char* Call::GetPeer() {}

void Call::DeleteThis() {}

///////////////////////////////////////////////////////////////////////////////
// FilterStackCall
// To be removed once promise conversion is complete

class FilterStackCall final : public Call {};

grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
                                          grpc_call** out_call) {}

void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {}

void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {}

void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {}

void FilterStackCall::ExternalUnref() {}

// start_batch_closure points to a caller-allocated closure to be used
// for entering the call combiner.
void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
                                   grpc_closure* start_batch_closure) {}

namespace {
struct CancelState {};
}  // namespace

// The on_complete callback used when sending a cancel_stream batch down
// the filter stack.  Yields the call combiner when the batch is done.
static void done_termination(void* arg, grpc_error_handle /*error*/) {}

void FilterStackCall::CancelWithError(grpc_error_handle error) {}

void FilterStackCall::SetFinalStatus(grpc_error_handle error) {}

bool FilterStackCall::PrepareApplicationMetadata(size_t count,
                                                 grpc_metadata* metadata,
                                                 bool is_trailing) {}

namespace {
class PublishToAppEncoder {};
}  // namespace

void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
                                         bool is_trailing) {}

void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) {}

void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
                                         grpc_error_handle batch_error) {}

namespace {
bool AreWriteFlagsValid(uint32_t flags) {}

bool AreInitialMetadataFlagsValid(uint32_t flags) {}

size_t BatchSlotForOp(grpc_op_type type) {}
}  // namespace

FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
    const grpc_op* ops) {}

void FilterStackCall::BatchControl::PostCompletion() {}

void FilterStackCall::BatchControl::FinishStep(PendingOp op) {}

void FilterStackCall::BatchControl::ProcessDataAfterMetadata() {}

void FilterStackCall::BatchControl::ReceivingStreamReady(
    grpc_error_handle error) {}

void FilterStackCall::HandleCompressionAlgorithmDisabled(
    grpc_compression_algorithm compression_algorithm) {}

void FilterStackCall::HandleCompressionAlgorithmNotAccepted(
    grpc_compression_algorithm compression_algorithm) {}

void FilterStackCall::BatchControl::ValidateFilteredMetadata() {}

void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
    grpc_error_handle error) {}

void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
    grpc_error_handle error) {}

void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {}

namespace {
void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
                      bool is_notify_tag_closure) {}
}  // namespace

grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
                                            void* notify_tag,
                                            bool is_notify_tag_closure) {}

void FilterStackCall::ContextSet(grpc_context_index elem, void* value,
                                 void (*destroy)(void*)) {}

///////////////////////////////////////////////////////////////////////////////
// Metadata validation helpers

namespace {
bool ValidateMetadata(size_t count, grpc_metadata* metadata) {}
}  // namespace

///////////////////////////////////////////////////////////////////////////////
// PromiseBasedCall
// Will be folded into Call once the promise conversion is done

class PromiseBasedCall : public Call,
                         public Activity,
                         public Wakeable,
                         public grpc_event_engine::experimental::EventEngine::
                             Closure /* for deadlines */ {};

template <typename T>
grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
                                       grpc_call** out_call) {}

PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
                                   const grpc_call_create_args& args)
    :{}

Waker PromiseBasedCall::MakeNonOwningWaker() {}

void PromiseBasedCall::CToMetadata(grpc_metadata* metadata, size_t count,
                                   grpc_metadata_batch* b) {}

void PromiseBasedCall::ContextSet(grpc_context_index elem, void* value,
                                  void (*destroy)(void*)) {}

void* PromiseBasedCall::ContextGet(grpc_context_index elem) const {}

PromiseBasedCall::Completion PromiseBasedCall::StartCompletion(
    void* tag, bool is_closure, const grpc_op* ops) {}

PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion(
    const Completion& completion, PendingOp reason) {}

void PromiseBasedCall::FailCompletion(const Completion& completion,
                                      SourceLocation location) {}

void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
                                            PendingOp reason) {}

void PromiseBasedCall::Update() {}

void PromiseBasedCall::ForceImmediateRepoll() {}

void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) {}

void PromiseBasedCall::SetCompletionQueueLocked(grpc_completion_queue* cq) {}

void PromiseBasedCall::UpdateDeadline(Timestamp deadline) {}

void PromiseBasedCall::ResetDeadline() {}

void PromiseBasedCall::Run() {}

void PromiseBasedCall::StartSendMessage(const grpc_op& op,
                                        const Completion& completion,
                                        PipeSender<MessageHandle>* sender) {}

bool PromiseBasedCall::PollSendMessage() {}

void PromiseBasedCall::CancelSendMessage() {}

void PromiseBasedCall::StartRecvMessage(const grpc_op& op,
                                        const Completion& completion,
                                        PipeReceiver<MessageHandle>* receiver) {}

void PromiseBasedCall::PollRecvMessage(
    grpc_compression_algorithm incoming_compression_algorithm) {}

void PromiseBasedCall::CancelRecvMessage() {}

///////////////////////////////////////////////////////////////////////////////
// CallContext

void CallContext::RunInContext(absl::AnyInvocable<void()> fn) {}

void CallContext::IncrementRefCount(const char* reason) {}

void CallContext::Unref(const char* reason) {}

void CallContext::UpdateDeadline(Timestamp deadline) {}

ServerCallContext* CallContext::server_call_context() {}

///////////////////////////////////////////////////////////////////////////////
// PublishMetadataArray

namespace {
void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array) {}
}  // namespace

///////////////////////////////////////////////////////////////////////////////
// ClientPromiseBasedCall

#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
class ClientPromiseBasedCall final : public PromiseBasedCall {};

void ClientPromiseBasedCall::StartPromise(
    ClientMetadataHandle client_initial_metadata) {}

void ClientPromiseBasedCall::CancelWithErrorLocked(grpc_error_handle error) {}

grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops,
                                                      size_t nops) const {}

void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
                                         const Completion& completion) {}

grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops,
                                                   size_t nops,
                                                   void* notify_tag,
                                                   bool is_notify_tag_closure) {}

void ClientPromiseBasedCall::PublishInitialMetadata(ServerMetadata* metadata) {}

void ClientPromiseBasedCall::UpdateOnce() {}

void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) {}

namespace {
std::string MakeErrorString(const ServerMetadata* trailing_metadata) {}
}  // namespace

void ClientPromiseBasedCall::PublishStatus(
    grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
    ServerMetadataHandle trailing_metadata) {}
#endif

///////////////////////////////////////////////////////////////////////////////
// ServerPromiseBasedCall

#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL

class ServerPromiseBasedCall final : public PromiseBasedCall {};

ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
                                               grpc_call_create_args* args)
    :{}

Poll<ServerMetadataHandle> ServerPromiseBasedCall::PollTopOfCall() {}

void ServerPromiseBasedCall::UpdateOnce() {}

grpc_call_error ServerPromiseBasedCall::ValidateBatch(const grpc_op* ops,
                                                      size_t nops) const {}

void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
                                         const Completion& completion) {}

grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
                                                   size_t nops,
                                                   void* notify_tag,
                                                   bool is_notify_tag_closure) {}

void ServerPromiseBasedCall::CancelWithErrorLocked(absl::Status error) {}

#endif

#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
ArenaPromise<ServerMetadataHandle>
ServerCallContext::MakeTopOfServerCallPromise(
    CallArgs call_args, grpc_completion_queue* cq,
    grpc_metadata_array* publish_initial_metadata,
    absl::FunctionRef<void(grpc_call* call)> publish) {}
#else
ArenaPromise<ServerMetadataHandle>
ServerCallContext::MakeTopOfServerCallPromise(
    CallArgs, grpc_completion_queue*, grpc_metadata_array*,
    absl::FunctionRef<void(grpc_call*)>) {
  (void)call_;
  Crash("Promise-based server call is not enabled");
}
#endif

}  // namespace grpc_core

///////////////////////////////////////////////////////////////////////////////
// C-based API

void* grpc_call_arena_alloc(grpc_call* call, size_t size) {}

size_t grpc_call_get_initial_size_estimate() {}

grpc_error_handle grpc_call_create(grpc_call_create_args* args,
                                   grpc_call** out_call) {}

void grpc_call_set_completion_queue(grpc_call* call,
                                    grpc_completion_queue* cq) {}

void grpc_call_ref(grpc_call* c) {}

void grpc_call_unref(grpc_call* c) {}

char* grpc_call_get_peer(grpc_call* call) {}

grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {}

grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {}

grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
                                             grpc_status_code status,
                                             const char* description,
                                             void* reserved) {}

void grpc_call_cancel_internal(grpc_call* call) {}

grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
    grpc_call* call) {}

uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {}

uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {}

grpc_core::Arena* grpc_call_get_arena(grpc_call* call) {}

grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {}

grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
                                      size_t nops, void* tag, void* reserved) {}

grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
                                                  const grpc_op* ops,
                                                  size_t nops,
                                                  grpc_closure* closure) {}

void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
                           void* value, void (*destroy)(void* value)) {}

void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {}

uint8_t grpc_call_is_client(grpc_call* call) {}

grpc_compression_algorithm grpc_call_compression_for_level(
    grpc_call* call, grpc_compression_level level) {}

bool grpc_call_is_trailers_only(const grpc_call* call) {}

int grpc_call_failed_before_recv_message(const grpc_call* c) {}

absl::string_view grpc_call_server_authority(const grpc_call* call) {}

const char* grpc_call_error_to_string(grpc_call_error error) {}