chromium/third_party/grpc/src/src/core/lib/surface/completion_queue.cc

//
//
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>

#include "src/core/lib/surface/completion_queue.h"

#include <inttypes.h>
#include <stdio.h>

#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <new>
#include <string>
#include <utility>
#include <vector>

#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"

#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/event_string.h"

grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");

namespace {

// Specifies a cq thread local cache.
// The first event that occurs on a thread
// with a cq cache will go into that cache, and
// will only be returned on the thread that initialized the cache.
// NOTE: Only one event will ever be cached.
thread_local grpc_cq_completion* g_cached_event;
thread_local grpc_completion_queue* g_cached_cq;

struct plucker {};
struct cq_poller_vtable {};
non_polling_worker;

struct non_polling_poller {};
size_t non_polling_poller_size(void) {}

void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {}

void non_polling_poller_destroy(grpc_pollset* pollset) {}

grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
                                          grpc_pollset_worker** worker,
                                          grpc_core::Timestamp deadline) {}

grpc_error_handle non_polling_poller_kick(
    grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {}

void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {}

const cq_poller_vtable g_poller_vtable_by_poller_type[] =;

}  // namespace

struct cq_vtable {};

namespace {

// Queue that holds the cq_completion_events. Internally uses
// MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
// queue). It uses a queue_lock to support multiple consumers.
// Only used in completion queues whose completion_type is GRPC_CQ_NEXT
class CqEventQueue {};

struct cq_next_data {};

struct cq_pluck_data {};

struct cq_callback_data {};

}  // namespace

// Completion queue structure
struct grpc_completion_queue {};

// Forward declarations
static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq);
static void cq_shutdown_callback(grpc_completion_queue* cq);

static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);

// A cq_end_op function is called when an operation on a given CQ with
// a given tag has completed. The storage argument is a reference to the
// space reserved for this completion as it is placed into the corresponding
// queue. The done argument is a callback that will be invoked when it is
// safe to free up that storage. The storage MUST NOT be freed until the
// done callback is invoked.
static void cq_end_op_for_next(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool internal);

static void cq_end_op_for_pluck(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool internal);

static void cq_end_op_for_callback(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool internal);

static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
                          void* reserved);

static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
                           gpr_timespec deadline, void* reserved);

// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
static void cq_init_next(void* data,
                         grpc_completion_queue_functor* shutdown_callback);
static void cq_init_pluck(void* data,
                          grpc_completion_queue_functor* shutdown_callback);
static void cq_init_callback(void* data,
                             grpc_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);

// Completion queue vtables based on the completion-type
static const cq_vtable g_cq_vtable[] =;

#define DATA_FROM_CQ(cq)
#define POLLSET_FROM_CQ(cq)

grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");

#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)

static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);

void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {}

int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
                                                   void** tag, int* ok) {}

bool CqEventQueue::Push(grpc_cq_completion* c) {}

grpc_cq_completion* CqEventQueue::Pop() {}

grpc_completion_queue* grpc_completion_queue_create_internal(
    grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
    grpc_completion_queue_functor* shutdown_callback) {}

static void cq_init_next(void* data,
                         grpc_completion_queue_functor* /*shutdown_callback*/) {}

static void cq_destroy_next(void* data) {}

static void cq_init_pluck(
    void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {}

static void cq_destroy_pluck(void* data) {}

static void cq_init_callback(void* data,
                             grpc_completion_queue_functor* shutdown_callback) {}

static void cq_destroy_callback(void* data) {}

grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {}

int grpc_get_cq_poll_num(grpc_completion_queue* cq) {}

#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
                          const char* file, int line) {}

static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {}

#ifndef NDEBUG
void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
                            const char* file, int line) {}

#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
#else
static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
                         bool /*lock_cq*/) {}
#endif

static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {}

static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {}

static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {}

bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {}

// Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
// completion
// type of GRPC_CQ_NEXT)
static void cq_end_op_for_next(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool /*internal*/) {}

// Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
// completion
// type of GRPC_CQ_PLUCK)
static void cq_end_op_for_pluck(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool /*internal*/) {}

static void functor_callback(void* arg, grpc_error_handle error) {}

// Complete an event on a completion queue of type GRPC_CQ_CALLBACK
static void cq_end_op_for_callback(
    grpc_completion_queue* cq, void* tag, grpc_error_handle error,
    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    grpc_cq_completion* storage, bool internal) {}

void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
                    grpc_error_handle error,
                    void (*done)(void* done_arg, grpc_cq_completion* storage),
                    void* done_arg, grpc_cq_completion* storage,
                    bool internal) {}

struct cq_is_finished_arg {};
class ExecCtxNext : public grpc_core::ExecCtx {};

#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue* cq) {}
#else
static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
#endif

static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
                          void* reserved) {}

// Finishes the completion queue shutdown. This means that there are no more
// completion events / tags expected from the completion queue
// - Must be called under completion queue lock
// - Must be called only once in completion queue's lifetime
// - grpc_completion_queue_shutdown() MUST have been called before calling
// this function
static void cq_finish_shutdown_next(grpc_completion_queue* cq) {}

static void cq_shutdown_next(grpc_completion_queue* cq) {}

grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
                                      gpr_timespec deadline, void* reserved) {}

static int add_plucker(grpc_completion_queue* cq, void* tag,
                       grpc_pollset_worker** worker) {}

static void del_plucker(grpc_completion_queue* cq, void* tag,
                        grpc_pollset_worker** worker) {}

class ExecCtxPluck : public grpc_core::ExecCtx {};

static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
                           gpr_timespec deadline, void* reserved) {}

grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
                                       gpr_timespec deadline, void* reserved) {}

static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {}

// NOTE: This function is almost exactly identical to cq_shutdown_next() but
// merging them is a bit tricky and probably not worth it
static void cq_shutdown_pluck(grpc_completion_queue* cq) {}

static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {}

static void cq_shutdown_callback(grpc_completion_queue* cq) {}

// Shutdown simply drops a ref that we reserved at creation time; if we drop
// to zero here, then enter shutdown mode and wake up any waiters
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {}

void grpc_completion_queue_destroy(grpc_completion_queue* cq) {}

grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {}

bool grpc_cq_can_listen(grpc_completion_queue* cq) {}