#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/retry_filter.h"
#include <inttypes.h>
#include <limits.h>
#include <stddef.h>
#include <memory>
#include <new>
#include <string>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/retry_service_config.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE …
#define RETRY_BACKOFF_JITTER …
namespace grpc_core {
namespace {
RetryGlobalConfig;
RetryMethodConfig;
RetryServiceConfigParser;
ServerRetryThrottleData;
TraceFlag grpc_retry_trace(false, "retry");
class RetryFilter { … };
class RetryFilter::CallData { … };
class RetryFilter::CallData::CallStackDestructionBarrier
: public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
kUnrefCallDtor> { … };
RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
bool is_transparent_retry)
: … { … }
RetryFilter::CallData::CallAttempt::~CallAttempt() { … }
void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() { … }
bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
PendingBatch* pending) { … }
bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() { … }
void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() { … }
RetryFilter::CallData::CallAttempt::BatchData*
RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { … }
namespace {
void StartBatchInCallCombiner(void* arg, grpc_error_handle ) { … }
}
void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
grpc_transport_stream_op_batch* batch, const char* reason,
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::
AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
grpc_error_handle error, CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { … }
void RetryFilter::CallData::CallAttempt::CancelFromSurface(
grpc_transport_stream_op_batch* cancel_batch) { … }
bool RetryFilter::CallData::CallAttempt::ShouldRetry(
absl::optional<grpc_status_code> status,
absl::optional<Duration> server_pushback) { … }
void RetryFilter::CallData::CallAttempt::Abandon() { … }
void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() { … }
RetryFilter::CallData::CallAttempt::BatchData::BatchData(
RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
: … { … }
RetryFilter::CallData::CallAttempt::BatchData::~BatchData() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
FreeCachedSendOpDataForCompletedBatch() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvInitialMetadataCallback(
grpc_error_handle error, CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
void* arg, grpc_error_handle error) { … }
namespace {
void GetCallStatus(
Timestamp deadline, grpc_metadata_batch* md_batch, grpc_error_handle error,
grpc_status_code* status, absl::optional<Duration>* server_pushback,
bool* is_lb_drop,
absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) { … }
}
void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvTrailingMetadataReady(
grpc_error_handle error, CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForDeferredCompletionCallbacks(
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresToFailUnstartedPendingBatches(
grpc_error_handle error, CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForCompletedPendingBatch(grpc_error_handle error,
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendInitialMetadataOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendMessageOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendTrailingMetadataOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvInitialMetadataOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvMessageOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvTrailingMetadataOp() { … }
void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
grpc_error_handle error) { … }
grpc_error_handle RetryFilter::CallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) { … }
void RetryFilter::CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* ,
grpc_closure* then_schedule_closure) { … }
void RetryFilter::CallData::StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { … }
void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
grpc_polling_entity* pollent) { … }
const RetryMethodConfig* RetryFilter::GetRetryPolicy(
const grpc_call_context_element* context) { … }
RetryFilter::CallData::CallData(RetryFilter* chand,
const grpc_call_element_args& args)
: … { … }
RetryFilter::CallData::~CallData() { … }
void RetryFilter::CallData::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) { … }
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) { … }
void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) { … }
void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) { … }
void RetryFilter::CallData::FreeCachedSendInitialMetadata() { … }
void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) { … }
void RetryFilter::CallData::FreeCachedSendTrailingMetadata() { … }
void RetryFilter::CallData::FreeAllCachedSendOpData() { … }
size_t RetryFilter::CallData::GetBatchIndex(
grpc_transport_stream_op_batch* batch) { … }
RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
grpc_transport_stream_op_batch* batch) { … }
void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) { … }
void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) { … }
void RetryFilter::CallData::FailPendingBatchInCallCombiner(
void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) { … }
template <typename Predicate>
RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
const char* log_message, Predicate predicate) { … }
void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) { … }
void RetryFilter::CallData::StartRetryTimer(
absl::optional<Duration> server_pushback) { … }
void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) { … }
void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
grpc_error_handle error) { … }
void RetryFilter::CallData::AddClosureToStartTransparentRetry(
CallCombinerClosureList* closures) { … }
void RetryFilter::CallData::StartTransparentRetry(void* arg,
grpc_error_handle ) { … }
}
const grpc_channel_filter kRetryFilterVtable = …;
}