#include <grpc/support/port_platform.h>
#include <grpc/impl/grpc_types.h>
#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h>
#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <unordered_map>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/debug/event_log.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/trace.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#ifndef SOL_TCP
#define SOL_TCP …
#endif
#ifndef TCP_INQ
#define TCP_INQ …
#define TCP_CM_INQ …
#endif
#ifdef GRPC_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS …
#else
#define SENDMSG_FLAGS …
#endif
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY …
#endif
#ifdef GRPC_MSG_IOVLEN_TYPE
typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
#else
msg_iovlen_type;
#endif
extern grpc_core::TraceFlag grpc_tcp_trace;
namespace grpc_core {
class TcpZerocopySendRecord { … };
class TcpZerocopySendCtx { … };
}
TcpZerocopySendCtx;
TcpZerocopySendRecord;
namespace {
struct grpc_tcp { … };
struct backup_poller { … };
}
static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);
#define BACKUP_POLLER_POLLSET(b) …
static grpc_core::Mutex* g_backup_poller_mu = …;
static int g_uncovered_notifications_pending
ABSL_GUARDED_BY(…);
static backup_poller* g_backup_poller ABSL_GUARDED_BY(…);
static void tcp_handle_read(void* arg , grpc_error_handle error);
static void tcp_handle_write(void* arg , grpc_error_handle error);
static void tcp_drop_uncovered_then_handle_write(void* arg ,
grpc_error_handle error);
static void done_poller(void* bp, grpc_error_handle ) { … }
static void run_poller(void* bp, grpc_error_handle ) { … }
static void drop_uncovered(grpc_tcp* ) { … }
static void cover_self(grpc_tcp* tcp) { … }
static void notify_on_read(grpc_tcp* tcp) { … }
static void notify_on_write(grpc_tcp* tcp) { … }
static void tcp_drop_uncovered_then_handle_write(void* arg,
grpc_error_handle error) { … }
static void add_to_estimate(grpc_tcp* tcp, size_t bytes) { … }
static void finish_estimate(grpc_tcp* tcp) { … }
static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
grpc_tcp* tcp) { … }
static void tcp_handle_read(void* arg , grpc_error_handle error);
static void tcp_handle_write(void* arg , grpc_error_handle error);
static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) { … }
static void tcp_free(grpc_tcp* tcp) { … }
#ifndef NDEBUG
#define TCP_UNREF(tcp, reason) …
#define TCP_REF(tcp, reason) …
static void tcp_unref(grpc_tcp* tcp, const char* reason,
const grpc_core::DebugLocation& debug_location) { … }
static void tcp_ref(grpc_tcp* tcp, const char* reason,
const grpc_core::DebugLocation& debug_location) { … }
#else
#define TCP_UNREF …
#define TCP_REF …
static void tcp_unref(grpc_tcp* tcp) {
if (GPR_UNLIKELY(tcp->refcount.Unref())) {
tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
#endif
static void tcp_destroy(grpc_endpoint* ep) { … }
static void perform_reclamation(grpc_tcp* tcp)
ABSL_LOCKS_EXCLUDED(tcp->read_mu) { … }
static void maybe_post_reclaimer(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { … }
static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { … }
static void update_rcvlowat(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { … }
#define MAX_READ_IOVEC …
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { … }
static void maybe_make_read_slices(grpc_tcp* tcp)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) { … }
static void tcp_handle_read(void* arg , grpc_error_handle error) { … }
static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_closure* cb, bool urgent, int min_progress_size) { … }
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) { … }
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags = 0);
static void tcp_handle_error(void* arg , grpc_error_handle error);
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
grpc_tcp* tcp, grpc_slice_buffer* buf);
#ifdef GRPC_LINUX_ERRQUEUE
static bool process_errors(grpc_tcp* tcp);
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
grpc_tcp* tcp, grpc_slice_buffer* buf) { … }
static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) { … }
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags) { … }
static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
TcpZerocopySendRecord* record,
uint32_t seq, const char* tag);
static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) { … }
static bool CmsgIsIpLevel(const cmsghdr& cmsg) { … }
static bool CmsgIsZeroCopy(const cmsghdr& cmsg) { … }
struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
struct cmsghdr* cmsg) { … }
static bool process_errors(grpc_tcp* tcp) { … }
static void tcp_handle_error(void* arg ,
grpc_error_handle error) { … }
#else
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
grpc_tcp* , grpc_slice_buffer* ) {
return nullptr;
}
static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* ) {}
static bool tcp_write_with_timestamps(grpc_tcp* , struct msghdr* ,
size_t ,
ssize_t* ,
int* ,
int ) {
gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
GPR_ASSERT(0);
return false;
}
static void tcp_handle_error(void* ,
grpc_error_handle ) {
gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
GPR_ASSERT(0);
}
#endif
void tcp_shutdown_buffer_list(grpc_tcp* tcp) { … }
#if defined(IOV_MAX) && IOV_MAX < 260
#define MAX_WRITE_IOVEC …
#else
#define MAX_WRITE_IOVEC …
#endif
msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
size_t* unwind_byte_idx,
size_t* sending_length,
iovec* iov) { … }
void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
size_t actually_sent) { … }
static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
grpc_error_handle* error) { … }
static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
TcpZerocopySendRecord* record,
uint32_t ,
const char* ) { … }
static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
grpc_error_handle* error) { … }
static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { … }
static void tcp_handle_write(void* arg ,
grpc_error_handle error) { … }
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb, void* arg, int ) { … }
static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { … }
static void tcp_add_to_pollset_set(grpc_endpoint* ep,
grpc_pollset_set* pollset_set) { … }
static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
grpc_pollset_set* pollset_set) { … }
static absl::string_view tcp_get_peer(grpc_endpoint* ep) { … }
static absl::string_view tcp_get_local_address(grpc_endpoint* ep) { … }
static int tcp_get_fd(grpc_endpoint* ep) { … }
static bool tcp_can_track_err(grpc_endpoint* ep) { … }
static const grpc_endpoint_vtable vtable = …;
grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
const grpc_core::PosixTcpOptions& options,
absl::string_view peer_string) { … }
int grpc_tcp_fd(grpc_endpoint* ep) { … }
void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done) { … }
void grpc_tcp_posix_init() { … }
void grpc_tcp_posix_shutdown() { … }
#endif