#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/call_combiner.h"
#include <inttypes.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/gprpp/crash.h"
namespace grpc_core {
DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
namespace {
constexpr intptr_t kErrorBit = …;
grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) { … }
}
CallCombiner::CallCombiner() { … }
CallCombiner::~CallCombiner() { … }
#ifdef GRPC_TSAN_ENABLED
void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
CallCombiner* self = static_cast<CallCombiner*>(arg);
RefCountedPtr<TsanLock> lock = self->tsan_lock_;
bool prev = false;
if (lock->taken.compare_exchange_strong(prev, true)) {
TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
} else {
lock.reset();
}
Closure::Run(DEBUG_LOCATION, self->original_closure_, error);
if (lock != nullptr) {
TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
bool prev = true;
GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
}
}
#endif
void CallCombiner::ScheduleClosure(grpc_closure* closure,
grpc_error_handle error) { … }
#ifndef NDEBUG
#define DEBUG_ARGS …
#define DEBUG_FMT_STR …
#define DEBUG_FMT_ARGS …
#else
#define DEBUG_ARGS
#define DEBUG_FMT_STR
#define DEBUG_FMT_ARGS
#endif
void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
DEBUG_ARGS const char* reason) { … }
void CallCombiner::Stop(DEBUG_ARGS const char* reason) { … }
void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) { … }
void CallCombiner::Cancel(grpc_error_handle error) { … }
}