#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/cleanup/cleanup.h"
#include "absl/functional/any_invocable.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h>
#include <stdint.h>
#include <sys/socket.h>
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
#endif
usingnamespacestd::chrono_literals;
namespace grpc_event_engine {
namespace experimental {
bool NeedPosixEngine() { … }
#ifdef GRPC_POSIX_SOCKET_TCP
void AsyncConnect::Start(EventEngine::Duration timeout) { … }
AsyncConnect ::~AsyncConnect() { … }
void AsyncConnect::OnTimeoutExpired(absl::Status status) { … }
void AsyncConnect::OnWritable(absl::Status status)
ABSL_NO_THREAD_SAFETY_ANALYSIS { … }
EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
PosixSocketWrapper sock, OnConnectCallback on_connect, ResolvedAddress addr,
MemoryAllocator&& allocator, const PosixTcpOptions& options,
Duration timeout) { … }
void PosixEventEngine::OnConnectFinishInternal(int connection_handle) { … }
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<ThreadPool> executor)
: … { … }
PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: … { … }
void PosixEnginePollerManager::Run(
experimental::EventEngine::Closure* closure) { … }
void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) { … }
void PosixEnginePollerManager::TriggerShutdown() { … }
PosixEnginePollerManager::~PosixEnginePollerManager() { … }
PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: … { … }
PosixEventEngine::PosixEventEngine()
: … { … }
void PosixEventEngine::PollerWorkInternal(
std::shared_ptr<PosixEnginePollerManager> poller_manager) { … }
#endif
struct PosixEventEngine::ClosureData final : public EventEngine::Closure { … };
PosixEventEngine::~PosixEventEngine() { … }
bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) { … }
EventEngine::TaskHandle PosixEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) { … }
EventEngine::TaskHandle PosixEventEngine::RunAfter(
Duration when, EventEngine::Closure* closure) { … }
void PosixEventEngine::Run(absl::AnyInvocable<void()> closure) { … }
void PosixEventEngine::Run(EventEngine::Closure* closure) { … }
EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
Duration when, absl::AnyInvocable<void()> cb) { … }
std::unique_ptr<EventEngine::DNSResolver> PosixEventEngine::GetDNSResolver(
EventEngine::DNSResolver::ResolverOptions const& ) { … }
bool PosixEventEngine::IsWorkerThread() { … }
bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { … }
EventEngine::ConnectionHandle PosixEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) { … }
std::unique_ptr<PosixEndpointWithFdSupport>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) { … }
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) { … }
absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
PosixEventEngine::CreatePosixListener(
PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) { … }
}
}