#include <grpc/support/port_platform.h>
#include "src/core/lib/surface/call.h"
#include <inttypes.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/cleanup/cleanup.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include <grpc/byte_buffer.h>
#include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
grpc_core::TraceFlag grpc_call_trace(false, "call");
grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount");
namespace grpc_core {
class Call : public CppImplOf<Call, grpc_call> { … };
Call::ParentCall* Call::GetOrCreateParentCall() { … }
Call::ParentCall* Call::parent_call() { … }
absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) { … }
void Call::PublishToParent(Call* parent) { … }
void Call::MaybeUnpublishFromParent() { … }
void Call::CancelWithStatus(grpc_status_code status, const char* description) { … }
void Call::PropagateCancellationToChildren() { … }
char* Call::GetPeer() { … }
void Call::DeleteThis() { … }
class FilterStackCall final : public Call { … };
grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
grpc_call** out_call) { … }
void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) { … }
void FilterStackCall::ReleaseCall(void* call, grpc_error_handle ) { … }
void FilterStackCall::DestroyCall(void* call, grpc_error_handle ) { … }
void FilterStackCall::ExternalUnref() { … }
void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
grpc_closure* start_batch_closure) { … }
namespace {
struct CancelState { … };
}
static void done_termination(void* arg, grpc_error_handle ) { … }
void FilterStackCall::CancelWithError(grpc_error_handle error) { … }
void FilterStackCall::SetFinalStatus(grpc_error_handle error) { … }
bool FilterStackCall::PrepareApplicationMetadata(size_t count,
grpc_metadata* metadata,
bool is_trailing) { … }
namespace {
class PublishToAppEncoder { … };
}
void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
bool is_trailing) { … }
void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) { … }
void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
grpc_error_handle batch_error) { … }
namespace {
bool AreWriteFlagsValid(uint32_t flags) { … }
bool AreInitialMetadataFlagsValid(uint32_t flags) { … }
size_t BatchSlotForOp(grpc_op_type type) { … }
}
FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
const grpc_op* ops) { … }
void FilterStackCall::BatchControl::PostCompletion() { … }
void FilterStackCall::BatchControl::FinishStep(PendingOp op) { … }
void FilterStackCall::BatchControl::ProcessDataAfterMetadata() { … }
void FilterStackCall::BatchControl::ReceivingStreamReady(
grpc_error_handle error) { … }
void FilterStackCall::HandleCompressionAlgorithmDisabled(
grpc_compression_algorithm compression_algorithm) { … }
void FilterStackCall::HandleCompressionAlgorithmNotAccepted(
grpc_compression_algorithm compression_algorithm) { … }
void FilterStackCall::BatchControl::ValidateFilteredMetadata() { … }
void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
grpc_error_handle error) { … }
void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
grpc_error_handle error) { … }
void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) { … }
namespace {
void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
bool is_notify_tag_closure) { … }
}
grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) { … }
void FilterStackCall::ContextSet(grpc_context_index elem, void* value,
void (*destroy)(void*)) { … }
namespace {
bool ValidateMetadata(size_t count, grpc_metadata* metadata) { … }
}
class PromiseBasedCall : public Call,
public Activity,
public Wakeable,
public grpc_event_engine::experimental::EventEngine::
Closure { … };
template <typename T>
grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
grpc_call** out_call) { … }
PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
const grpc_call_create_args& args)
: … { … }
Waker PromiseBasedCall::MakeNonOwningWaker() { … }
void PromiseBasedCall::CToMetadata(grpc_metadata* metadata, size_t count,
grpc_metadata_batch* b) { … }
void PromiseBasedCall::ContextSet(grpc_context_index elem, void* value,
void (*destroy)(void*)) { … }
void* PromiseBasedCall::ContextGet(grpc_context_index elem) const { … }
PromiseBasedCall::Completion PromiseBasedCall::StartCompletion(
void* tag, bool is_closure, const grpc_op* ops) { … }
PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion(
const Completion& completion, PendingOp reason) { … }
void PromiseBasedCall::FailCompletion(const Completion& completion,
SourceLocation location) { … }
void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
PendingOp reason) { … }
void PromiseBasedCall::Update() { … }
void PromiseBasedCall::ForceImmediateRepoll() { … }
void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) { … }
void PromiseBasedCall::SetCompletionQueueLocked(grpc_completion_queue* cq) { … }
void PromiseBasedCall::UpdateDeadline(Timestamp deadline) { … }
void PromiseBasedCall::ResetDeadline() { … }
void PromiseBasedCall::Run() { … }
void PromiseBasedCall::StartSendMessage(const grpc_op& op,
const Completion& completion,
PipeSender<MessageHandle>* sender) { … }
bool PromiseBasedCall::PollSendMessage() { … }
void PromiseBasedCall::CancelSendMessage() { … }
void PromiseBasedCall::StartRecvMessage(const grpc_op& op,
const Completion& completion,
PipeReceiver<MessageHandle>* receiver) { … }
void PromiseBasedCall::PollRecvMessage(
grpc_compression_algorithm incoming_compression_algorithm) { … }
void PromiseBasedCall::CancelRecvMessage() { … }
void CallContext::RunInContext(absl::AnyInvocable<void()> fn) { … }
void CallContext::IncrementRefCount(const char* reason) { … }
void CallContext::Unref(const char* reason) { … }
void CallContext::UpdateDeadline(Timestamp deadline) { … }
ServerCallContext* CallContext::server_call_context() { … }
namespace {
void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array) { … }
}
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
class ClientPromiseBasedCall final : public PromiseBasedCall { … };
void ClientPromiseBasedCall::StartPromise(
ClientMetadataHandle client_initial_metadata) { … }
void ClientPromiseBasedCall::CancelWithErrorLocked(grpc_error_handle error) { … }
grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops,
size_t nops) const { … }
void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
const Completion& completion) { … }
grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops,
size_t nops,
void* notify_tag,
bool is_notify_tag_closure) { … }
void ClientPromiseBasedCall::PublishInitialMetadata(ServerMetadata* metadata) { … }
void ClientPromiseBasedCall::UpdateOnce() { … }
void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) { … }
namespace {
std::string MakeErrorString(const ServerMetadata* trailing_metadata) { … }
}
void ClientPromiseBasedCall::PublishStatus(
grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
ServerMetadataHandle trailing_metadata) { … }
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
class ServerPromiseBasedCall final : public PromiseBasedCall { … };
ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
grpc_call_create_args* args)
: … { … }
Poll<ServerMetadataHandle> ServerPromiseBasedCall::PollTopOfCall() { … }
void ServerPromiseBasedCall::UpdateOnce() { … }
grpc_call_error ServerPromiseBasedCall::ValidateBatch(const grpc_op* ops,
size_t nops) const { … }
void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
const Completion& completion) { … }
grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
size_t nops,
void* notify_tag,
bool is_notify_tag_closure) { … }
void ServerPromiseBasedCall::CancelWithErrorLocked(absl::Status error) { … }
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
ArenaPromise<ServerMetadataHandle>
ServerCallContext::MakeTopOfServerCallPromise(
CallArgs call_args, grpc_completion_queue* cq,
grpc_metadata_array* publish_initial_metadata,
absl::FunctionRef<void(grpc_call* call)> publish) { … }
#else
ArenaPromise<ServerMetadataHandle>
ServerCallContext::MakeTopOfServerCallPromise(
CallArgs, grpc_completion_queue*, grpc_metadata_array*,
absl::FunctionRef<void(grpc_call*)>) {
(void)call_;
Crash("Promise-based server call is not enabled");
}
#endif
}
void* grpc_call_arena_alloc(grpc_call* call, size_t size) { … }
size_t grpc_call_get_initial_size_estimate() { … }
grpc_error_handle grpc_call_create(grpc_call_create_args* args,
grpc_call** out_call) { … }
void grpc_call_set_completion_queue(grpc_call* call,
grpc_completion_queue* cq) { … }
void grpc_call_ref(grpc_call* c) { … }
void grpc_call_unref(grpc_call* c) { … }
char* grpc_call_get_peer(grpc_call* call) { … }
grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) { … }
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { … }
grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) { … }
void grpc_call_cancel_internal(grpc_call* call) { … }
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call* call) { … }
uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) { … }
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) { … }
grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { … }
grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { … }
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) { … }
grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
const grpc_op* ops,
size_t nops,
grpc_closure* closure) { … }
void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
void* value, void (*destroy)(void* value)) { … }
void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) { … }
uint8_t grpc_call_is_client(grpc_call* call) { … }
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call* call, grpc_compression_level level) { … }
bool grpc_call_is_trailers_only(const grpc_call* call) { … }
int grpc_call_failed_before_recv_message(const grpc_call* c) { … }
absl::string_view grpc_call_server_authority(const grpc_call* call) { … }
const char* grpc_call_error_to_string(grpc_call_error error) { … }