folly/folly/io/async/AsyncSocket.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/io/async/AsyncSocket.h>

#include <sys/types.h>

#include <cerrno>
#include <sstream>

#include <boost/preprocessor/control/if.hpp>

#include <folly/Exception.h>
#include <folly/Format.h>
#include <folly/Portability.h>
#include <folly/SocketAddress.h>
#include <folly/String.h>
#include <folly/io/Cursor.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
#include <folly/io/SocketOptionMap.h>
#include <folly/lang/CheckedMath.h>
#include <folly/portability/Fcntl.h>
#include <folly/portability/Sockets.h>
#include <folly/portability/SysMman.h>
#include <folly/portability/SysUio.h>
#include <folly/portability/Unistd.h>

#if defined(__linux__)
#include <linux/if_packet.h>
#include <linux/sockios.h>
#include <sys/ioctl.h>
#endif

ZeroCopyMemStore;

namespace {
class ZeroCopyMMapMemStoreFallback : public ZeroCopyMemStore {};

#if TCP_ZEROCOPY_RECEIVE
std::unique_ptr<folly::IOBuf> getRXZeroCopyIOBuf(
    ZeroCopyMemStore::EntryPtr&& ptr) {
  auto* entry = ptr.release();
  return folly::IOBuf::takeOwnership(
      entry->data,
      entry->len,
      entry->len,
      [](void* /*buf*/, void* userData) {
        reinterpret_cast<ZeroCopyMemStore::Entry*>(userData)->put();
      },
      entry);
}

class ZeroCopyMMapMemStoreReal : public ZeroCopyMemStore {
 public:
  ZeroCopyMMapMemStoreReal(size_t entries, size_t size) {
    // we just need a socket so the kernel
    // will set the vma->vm_ops = &tcp_vm_ops
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
    if (fd >= 0) {
      void* addr =
          ::mmap(nullptr, entries * size, PROT_READ, MAP_SHARED, fd, 0);
      ::close(fd);
      if (addr != MAP_FAILED) {
        addr_ = addr;
        numEntries_ = entries;
        entrySize_ = size;
        entries_.resize(numEntries_);
        for (size_t i = 0; i < numEntries_; i++) {
          entries_[i].data =
              reinterpret_cast<uint8_t*>(addr_) + (i * entrySize_);
          entries_[i].capacity = entrySize_;
          entries_[i].store = this;
          avail_.push_back(&entries_[i]);
        }
      }
    }
  }
  ~ZeroCopyMMapMemStoreReal() override {
    CHECK_EQ(avail_.size(), numEntries_);
    if (addr_) {
      ::munmap(addr_, numEntries_ * entrySize_);
    }
  }

  ZeroCopyMemStore::EntryPtr get() override {
    std::unique_lock<std::mutex> lk(availMutex_);
    if (!avail_.empty()) {
      auto* entry = avail_.front();
      avail_.pop_front();
      DCHECK(entry->len == 0);
      DCHECK(entry->capacity == entrySize_);

      ZeroCopyMemStore::EntryPtr ret(entry);

      return ret;
    }

    return nullptr;
  }

  void put(ZeroCopyMemStore::Entry* entry) override {
    if (entry) {
      DCHECK(entry->store == this);
      if (entry->len) {
        auto ret = ::madvise(entry->data, entry->len, MADV_DONTNEED);
        entry->len = 0;
        DCHECK(!ret);
      }

      std::unique_lock<std::mutex> lk(availMutex_);
      avail_.push_back(entry);
    }
  }

 private:
  std::vector<ZeroCopyMemStore::Entry> entries_;
  std::mutex availMutex_;
  std::deque<ZeroCopyMemStore::Entry*> avail_;
  void* addr_{nullptr};
  size_t numEntries_{0};
  size_t entrySize_{0};
};

using ZeroCopyMMapMemStore = ZeroCopyMMapMemStoreReal;
#else
ZeroCopyMMapMemStore;
#endif
} // namespace

