#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <type_traits>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/event_engine/slice.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#ifdef GRPC_LINUX_ERRQUEUE
#include <dirent.h>
#include <linux/capability.h>
#include <linux/errqueue.h>
#include <linux/netlink.h>
#include <sys/prctl.h>
#include <sys/resource.h>
#endif
#include <netinet/in.h>
#ifndef SOL_TCP
#define SOL_TCP …
#endif
#ifndef TCP_INQ
#define TCP_INQ …
#define TCP_CM_INQ …
#endif
#ifdef GRPC_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS …
#else
#define SENDMSG_FLAGS …
#endif
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY …
#endif
#define MAX_READ_IOVEC …
namespace grpc_event_engine {
namespace experimental {
namespace {
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) { … }
#ifdef GRPC_LINUX_ERRQUEUE
#define CAP_IS_SUPPORTED(cap) …
void rtrim(std::string& s) { … }
uint64_t ParseUlimitMemLockFromFile(std::string file_name) { … }
uint64_t GetUlimitHardMemLock() { … }
uint64_t GetRLimitMemLockMax() { … }
bool CmsgIsIpLevel(const cmsghdr& cmsg) { … }
bool CmsgIsZeroCopy(const cmsghdr& cmsg) { … }
#endif
absl::Status PosixOSError(int error_no, const char* call_name) { … }
}
#if defined(IOV_MAX) && IOV_MAX < 260
#define MAX_WRITE_IOVEC …
#else
#define MAX_WRITE_IOVEC …
#endif
msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
size_t* unwind_byte_idx,
size_t* sending_length,
iovec* iov) { … }
void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
size_t actually_sent) { … }
void PosixEndpointImpl::AddToEstimate(size_t bytes) { … }
void PosixEndpointImpl::FinishEstimate() { … }
absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) { … }
bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { … }
void PosixEndpointImpl::PerformReclamation() { … }
void PosixEndpointImpl::MaybePostReclaimer() { … }
void PosixEndpointImpl::UpdateRcvLowat() { … }
void PosixEndpointImpl::MaybeMakeReadSlices() { … }
void PosixEndpointImpl::HandleRead(absl::Status status) { … }
bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer,
const EventEngine::Endpoint::ReadArgs* args) { … }
#ifdef GRPC_LINUX_ERRQUEUE
TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
SliceBuffer& buf) { … }
bool PosixEndpointImpl::ProcessErrors() { … }
void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() { … }
void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) { … }
struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
struct cmsghdr* cmsg) { … }
void PosixEndpointImpl::HandleError(absl::Status status) { … }
bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length,
int* saved_errno,
int additional_flags) { … }
#else
TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
SliceBuffer& ) {
return nullptr;
}
void PosixEndpointImpl::HandleError(absl::Status ) {
grpc_core::Crash("Error handling not supported on this platform");
}
void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}
bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* ,
size_t ,
ssize_t* ,
int* ,
int ) {
grpc_core::Crash("Write with timestamps not supported for this platform");
}
#endif
void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
TcpZerocopySendRecord* record) { … }
void PosixEndpointImpl::TcpShutdownTracedBufferList() { … }
bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
absl::Status& status) { … }
bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record,
absl::Status& status) { … }
bool PosixEndpointImpl::TcpFlush(absl::Status& status) { … }
void PosixEndpointImpl::HandleWrite(absl::Status status) { … }
bool PosixEndpointImpl::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const EventEngine::Endpoint::WriteArgs* args) { … }
void PosixEndpointImpl::MaybeShutdown(
absl::Status why,
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) { … }
PosixEndpointImpl ::~PosixEndpointImpl() { … }
PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
PosixEngineClosure* on_done,
std::shared_ptr<EventEngine> engine,
MemoryAllocator&& ,
const PosixTcpOptions& options)
: … { … }
std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown,
std::shared_ptr<EventEngine> engine, MemoryAllocator&& allocator,
const PosixTcpOptions& options) { … }
}
}
#else
namespace grpc_event_engine {
namespace experimental {
std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
EventHandle* , PosixEngineClosure* ,
std::shared_ptr<EventEngine> ,
const PosixTcpOptions& ) {
grpc_core::Crash("Cannot create PosixEndpoint on this platform");
}
}
}
#endif