chromium/third_party/grpc/src/src/core/lib/iomgr/tcp_posix.cc

//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include <grpc/impl/grpc_types.h>

#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET_TCP

#include <errno.h>
#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <algorithm>
#include <unordered_map>

#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/debug/event_log.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/trace.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"

#ifndef SOL_TCP
#define SOL_TCP
#endif

#ifndef TCP_INQ
#define TCP_INQ
#define TCP_CM_INQ
#endif

#ifdef GRPC_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS
#else
#define SENDMSG_FLAGS
#endif

// TCP zero copy sendmsg flag.
// NB: We define this here as a fallback in case we're using an older set of
// library headers that has not defined MSG_ZEROCOPY. Since this constant is
// part of the kernel, we are guaranteed it will never change/disagree so
// defining it here is safe.
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY
#endif

#ifdef GRPC_MSG_IOVLEN_TYPE
typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
#else
msg_iovlen_type;
#endif

extern grpc_core::TraceFlag grpc_tcp_trace;

namespace grpc_core {

class TcpZerocopySendRecord {};

class TcpZerocopySendCtx {};

}  // namespace grpc_core

TcpZerocopySendCtx;
TcpZerocopySendRecord;

namespace {

struct grpc_tcp {};

struct backup_poller {};

}  // namespace

static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);

#define BACKUP_POLLER_POLLSET(b)

static grpc_core::Mutex* g_backup_poller_mu =;
static int g_uncovered_notifications_pending
    ABSL_GUARDED_BY();
static backup_poller* g_backup_poller ABSL_GUARDED_BY();

static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
                                                 grpc_error_handle error);

static void done_poller(void* bp, grpc_error_handle /*error_ignored*/) {}

static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {}

static void drop_uncovered(grpc_tcp* /*tcp*/) {}

// gRPC API considers a Write operation to be done the moment it clears ‘flow
// control’ i.e., not necessarily sent on the wire. This means that the
// application MIGHT not call `grpc_completion_queue_next/pluck` in a timely
// manner when its `Write()` API is acked.
//
// We need to ensure that the fd is 'covered' (i.e being monitored by some
// polling thread and progress is made) and hence add it to a backup poller here
static void cover_self(grpc_tcp* tcp) {}

static void notify_on_read(grpc_tcp* tcp) {}

static void notify_on_write(grpc_tcp* tcp) {}

static void tcp_drop_uncovered_then_handle_write(void* arg,
                                                 grpc_error_handle error) {}

static void add_to_estimate(grpc_tcp* tcp, size_t bytes) {}

static void finish_estimate(grpc_tcp* tcp) {}

static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
                                            grpc_tcp* tcp) {}

static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);

static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {}

static void tcp_free(grpc_tcp* tcp) {}

#ifndef NDEBUG
#define TCP_UNREF(tcp, reason)
#define TCP_REF(tcp, reason)
static void tcp_unref(grpc_tcp* tcp, const char* reason,
                      const grpc_core::DebugLocation& debug_location) {}

static void tcp_ref(grpc_tcp* tcp, const char* reason,
                    const grpc_core::DebugLocation& debug_location) {}
#else
#define TCP_UNREF
#define TCP_REF
static void tcp_unref(grpc_tcp* tcp) {
  if (GPR_UNLIKELY(tcp->refcount.Unref())) {
    tcp_free(tcp);
  }
}

static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
#endif

static void tcp_destroy(grpc_endpoint* ep) {}

static void perform_reclamation(grpc_tcp* tcp)
    ABSL_LOCKS_EXCLUDED(tcp->read_mu) {}

static void maybe_post_reclaimer(grpc_tcp* tcp)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {}

static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {}

static void update_rcvlowat(grpc_tcp* tcp)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {}

// Returns true if data available to read or error other than EAGAIN.
#define MAX_READ_IOVEC
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {}

static void maybe_make_read_slices(grpc_tcp* tcp)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {}

static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {}

static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
                     grpc_closure* cb, bool urgent, int min_progress_size) {}

// A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
// of bytes sent.
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
                 int additional_flags = 0) {}

