#include "rtc_base/physical_socket_server.h"
#include <cstdint>
#include <utility>
#if defined(_MSC_VER) && _MSC_VER < 1300
#pragma warning(disable : 4786)
#endif
#ifdef MEMORY_SANITIZER
#include <sanitizer/msan_interface.h>
#endif
#if defined(WEBRTC_POSIX)
#include <fcntl.h>
#if defined(WEBRTC_USE_EPOLL)
#include <poll.h>
#elif defined(WEBRTC_USE_POLL)
#include <poll.h>
#endif
#include <sys/ioctl.h>
#include <sys/select.h>
#include <unistd.h>
#endif
#if defined(WEBRTC_WIN)
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#undef SetPort
#endif
#include <errno.h>
#include "rtc_base/async_dns_resolver.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/logging.h"
#include "rtc_base/network/ecn_marking.h"
#include "rtc_base/network_monitor.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
#if defined(WEBRTC_LINUX)
#include <linux/sockios.h>
#endif
#if defined(WEBRTC_WIN)
#define LAST_SYSTEM_ERROR …
#elif defined(__native_client__) && __native_client__
#define LAST_SYSTEM_ERROR …
#elif defined(WEBRTC_POSIX)
#define LAST_SYSTEM_ERROR …
#endif
#if defined(WEBRTC_POSIX)
#include <netinet/tcp.h>
#define IP_MTU …
SockOptArg;
#endif
#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(__native_client__)
int64_t GetSocketRecvTimestamp(int socket) { … }
#else
int64_t GetSocketRecvTimestamp(int socket) {
return -1;
}
#endif
#if defined(WEBRTC_WIN)
typedef char* SockOptArg;
#endif
#if defined(WEBRTC_LINUX)
#if !defined(POLLRDHUP)
#define POLLRDHUP …
#endif
#if !defined(EPOLLRDHUP)
#define EPOLLRDHUP …
#endif
#endif
namespace {
static constexpr uint8_t kEcnMask = …;
#if defined(WEBRTC_POSIX)
rtc::EcnMarking EcnFromDs(uint8_t ds) { … }
#endif
class ScopedSetTrue { … };
}
namespace rtc {
PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
: … { … }
PhysicalSocket::~PhysicalSocket() { … }
bool PhysicalSocket::Create(int family, int type) { … }
SocketAddress PhysicalSocket::GetLocalAddress() const { … }
SocketAddress PhysicalSocket::GetRemoteAddress() const { … }
int PhysicalSocket::Bind(const SocketAddress& bind_addr) { … }
int PhysicalSocket::Connect(const SocketAddress& addr) { … }
int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { … }
int PhysicalSocket::GetError() const { … }
void PhysicalSocket::SetError(int error) { … }
Socket::ConnState PhysicalSocket::GetState() const { … }
int PhysicalSocket::GetOption(Option opt, int* value) { … }
int PhysicalSocket::SetOption(Option opt, int value) { … }
int PhysicalSocket::Send(const void* pv, size_t cb) { … }
int PhysicalSocket::SendTo(const void* buffer,
size_t length,
const SocketAddress& addr) { … }
int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { … }
int PhysicalSocket::RecvFrom(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp) { … }
int PhysicalSocket::RecvFrom(ReceiveBuffer& buffer) { … }
int PhysicalSocket::DoReadFromSocket(void* buffer,
size_t length,
SocketAddress* out_addr,
int64_t* timestamp,
EcnMarking* ecn) { … }
int PhysicalSocket::Listen(int backlog) { … }
Socket* PhysicalSocket::Accept(SocketAddress* out_addr) { … }
int PhysicalSocket::Close() { … }
SOCKET PhysicalSocket::DoAccept(SOCKET socket,
sockaddr* addr,
socklen_t* addrlen) { … }
int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) { … }
int PhysicalSocket::DoSendTo(SOCKET socket,
const char* buf,
int len,
int flags,
const struct sockaddr* dest_addr,
socklen_t addrlen) { … }
void PhysicalSocket::OnResolveResult(
const webrtc::AsyncDnsResolverResult& result) { … }
void PhysicalSocket::UpdateLastError() { … }
void PhysicalSocket::MaybeRemapSendError() { … }
void PhysicalSocket::SetEnabledEvents(uint8_t events) { … }
void PhysicalSocket::EnableEvents(uint8_t events) { … }
void PhysicalSocket::DisableEvents(uint8_t events) { … }
int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { … }
SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
#if defined(WEBRTC_WIN)
: … { … }
SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
#if defined(WEBRTC_WIN)
: … { … }
SocketDispatcher::~SocketDispatcher() { … }
bool SocketDispatcher::Initialize() { … }
bool SocketDispatcher::Create(int type) { … }
bool SocketDispatcher::Create(int family, int type) { … }
#if defined(WEBRTC_WIN)
WSAEVENT SocketDispatcher::GetWSAEvent() {
return WSA_INVALID_EVENT;
}
SOCKET SocketDispatcher::GetSocket() {
return s_;
}
bool SocketDispatcher::CheckSignalClose() {
if (!signal_close_)
return false;
char ch;
if (recv(s_, &ch, 1, MSG_PEEK) > 0)
return false;
state_ = CS_CLOSED;
signal_close_ = false;
SignalCloseEvent(this, signal_err_);
return true;
}
int SocketDispatcher::next_id_ = 0;
#elif defined(WEBRTC_POSIX)
int SocketDispatcher::GetDescriptor() { … }
bool SocketDispatcher::IsDescriptorClosed() { … }
#endif
uint32_t SocketDispatcher::GetRequestedEvents() { … }
#if defined(WEBRTC_WIN)
void SocketDispatcher::OnEvent(uint32_t ff, int err) {
if ((ff & DE_CONNECT) != 0)
state_ = CS_CONNECTED;
int cache_id = id_;
if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
if (ff != DE_CONNECT)
RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
DisableEvents(DE_CONNECT);
#if !defined(NDEBUG)
dbg_addr_ = "Connected @ ";
dbg_addr_.append(GetRemoteAddress().ToString());
#endif
SignalConnectEvent(this);
}
if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
DisableEvents(DE_ACCEPT);
SignalReadEvent(this);
}
if ((ff & DE_READ) != 0) {
DisableEvents(DE_READ);
SignalReadEvent(this);
}
if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
DisableEvents(DE_WRITE);
SignalWriteEvent(this);
}
if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
signal_close_ = true;
signal_err_ = err;
}
}
#elif defined(WEBRTC_POSIX)
void SocketDispatcher::OnEvent(uint32_t ff, int err) { … }
#endif
#if defined(WEBRTC_USE_EPOLL)
inline static int GetEpollEvents(uint32_t ff) { … }
void SocketDispatcher::StartBatchedEventUpdates() { … }
void SocketDispatcher::FinishBatchedEventUpdates() { … }
void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { … }
void SocketDispatcher::SetEnabledEvents(uint8_t events) { … }
void SocketDispatcher::EnableEvents(uint8_t events) { … }
void SocketDispatcher::DisableEvents(uint8_t events) { … }
#endif
int SocketDispatcher::Close() { … }
#if defined(WEBRTC_POSIX)
class Signaler : public Dispatcher { … };
#endif
#if defined(WEBRTC_WIN)
static uint32_t FlagsToEvents(uint32_t events) {
uint32_t ffFD = FD_CLOSE;
if (events & DE_READ)
ffFD |= FD_READ;
if (events & DE_WRITE)
ffFD |= FD_WRITE;
if (events & DE_CONNECT)
ffFD |= FD_CONNECT;
if (events & DE_ACCEPT)
ffFD |= FD_ACCEPT;
return ffFD;
}
class Signaler : public Dispatcher {
public:
Signaler(PhysicalSocketServer* ss, bool& flag_to_clear)
: ss_(ss), flag_to_clear_(flag_to_clear) {
hev_ = WSACreateEvent();
if (hev_) {
ss_->Add(this);
}
}
~Signaler() override {
if (hev_ != nullptr) {
ss_->Remove(this);
WSACloseEvent(hev_);
hev_ = nullptr;
}
}
virtual void Signal() {
if (hev_ != nullptr)
WSASetEvent(hev_);
}
uint32_t GetRequestedEvents() override { return 0; }
void OnEvent(uint32_t ff, int err) override {
WSAResetEvent(hev_);
flag_to_clear_ = false;
}
WSAEVENT GetWSAEvent() override { return hev_; }
SOCKET GetSocket() override { return INVALID_SOCKET; }
bool CheckSignalClose() override { return false; }
private:
PhysicalSocketServer* ss_;
WSAEVENT hev_;
bool& flag_to_clear_;
};
#endif
PhysicalSocketServer::PhysicalSocketServer()
: … { … }
PhysicalSocketServer::~PhysicalSocketServer() { … }
void PhysicalSocketServer::WakeUp() { … }
Socket* PhysicalSocketServer::CreateSocket(int family, int type) { … }
Socket* PhysicalSocketServer::WrapSocket(SOCKET s) { … }
void PhysicalSocketServer::Add(Dispatcher* pdispatcher) { … }
void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) { … }
void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { … }
int PhysicalSocketServer::ToCmsWait(webrtc::TimeDelta max_wait_duration) { … }
#if defined(WEBRTC_POSIX)
bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) { … }
static void ProcessEvents(Dispatcher* dispatcher,
bool readable,
bool writable,
bool error_event,
bool check_error) { … }
#if defined(WEBRTC_USE_POLL) || defined(WEBRTC_USE_EPOLL)
static void ProcessPollEvents(Dispatcher* dispatcher, const pollfd& pfd) { … }
static pollfd DispatcherToPollfd(Dispatcher* dispatcher) { … }
#endif
bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { … }
#if defined(WEBRTC_USE_EPOLL)
void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) { … }
void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { … }
void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) { … }
bool PhysicalSocketServer::WaitEpoll(int cmsWait) { … }
bool PhysicalSocketServer::WaitPollOneDispatcher(int cmsWait,
Dispatcher* dispatcher) { … }
#elif defined(WEBRTC_USE_POLL)
bool PhysicalSocketServer::WaitPoll(int cmsWait, bool process_io) {
int64_t msWait = -1;
int64_t msStop = -1;
if (cmsWait != kForeverMs) {
msWait = cmsWait;
msStop = TimeAfter(cmsWait);
}
std::vector<pollfd> pollfds;
fWait_ = true;
while (fWait_) {
{
CritScope cr(&crit_);
current_dispatcher_keys_.clear();
pollfds.clear();
pollfds.reserve(dispatcher_by_key_.size());
for (auto const& kv : dispatcher_by_key_) {
uint64_t key = kv.first;
Dispatcher* pdispatcher = kv.second;
if (!process_io && (pdispatcher != signal_wakeup_))
continue;
current_dispatcher_keys_.push_back(key);
pollfds.push_back(DispatcherToPollfd(pdispatcher));
}
}
int n = poll(pollfds.data(), pollfds.size(), static_cast<int>(msWait));
if (n < 0) {
if (errno != EINTR) {
RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
return false;
}
} else if (n == 0) {
return true;
} else {
CritScope cr(&crit_);
for (size_t i = 0; i < current_dispatcher_keys_.size(); ++i) {
uint64_t key = current_dispatcher_keys_[i];
if (!dispatcher_by_key_.count(key))
continue;
ProcessPollEvents(dispatcher_by_key_.at(key), pollfds[i]);
}
}
if (cmsWait != kForeverMs) {
msWait = TimeDiff(msStop, TimeMillis());
if (msWait < 0) {
return true;
}
}
}
return true;
}
#endif
#endif
#if defined(WEBRTC_WIN)
bool PhysicalSocketServer::Wait(webrtc::TimeDelta max_wait_duration,
bool process_io) {
RTC_DCHECK(!waiting_);
ScopedSetTrue s(&waiting_);
int cmsWait = ToCmsWait(max_wait_duration);
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = Time();
fWait_ = true;
while (fWait_) {
std::vector<WSAEVENT> events;
std::vector<uint64_t> event_owners;
events.push_back(socket_ev_);
{
CritScope cr(&crit_);
current_dispatcher_keys_.clear();
for (auto const& kv : dispatcher_by_key_) {
current_dispatcher_keys_.push_back(kv.first);
}
for (uint64_t key : current_dispatcher_keys_) {
if (!dispatcher_by_key_.count(key)) {
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
if (!disp)
continue;
if (!process_io && (disp != signal_wakeup_))
continue;
SOCKET s = disp->GetSocket();
if (disp->CheckSignalClose()) {
} else if (s != INVALID_SOCKET) {
WSAEventSelect(s, events[0],
FlagsToEvents(disp->GetRequestedEvents()));
} else {
events.push_back(disp->GetWSAEvent());
event_owners.push_back(key);
}
}
}
int64_t cmsNext;
if (cmsWait == kForeverMs) {
cmsNext = cmsWait;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
}
DWORD dw =
WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
false, static_cast<DWORD>(cmsNext), false);
if (dw == WSA_WAIT_FAILED) {
WSAGetLastError();
RTC_DCHECK_NOTREACHED();
return false;
} else if (dw == WSA_WAIT_TIMEOUT) {
return true;
} else {
CritScope cr(&crit_);
int index = dw - WSA_WAIT_EVENT_0;
if (index > 0) {
--index;
uint64_t key = event_owners[index];
if (!dispatcher_by_key_.count(key)) {
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
disp->OnEvent(0, 0);
} else if (process_io) {
for (uint64_t key : current_dispatcher_keys_) {
if (!dispatcher_by_key_.count(key)) {
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
SOCKET s = disp->GetSocket();
if (s == INVALID_SOCKET)
continue;
WSANETWORKEVENTS wsaEvents;
int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
if (err == 0) {
{
if ((wsaEvents.lNetworkEvents & FD_READ) &&
wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
RTC_LOG(LS_WARNING)
<< "PhysicalSocketServer got FD_READ_BIT error "
<< wsaEvents.iErrorCode[FD_READ_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
RTC_LOG(LS_WARNING)
<< "PhysicalSocketServer got FD_WRITE_BIT error "
<< wsaEvents.iErrorCode[FD_WRITE_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
RTC_LOG(LS_WARNING)
<< "PhysicalSocketServer got FD_CONNECT_BIT error "
<< wsaEvents.iErrorCode[FD_CONNECT_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
RTC_LOG(LS_WARNING)
<< "PhysicalSocketServer got FD_ACCEPT_BIT error "
<< wsaEvents.iErrorCode[FD_ACCEPT_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
RTC_LOG(LS_WARNING)
<< "PhysicalSocketServer got FD_CLOSE_BIT error "
<< wsaEvents.iErrorCode[FD_CLOSE_BIT];
}
}
uint32_t ff = 0;
int errcode = 0;
if (wsaEvents.lNetworkEvents & FD_READ)
ff |= DE_READ;
if (wsaEvents.lNetworkEvents & FD_WRITE)
ff |= DE_WRITE;
if (wsaEvents.lNetworkEvents & FD_CONNECT) {
if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
ff |= DE_CONNECT;
} else {
ff |= DE_CLOSE;
errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
}
}
if (wsaEvents.lNetworkEvents & FD_ACCEPT)
ff |= DE_ACCEPT;
if (wsaEvents.lNetworkEvents & FD_CLOSE) {
ff |= DE_CLOSE;
errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
}
if (ff != 0) {
disp->OnEvent(ff, errcode);
}
}
}
}
WSAResetEvent(socket_ev_);
}
if (!fWait_)
break;
cmsElapsed = TimeSince(msStart);
if ((cmsWait != kForeverMs) && (cmsElapsed >= cmsWait)) {
break;
}
}
return true;
}
#endif
}