chromium/third_party/grpc/src/src/core/ext/filters/client_channel/retry_filter.cc

//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/ext/filters/client_channel/retry_filter.h"

#include <inttypes.h>
#include <limits.h>
#include <stddef.h>

#include <memory>
#include <new>
#include <string>
#include <utility>

#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"

#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/retry_service_config.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"

//
// Retry filter
//

// This filter is intended to be used in the DynamicFilter stack in the
// client channel, which is situated between the name resolver and the
// LB policy.  Normally, the last filter in the DynamicFilter stack is
// the DynamicTerminationFilter (see client_channel.cc), which creates a
// LoadBalancedCall and delegates to it.  However, when retries are
// enabled, this filter is used instead of the DynamicTerminationFilter.
//
// In order to support retries, we act as a proxy for stream op batches.
// When we get a batch from the surface, we add it to our list of pending
// batches, and we then use those batches to construct separate "child"
// batches to be started on an LB call.  When the child batches return, we
// then decide which pending batches have been completed and schedule their
// callbacks accordingly.  If a call attempt fails and we want to retry it,
// we create a new LB call and start again, constructing new "child" batches
// for the new LB call.
//
// Note that retries are committed when receiving data from the server
// (except for Trailers-Only responses).  However, there may be many
// send ops started before receiving any data, so we may have already
// completed some number of send ops (and returned the completions up to
// the surface) by the time we realize that we need to retry.  To deal
// with this, we cache data for send ops, so that we can replay them on a
// different LB call even after we have completed the original batches.
//
// The code is structured as follows:
// - In CallData (in the parent channel), we maintain a list of pending
//   ops and cached data for send ops.
// - There is a CallData::CallAttempt object for each retry attempt.
//   This object contains the LB call for that attempt and state to indicate
//   which ops from the CallData object have already been sent down to that
//   LB call.
// - There is a CallData::CallAttempt::BatchData object for each "child"
//   batch sent on the LB call.
//
// When constructing the "child" batches, we compare the state in the
// CallAttempt object against the state in the CallData object to see
// which batches need to be sent on the LB call for a given attempt.

// TODO(roth): In subsequent PRs:
// - implement hedging

// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE

// This value was picked arbitrarily.  It can be changed if there is
// any even moderately compelling reason to do so.
#define RETRY_BACKOFF_JITTER

namespace grpc_core {

namespace {

RetryGlobalConfig;
RetryMethodConfig;
RetryServiceConfigParser;
ServerRetryThrottleData;

TraceFlag grpc_retry_trace(false, "retry");

//
// RetryFilter
//

class RetryFilter {};

//
// RetryFilter::CallData
//

class RetryFilter::CallData {};

//
// RetryFilter::CallData::CallStackDestructionBarrier
//

// A class to track the existence of LoadBalancedCall call stacks that
// we've created.  We wait until all such call stacks have been
// destroyed before we return the on_call_stack_destruction closure up
// to the surface.
//
// The parent RetryFilter::CallData object holds a ref to this object.
// When it is destroyed, it will store the on_call_stack_destruction
// closure from the surface in this object and then release its ref.
// We also take a ref to this object for each LB call we create, and
// those refs are not released until the LB call stack is destroyed.
// When this object is destroyed, it will invoke the
// on_call_stack_destruction closure from the surface.
class RetryFilter::CallData::CallStackDestructionBarrier
    : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
                        kUnrefCallDtor> {};

//
// RetryFilter::CallData::CallAttempt
//

RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
                                                bool is_transparent_retry)
    :{}

RetryFilter::CallData::CallAttempt::~CallAttempt() {}

void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {}

bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
    PendingBatch* pending) {}

bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {}

void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {}

// If there are any cached send ops that need to be replayed on the
// current call attempt, creates and returns a new batch to replay those ops.
// Otherwise, returns nullptr.
RetryFilter::CallData::CallAttempt::BatchData*
RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {}

namespace {

void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {}

}  // namespace

void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
    grpc_transport_stream_op_batch* batch, const char* reason,
    CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::
    AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
    grpc_error_handle error, CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
    CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
    CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {}

void RetryFilter::CallData::CallAttempt::CancelFromSurface(
    grpc_transport_stream_op_batch* cancel_batch) {}

bool RetryFilter::CallData::CallAttempt::ShouldRetry(
    absl::optional<grpc_status_code> status,
    absl::optional<Duration> server_pushback) {}

void RetryFilter::CallData::CallAttempt::Abandon() {}

void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer(
    void* arg, grpc_error_handle error) {}

void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
    void* arg, grpc_error_handle error) {}