#if FOLLY_HAVE_VLA
#define FOLLY_HAVE_VLA_01
#else
#define FOLLY_HAVE_VLA_01
#endif

string;
unique_ptr;

fsp;

namespace folly {

static constexpr bool msgErrQueueSupported =true;
#else
    false;
#endif // FOLLY_HAVE_MSG_ERRQUEUE

std::unique_ptr<ZeroCopyMemStore> AsyncSocket::createDefaultZeroCopyMemStore(
    size_t entries, size_t size) {}

static AsyncSocketException const& getSocketClosedLocallyEx() {}

static AsyncSocketException const& getSocketShutdownForWritesEx() {}

namespace {
#if FOLLY_HAVE_SO_TIMESTAMPING
const sock_extended_err* FOLLY_NULLABLE
cmsgToSockExtendedErr(const cmsghdr& cmsg) {
  if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
      (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
      (cmsg.cmsg_level == SOL_PACKET &&
       cmsg.cmsg_type == PACKET_TX_TIMESTAMP)) {
    return reinterpret_cast<const sock_extended_err*>(CMSG_DATA(&cmsg));
  }
  (void)cmsg;
  return nullptr;
}

const sock_extended_err* FOLLY_NULLABLE
cmsgToSockExtendedErrTimestamping(const cmsghdr& cmsg) {
  const auto serr = cmsgToSockExtendedErr(cmsg);
  if (serr && serr->ee_errno == ENOMSG &&
      serr->ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
    return serr;
  }
  (void)cmsg;
  return nullptr;
}

const scm_timestamping* FOLLY_NULLABLE
cmsgToScmTimestamping(const cmsghdr& cmsg) {
  if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
    return reinterpret_cast<const struct scm_timestamping*>(CMSG_DATA(&cmsg));
  }
  (void)cmsg;
  return nullptr;
}

#endif // FOLLY_HAVE_SO_TIMESTAMPING
} // namespace

// TODO: It might help performance to provide a version of BytesWriteRequest
// that users could derive from, so we can avoid the extra allocation for each
// call to write()/writev().
//
// We would need the version for external users where they provide the iovec
// storage space, and only our internal version would allocate it at the end of
// the WriteRequest.

/* The default WriteRequest implementation, used for write(), writev() and
 * writeChain()
 *
 * A new BytesWriteRequest operation is allocated on the heap for all write
 * operations that cannot be completed immediately.
 */
class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {};

int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
    folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {}

void AsyncSocket::SendMsgParamsCallback::getAncillaryData(
    folly::WriteFlags flags,
    void* data,
    const WriteRequestTag& writeTag,
    const bool byteEventsEnabled) noexcept {}

uint32_t AsyncSocket::SendMsgParamsCallback::getAncillaryDataSize(
    folly::WriteFlags flags,
    const WriteRequestTag&,
    const bool byteEventsEnabled) noexcept {}

folly::Optional<AsyncSocket::ByteEvent>
AsyncSocket::ByteEventHelper::processCmsg(
    const cmsghdr& cmsg, const size_t rawBytesWritten) {}

namespace {
AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;

// Based on flags, signal the transparent handler to disable certain functions
void disableTransparentFunctions(
    NetworkSocket fd, bool noTransparentTls, bool noTSocks) {}

constexpr size_t kSmallIoVecSize =;

} // namespace

AsyncSocket::AsyncSocket()
    :{}

AsyncSocket::AsyncSocket(EventBase* evb)
    :{}

AsyncSocket::AsyncSocket(
    EventBase* evb,
    const folly::SocketAddress& address,
    uint32_t connectTimeout,
    bool useZeroCopy)
    :{}

AsyncSocket::AsyncSocket(
    EventBase* evb,
    const std::string& ip,
    uint16_t port,
    uint32_t connectTimeout,
    bool useZeroCopy)
    :{}

