#include <grpc/support/port_platform.h>
#include "src/core/lib/surface/completion_queue.h"
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <new>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/event_string.h"
grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
namespace {
thread_local grpc_cq_completion* g_cached_event;
thread_local grpc_completion_queue* g_cached_cq;
struct plucker { … };
struct cq_poller_vtable { … };
non_polling_worker;
struct non_polling_poller { … };
size_t non_polling_poller_size(void) { … }
void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { … }
void non_polling_poller_destroy(grpc_pollset* pollset) { … }
grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_core::Timestamp deadline) { … }
grpc_error_handle non_polling_poller_kick(
grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { … }
void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) { … }
const cq_poller_vtable g_poller_vtable_by_poller_type[] = …;
}
struct cq_vtable { … };
namespace {
class CqEventQueue { … };
struct cq_next_data { … };
struct cq_pluck_data { … };
struct cq_callback_data { … };
}
struct grpc_completion_queue { … };
static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq);
static void cq_shutdown_callback(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
static void cq_end_op_for_next(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_callback(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
static void cq_init_next(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_init_pluck(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_init_callback(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
static const cq_vtable g_cq_vtable[] = …;
#define DATA_FROM_CQ(cq) …
#define POLLSET_FROM_CQ(cq) …
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) …
static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { … }
int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
void** tag, int* ok) { … }
bool CqEventQueue::Push(grpc_cq_completion* c) { … }
grpc_cq_completion* CqEventQueue::Pop() { … }
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_completion_queue_functor* shutdown_callback) { … }
static void cq_init_next(void* data,
grpc_completion_queue_functor* ) { … }
static void cq_destroy_next(void* data) { … }
static void cq_init_pluck(
void* data, grpc_completion_queue_functor* ) { … }
static void cq_destroy_pluck(void* data) { … }
static void cq_init_callback(void* data,
grpc_completion_queue_functor* shutdown_callback) { … }
static void cq_destroy_callback(void* data) { … }
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { … }
int grpc_get_cq_poll_num(grpc_completion_queue* cq) { … }
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
const char* file, int line) { … }
static void on_pollset_shutdown_done(void* arg, grpc_error_handle ) { … }
#ifndef NDEBUG
void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
const char* file, int line) { … }
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) { … }
#else
static void cq_check_tag(grpc_completion_queue* , void* ,
bool ) {}
#endif
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* ) { … }
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* ) { … }
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* ) { … }
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { … }
static void cq_end_op_for_next(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool ) { … }
static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool ) { … }
static void functor_callback(void* arg, grpc_error_handle error) { … }
static void cq_end_op_for_callback(
grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) { … }
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage,
bool internal) { … }
struct cq_is_finished_arg { … };
class ExecCtxNext : public grpc_core::ExecCtx { … };
#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue* cq) { … }
#else
static void dump_pending_tags(grpc_completion_queue* ) {}
#endif
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved) { … }
static void cq_finish_shutdown_next(grpc_completion_queue* cq) { … }
static void cq_shutdown_next(grpc_completion_queue* cq) { … }
grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
gpr_timespec deadline, void* reserved) { … }
static int add_plucker(grpc_completion_queue* cq, void* tag,
grpc_pollset_worker** worker) { … }
static void del_plucker(grpc_completion_queue* cq, void* tag,
grpc_pollset_worker** worker) { … }
class ExecCtxPluck : public grpc_core::ExecCtx { … };
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved) { … }
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved) { … }
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { … }
static void cq_shutdown_pluck(grpc_completion_queue* cq) { … }
static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { … }
static void cq_shutdown_callback(grpc_completion_queue* cq) { … }
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { … }
void grpc_completion_queue_destroy(grpc_completion_queue* cq) { … }
grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) { … }
bool grpc_cq_can_listen(grpc_completion_queue* cq) { … }