/// This is to be called if outgoing_buffer_arg is not null. On linux platforms,
/// this will call sendmsg with socket options set to collect timestamps inside
/// the kernel. On return, sent_length is set to the return value of the sendmsg
/// call. Returns false if setting the socket options failed. This is not
/// implemented for non-linux platforms currently, and crashes out.
///
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
                                      size_t sending_length,
                                      ssize_t* sent_length, int* saved_errno,
                                      int additional_flags = 0);

/// The callback function to be invoked when we get an error on the socket.
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error_handle error);

static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
    grpc_tcp* tcp, grpc_slice_buffer* buf);

#ifdef GRPC_LINUX_ERRQUEUE
static bool process_errors(grpc_tcp* tcp);

static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
    grpc_tcp* tcp, grpc_slice_buffer* buf) {}

static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {}

static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
                                      size_t sending_length,
                                      ssize_t* sent_length, int* saved_errno,
                                      int additional_flags) {}

static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
                                            TcpZerocopySendRecord* record,
                                            uint32_t seq, const char* tag);
// Reads \a cmsg to process zerocopy control messages.
static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) {}

// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
static bool CmsgIsIpLevel(const cmsghdr& cmsg) {}

static bool CmsgIsZeroCopy(const cmsghdr& cmsg) {}

/// Reads \a cmsg to derive timestamps from the control messages. If a valid
/// timestamp is found, the traced buffer list is updated with this timestamp.
/// The caller of this function should be looping on the control messages found
/// in \a msg. \a cmsg should point to the control message that the caller wants
/// processed.
/// On return, a pointer to a control message is returned. On the next
/// iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg.
struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
                                  struct cmsghdr* cmsg) {}

/// For linux platforms, reads the socket's error queue and processes error
/// messages from the queue.
///
static bool process_errors(grpc_tcp* tcp) {}

static void tcp_handle_error(void* arg /* grpc_tcp */,
                             grpc_error_handle error) {}

#else   // GRPC_LINUX_ERRQUEUE
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
    grpc_tcp* /*tcp*/, grpc_slice_buffer* /*buf*/) {
  return nullptr;
}

static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {}

static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
                                      size_t /*sending_length*/,
                                      ssize_t* /*sent_length*/,
                                      int* /* saved_errno */,
                                      int /*additional_flags*/) {
  gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
  GPR_ASSERT(0);
  return false;
}

static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */,
                             grpc_error_handle /*error*/) {
  gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
  GPR_ASSERT(0);
}
#endif  // GRPC_LINUX_ERRQUEUE

// If outgoing_buffer_arg is filled, shuts down the list early, so that any
// release operations needed can be performed on the arg
void tcp_shutdown_buffer_list(grpc_tcp* tcp) {}

#if defined(IOV_MAX) && IOV_MAX < 260
#define MAX_WRITE_IOVEC
#else
#define MAX_WRITE_IOVEC
#endif
msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
                                                    size_t* unwind_byte_idx,
                                                    size_t* sending_length,
                                                    iovec* iov) {}

void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
                                                     size_t actually_sent) {}

// returns true if done, false if pending; if returning true, *error is set
static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
                                  grpc_error_handle* error) {}

static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
                                            TcpZerocopySendRecord* record,
                                            uint32_t /*seq*/,
                                            const char* /*tag*/) {}

static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
                               grpc_error_handle* error) {}

static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {}

static void tcp_handle_write(void* arg /* grpc_tcp */,
                             grpc_error_handle error) {}

static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
                      grpc_closure* cb, void* arg, int /*max_frame_size*/) {}

static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}

static void tcp_add_to_pollset_set(grpc_endpoint* ep,
                                   grpc_pollset_set* pollset_set) {}

static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
                                        grpc_pollset_set* pollset_set) {}

static absl::string_view tcp_get_peer(grpc_endpoint* ep) {}

static absl::string_view tcp_get_local_address(grpc_endpoint* ep) {}

static int tcp_get_fd(grpc_endpoint* ep) {}

static bool tcp_can_track_err(grpc_endpoint* ep) {}

static const grpc_endpoint_vtable vtable =;

grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
                               const grpc_core::PosixTcpOptions& options,
                               absl::string_view peer_string) {}

int grpc_tcp_fd(grpc_endpoint* ep) {}

void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
                                     grpc_closure* done) {}

void grpc_tcp_posix_init() {}

void grpc_tcp_posix_shutdown() {}

#endif  // GRPC_POSIX_SOCKET_TCP