AsyncSocket::AsyncSocket(
    EventBase* evb,
    NetworkSocket fd,
    uint32_t zeroCopyBufId,
    const SocketAddress* peerAddress,
    folly::Optional<std::chrono::steady_clock::time_point>
        maybeConnectionEstablishTime)
    :{}

AsyncSocket::AsyncSocket(AsyncSocket* oldAsyncSocket)
    :{}

AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
    :{}

// init() method, since constructor forwarding isn't supported in most
// compilers yet.
void AsyncSocket::init() {}

AsyncSocket::~AsyncSocket() {}

void AsyncSocket::destroy() {}

NetworkSocket AsyncSocket::detachNetworkSocket() {}

void AsyncSocket::setShutdownSocketSet(
    const std::weak_ptr<ShutdownSocketSet>& wNewSS) {}

void AsyncSocket::setCloseOnExec() {}

void AsyncSocket::connect(
    ConnectCallback* callback,
    const folly::SocketAddress& address,
    int timeout,
    const SocketOptionMap& options,
    const folly::SocketAddress& bindAddr,
    const std::string& ifName) noexcept {}

int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {}

void AsyncSocket::scheduleConnectTimeout() {}

void AsyncSocket::registerForConnectEvents() {}

void AsyncSocket::connect(
    ConnectCallback* callback,
    const string& ip,
    uint16_t port,
    int timeout,
    const SocketOptionMap& options) noexcept {}

void AsyncSocket::cancelConnect() {}

void AsyncSocket::setSendTimeout(uint32_t milliseconds) {}

void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {}

AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {}

void AsyncSocket::setReadAncillaryDataCB(ReadAncillaryDataCallback* callback) {}

AsyncSocket::ReadAncillaryDataCallback*
AsyncSocket::getReadAncillaryDataCallback() const {}

void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {}

AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {}

void AsyncSocket::setReadCB(ReadCallback* callback) {}

AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {}

bool AsyncSocket::setZeroCopy(bool enable) {}

void AsyncSocket::setZeroCopyEnableFunc(AsyncWriter::ZeroCopyEnableFunc func) {}

void AsyncSocket::setZeroCopyReenableThreshold(size_t threshold) {}

void AsyncSocket::setZeroCopyDrainConfig(const ZeroCopyDrainConfig& config) {}

bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {}

void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {}

void AsyncSocket::addZeroCopyBuf(
    std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb) {}

void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {}

void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {}

void AsyncSocket::drainZeroCopyQueue() {}

void AsyncSocket::setZeroCopyBuf(
    std::unique_ptr<folly::IOBuf>&& buf, ReleaseIOBufCallback* cb) {}

bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {}

bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {}

void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {}

void AsyncSocket::releaseIOBuf(
    std::unique_ptr<folly::IOBuf> buf, ReleaseIOBufCallback* callback) {}

void AsyncSocket::enableByteEvents() {}

void AsyncSocket::write(
    WriteCallback* callback, const void* buf, size_t bytes, WriteFlags flags) {}

void AsyncSocket::writev(
    WriteCallback* callback, const iovec* vec, size_t count, WriteFlags flags) {}

void AsyncSocket::writeChain(
    WriteCallback* callback, unique_ptr<IOBuf>&& buf, WriteFlags flags) {}

void AsyncSocket::writeChainImpl(
    WriteCallback* callback,
    iovec* vec,
    size_t count,
    unique_ptr<IOBuf>&& buf,
    WriteFlags flags) {}

void AsyncSocket::writeImpl(
    WriteCallback* callback,
    const iovec* vec,
    size_t count,
    unique_ptr<IOBuf>&& buf,
    size_t totalBytes,
    WriteFlags flags) {}

void AsyncSocket::writeRequest(WriteRequest* req) {}

void AsyncSocket::close() {}

void AsyncSocket::closeNow() {}

void AsyncSocket::closeWithReset() {}

void AsyncSocket::shutdownWrite() {}

