#include <folly/io/async/EventBasePoller.h>
#include <atomic>
#include <stdexcept>
#include <boost/polymorphic_cast.hpp>
#include <fmt/format.h>
#include <glog/logging.h>
#include <folly/FileUtil.h>
#include <folly/String.h>
#include <folly/experimental/io/Epoll.h>
#include <folly/experimental/io/Liburing.h>
#include <folly/lang/Align.h>
#include <folly/portability/GFlags.h>
#include <folly/synchronization/Baton.h>
#include <folly/system/ThreadName.h>
#if FOLLY_HAS_EPOLL
#include <poll.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#endif
#if FOLLY_HAS_LIBURING
#include <liburing.h>
#endif
FOLLY_GFLAGS_DEFINE_string(…);
FOLLY_GFLAGS_DEFINE_uint64(…);
FOLLY_GFLAGS_DEFINE_uint64(…);
FOLLY_GFLAGS_DEFINE_uint64(…);
FOLLY_GFLAGS_DEFINE_bool(…);
FOLLY_GFLAGS_DEFINE_uint64(…);
namespace folly::detail {
namespace {
template <class T>
class Queue { … };
#if FOLLY_HAS_EPOLL
class EventBasePollerImpl : public EventBasePoller {
class FdGroupImpl;
public:
explicit EventBasePollerImpl(bool rearmInline)
: rearmInline_(rearmInline),
notificationEv_(
Event::NotificationFd{}, ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) {
PCHECK(notificationEv_.fd >= 0);
}
~EventBasePollerImpl() override {
CHECK_EQ(numGroups_.load(), 0)
<< "All groups must be destroyed before EventBasePoller";
::close(notificationEv_.fd);
}
std::unique_ptr<FdGroup> makeFdGroup(ReadyCallback readyCallback) override;
protected:
void startLoop() {
Baton<> started;
loopThread_ =
std::make_unique<std::thread>([this, &started]() { loop(started); });
started.wait();
}
void stopLoop() {
stop_ = true;
notifyEvfd();
loopThread_->join();
}
struct Event final : public Handle {
struct NotificationFd {};
Event(FdGroupImpl& group_, int fd_, void* userData)
: Handle(userData), group(&group_), fd(fd_) {}
Event(NotificationFd, int fd_) : Handle(nullptr), group(nullptr), fd(fd_) {}
~Event() override {
CHECK(isNotificationFd() || joined_.ready())
<< "Handle must be reclaimed before destruction";
}
bool isNotificationFd() const { return group == nullptr; }
FOLLY_ALWAYS_INLINE void markReady() {
#ifdef FOLLY_SANITIZE_THREAD
CHECK(!ready_.exchange(true, std::memory_order_acq_rel));
#endif
}
FOLLY_ALWAYS_INLINE void markProcessed() {
#ifdef FOLLY_SANITIZE_THREAD
CHECK(ready_.exchange(false, std::memory_order_release));
#endif
}
void handoff(bool done) override;
void handleHandoff();
void join();
FdGroupImpl* group;
const int fd;
bool registered = false;
Event* next{nullptr};
private:
bool joining_ = false;
Baton<> joined_;
#ifdef FOLLY_SANITIZE_THREAD
std::atomic<bool> ready_{true};
#endif
};
private:
virtual void setup() = 0;
virtual void teardown() = 0;
virtual void addEvent(Event* event) = 0;
virtual void delEvent(Event* event) = 0;
virtual bool waitForEvents(
std::chrono::steady_clock::time_point loopStart) = 0;
void notifyEvfd();
void returnEvent(Event* event);
void handleNotification();
void handleReadyEvents();
void loop(folly::Baton<>& started);
protected:
const bool rearmInline_;
std::vector<Event*> readyEvents_;
private:
std::atomic<size_t> numGroups_{0};
Event notificationEv_;
std::vector<std::unique_ptr<Event>> events_;
std::unique_ptr<std::thread> loopThread_;
std::vector<Handle*> readyHandles_;
std::atomic<bool> stop_{false};
Queue<Event> returnQueue_;
};
class EventBasePollerImpl::FdGroupImpl final : public FdGroup {
public:
FdGroupImpl(EventBasePollerImpl& parent_, ReadyCallback readyCallback_)
: parent(parent_), readyCallback(std::move(readyCallback_)) {
++parent.numGroups_;
}
~FdGroupImpl() override {
CHECK_EQ(numHandles_, 0)
<< "All EventBases must be reclaimed before group destruction";
--parent.numGroups_;
}
std::unique_ptr<Handle> add(int fd, void* userData) override {
auto handle = std::make_unique<Event>(*this, fd, userData);
++numHandles_;
Handle* handlePtr = handle.get();
readyCallback({&handlePtr, 1});
return handle;
}
void reclaim(std::unique_ptr<Handle> handle) override {
boost::polymorphic_downcast<Event*>(handle.get())->join();
--numHandles_;
}
EventBasePollerImpl& parent;
const ReadyCallback readyCallback;
private:
size_t numHandles_ = 0;
};
void EventBasePollerImpl::Event::handoff(bool done) {
DCHECK(!isNotificationFd());
CHECK(!joining_);
joining_ = done;
if (group->parent.rearmInline_) {
handleHandoff();
} else {
group->parent.returnEvent(this);
}
}
void EventBasePollerImpl::Event::handleHandoff() {
DCHECK(!isNotificationFd());
if (joining_) {
group->parent.delEvent(this);
joined_.post();
return;
}
markProcessed();
group->parent.addEvent(this);
}
void EventBasePollerImpl::Event::join() {
DCHECK(!isNotificationFd());
joined_.wait();
}
void EventBasePollerImpl::notifyEvfd() {
uint64_t val = 1;
auto ret = writeNoInt(notificationEv_.fd, &val, sizeof(val));
PCHECK(ret == sizeof(val)) << ret;
}
void EventBasePollerImpl::returnEvent(Event* event) {
if (returnQueue_.insert(event)) {
notifyEvfd();
}
}
std::unique_ptr<EventBasePoller::FdGroup> EventBasePollerImpl::makeFdGroup(
ReadyCallback readyCallback) {
return std::make_unique<FdGroupImpl>(*this, std::move(readyCallback));
}
void EventBasePollerImpl::handleNotification() {
while (auto* event = returnQueue_.arm()) {
while (event) {
auto* next = std::exchange(event->next, nullptr);
event->handleHandoff();
event = next;
}
}
notificationEv_.markProcessed();
addEvent(¬ificationEv_);
}
void EventBasePollerImpl::handleReadyEvents() {
CHECK(!readyEvents_.empty());
std::sort(
readyEvents_.begin(),
readyEvents_.end(),
[](const Event* lhs, const Event* rhs) {
return std::greater<>{}(lhs->group, rhs->group);
});
auto it = readyEvents_.begin();
FdGroupImpl* curGroup = (*it)->group;
while (true) {
if (it == readyEvents_.end() || (*it)->group != curGroup) {
if (curGroup == nullptr) {
CHECK_EQ(readyHandles_.size(), 1);
CHECK_EQ(readyHandles_[0], ¬ificationEv_);
handleNotification();
} else {
curGroup->readyCallback(folly::range(readyHandles_));
}
readyHandles_.clear();
if (it == readyEvents_.end()) {
break;
}
curGroup = (*it)->group;
}
(*it)->markReady();
readyHandles_.push_back(*it++);
}
readyEvents_.clear();
}
void EventBasePollerImpl::loop(folly::Baton<>& started) {
setThreadName("EventBasePoller");
setup();
handleNotification();
started.post();
auto lastLoopTs = std::chrono::steady_clock::now();
while (!stop_.load()) {
if (!waitForEvents(lastLoopTs)) {
continue;
}
auto waitEndTs = std::chrono::steady_clock::now();
handleReadyEvents();
auto busyEndTs = std::chrono::steady_clock::now();
stats_.wlock()->update(
readyEvents_.size(), waitEndTs - lastLoopTs, busyEndTs - waitEndTs);
lastLoopTs = busyEndTs;
}
}
class EventBasePollerEpoll final : public EventBasePollerImpl {
public:
EventBasePollerEpoll()
: EventBasePollerImpl(FLAGS_folly_event_base_poller_epoll_rearm_inline) {
startLoop();
}
~EventBasePollerEpoll() override { stopLoop(); }
void setup() override {
epFd_ = ::epoll_create1(EPOLL_CLOEXEC);
PCHECK(epFd_ > 0);
}
void teardown() override { ::close(epFd_); }
void addEvent(Event* event) override {
if (event->isNotificationFd() && event->registered) {
return;
}
epoll_event ev = {};
ev.data.ptr = event;
ev.events = EPOLLIN;
int op = EPOLL_CTL_ADD;
if (!event->isNotificationFd()) {
ev.events |= EPOLLONESHOT;
if (FOLLY_UNLIKELY(!event->registered)) {
event->registered = true;
} else {
op = EPOLL_CTL_MOD;
}
} else {
ev.events |= EPOLLET;
event->registered = true;
}
auto ret = ::epoll_ctl(epFd_, op, event->fd, &ev);
PCHECK(ret == 0);
}
void delEvent(Event* event) override {
CHECK(!event->isNotificationFd());
if (!event->registered) {
return;
}
auto ret = ::epoll_ctl(epFd_, EPOLL_CTL_DEL, event->fd, nullptr);
PCHECK(ret == 0);
}
bool waitForEvents(std::chrono::steady_clock::time_point loopStart) override {
int ret;
auto spinUntil = loopStart +
std::chrono::microseconds{FLAGS_folly_event_base_poller_spin_us};
do {
ret = ::epoll_wait(epFd_, epollEvents_.data(), kMaxEvents, 0);
} while (ret <= 0 && std::chrono::steady_clock::now() < spinUntil);
if (ret <= 0) {
if (auto sleepUs = FLAGS_folly_event_base_poller_sleep_us) {
std::this_thread::sleep_for(std::chrono::microseconds{sleepUs});
}
ret = ::epoll_wait(epFd_, epollEvents_.data(), kMaxEvents, -1);
}
if (ret <= 0) {
return false;
}
for (int i = 0; i < ret; ++i) {
readyEvents_.push_back(
CHECK_NOTNULL(reinterpret_cast<Event*>(epollEvents_[i].data.ptr)));
}
return true;
}
private:
const size_t kMaxEvents = FLAGS_folly_event_base_poller_epoll_max_events;
int epFd_{-1};
std::vector<struct epoll_event> epollEvents_{kMaxEvents};
};
#if FOLLY_HAS_LIBURING
namespace {
void enableFlagsIfSupported(
struct io_uring_params& params, uint32_t desiredFlags, const char* msg) {
struct io_uring_params tmpParams;
::memset(¶ms, 0, sizeof(tmpParams));
tmpParams.flags = desiredFlags;
int fd = ::io_uring_setup(1, &tmpParams);
if (fd >= 0) {
::close(fd);
VLOG(1) << "io_uring flags " << msg << " supported";
params.flags |= desiredFlags;
} else if (fd == -EINVAL) {
VLOG(1) << "io_uring flags " << msg << " NOT supported";
} else {
LOG(ERROR) << "Unexpected error " << folly::errnoStr(-fd)
<< " while probing supported io_uring flags";
}
}
#define ENABLE_FLAGS_IF_SUPPORTED …
}
class EventBasePollerIoUring final : public EventBasePollerImpl {
public:
EventBasePollerIoUring()
: EventBasePollerImpl( false) {
startLoop();
}
~EventBasePollerIoUring() override { stopLoop(); }
void setup() override {
::memset(&ring_, 0, sizeof(ring_));
struct io_uring_params params;
::memset(¶ms, 0, sizeof(params));
ENABLE_FLAGS_IF_SUPPORTED(
params, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
ENABLE_FLAGS_IF_SUPPORTED(
params, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG);
int ret = ::io_uring_queue_init_params(
FLAGS_folly_event_base_poller_io_uring_sq_entries, &ring_, ¶ms);
CHECK(ret == 0) << "Error creating io_uring: " << folly::errnoStr(-ret);
}
void teardown() override { ::io_uring_queue_exit(&ring_); }
void addEvent(Event* event) override {
auto* sqe = ::io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
submitPendingSqes();
sqe = ::io_uring_get_sqe(&ring_);
CHECK(sqe != nullptr);
}
++numPendingSqes_;
::io_uring_sqe_set_data(sqe, event);
if (event->isNotificationFd()) {
::io_uring_prep_read(
sqe, event->fd, &eventFdBuf_, sizeof(eventFdBuf_), 0);
} else {
::io_uring_prep_poll_add(sqe, event->fd, POLLIN);
}
}
void delEvent(Event* ) override {
}
bool waitForEvents(std::chrono::steady_clock::time_point loopStart) override {
if (numPendingSqes_ > 0) {
submitPendingSqes();
}
int ret;
struct io_uring_cqe* cqe = nullptr;
auto spinUntil = loopStart +
std::chrono::microseconds{FLAGS_folly_event_base_poller_spin_us};
do {
ret = ::io_uring_peek_cqe(&ring_, &cqe);
} while (ret != 0 && std::chrono::steady_clock::now() < spinUntil);
if (auto sleepUs = FLAGS_folly_event_base_poller_sleep_us;
ret != 0 && sleepUs > 0) {
struct __kernel_timespec timeout;
timeout.tv_sec = sleepUs / 1'000'000;
timeout.tv_nsec = (sleepUs % 1'000'000) * 1'000;
ret = ::io_uring_wait_cqes(
&ring_,
&cqe,
std::numeric_limits<unsigned>::max(),
&timeout,
nullptr);
}
if (ret != 0 || cqe == nullptr) {
ret = ::io_uring_wait_cqe(&ring_, &cqe);
}
if (ret != 0 || cqe == nullptr) {
return false;
}
DCHECK_EQ(readyEvents_.size(), 0);
unsigned head;
io_uring_for_each_cqe(&ring_, head, cqe) {
auto* event =
CHECK_NOTNULL(static_cast<Event*>(io_uring_cqe_get_data(cqe)));
if (event->isNotificationFd()) {
CHECK_EQ(cqe->res, sizeof(eventFdBuf_)) << errnoStr(-cqe->res);
} else {
CHECK_GE(cqe->res, 0) << errnoStr(-cqe->res);
}
readyEvents_.push_back(event);
}
::io_uring_cq_advance(&ring_, readyEvents_.size());
return true;
}
private:
void submitPendingSqes() {
auto ret = ::io_uring_submit(&ring_);
CHECK_EQ(ret, numPendingSqes_);
numPendingSqes_ = 0;
}
struct io_uring ring_;
size_t numPendingSqes_ = 0;
alignas(cacheline_align_v) uint64_t eventFdBuf_;
};
#endif
#endif
}
void EventBasePoller::Stats::update(
int numEvents, Duration wait, Duration busy) { … }
EventBasePoller::Handle::~Handle() = default;
EventBasePoller::FdGroup::~FdGroup() = default;
EventBasePoller::~EventBasePoller() = default;
EventBasePoller& EventBasePoller::get() { … }
}