chromium/third_party/grpc/src/src/core/lib/event_engine/posix_engine/posix_endpoint.cc

// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>

#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"

#include <errno.h>
#include <inttypes.h>
#include <limits.h>

#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <type_traits>

#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"

#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/event_engine/slice.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/log.h>

#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"

#ifdef GRPC_POSIX_SOCKET_TCP
#ifdef GRPC_LINUX_ERRQUEUE
#include <dirent.h>            // IWYU pragma: keep
#include <linux/capability.h>  // IWYU pragma: keep
#include <linux/errqueue.h>    // IWYU pragma: keep
#include <linux/netlink.h>     // IWYU pragma: keep
#include <sys/prctl.h>         // IWYU pragma: keep
#include <sys/resource.h>      // IWYU pragma: keep
#endif
#include <netinet/in.h>  // IWYU pragma: keep

#ifndef SOL_TCP
#define SOL_TCP
#endif

#ifndef TCP_INQ
#define TCP_INQ
#define TCP_CM_INQ
#endif

#ifdef GRPC_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS
#else
#define SENDMSG_FLAGS
#endif

// TCP zero copy sendmsg flag.
// NB: We define this here as a fallback in case we're using an older set of
// library headers that has not defined MSG_ZEROCOPY. Since this constant is
// part of the kernel, we are guaranteed it will never change/disagree so
// defining it here is safe.
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY
#endif

#define MAX_READ_IOVEC

namespace grpc_event_engine {
namespace experimental {

namespace {

// A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
// of bytes sent.
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
                int additional_flags = 0) {}

#ifdef GRPC_LINUX_ERRQUEUE

#define CAP_IS_SUPPORTED(cap)

// Remove spaces and newline characters from the end of a string.
void rtrim(std::string& s) {}

uint64_t ParseUlimitMemLockFromFile(std::string file_name) {}

// Ulimit hard memlock controls per socket limit for maximum locked memory in
// RAM. Parses all files under  /etc/security/limits.d/ and
// /etc/security/limits.conf file for a line of the following format:
// * hard memlock <value>
// It extracts the first valid <value> and returns it. A value of UINT64_MAX
// represents unlimited or infinity. Hard memlock value should be set to
// allow zerocopy sendmsgs to succeed. It controls the maximum amount of
// memory that can be locked by a socket in RAM.
uint64_t GetUlimitHardMemLock() {}

// RLIMIT_MEMLOCK controls per process limit for maximum locked memory in RAM.
uint64_t GetRLimitMemLockMax() {}

// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
bool CmsgIsIpLevel(const cmsghdr& cmsg) {}

bool CmsgIsZeroCopy(const cmsghdr& cmsg) {}
#endif  // GRPC_LINUX_ERRQUEUE

absl::Status PosixOSError(int error_no, const char* call_name) {}

}  // namespace

#if defined(IOV_MAX) && IOV_MAX < 260
#define MAX_WRITE_IOVEC
#else
#define MAX_WRITE_IOVEC
#endif
msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
                                                    size_t* unwind_byte_idx,
                                                    size_t* sending_length,
                                                    iovec* iov) {}

void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
                                                     size_t actually_sent) {}

void PosixEndpointImpl::AddToEstimate(size_t bytes) {}

void PosixEndpointImpl::FinishEstimate() {}

absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) {}

// Returns true if data available to read or error other than EAGAIN.
bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {}

void PosixEndpointImpl::PerformReclamation() {}

void PosixEndpointImpl::MaybePostReclaimer() {}

void PosixEndpointImpl::UpdateRcvLowat() {}

void PosixEndpointImpl::MaybeMakeReadSlices() {}

void PosixEndpointImpl::HandleRead(absl::Status status) {}

bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
                             SliceBuffer* buffer,
                             const EventEngine::Endpoint::ReadArgs* args) {}

#ifdef GRPC_LINUX_ERRQUEUE
TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
    SliceBuffer& buf) {}

// For linux platforms, reads the socket's error queue and processes error
// messages from the queue.
bool PosixEndpointImpl::ProcessErrors() {}

void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}

// Reads \a cmsg to process zerocopy control messages.
void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) {}

// Reads \a cmsg to derive timestamps from the control messages. If a valid
// timestamp is found, the traced buffer list is updated with this timestamp.
// The caller of this function should be looping on the control messages found
// in \a msg. \a cmsg should point to the control message that the caller wants
// processed. On return, a pointer to a control message is returned. On the next
// iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg.
struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
                                                    struct cmsghdr* cmsg) {}

void PosixEndpointImpl::HandleError(absl::Status status) {}

bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
                                            size_t sending_length,
                                            ssize_t* sent_length,
                                            int* saved_errno,
                                            int additional_flags) {}

#else   // GRPC_LINUX_ERRQUEUE
TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
    SliceBuffer& /*buf*/) {
  return nullptr;
}

void PosixEndpointImpl::HandleError(absl::Status /*status*/) {
  grpc_core::Crash("Error handling not supported on this platform");
}

void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}

bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* /*msg*/,
                                            size_t /*sending_length*/,
                                            ssize_t* /*sent_length*/,
                                            int* /*saved_errno*/,
                                            int /*additional_flags*/) {
  grpc_core::Crash("Write with timestamps not supported for this platform");
}
#endif  // GRPC_LINUX_ERRQUEUE

void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
    TcpZerocopySendRecord* record) {}

// If outgoing_buffer_arg is filled, shuts down the list early, so that any
// release operations needed can be performed on the arg.
void PosixEndpointImpl::TcpShutdownTracedBufferList() {}

// returns true if done, false if pending; if returning true, *error is set
bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
                                        absl::Status& status) {}

bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record,
                                         absl::Status& status) {}

bool PosixEndpointImpl::TcpFlush(absl::Status& status) {}

void PosixEndpointImpl::HandleWrite(absl::Status status) {}

bool PosixEndpointImpl::Write(
    absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
    const EventEngine::Endpoint::WriteArgs* args) {}

void PosixEndpointImpl::MaybeShutdown(
    absl::Status why,
    absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {}

PosixEndpointImpl ::~PosixEndpointImpl() {}

PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
                                     PosixEngineClosure* on_done,
                                     std::shared_ptr<EventEngine> engine,
                                     MemoryAllocator&& /*allocator*/,
                                     const PosixTcpOptions& options)
    :{}

std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
    EventHandle* handle, PosixEngineClosure* on_shutdown,
    std::shared_ptr<EventEngine> engine, MemoryAllocator&& allocator,
    const PosixTcpOptions& options) {}

}  // namespace experimental
}  // namespace grpc_event_engine

#else  // GRPC_POSIX_SOCKET_TCP

namespace grpc_event_engine {
namespace experimental {

std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
    EventHandle* /*handle*/, PosixEngineClosure* /*on_shutdown*/,
    std::shared_ptr<EventEngine> /*engine*/,
    const PosixTcpOptions& /*options*/) {
  grpc_core::Crash("Cannot create PosixEndpoint on this platform");
}

}  // namespace experimental
}  // namespace grpc_event_engine

#endif  // GRPC_POSIX_SOCKET_TCP