void AsyncSocket::shutdownWriteNow() {}

bool AsyncSocket::readable() const {}

bool AsyncSocket::writable() const {}

bool AsyncSocket::isPending() const {}

bool AsyncSocket::hangup() const {}

bool AsyncSocket::good() const {}

bool AsyncSocket::error() const {}

void AsyncSocket::attachEventBase(EventBase* eventBase) {}

void AsyncSocket::detachEventBase() {}

bool AsyncSocket::isDetachable() const {}

void AsyncSocket::cacheAddresses() {}

void AsyncSocket::cacheLocalAddress() const {}

void AsyncSocket::cachePeerAddress() const {}

void AsyncSocket::applyOptions(
    const SocketOptionMap& options, SocketOptionKey::ApplyPos pos) {}

bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {}

void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {}

void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {}

bool AsyncSocket::getTFOSucceded() const {}

int AsyncSocket::setNoDelay(bool noDelay) {}

int AsyncSocket::setCongestionFlavor(const std::string& cname) {}

int AsyncSocket::setQuickAck(bool quickack) {}

int AsyncSocket::setSendBufSize(size_t bufsize) {}

int AsyncSocket::setRecvBufSize(size_t bufsize) {}

#if defined(__linux__)
size_t AsyncSocket::getSendBufInUse() const {
  if (fd_ == NetworkSocket()) {
    std::stringstream issueString;
    issueString << "AsyncSocket::getSendBufInUse() called on non-open socket "
                << this << "(state=" << state_ << ")";
    VLOG(4) << issueString.str();
    throw std::logic_error(issueString.str());
  }

  size_t returnValue = 0;
  if (-1 == ::ioctl(fd_.toFd(), SIOCOUTQ, &returnValue)) {
    int errnoCopy = errno;
    std::stringstream issueString;
    issueString << "Failed to get the tx used bytes on Socket: " << this
                << "(fd=" << fd_ << ", state=" << state_
                << "): " << errnoStr(errnoCopy);
    VLOG(2) << issueString.str();
    throw std::logic_error(issueString.str());
  }

  return returnValue;
}

size_t AsyncSocket::getRecvBufInUse() const {
  if (fd_ == NetworkSocket()) {
    std::stringstream issueString;
    issueString << "AsyncSocket::getRecvBufInUse() called on non-open socket "
                << this << "(state=" << state_ << ")";
    VLOG(4) << issueString.str();
    throw std::logic_error(issueString.str());
  }

  size_t returnValue = 0;
  if (-1 == ::ioctl(fd_.toFd(), SIOCINQ, &returnValue)) {
    std::stringstream issueString;
    int errnoCopy = errno;
    issueString << "Failed to get the rx used bytes on Socket: " << this
                << "(fd=" << fd_ << ", state=" << state_
                << "): " << errnoStr(errnoCopy);
    VLOG(2) << issueString.str();
    throw std::logic_error(issueString.str());
  }

  return returnValue;
}
#endif

int AsyncSocket::setTCPProfile(int profd) {}

void AsyncSocket::ioReady(uint16_t events) noexcept {}

AsyncSocket::ReadResult AsyncSocket::performReadMsg(
    struct ::msghdr& msg,
    // This is here only to preserve AsyncSSLSocket's legacy semi-broken
    // behavior (D43648653 for context).
    AsyncReader::ReadCallback::ReadMode) {}

void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {}

void AsyncSocket::prepareReadBuffers(IOBufIovecBuilder::IoVecVec& iovs) {}

void AsyncSocket::drainErrorQueue() noexcept {}

size_t AsyncSocket::handleErrMessages() noexcept {}

bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {}

folly::Expected<folly::TcpInfo, std::errc> AsyncSocket::getTcpInfo(
    const TcpInfo::LookupOptions& options) {}

void AsyncSocket::addLifecycleObserver(
    AsyncSocket::LegacyLifecycleObserver* observer) {}