void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {}

//
// RetryFilter::CallData::CallAttempt::BatchData
//

RetryFilter::CallData::CallAttempt::BatchData::BatchData(
    RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
    :{}

RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    FreeCachedSendOpDataForCompletedBatch() {}

//
// recv_initial_metadata callback handling
//

void RetryFilter::CallData::CallAttempt::BatchData::
    MaybeAddClosureForRecvInitialMetadataCallback(
        grpc_error_handle error, CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
    void* arg, grpc_error_handle error) {}

//
// recv_message callback handling
//

void RetryFilter::CallData::CallAttempt::BatchData::
    MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,
                                          CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
    void* arg, grpc_error_handle error) {}

//
// recv_trailing_metadata handling
//

namespace {

// Sets *status, *server_pushback, and *is_lb_drop based on md_batch
// and error.
void GetCallStatus(
    Timestamp deadline, grpc_metadata_batch* md_batch, grpc_error_handle error,
    grpc_status_code* status, absl::optional<Duration>* server_pushback,
    bool* is_lb_drop,
    absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {}

}  // namespace

void RetryFilter::CallData::CallAttempt::BatchData::
    MaybeAddClosureForRecvTrailingMetadataReady(
        grpc_error_handle error, CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddClosuresForDeferredCompletionCallbacks(
        CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddClosuresToFailUnstartedPendingBatches(
        grpc_error_handle error, CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
    grpc_error_handle error) {}

void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
    void* arg, grpc_error_handle error) {}

//
// on_complete callback handling
//

void RetryFilter::CallData::CallAttempt::BatchData::
    AddClosuresForCompletedPendingBatch(grpc_error_handle error,
                                        CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {}

void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
    void* arg, grpc_error_handle error) {}

void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
    void* arg, grpc_error_handle error) {}

//
// retriable batch construction
//

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableSendInitialMetadataOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableSendMessageOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableSendTrailingMetadataOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableRecvInitialMetadataOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableRecvMessageOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::
    AddRetriableRecvTrailingMetadataOp() {}

void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
    grpc_error_handle error) {}

//
// CallData vtable functions
//

grpc_error_handle RetryFilter::CallData::Init(
    grpc_call_element* elem, const grpc_call_element_args* args) {}

void RetryFilter::CallData::Destroy(grpc_call_element* elem,
                                    const grpc_call_final_info* /*final_info*/,
                                    grpc_closure* then_schedule_closure) {}

void RetryFilter::CallData::StartTransportStreamOpBatch(
    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {}

void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
                                       grpc_polling_entity* pollent) {}

//
// CallData implementation
//

const RetryMethodConfig* RetryFilter::GetRetryPolicy(
    const grpc_call_context_element* context) {}

RetryFilter::CallData::CallData(RetryFilter* chand,
                                const grpc_call_element_args& args)
    :{}

RetryFilter::CallData::~CallData() {}

void RetryFilter::CallData::StartTransportStreamOpBatch(
    grpc_transport_stream_op_batch* batch) {}

OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall(
    ConfigSelector::CallDispatchController* call_dispatch_controller,
    bool is_transparent_retry) {}

void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {}

//
// send op data caching
//

void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {}

void RetryFilter::CallData::FreeCachedSendInitialMetadata() {}

void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {}

void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {}

void RetryFilter::CallData::FreeAllCachedSendOpData() {}

//
// pending_batches management
//

size_t RetryFilter::CallData::GetBatchIndex(
    grpc_transport_stream_op_batch* batch) {}

// This is called via the call combiner, so access to calld is synchronized.
RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
    grpc_transport_stream_op_batch* batch) {}

void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {}

void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {}

// This is called via the call combiner, so access to calld is synchronized.
void RetryFilter::CallData::FailPendingBatchInCallCombiner(
    void* arg, grpc_error_handle error) {}

// This is called via the call combiner, so access to calld is synchronized.
void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {}

template <typename Predicate>
RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
    const char* log_message, Predicate predicate) {}

//
// retry code
//

void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {}

void RetryFilter::CallData::StartRetryTimer(
    absl::optional<Duration> server_pushback) {}

void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) {}

void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
                                               grpc_error_handle error) {}

void RetryFilter::CallData::AddClosureToStartTransparentRetry(
    CallCombinerClosureList* closures) {}

void RetryFilter::CallData::StartTransparentRetry(void* arg,
                                                  grpc_error_handle /*error*/) {}

}  // namespace

const grpc_channel_filter kRetryFilterVtable =;

}  // namespace grpc_core