chromium/third_party/grpc/src/src/core/lib/iomgr/ev_poll_posix.cc

//
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include <grpc/support/sync.h>

#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET_EV_POLL

#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <poll.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#include <string>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"

#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"

#define GRPC_POLLSET_KICK_BROADCAST

//******************************************************************************
// FD declarations
//
grpc_fd_watcher;

grpc_cached_wakeup_fd;

// Only used when GRPC_ENABLE_FORK_SUPPORT=1
struct grpc_fork_fd_list {};

struct grpc_fd {};

// True when GRPC_ENABLE_FORK_SUPPORT=1.
static bool track_fds_for_fork =;

// Only used when GRPC_ENABLE_FORK_SUPPORT=1
static grpc_fork_fd_list* fork_fd_list_head =;
static gpr_mu fork_fd_list_mu;

// Begin polling on an fd.
// Registers that the given pollset is interested in this fd - so that if read
// or writability interest changes, the pollset can be kicked to pick up that
// new interest.
// Return value is:
//   (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
// i.e. a combination of read_mask and write_mask determined by the fd's current
// interest in said events.
// Polling strategies that do not need to alter their behavior depending on the
// fd's current interest (such as epoll) do not need to call this function.
// MUST NOT be called with a pollset lock taken
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
                              grpc_pollset_worker* worker, uint32_t read_mask,
                              uint32_t write_mask, grpc_fd_watcher* watcher);
// Complete polling previously started with fd_begin_poll
// MUST NOT be called with a pollset lock taken
// if got_read or got_write are 1, also does the become_{readable,writable} as
// appropriate.
static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);

// Return 1 if this fd is orphaned, 0 otherwise
static bool fd_is_orphaned(grpc_fd* fd);

#ifndef NDEBUG
static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
                     int line);
#define GRPC_FD_REF(fd, reason)
#define GRPC_FD_UNREF(fd, reason)
#else
static void fd_ref(grpc_fd* fd);
static void fd_unref(grpc_fd* fd);
#define GRPC_FD_REF
#define GRPC_FD_UNREF
#endif

#define CLOSURE_NOT_READY
#define CLOSURE_READY

//******************************************************************************
// pollset declarations
//

grpc_cached_wakeup_fd;

struct grpc_pollset_worker {};

struct grpc_pollset {};

// Add an fd to a pollset
static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);

static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);

// Convert a timespec to milliseconds:
// - very small or negative poll times are clamped to zero to do a
//   non-blocking poll (which becomes spin polling)
// - other small values are rounded up to one millisecond
// - longer than a millisecond polls are rounded up to the next nearest
//   millisecond to avoid spinning
// - infinite timeouts are converted to -1
static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline);

// Allow kick to wakeup the currently polling worker
#define GRPC_POLLSET_CAN_KICK_SELF
// Force the wakee to repoll when awoken
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP
// As per pollset_kick, with an extended set of flags (defined above)
// -- mostly for fd_posix's use.
static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
                                          grpc_pollset_worker* specific_worker,
                                          uint32_t flags) GRPC_MUST_USE_RESULT;

// Return 1 if the pollset has active threads in pollset_work (pollset must
// be locked)
static bool pollset_has_workers(grpc_pollset* pollset);

//******************************************************************************
// pollset_set definitions
//

struct grpc_pollset_set {};

//******************************************************************************
// functions to track opened fds. No-ops unless track_fds_for_fork is true.
//

static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {}

static void fork_fd_list_add_node(grpc_fork_fd_list* node) {}

static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {}

static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {}

//******************************************************************************
// fd_posix.c
//

#ifndef NDEBUG
#define REF_BY(fd, n, reason)
#define UNREF_BY(fd, n, reason)
static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
                   int line) {}

#ifndef NDEBUG
static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
                     int line) {}

static grpc_fd* fd_create(int fd, const char* name, bool track_err) {}

static bool fd_is_orphaned(grpc_fd* fd) {}

static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {}

static void maybe_wake_one_watcher_locked(grpc_fd* fd) {}

static void wake_all_watchers_locked(grpc_fd* fd) {}

static int has_watchers(grpc_fd* fd) {}

static void close_fd_locked(grpc_fd* fd) {}

static int fd_wrapped_fd(grpc_fd* fd) {}

static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
                      const char* reason) {}

// increment refcount by two to avoid changing the orphan bit
#ifndef NDEBUG
static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
                   int line) {}

static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
                     int line) {}
#else
static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }

static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
#endif

static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {}

static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
                             grpc_closure* closure) {}

// returns 1 if state becomes not ready
static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {}

static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {}

static bool fd_is_shutdown(grpc_fd* fd) {}

static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {}

static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {}

static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {}

static void fd_set_readable(grpc_fd* fd) {}

static void fd_set_writable(grpc_fd* fd) {}

static void fd_set_error(grpc_fd* /*fd*/) {}

static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
                              grpc_pollset_worker* worker, uint32_t read_mask,
                              uint32_t write_mask, grpc_fd_watcher* watcher) {}

static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {}

static void fd_set_pre_allocated(grpc_fd* fd) {}

//******************************************************************************
// pollset_posix.c
//

static thread_local grpc_pollset* g_current_thread_poller;
static thread_local grpc_pollset_worker* g_current_thread_worker;

static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {}

static bool pollset_has_workers(grpc_pollset* p) {}

static bool pollset_in_pollset_sets(grpc_pollset* p) {}

static bool pollset_has_observers(grpc_pollset* p) {}

static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {}

static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {}

static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {}

static void kick_append_error(grpc_error_handle* composite,
                              grpc_error_handle error) {}

static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
                                          grpc_pollset_worker* specific_worker,
                                          uint32_t flags) {}

static grpc_error_handle pollset_kick(grpc_pollset* p,
                                      grpc_pollset_worker* specific_worker) {}

// global state management

static grpc_error_handle pollset_global_init(void) {}

// main interface

static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {}

static void pollset_destroy(grpc_pollset* pollset) {}

static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}

static void finish_shutdown(grpc_pollset* pollset) {}

static void work_combine_error(grpc_error_handle* composite,
                               grpc_error_handle error) {}

static grpc_error_handle pollset_work(grpc_pollset* pollset,
                                      grpc_pollset_worker** worker_hdl,
                                      grpc_core::Timestamp deadline) {}

static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {}

static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) {}

//******************************************************************************
// pollset_set_posix.c
//

static grpc_pollset_set* pollset_set_create(void) {}

static void pollset_set_destroy(grpc_pollset_set* pollset_set) {}

static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
                                    grpc_pollset* pollset) {}

static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
                                    grpc_pollset* pollset) {}

static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
                                        grpc_pollset_set* item) {}

static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
                                        grpc_pollset_set* item) {}

static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {}

static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {}

//******************************************************************************
// event engine binding
//

static bool is_any_background_poller_thread(void) {}

static void shutdown_background_closure(void) {}

static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
                                             grpc_error_handle /*error*/) {}

// Called by the child process's post-fork handler to close open fds, including
// worker wakeup fds. This allows gRPC to shutdown in the child process without
// interfering with connections or RPCs ongoing in the parent.
static void reset_event_manager_on_fork() {}

const grpc_event_engine_vtable grpc_ev_poll_posix =;

namespace {

grpc_poll_function_type real_poll_function;

int phony_poll(struct pollfd fds[], nfds_t nfds, int timeout) {}

}  // namespace

const grpc_event_engine_vtable grpc_ev_none_posix =v =;

#endif  // GRPC_POSIX_SOCKET_EV_POLL