#pragma once
#include <sys/types.h>
#include <chrono>
#include <map>
#include <set>
#include <vector>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/slist.hpp>
#include <glog/logging.h>
#include <folly/CPortability.h>
#include <folly/Conv.h>
#include <folly/CppAttributes.h>
#include <folly/ExceptionString.h>
#include <folly/Function.h>
#include <folly/Optional.h>
#include <folly/Range.h>
#include <folly/experimental/io/Liburing.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/EventBaseBackendBase.h>
#include <folly/io/async/IoUringBase.h>
#include <folly/portability/Asm.h>
#include <folly/small_vector.h>
#if __has_include(<poll.h>)
#include <poll.h>
#endif
#if FOLLY_HAS_LIBURING
#include <liburing.h>
namespace folly {
class IoUringBackend : public EventBaseBackendBase {
public:
class FOLLY_EXPORT NotAvailable : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};
struct Options {
enum Flags {
POLL_SQ = 0x1,
POLL_CQ = 0x2,
POLL_SQ_IMMEDIATE_IO = 0x4,
};
Options() = default;
Options& setCapacity(size_t v) {
capacity = v;
return *this;
}
Options& setMinCapacity(size_t v) {
minCapacity = v;
return *this;
}
Options& setMaxSubmit(size_t v) {
maxSubmit = v;
return *this;
}
Options& setSqeSize(size_t v) {
sqeSize = v;
return *this;
}
Options& setMaxGet(size_t v) {
maxGet = v;
return *this;
}
Options& setUseRegisteredFds(size_t v) {
registeredFds = v;
return *this;
}
Options& setFlags(uint32_t v) {
flags = v;
return *this;
}
Options& setSQIdle(std::chrono::milliseconds v) {
sqIdle = v;
return *this;
}
Options& setCQIdle(std::chrono::milliseconds v) {
cqIdle = v;
return *this;
}
Options& setSQCpu(uint32_t v) {
sqCpus.insert(v);
return *this;
}
Options& setSQCpus(std::set<uint32_t> const& cpus) {
sqCpus.insert(cpus.begin(), cpus.end());
return *this;
}
Options& setSQGroupName(const std::string& v) {
sqGroupName = v;
return *this;
}
Options& setSQGroupNumThreads(size_t v) {
sqGroupNumThreads = v;
return *this;
}
Options& setInitialProvidedBuffers(size_t eachSize, size_t count) {
initialProvidedBuffersCount = count;
initialProvidedBuffersEachSize = eachSize;
return *this;
}
Options& setRegisterRingFd(bool v) {
registerRingFd = v;
return *this;
}
Options& setTaskRunCoop(bool v) {
taskRunCoop = v;
return *this;
}
Options& setDeferTaskRun(bool v) {
deferTaskRun = v;
return *this;
}
Options& setTimeout(std::chrono::microseconds v) {
timeout = v;
return *this;
}
Options& setBatchSize(int v) {
batchSize = v;
return *this;
}
ssize_t sqeSize{-1};
size_t capacity{256};
size_t minCapacity{0};
size_t maxSubmit{128};
size_t maxGet{256};
size_t registeredFds{0};
size_t sqGroupNumThreads{1};
size_t initialProvidedBuffersCount{0};
size_t initialProvidedBuffersEachSize{0};
uint32_t flags{0};
int batchSize{0};
bool registerRingFd{false};
bool taskRunCoop{false};
bool deferTaskRun{false};
std::chrono::microseconds timeout{0};
std::chrono::milliseconds sqIdle{0};
std::chrono::milliseconds cqIdle{0};
std::set<uint32_t> sqCpus;
std::string sqGroupName;
};
explicit IoUringBackend(Options options);
~IoUringBackend() override;
Options const& options() const { return options_; }
bool isWaitingToSubmit() const {
return waitingToSubmit_ || !submitList_.empty();
}
struct io_uring* ioRingPtr() { return &ioRing_; }
struct io_uring_params const& params() const { return params_; }
bool useReqBatching() const {
return options_.timeout.count() > 0 && options_.batchSize > 0;
}
int getPollableFd() const override { return ioRing_.ring_fd; }
event_base* getEventBase() override { return nullptr; }
int eb_event_base_loop(int flags) override;
int eb_event_base_loopbreak() override;
int eb_event_add(Event& event, const struct timeval* timeout) override;
int eb_event_del(Event& event) override;
bool eb_event_active(Event&, int) override { return false; }
size_t loopPoll();
void submitOutstanding();
unsigned int processCompleted();
static bool isAvailable();
bool kernelHasNonBlockWriteFixes() const;
static bool kernelSupportsRecvmsgMultishot();
static bool kernelSupportsDeferTaskrun();
static bool kernelSupportsSendZC();
IoUringFdRegistrationRecord* registerFd(int fd) noexcept {
return fdRegistry_.alloc(fd);
}
bool unregisterFd(IoUringFdRegistrationRecord* rec) {
return fdRegistry_.free(rec);
}
using CQPollLoopCallback = folly::Function<void()>;
void setCQPollLoopCallback(CQPollLoopCallback&& cb) {
cqPollLoopCallback_ = std::move(cb);
}
using FileOpCallback = folly::Function<void(int)>;
void queueRead(
int fd,
void* buf,
unsigned int nbytes,
off_t offset,
FileOpCallback&& cb);
void queueWrite(
int fd,
const void* buf,
unsigned int nbytes,
off_t offset,
FileOpCallback&& cb);
void queueReadv(
int fd,
Range<const struct iovec*> iovecs,
off_t offset,
FileOpCallback&& cb);
void queueWritev(
int fd,
Range<const struct iovec*> iovecs,
off_t offset,
FileOpCallback&& cb);
void queueFsync(int fd, FileOpCallback&& cb);
void queueFdatasync(int fd, FileOpCallback&& cb);
void queueOpenat(
int dfd, const char* path, int flags, mode_t mode, FileOpCallback&& cb);
void queueOpenat2(
int dfd, const char* path, struct open_how* how, FileOpCallback&& cb);
void queueClose(int fd, FileOpCallback&& cb);
void queueStatx(
int dirfd,
const char* pathname,
int flags,
unsigned int mask,
struct statx* statxbuf,
FileOpCallback&& cb);
void queueFallocate(
int fd, int mode, off_t offset, off_t len, FileOpCallback&& cb);
void queueSendmsg(
int fd,
const struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb);
void queueRecvmsg(
int fd, struct msghdr* msg, unsigned int flags, FileOpCallback&& cb);
void submit(IoSqeBase& ioSqe) {
submitImmediateIoSqe(ioSqe);
}
void submitNextLoop(IoSqeBase& ioSqe) noexcept;
void submitSoon(IoSqeBase& ioSqe) noexcept;
void submitNow(IoSqeBase& ioSqe);
void cancel(IoSqeBase* sqe);
IoUringBufferProviderBase* bufferProvider() { return bufferProvider_.get(); }
uint16_t nextBufferProviderGid() { return bufferProviderGidNext_++; }
protected:
enum class WaitForEventsMode { WAIT, DONT_WAIT };
class SocketPair {
public:
SocketPair();
SocketPair(const SocketPair&) = delete;
SocketPair& operator=(const SocketPair&) = delete;
~SocketPair();
int readFd() const { return fds_[1]; }
int writeFd() const { return fds_[0]; }
private:
std::array<int, 2> fds_{-1, -1};
};
struct UserData {
uintptr_t value;
explicit UserData(void* p) noexcept
: value{reinterpret_cast<uintptr_t>(p)} {}
operator uint64_t() const noexcept { return value; }
operator void*() const noexcept {
return reinterpret_cast<void*>(value);
}
};
static uint32_t getPollFlags(short events) {
uint32_t ret = 0;
if (events & EV_READ) {
ret |= POLLIN;
}
if (events & EV_WRITE) {
ret |= POLLOUT;
}
return ret;
}
static short getPollEvents(uint32_t flags, short events) {
short ret = 0;
if (flags & POLLIN) {
ret |= EV_READ;
}
if (flags & POLLOUT) {
ret |= EV_WRITE;
}
if (flags & (POLLERR | POLLHUP)) {
ret |= (EV_READ | EV_WRITE);
}
ret &= events;
return ret;
}
bool addTimerFd();
void scheduleTimeout();
void scheduleTimeout(const std::chrono::microseconds& us);
void addTimerEvent(Event& event, const struct timeval* timeout);
void removeTimerEvent(Event& event);
size_t processTimers();
void setProcessTimers();
size_t processActiveEvents();
struct IoSqe;
static void processPollIoSqe(
IoUringBackend* backend, IoSqe* ioSqe, int res, uint32_t flags);
static void processTimerIoSqe(
IoUringBackend* backend,
IoSqe* ,
int ,
uint32_t );
static void processSignalReadIoSqe(
IoUringBackend* backend,
IoSqe* ,
int ,
uint32_t );
void addSignalEvent(Event& event);
void removeSignalEvent(Event& event);
bool addSignalFds();
size_t processSignals();
void setProcessSignals();
void processPollIo(IoSqe* ioSqe, int res, uint32_t flags) noexcept;
IoSqe* FOLLY_NULLABLE allocIoSqe(const EventCallback& cb);
void releaseIoSqe(IoSqe* aioIoSqe) noexcept;
void submitImmediateIoSqe(IoSqeBase& ioSqe);
void internalSubmit(IoSqeBase& ioSqe) noexcept;
enum class InternalProcessCqeMode {
NORMAL,
AVAILABLE_ONLY,
CANCEL_ALL,
};
unsigned int internalProcessCqe(
unsigned int maxGet, InternalProcessCqeMode mode) noexcept;
int eb_event_modify_inserted(Event& event, IoSqe* ioSqe);
struct FdRegistry {
FdRegistry() = delete;
FdRegistry(struct io_uring& ioRing, size_t n);
IoUringFdRegistrationRecord* alloc(int fd) noexcept;
bool free(IoUringFdRegistrationRecord* record);
int init();
size_t update();
bool err_{false};
struct io_uring& ioRing_;
std::vector<int> files_;
size_t inUse_;
std::vector<IoUringFdRegistrationRecord> records_;
boost::intrusive::
slist<IoUringFdRegistrationRecord, boost::intrusive::cache_last<false>>
free_;
};
struct IoSqe : public IoSqeBase {
using BackendCb = void(IoUringBackend*, IoSqe*, int, uint32_t);
explicit IoSqe(
IoUringBackend* backend = nullptr,
bool poolAlloc = false,
bool persist = false)
: backend_(backend), poolAlloc_(poolAlloc), persist_(persist) {}
void callback(const io_uring_cqe* cqe) noexcept override {
backendCb_(backend_, this, cqe->res, cqe->flags);
}
void callbackCancelled(const io_uring_cqe*) noexcept override { release(); }
virtual void release() noexcept;
IoUringBackend* backend_;
BackendCb* backendCb_{nullptr};
const bool poolAlloc_;
const bool persist_;
Event* event_{nullptr};
IoUringFdRegistrationRecord* fdRecord_{nullptr};
size_t useCount_{0};
int res_;
uint32_t cqeFlags_;
FOLLY_ALWAYS_INLINE void resetEvent() {
unlink();
if (event_) {
event_->setUserData(nullptr);
event_ = nullptr;
}
}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
auto* ev = event_->getEvent();
if (ev) {
const auto& cb = event_->getCallback();
switch (cb.type_) {
case EventCallback::Type::TYPE_NONE:
break;
case EventCallback::Type::TYPE_READ:
if (auto* iov = cb.readCb_->allocateData()) {
prepRead(
sqe,
ev->ev_fd,
&iov->data_,
0,
(ev->ev_events & EV_PERSIST) != 0);
cbData_.set(iov);
return;
}
break;
case EventCallback::Type::TYPE_RECVMSG:
if (auto* msg = cb.recvmsgCb_->allocateData()) {
prepRecvmsg(
sqe,
ev->ev_fd,
&msg->data_,
(ev->ev_events & EV_PERSIST) != 0);
cbData_.set(msg);
return;
}
break;
case EventCallback::Type::TYPE_RECVMSG_MULTISHOT:
if (auto* hdr =
cb.recvmsgMultishotCb_->allocateRecvmsgMultishotData()) {
prepRecvmsgMultishot(sqe, ev->ev_fd, &hdr->data_);
cbData_.set(hdr);
return;
}
break;
}
prepPollAdd(sqe, ev->ev_fd, getPollFlags(ev->ev_events));
}
}
virtual void processActive() {}
struct EventCallbackData {
EventCallback::Type type_{EventCallback::Type::TYPE_NONE};
union {
EventReadCallback::IoVec* ioVec_;
EventRecvmsgCallback::MsgHdr* msgHdr_;
EventRecvmsgMultishotCallback::Hdr* hdr_;
};
void set(EventReadCallback::IoVec* ioVec) {
type_ = EventCallback::Type::TYPE_READ;
ioVec_ = ioVec;
}
void set(EventRecvmsgCallback::MsgHdr* msgHdr) {
type_ = EventCallback::Type::TYPE_RECVMSG;
msgHdr_ = msgHdr;
}
void set(EventRecvmsgMultishotCallback::Hdr* hdr) {
type_ = EventCallback::Type::TYPE_RECVMSG_MULTISHOT;
hdr_ = hdr;
}
void reset() { type_ = EventCallback::Type::TYPE_NONE; }
bool processCb(IoUringBackend* backend, int res, uint32_t flags) {
bool ret = false;
bool released = false;
switch (type_) {
case EventCallback::Type::TYPE_READ: {
released = ret = true;
auto cbFunc = ioVec_->cbFunc_;
cbFunc(ioVec_, res);
break;
}
case EventCallback::Type::TYPE_RECVMSG: {
released = ret = true;
auto cbFunc = msgHdr_->cbFunc_;
cbFunc(msgHdr_, res);
break;
}
case EventCallback::Type::TYPE_RECVMSG_MULTISHOT: {
ret = true;
std::unique_ptr<IOBuf> buf;
if (flags & IORING_CQE_F_BUFFER) {
if (IoUringBufferProviderBase* bp = backend->bufferProvider()) {
buf = bp->getIoBuf(flags >> 16, res);
}
}
hdr_->cbFunc_(hdr_, res, std::move(buf));
if (!(flags & IORING_CQE_F_MORE)) {
hdr_->freeFunc_(hdr_);
released = true;
}
break;
}
case EventCallback::Type::TYPE_NONE:
break;
}
if (released) {
type_ = EventCallback::Type::TYPE_NONE;
}
return ret;
}
void releaseData() {
switch (type_) {
case EventCallback::Type::TYPE_READ: {
auto freeFunc = ioVec_->freeFunc_;
freeFunc(ioVec_);
break;
}
case EventCallback::Type::TYPE_RECVMSG: {
auto freeFunc = msgHdr_->freeFunc_;
freeFunc(msgHdr_);
break;
}
case EventCallback::Type::TYPE_RECVMSG_MULTISHOT:
hdr_->freeFunc_(hdr_);
break;
case EventCallback::Type::TYPE_NONE:
break;
}
type_ = EventCallback::Type::TYPE_NONE;
}
};
EventCallbackData cbData_;
void prepPollAdd(
struct io_uring_sqe* sqe, int fd, uint32_t events) noexcept {
CHECK(sqe);
::io_uring_prep_poll_add(sqe, fd, events);
::io_uring_sqe_set_data(sqe, this);
}
void prepRead(
struct io_uring_sqe* sqe,
int fd,
const struct iovec* iov,
off_t offset,
bool registerFd) noexcept {
prepUtilFunc(
::io_uring_prep_read,
sqe,
registerFd,
fd,
iov->iov_base,
(unsigned int)iov->iov_len,
offset);
}
void prepWrite(
struct io_uring_sqe* sqe,
int fd,
const struct iovec* iov,
off_t offset,
bool registerFd) noexcept {
prepUtilFunc(
::io_uring_prep_write,
sqe,
registerFd,
fd,
iov->iov_base,
(unsigned int)iov->iov_len,
offset);
}
void prepRecvmsg(
struct io_uring_sqe* sqe,
int fd,
struct msghdr* msg,
bool registerFd) noexcept {
prepUtilFunc(
::io_uring_prep_recvmsg, sqe, registerFd, fd, msg, MSG_TRUNC);
}
template <typename Fn, typename... Args>
void prepUtilFunc(
Fn fn,
struct io_uring_sqe* sqe,
bool registerFd,
int fd,
Args... args) {
CHECK(sqe);
if (registerFd && !fdRecord_) {
fdRecord_ = backend_->registerFd(fd);
}
if (fdRecord_) {
fn(sqe, fdRecord_->idx_, std::forward<Args>(args)...);
sqe->flags |= IOSQE_FIXED_FILE;
} else {
fn(sqe, fd, std::forward<Args>(args)...);
}
::io_uring_sqe_set_data(sqe, this);
}
void prepRecvmsgMultishot(
struct io_uring_sqe* sqe, int fd, struct msghdr* msg) noexcept {
CHECK(sqe);
::io_uring_prep_recvmsg(sqe, fd, msg, MSG_TRUNC);
constexpr uint16_t kMultishotFlag = 1U << 1;
sqe->ioprio |= kMultishotFlag;
if (IoUringBufferProviderBase* bp = backend_->bufferProvider()) {
sqe->buf_group = bp->gid();
sqe->flags |= IOSQE_BUFFER_SELECT;
}
::io_uring_sqe_set_data(sqe, this);
}
FOLLY_ALWAYS_INLINE void prepCancel(
struct io_uring_sqe* sqe, IoSqe* cancel_sqe) {
CHECK(sqe);
::io_uring_prep_cancel(sqe, UserData{cancel_sqe}, 0);
::io_uring_sqe_set_data(sqe, this);
}
};
using IoSqeBaseList = boost::intrusive::
list<IoSqeBase, boost::intrusive::constant_time_size<false>>;
using IoSqeList = boost::intrusive::
list<IoSqe, boost::intrusive::constant_time_size<false>>;
struct FileOpIoSqe : public IoSqe {
FileOpIoSqe(IoUringBackend* backend, int fd, FileOpCallback&& cb)
: IoSqe(backend, false), fd_(fd), cb_(std::move(cb)) {}
void processActive() override { cb_(res_); }
int fd_{-1};
FileOpCallback cb_;
};
struct ReadWriteIoSqe : public FileOpIoSqe {
ReadWriteIoSqe(
IoUringBackend* backend,
int fd,
const struct iovec* iov,
off_t offset,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)),
iov_(iov, iov + 1),
offset_(offset) {}
ReadWriteIoSqe(
IoUringBackend* backend,
int fd,
Range<const struct iovec*> iov,
off_t offset,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), iov_(iov), offset_(offset) {}
void processActive() override { cb_(res_); }
static constexpr size_t kNumInlineIoVec = 4;
folly::small_vector<struct iovec> iov_;
off_t offset_;
};
struct ReadIoSqe : public ReadWriteIoSqe {
using ReadWriteIoSqe::ReadWriteIoSqe;
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
prepRead(sqe, fd_, iov_.data(), offset_, false);
}
};
struct WriteIoSqe : public ReadWriteIoSqe {
using ReadWriteIoSqe::ReadWriteIoSqe;
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
prepWrite(sqe, fd_, iov_.data(), offset_, false);
}
};
struct ReadvIoSqe : public ReadWriteIoSqe {
using ReadWriteIoSqe::ReadWriteIoSqe;
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_readv(
sqe, fd_, iov_.data(), (unsigned int)iov_.size(), offset_);
::io_uring_sqe_set_data(sqe, this);
}
};
struct WritevIoSqe : public ReadWriteIoSqe {
using ReadWriteIoSqe::ReadWriteIoSqe;
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_writev(
sqe, fd_, iov_.data(), (unsigned int)iov_.size(), offset_);
::io_uring_sqe_set_data(sqe, this);
}
};
enum class FSyncFlags {
FLAGS_FSYNC = 0,
FLAGS_FDATASYNC = 1,
};
struct FSyncIoSqe : public FileOpIoSqe {
FSyncIoSqe(
IoUringBackend* backend, int fd, FSyncFlags flags, FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), flags_(flags) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
unsigned int fsyncFlags = 0;
switch (flags_) {
case FSyncFlags::FLAGS_FSYNC:
fsyncFlags = 0;
break;
case FSyncFlags::FLAGS_FDATASYNC:
fsyncFlags = IORING_FSYNC_DATASYNC;
break;
}
::io_uring_prep_fsync(sqe, fd_, fsyncFlags);
::io_uring_sqe_set_data(sqe, this);
}
FSyncFlags flags_;
};
struct FOpenAtIoSqe : public FileOpIoSqe {
FOpenAtIoSqe(
IoUringBackend* backend,
int dfd,
const char* path,
int flags,
mode_t mode,
FileOpCallback&& cb)
: FileOpIoSqe(backend, dfd, std::move(cb)),
path_(path),
flags_(flags),
mode_(mode) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_openat(sqe, fd_, path_.c_str(), flags_, mode_);
::io_uring_sqe_set_data(sqe, this);
}
std::string path_;
int flags_;
mode_t mode_;
};
struct FOpenAt2IoSqe : public FileOpIoSqe {
FOpenAt2IoSqe(
IoUringBackend* backend,
int dfd,
const char* path,
struct open_how* how,
FileOpCallback&& cb)
: FileOpIoSqe(backend, dfd, std::move(cb)), path_(path), how_(*how) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_openat2(sqe, fd_, path_.c_str(), &how_);
::io_uring_sqe_set_data(sqe, this);
}
std::string path_;
struct open_how how_;
};
struct FCloseIoSqe : public FileOpIoSqe {
using FileOpIoSqe::FileOpIoSqe;
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_close(sqe, fd_);
::io_uring_sqe_set_data(sqe, this);
}
};
struct FStatxIoSqe : public FileOpIoSqe {
FStatxIoSqe(
IoUringBackend* backend,
int dfd,
const char* pathname,
int flags,
unsigned int mask,
struct statx* statxbuf,
FileOpCallback&& cb)
: FileOpIoSqe(backend, dfd, std::move(cb)),
path_(pathname),
flags_(flags),
mask_(mask),
statxbuf_(statxbuf) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_statx(sqe, fd_, path_, flags_, mask_, statxbuf_);
::io_uring_sqe_set_data(sqe, this);
}
const char* path_;
int flags_;
unsigned int mask_;
struct statx* statxbuf_;
};
struct FAllocateIoSqe : public FileOpIoSqe {
FAllocateIoSqe(
IoUringBackend* backend,
int fd,
int mode,
off_t offset,
off_t len,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)),
mode_(mode),
offset_(offset),
len_(len) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_fallocate(sqe, fd_, mode_, offset_, len_);
::io_uring_sqe_set_data(sqe, this);
}
int mode_;
off_t offset_;
off_t len_;
};
struct SendmsgIoSqe : public FileOpIoSqe {
SendmsgIoSqe(
IoUringBackend* backend,
int fd,
const struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), msg_(msg), flags_(flags) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_sendmsg(sqe, fd_, msg_, flags_);
::io_uring_sqe_set_data(sqe, this);
}
const struct msghdr* msg_;
unsigned int flags_;
};
struct RecvmsgIoSqe : public FileOpIoSqe {
RecvmsgIoSqe(
IoUringBackend* backend,
int fd,
struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), msg_(msg), flags_(flags) {}
void processSubmit(struct io_uring_sqe* sqe) noexcept override {
::io_uring_prep_recvmsg(sqe, fd_, msg_, flags_);
::io_uring_sqe_set_data(sqe, this);
}
struct msghdr* msg_;
unsigned int flags_;
};
size_t getActiveEvents(WaitForEventsMode waitForEvents);
size_t prepList(IoSqeBaseList& ioSqes);
int submitOne();
int cancelOne(IoSqe* ioSqe);
int submitBusyCheck(int num, WaitForEventsMode waitForEvents) noexcept;
int submitEager();
void queueFsync(int fd, FSyncFlags flags, FileOpCallback&& cb);
void processFileOp(IoSqe* ioSqe, int res) noexcept;
static void processFileOpCB(
IoUringBackend* backend, IoSqe* ioSqe, int res, uint32_t) {
static_cast<IoUringBackend*>(backend)->processFileOp(ioSqe, res);
}
IoUringBackend::IoSqe* allocNewIoSqe(const EventCallback& ) {
auto* ret = new IoSqe(this, numPooledIoSqeInUse_ < numEntries_);
++numPooledIoSqeInUse_;
ret->backendCb_ = IoUringBackend::processPollIoSqe;
return ret;
}
void cleanup();
struct io_uring_sqe* getUntrackedSqe();
struct io_uring_sqe* getSqe();
void delayedInit();
void initSubmissionLinked();
Options options_;
size_t numEntries_;
std::unique_ptr<IoSqe> timerEntry_;
std::unique_ptr<IoSqe> signalReadEntry_;
IoSqeList freeList_;
bool usingDeferTaskrun_{false};
int timerFd_{-1};
bool timerChanged_{false};
bool timerSet_{false};
std::multimap<std::chrono::steady_clock::time_point, Event*> timers_;
SocketPair signalFds_;
std::map<int, std::set<Event*>> signals_;
IoSqeBaseList submitList_;
uint16_t bufferProviderGidNext_{0};
IoUringBufferProviderBase::UniquePtr bufferProvider_;
bool loopBreak_{false};
bool shuttingDown_{false};
bool processTimers_{false};
bool processSignals_{false};
IoSqeList activeEvents_;
size_t waitingToSubmit_{0};
size_t numInsertedEvents_{0};
size_t numInternalEvents_{0};
size_t numSendEvents_{0};
size_t numPooledIoSqeInUse_{0};
struct io_uring_params params_;
struct io_uring ioRing_;
FdRegistry fdRegistry_;
CQPollLoopCallback cqPollLoopCallback_;
bool needsDelayedInit_{true};
folly::Optional<std::thread::id> submitTid_;
int isSubmitting_{0};
bool gettingEvents_{false};
void dCheckSubmitTid();
void setSubmitting() noexcept { isSubmitting_++; }
void doneSubmitting() noexcept { isSubmitting_--; }
void setGetActiveEvents() {
if (kIsDebug && gettingEvents_) {
throw std::runtime_error("getting events is not reentrant");
gettingEvents_ = true;
}
}
void doneGetActiveEvents() noexcept { gettingEvents_ = false; }
bool isSubmitting() const noexcept { return isSubmitting_; }
};
using PollIoBackend = IoUringBackend;
}
#endif