#include <folly/io/async/AsyncIO.h>
#include <cerrno>
#include <ostream>
#include <stdexcept>
#include <string>
#include <boost/intrusive/parent_from_member.hpp>
#include <fmt/ostream.h>
#include <glog/logging.h>
#include <folly/Exception.h>
#include <folly/Likely.h>
#include <folly/String.h>
#include <folly/portability/Unistd.h>
#include <folly/small_vector.h>
#if __has_include(<sys/eventfd.h>)
#include <sys/eventfd.h>
#endif
#if __has_include(<libaio.h>)
namespace {
#define X …
const char* iocbCmdToString(short int cmd_short) {
auto cmd = static_cast<io_iocb_cmd>(cmd_short);
switch (cmd) {
X(IO_CMD_PREAD);
X(IO_CMD_PWRITE);
X(IO_CMD_FSYNC);
X(IO_CMD_FDSYNC);
X(IO_CMD_POLL);
X(IO_CMD_NOOP);
X(IO_CMD_PREADV);
X(IO_CMD_PWRITEV);
}
return "<INVALID io_iocb_cmd>";
}
#undef X
void toStream(std::ostream& os, const iocb& cb) {
fmt::print(
os,
"data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
cb.data,
cb.key,
iocbCmdToString(cb.aio_lio_opcode),
cb.aio_reqprio,
cb.aio_fildes,
folly::AsyncBaseOp::fd2name(cb.aio_fildes));
switch (cb.aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
fmt::print(
os,
"buf={}, offset={}, nbytes={}, ",
cb.u.c.buf,
cb.u.c.offset,
cb.u.c.nbytes);
break;
default:
os << "[TODO: write debug string for "
<< iocbCmdToString(cb.aio_lio_opcode) << "] ";
break;
}
}
}
namespace folly {
AsyncIOOp::AsyncIOOp(NotificationCallback cb) : AsyncBaseOp(std::move(cb)) {
memset(&iocb_, 0, sizeof(iocb_));
}
void AsyncIOOp::reset(NotificationCallback cb) {
CHECK_NE(state_, State::PENDING);
cb_ = std::move(cb);
state_ = State::UNINITIALIZED;
result_ = -EINVAL;
memset(&iocb_, 0, sizeof(iocb_));
}
AsyncIOOp::~AsyncIOOp() = default;
void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
init();
io_prep_pread(&iocb_, fd, buf, size, start);
}
void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
}
void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
init();
io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
}
void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
}
void AsyncIOOp::toStream(std::ostream& os) const {
os << "{" << state_ << ", ";
if (state_ != AsyncBaseOp::State::UNINITIALIZED) {
::toStream(os, iocb_);
}
if (state_ == AsyncBaseOp::State::COMPLETED) {
os << "result=" << result_;
if (result_ < 0) {
os << " (" << errnoStr(-result_) << ')';
}
os << ", ";
}
os << "}";
}
std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
op.toStream(os);
return os;
}
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: AsyncBase(capacity, pollMode) {
if (pollMode_ == POLLABLE) {
#if __has_include(<sys/eventfd.h>)
pollFd_ = eventfd(0, EFD_NONBLOCK);
checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
#else
#endif
}
}
AsyncIO::~AsyncIO() {
CHECK_EQ(pending_, 0);
if (ctx_) {
int rc = io_queue_release(ctx_);
CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
}
if (pollFd_ != -1) {
CHECK_ERR(close(pollFd_));
pollFd_ = -1;
}
}
void AsyncIO::initializeContext() {
if (!init_.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lock(initMutex_);
if (!init_.load(std::memory_order_relaxed)) {
int rc = io_queue_init(capacity_, &ctx_);
if (rc == -EAGAIN) {
long aio_nr, aio_max;
std::unique_ptr<FILE, int (*)(FILE*)> fp(
fopen("/proc/sys/fs/aio-nr", "r"), fclose);
PCHECK(fp);
CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
PCHECK(aio_max_fp);
CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
LOG(ERROR) << "No resources for requested capacity of " << capacity_;
LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
}
checkKernelError(rc, "AsyncIO: io_queue_init failed");
DCHECK(ctx_);
init_.store(true, std::memory_order_release);
}
}
}
int AsyncIO::drainPollFd() {
uint64_t numEvents;
ssize_t rc;
do {
rc = ::read(pollFd_, &numEvents, 8);
} while (rc == -1 && errno == EINTR);
if (FOLLY_UNLIKELY(rc == -1 && errno == EAGAIN)) {
return 0;
}
checkUnixError(rc, "AsyncIO: read from event fd failed");
DCHECK_EQ(rc, 8);
DCHECK_GT(numEvents, 0);
return static_cast<int>(numEvents);
}
int AsyncIO::submitOne(AsyncBase::Op* op) {
AsyncIOOp* aop = op->getAsyncIOOp();
if (!aop) {
return -1;
}
iocb* cb = &aop->iocb_;
cb->data = nullptr;
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
return io_submit(ctx_, 1, &cb);
}
int AsyncIO::submitRange(Range<AsyncBase::Op**> ops) {
std::vector<iocb*> vec;
vec.reserve(ops.size());
for (auto& op : ops) {
AsyncIOOp* aop = op->getAsyncIOOp();
if (!aop) {
continue;
}
iocb* cb = &aop->iocb_;
cb->data = nullptr;
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
vec.push_back(cb);
}
return vec.size() ? io_submit(ctx_, vec.size(), vec.data()) : -1;
}
Range<AsyncBase::Op**> AsyncIO::doWait(
WaitType type,
size_t minRequests,
size_t maxRequests,
std::vector<AsyncBase::Op*>& result) {
size_t constexpr kNumInlineRequests = 16;
folly::small_vector<io_event, kNumInlineRequests> events(maxRequests);
size_t count = 0;
do {
int ret;
do {
ret = io_getevents(
ctx_,
minRequests - count,
maxRequests - count,
events.data() + count,
nullptr);
} while (ret == -EINTR);
CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
<< errnoStr(-ret);
count += ret;
} while (count < minRequests);
DCHECK_LE(count, maxRequests);
result.clear();
for (size_t i = 0; i < count; ++i) {
CHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
decrementPending();
switch (type) {
case WaitType::COMPLETE:
complete(op, events[i].res);
break;
case WaitType::CANCEL:
cancel(op);
break;
}
result.push_back(op);
}
return range(result);
}
}
#endif