#include <folly/io/async/AsyncSocket.h>
#include <sys/types.h>
#include <cerrno>
#include <sstream>
#include <boost/preprocessor/control/if.hpp>
#include <folly/Exception.h>
#include <folly/Format.h>
#include <folly/Portability.h>
#include <folly/SocketAddress.h>
#include <folly/String.h>
#include <folly/io/Cursor.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/SocketOptionMap.h>
#include <folly/lang/CheckedMath.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/SysMman.h>
#include <folly/portability/SysUio.h>
#include <folly/portability/Unistd.h>
#if defined(__linux__)
#include <linux/if_packet.h>
#include <linux/sockios.h>
#include <sys/ioctl.h>
#endif
ZeroCopyMemStore;
namespace {
class ZeroCopyMMapMemStoreFallback : public ZeroCopyMemStore { … };
#if TCP_ZEROCOPY_RECEIVE
std::unique_ptr<folly::IOBuf> getRXZeroCopyIOBuf(
ZeroCopyMemStore::EntryPtr&& ptr) {
auto* entry = ptr.release();
return folly::IOBuf::takeOwnership(
entry->data,
entry->len,
entry->len,
[](void* , void* userData) {
reinterpret_cast<ZeroCopyMemStore::Entry*>(userData)->put();
},
entry);
}
class ZeroCopyMMapMemStoreReal : public ZeroCopyMemStore {
public:
ZeroCopyMMapMemStoreReal(size_t entries, size_t size) {
int fd = ::socket(AF_INET, SOCK_STREAM, 0);
if (fd >= 0) {
void* addr =
::mmap(nullptr, entries * size, PROT_READ, MAP_SHARED, fd, 0);
::close(fd);
if (addr != MAP_FAILED) {
addr_ = addr;
numEntries_ = entries;
entrySize_ = size;
entries_.resize(numEntries_);
for (size_t i = 0; i < numEntries_; i++) {
entries_[i].data =
reinterpret_cast<uint8_t*>(addr_) + (i * entrySize_);
entries_[i].capacity = entrySize_;
entries_[i].store = this;
avail_.push_back(&entries_[i]);
}
}
}
}
~ZeroCopyMMapMemStoreReal() override {
CHECK_EQ(avail_.size(), numEntries_);
if (addr_) {
::munmap(addr_, numEntries_ * entrySize_);
}
}
ZeroCopyMemStore::EntryPtr get() override {
std::unique_lock<std::mutex> lk(availMutex_);
if (!avail_.empty()) {
auto* entry = avail_.front();
avail_.pop_front();
DCHECK(entry->len == 0);
DCHECK(entry->capacity == entrySize_);
ZeroCopyMemStore::EntryPtr ret(entry);
return ret;
}
return nullptr;
}
void put(ZeroCopyMemStore::Entry* entry) override {
if (entry) {
DCHECK(entry->store == this);
if (entry->len) {
auto ret = ::madvise(entry->data, entry->len, MADV_DONTNEED);
entry->len = 0;
DCHECK(!ret);
}
std::unique_lock<std::mutex> lk(availMutex_);
avail_.push_back(entry);
}
}
private:
std::vector<ZeroCopyMemStore::Entry> entries_;
std::mutex availMutex_;
std::deque<ZeroCopyMemStore::Entry*> avail_;
void* addr_{nullptr};
size_t numEntries_{0};
size_t entrySize_{0};
};
using ZeroCopyMMapMemStore = ZeroCopyMMapMemStoreReal;
#else
ZeroCopyMMapMemStore;
#endif
}
#if FOLLY_HAVE_VLA
#define FOLLY_HAVE_VLA_01 …
#else
#define FOLLY_HAVE_VLA_01 …
#endif
string;
unique_ptr;
fsp;
namespace folly {
static constexpr bool msgErrQueueSupported = …true;
#else
false;
#endif
std::unique_ptr<ZeroCopyMemStore> AsyncSocket::createDefaultZeroCopyMemStore(
size_t entries, size_t size) { … }
static AsyncSocketException const& getSocketClosedLocallyEx() { … }
static AsyncSocketException const& getSocketShutdownForWritesEx() { … }
namespace {
#if FOLLY_HAVE_SO_TIMESTAMPING
const sock_extended_err* FOLLY_NULLABLE
cmsgToSockExtendedErr(const cmsghdr& cmsg) {
if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
(cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
(cmsg.cmsg_level == SOL_PACKET &&
cmsg.cmsg_type == PACKET_TX_TIMESTAMP)) {
return reinterpret_cast<const sock_extended_err*>(CMSG_DATA(&cmsg));
}
(void)cmsg;
return nullptr;
}
const sock_extended_err* FOLLY_NULLABLE
cmsgToSockExtendedErrTimestamping(const cmsghdr& cmsg) {
const auto serr = cmsgToSockExtendedErr(cmsg);
if (serr && serr->ee_errno == ENOMSG &&
serr->ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
return serr;
}
(void)cmsg;
return nullptr;
}
const scm_timestamping* FOLLY_NULLABLE
cmsgToScmTimestamping(const cmsghdr& cmsg) {
if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
return reinterpret_cast<const struct scm_timestamping*>(CMSG_DATA(&cmsg));
}
(void)cmsg;
return nullptr;
}
#endif
}
class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { … };
int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
folly::WriteFlags flags, bool zeroCopyEnabled) noexcept { … }
void AsyncSocket::SendMsgParamsCallback::getAncillaryData(
folly::WriteFlags flags,
void* data,
const WriteRequestTag& writeTag,
const bool byteEventsEnabled) noexcept { … }
uint32_t AsyncSocket::SendMsgParamsCallback::getAncillaryDataSize(
folly::WriteFlags flags,
const WriteRequestTag&,
const bool byteEventsEnabled) noexcept { … }
folly::Optional<AsyncSocket::ByteEvent>
AsyncSocket::ByteEventHelper::processCmsg(
const cmsghdr& cmsg, const size_t rawBytesWritten) { … }
namespace {
AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
void disableTransparentFunctions(
NetworkSocket fd, bool noTransparentTls, bool noTSocks) { … }
constexpr size_t kSmallIoVecSize = …;
}
AsyncSocket::AsyncSocket()
: … { … }
AsyncSocket::AsyncSocket(EventBase* evb)
: … { … }
AsyncSocket::AsyncSocket(
EventBase* evb,
const folly::SocketAddress& address,
uint32_t connectTimeout,
bool useZeroCopy)
: … { … }
AsyncSocket::AsyncSocket(
EventBase* evb,
const std::string& ip,
uint16_t port,
uint32_t connectTimeout,
bool useZeroCopy)
: … { … }
AsyncSocket::AsyncSocket(
EventBase* evb,
NetworkSocket fd,
uint32_t zeroCopyBufId,
const SocketAddress* peerAddress,
folly::Optional<std::chrono::steady_clock::time_point>
maybeConnectionEstablishTime)
: … { … }
AsyncSocket::AsyncSocket(AsyncSocket* oldAsyncSocket)
: … { … }
AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
: … { … }
void AsyncSocket::init() { … }
AsyncSocket::~AsyncSocket() { … }
void AsyncSocket::destroy() { … }
NetworkSocket AsyncSocket::detachNetworkSocket() { … }
void AsyncSocket::setShutdownSocketSet(
const std::weak_ptr<ShutdownSocketSet>& wNewSS) { … }
void AsyncSocket::setCloseOnExec() { … }
void AsyncSocket::connect(
ConnectCallback* callback,
const folly::SocketAddress& address,
int timeout,
const SocketOptionMap& options,
const folly::SocketAddress& bindAddr,
const std::string& ifName) noexcept { … }
int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) { … }
void AsyncSocket::scheduleConnectTimeout() { … }
void AsyncSocket::registerForConnectEvents() { … }
void AsyncSocket::connect(
ConnectCallback* callback,
const string& ip,
uint16_t port,
int timeout,
const SocketOptionMap& options) noexcept { … }
void AsyncSocket::cancelConnect() { … }
void AsyncSocket::setSendTimeout(uint32_t milliseconds) { … }
void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) { … }
AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const { … }
void AsyncSocket::setReadAncillaryDataCB(ReadAncillaryDataCallback* callback) { … }
AsyncSocket::ReadAncillaryDataCallback*
AsyncSocket::getReadAncillaryDataCallback() const { … }
void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) { … }
AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const { … }
void AsyncSocket::setReadCB(ReadCallback* callback) { … }
AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { … }
bool AsyncSocket::setZeroCopy(bool enable) { … }
void AsyncSocket::setZeroCopyEnableFunc(AsyncWriter::ZeroCopyEnableFunc func) { … }
void AsyncSocket::setZeroCopyReenableThreshold(size_t threshold) { … }
void AsyncSocket::setZeroCopyDrainConfig(const ZeroCopyDrainConfig& config) { … }
bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) { … }
void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) { … }
void AsyncSocket::addZeroCopyBuf(
std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb) { … }
void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) { … }
void AsyncSocket::releaseZeroCopyBuf(uint32_t id) { … }
void AsyncSocket::drainZeroCopyQueue() { … }
void AsyncSocket::setZeroCopyBuf(
std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb) { … }
bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) { … }
bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const { … }
void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) { … }
void AsyncSocket::releaseIOBuf(
std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback) { … }
void AsyncSocket::enableByteEvents() { … }
void AsyncSocket::write(
WriteCallback* callback, const void* buf, size_t bytes, WriteFlags flags) { … }
void AsyncSocket::writev(
WriteCallback* callback, const iovec* vec, size_t count, WriteFlags flags) { … }
void AsyncSocket::writeChain(
WriteCallback* callback, unique_ptr<IOBuf>&& buf, WriteFlags flags) { … }
void AsyncSocket::writeChainImpl(
WriteCallback* callback,
iovec* vec,
size_t count,
unique_ptr<IOBuf>&& buf,
WriteFlags flags) { … }
void AsyncSocket::writeImpl(
WriteCallback* callback,
const iovec* vec,
size_t count,
unique_ptr<IOBuf>&& buf,
size_t totalBytes,
WriteFlags flags) { … }
void AsyncSocket::writeRequest(WriteRequest* req) { … }
void AsyncSocket::close() { … }
void AsyncSocket::closeNow() { … }
void AsyncSocket::closeWithReset() { … }
void AsyncSocket::shutdownWrite() { … }
void AsyncSocket::shutdownWriteNow() { … }
bool AsyncSocket::readable() const { … }
bool AsyncSocket::writable() const { … }
bool AsyncSocket::isPending() const { … }
bool AsyncSocket::hangup() const { … }
bool AsyncSocket::good() const { … }
bool AsyncSocket::error() const { … }
void AsyncSocket::attachEventBase(EventBase* eventBase) { … }
void AsyncSocket::detachEventBase() { … }
bool AsyncSocket::isDetachable() const { … }
void AsyncSocket::cacheAddresses() { … }
void AsyncSocket::cacheLocalAddress() const { … }
void AsyncSocket::cachePeerAddress() const { … }
void AsyncSocket::applyOptions(
const SocketOptionMap& options, SocketOptionKey::ApplyPos pos) { … }
bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept { … }
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { … }
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const { … }
bool AsyncSocket::getTFOSucceded() const { … }
int AsyncSocket::setNoDelay(bool noDelay) { … }
int AsyncSocket::setCongestionFlavor(const std::string& cname) { … }
int AsyncSocket::setQuickAck(bool quickack) { … }
int AsyncSocket::setSendBufSize(size_t bufsize) { … }
int AsyncSocket::setRecvBufSize(size_t bufsize) { … }
#if defined(__linux__)
size_t AsyncSocket::getSendBufInUse() const {
if (fd_ == NetworkSocket()) {
std::stringstream issueString;
issueString << "AsyncSocket::getSendBufInUse() called on non-open socket "
<< this << "(state=" << state_ << ")";
VLOG(4) << issueString.str();
throw std::logic_error(issueString.str());
}
size_t returnValue = 0;
if (-1 == ::ioctl(fd_.toFd(), SIOCOUTQ, &returnValue)) {
int errnoCopy = errno;
std::stringstream issueString;
issueString << "Failed to get the tx used bytes on Socket: " << this
<< "(fd=" << fd_ << ", state=" << state_
<< "): " << errnoStr(errnoCopy);
VLOG(2) << issueString.str();
throw std::logic_error(issueString.str());
}
return returnValue;
}
size_t AsyncSocket::getRecvBufInUse() const {
if (fd_ == NetworkSocket()) {
std::stringstream issueString;
issueString << "AsyncSocket::getRecvBufInUse() called on non-open socket "
<< this << "(state=" << state_ << ")";
VLOG(4) << issueString.str();
throw std::logic_error(issueString.str());
}
size_t returnValue = 0;
if (-1 == ::ioctl(fd_.toFd(), SIOCINQ, &returnValue)) {
std::stringstream issueString;
int errnoCopy = errno;
issueString << "Failed to get the rx used bytes on Socket: " << this
<< "(fd=" << fd_ << ", state=" << state_
<< "): " << errnoStr(errnoCopy);
VLOG(2) << issueString.str();
throw std::logic_error(issueString.str());
}
return returnValue;
}
#endif
int AsyncSocket::setTCPProfile(int profd) { … }
void AsyncSocket::ioReady(uint16_t events) noexcept { … }
AsyncSocket::ReadResult AsyncSocket::performReadMsg(
struct ::msghdr& msg,
AsyncReader::ReadCallback::ReadMode) { … }
void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) { … }
void AsyncSocket::prepareReadBuffers(IOBufIovecBuilder::IoVecVec& iovs) { … }
void AsyncSocket::drainErrorQueue() noexcept { … }
size_t AsyncSocket::handleErrMessages() noexcept { … }
bool AsyncSocket::processZeroCopyWriteInProgress() noexcept { … }
folly::Expected<folly::TcpInfo, std::errc> AsyncSocket::getTcpInfo(
const TcpInfo::LookupOptions& options) { … }
void AsyncSocket::addLifecycleObserver(
AsyncSocket::LegacyLifecycleObserver* observer) { … }
bool AsyncSocket::removeLifecycleObserver(
AsyncSocket::LegacyLifecycleObserver* observer) { … }
std::vector<AsyncSocket::LegacyLifecycleObserver*>
AsyncSocket::getLifecycleObservers() const { … }
void AsyncSocket::splitIovecArray(
const size_t startOffset,
const size_t endOffset,
const iovec* srcVec,
const size_t srcCount,
iovec* dstVec,
size_t& dstCount) { … }
AsyncSocket::ReadCode AsyncSocket::processZeroCopyRead() { … }
AsyncSocket::ReadCode AsyncSocket::processNormalRead() { … }
void AsyncSocket::handleRead() noexcept { … }
void AsyncSocket::handleWrite() noexcept { … }
void AsyncSocket::checkForImmediateRead() noexcept { … }
void AsyncSocket::handleInitialReadWrite() noexcept { … }
void AsyncSocket::handleConnect() noexcept { … }
void AsyncSocket::timeoutExpired() noexcept { … }
void AsyncSocket::handleNetworkSocketAttached() { … }
ssize_t AsyncSocket::tfoSendMsg(
NetworkSocket fd, struct msghdr* msg, int msg_flags) { … }
AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
const iovec* vec,
size_t count,
WriteFlags flags,
WriteRequestTag writeTag) { … }
AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
NetworkSocket fd, struct msghdr* msg, int msg_flags) { … }
AsyncSocket::WriteResult AsyncSocket::performWrite(
const iovec* vec,
uint32_t count,
WriteFlags flags,
uint32_t* countWritten,
uint32_t* partialWritten,
WriteRequestTag writeTag) { … }
bool AsyncSocket::updateEventRegistration() { … }
bool AsyncSocket::updateEventRegistration(uint16_t enable, uint16_t disable) { … }
void AsyncSocket::startFail() { … }
void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) { … }
void AsyncSocket::finishFail() { … }
void AsyncSocket::finishFail(const AsyncSocketException& ex) { … }
void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) { … }
void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) { … }
AsyncSocket::ReadCode AsyncSocket::failRead(
const char* fn, const AsyncSocketException& ex) { … }
void AsyncSocket::failErrMessageRead(
const char* fn, const AsyncSocketException& ex) { … }
void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) { … }
void AsyncSocket::failWrite(
const char* fn,
WriteCallback* callback,
size_t bytesWritten,
const AsyncSocketException& ex) { … }
void AsyncSocket::failAllWrites(const AsyncSocketException& ex) { … }
void AsyncSocket::failByteEvents(const AsyncSocketException& ex) { … }
void AsyncSocket::invalidState(ConnectCallback* callback) { … }
void AsyncSocket::invalidState(ErrMessageCallback* callback) { … }
void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) { … }
void AsyncSocket::invokeConnectSuccess() { … }
void AsyncSocket::invokeConnectAttempt() { … }
void AsyncSocket::invalidState(ReadCallback* callback) { … }
void AsyncSocket::invalidState(WriteCallback* callback) { … }
void AsyncSocket::doClose() { … }
std::ostream& operator<<(
std::ostream& os, const AsyncSocket::StateEnum& state) { … }
std::string AsyncSocket::withAddr(folly::StringPiece s) { … }
void AsyncSocket::setBufferCallback(BufferCallback* cb) { … }
std::ostream& operator<<(
std::ostream& os, const folly::AsyncSocket::WriteRequestTag& tag) { … }
}