#pragma once
#include <folly/experimental/io/Liburing.h>
#include <folly/io/async/IoUringBase.h>
#include <folly/portability/SysMman.h>
#if FOLLY_HAS_LIBURING
#include <liburing.h>
namespace folly {
class IoUringProvidedBufferRing : public IoUringBufferProviderBase {
public:
class LibUringCallError : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};
IoUringProvidedBufferRing(
io_uring* ioRingPtr,
uint16_t gid,
int count,
int bufferShift,
int ringSizeShift);
void enobuf() noexcept override;
void unusedBuf(uint16_t i) noexcept override;
void destroy() noexcept override;
std::unique_ptr<IOBuf> getIoBuf(uint16_t i, size_t length) noexcept override;
uint32_t count() const noexcept override { return buffer_.bufferCount(); }
bool available() const noexcept override {
return !enobuf_.load(std::memory_order_relaxed);
}
private:
void initialRegister();
void returnBufferInShutdown() noexcept;
void returnBuffer(uint16_t i) noexcept;
std::atomic<uint16_t>* sharedTail() {
return reinterpret_cast<std::atomic<uint16_t>*>(&buffer_.ring()->tail);
}
bool tryPublish(uint16_t expected, uint16_t value) noexcept {
return sharedTail()->compare_exchange_strong(
expected, value, std::memory_order_release);
}
char const* getData(uint16_t i) { return buffer_.buffer(i); }
class ProvidedBuffersBuffer {
public:
ProvidedBuffersBuffer(
int count, int bufferShift, int ringCountShift, bool huge_pages);
~ProvidedBuffersBuffer() { ::munmap(buffer_, allSize_); }
static size_t calcBufferSize(int bufferShift) {
return 1LLU << std::max<int>(5, bufferShift);
}
struct io_uring_buf_ring* ring() const noexcept { return ringPtr_; }
struct io_uring_buf* ringBuf(int idx) const noexcept {
return &ringPtr_->bufs[idx & ringMask_];
}
uint32_t bufferCount() const noexcept { return bufferCount_; }
uint32_t ringCount() const noexcept { return 1 + ringMask_; }
char* buffer(uint16_t idx) {
size_t offset = (size_t)idx << bufferShift_;
return bufferBuffer_ + offset;
}
size_t sizePerBuffer() const { return sizePerBuffer_; }
private:
void* buffer_;
size_t allSize_;
size_t ringMemSize_;
struct io_uring_buf_ring* ringPtr_;
int ringMask_;
size_t bufferSize_;
size_t bufferShift_;
size_t sizePerBuffer_;
char* bufferBuffer_;
uint32_t bufferCount_;
static constexpr size_t kHugePageMask = (1LLU << 21) - 1;
static constexpr size_t kPageMask = (1LLU << 12) - 1;
static constexpr size_t kBufferAlignMask{31LLU};
};
io_uring* ioRingPtr_;
ProvidedBuffersBuffer buffer_;
std::atomic<bool> enobuf_{false};
std::vector<IoUringProvidedBufferRing*> ioBufCallbacks_;
uint64_t gottenBuffers_{0};
std::atomic<uint64_t> returnedBuffers_{0};
std::atomic<bool> wantsShutdown_{false};
std::atomic<uint32_t> shutdownReferences_;
std::mutex shutdownMutex_;
};
}
#endif