bool AsyncSocket::removeLifecycleObserver(
    AsyncSocket::LegacyLifecycleObserver* observer) {}

std::vector<AsyncSocket::LegacyLifecycleObserver*>
AsyncSocket::getLifecycleObservers() const {}

void AsyncSocket::splitIovecArray(
    const size_t startOffset,
    const size_t endOffset,
    const iovec* srcVec,
    const size_t srcCount,
    iovec* dstVec,
    size_t& dstCount) {}

AsyncSocket::ReadCode AsyncSocket::processZeroCopyRead() {}

AsyncSocket::ReadCode AsyncSocket::processNormalRead() {}

void AsyncSocket::handleRead() noexcept {}

/**
 * This function attempts to write as much data as possible, until no more
 * data can be written.
 *
 * - If it sends all available data, it unregisters for write events, and
 * stops the writeTimeout_.
 *
 * - If not all of the data can be sent immediately, it reschedules
 *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
 *   registered for write events.
 */
void AsyncSocket::handleWrite() noexcept {}

void AsyncSocket::checkForImmediateRead() noexcept {}

void AsyncSocket::handleInitialReadWrite() noexcept {}

void AsyncSocket::handleConnect() noexcept {}

void AsyncSocket::timeoutExpired() noexcept {}

void AsyncSocket::handleNetworkSocketAttached() {}

ssize_t AsyncSocket::tfoSendMsg(
    NetworkSocket fd, struct msghdr* msg, int msg_flags) {}

AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
    const iovec* vec,
    size_t count,
    WriteFlags flags,
    WriteRequestTag writeTag) {}

AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
    NetworkSocket fd, struct msghdr* msg, int msg_flags) {}

AsyncSocket::WriteResult AsyncSocket::performWrite(
    const iovec* vec,
    uint32_t count,
    WriteFlags flags,
    uint32_t* countWritten,
    uint32_t* partialWritten,
    WriteRequestTag writeTag) {}

/**
 * Re-register the EventHandler after eventFlags_ has changed.
 *
 * If an error occurs, fail() is called to move the socket into the error
 * state and call all currently installed callbacks.  After an error, the
 * AsyncSocket is completely unregistered.
 *
 * @return Returns true on success, or false on error.
 */
bool AsyncSocket::updateEventRegistration() {}

bool AsyncSocket::updateEventRegistration(uint16_t enable, uint16_t disable) {}

void AsyncSocket::startFail() {}

void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {}

void AsyncSocket::finishFail() {}

void AsyncSocket::finishFail(const AsyncSocketException& ex) {}

void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {}

void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {}

AsyncSocket::ReadCode AsyncSocket::failRead(
    const char* fn, const AsyncSocketException& ex) {}

void AsyncSocket::failErrMessageRead(
    const char* fn, const AsyncSocketException& ex) {}

void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {}

void AsyncSocket::failWrite(
    const char* fn,
    WriteCallback* callback,
    size_t bytesWritten,
    const AsyncSocketException& ex) {}

void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {}

void AsyncSocket::failByteEvents(const AsyncSocketException& ex) {}

void AsyncSocket::invalidState(ConnectCallback* callback) {}

void AsyncSocket::invalidState(ErrMessageCallback* callback) {}

void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {}

void AsyncSocket::invokeConnectSuccess() {}

void AsyncSocket::invokeConnectAttempt() {}

void AsyncSocket::invalidState(ReadCallback* callback) {}

void AsyncSocket::invalidState(WriteCallback* callback) {}

void AsyncSocket::doClose() {}

std::ostream& operator<<(
    std::ostream& os, const AsyncSocket::StateEnum& state) {}

std::string AsyncSocket::withAddr(folly::StringPiece s) {}

void AsyncSocket::setBufferCallback(BufferCallback* cb) {}

std::ostream& operator<<(
    std::ostream& os, const folly::AsyncSocket::WriteRequestTag& tag) {}

} // namespace folly