#include <folly/compression/QuotientMultiSet.h>
#include <cmath>
#include <folly/Math.h>
#if FOLLY_QUOTIENT_MULTI_SET_SUPPORTED
namespace folly {
QuotientMultiSetBuilder::QuotientMultiSetBuilder(
size_t keyBits, size_t expectedElements, double loadFactor)
: keyBits_(keyBits), maxKey_(qms_detail::maxValue(keyBits_)) {
expectedElements = std::max<size_t>(expectedElements, 1);
uint64_t numSlots = to_integral(ceil(expectedElements / loadFactor));
divisor_ = divCeil(maxKey_, numSlots);
remainderBits_ = findLastSet(divisor_ - 1);
if (remainderBits_ > 56) {
remainderBits_ = 56;
divisor_ = uint64_t(1) << remainderBits_;
}
blockSize_ = Block::blockSize(remainderBits_);
fraction_ = qms_detail::getInverse(divisor_);
}
QuotientMultiSetBuilder::~QuotientMultiSetBuilder() = default;
bool QuotientMultiSetBuilder::maybeAllocateBlocks(size_t limitIndex) {
bool blockAllocated = false;
for (; numBlocks_ <= limitIndex; numBlocks_++) {
auto block = Block::make(remainderBits_);
blocks_.emplace_back(std::move(block), numBlocks_);
blockAllocated = true;
}
return blockAllocated;
}
bool QuotientMultiSetBuilder::insert(uint64_t key) {
FOLLY_SAFE_CHECK(key <= maxKey_, "Invalid key");
FOLLY_SAFE_CHECK(
key >= prevKey_, "Keys need to be inserted in nondecreasing order");
const auto qr = qms_detail::getQuotientAndRemainder(key, divisor_, fraction_);
const auto& quotient = qr.first;
const auto& remainder = qr.second;
const size_t blockIndex = quotient / kBlockSize;
const size_t offsetInBlock = quotient % kBlockSize;
bool newBlockAllocated = false;
newBlockAllocated |= maybeAllocateBlocks(
std::max<uint64_t>(blockIndex, nextSlot_ / kBlockSize));
auto block = getBlock(nextSlot_ / kBlockSize).block.get();
if (prevOccupiedQuotient_ != quotient) {
closePreviousRun();
if (blockIndex > nextSlot_ / kBlockSize) {
nextSlot_ = (blockIndex * kBlockSize);
newBlockAllocated |= maybeAllocateBlocks(blockIndex);
block = getBlock(blockIndex).block.get();
}
prevRunStart_ = nextSlot_;
prevOccupiedQuotient_ = quotient;
}
block->setRemainder(nextSlot_ % kBlockSize, remainderBits_, remainder);
block = getBlock(blockIndex).block.get();
block->setOccupied(offsetInBlock);
nextSlot_++;
prevKey_ = key;
numKeys_++;
return newBlockAllocated;
}
void QuotientMultiSetBuilder::setBlockPayload(uint64_t payload) {
DCHECK(!blocks_.empty());
blocks_.back().block->payload = payload;
}
void QuotientMultiSetBuilder::closePreviousRun() {
if (FOLLY_UNLIKELY(nextSlot_ == 0)) {
return;
}
const auto runEnd = nextSlot_ - 1;
auto block = getBlock(runEnd / kBlockSize).block.get();
block->setRunend(runEnd % kBlockSize);
numRuns_++;
auto prevRunOccupiedBlock =
getBlock(prevOccupiedQuotient_ / kBlockSize).block.get();
if (isPowTwo(prevRunOccupiedBlock->occupieds)) {
prevRunOccupiedBlock->offset = runEnd;
}
size_t limitIndex = (prevOccupiedQuotient_ + 1) / kBlockSize;
for (size_t idx = readyBlocks_; idx < blocks_.size(); idx++) {
if (blocks_[idx].index < limitIndex) {
blocks_[idx].ready = true;
readyBlocks_++;
} else {
break;
}
}
}
void QuotientMultiSetBuilder::moveReadyBlocks(IOBufQueue& buff) {
while (!blocks_.empty()) {
if (!blocks_.front().ready) {
break;
}
buff.append(
IOBuf::takeOwnership(blocks_.front().block.release(), blockSize_));
blocks_.pop_front();
}
}
void QuotientMultiSetBuilder::flush(IOBufQueue& buff) {
moveReadyBlocks(buff);
readyBlocks_ = 0;
}
void QuotientMultiSetBuilder::close(IOBufQueue& buff) {
closePreviousRun();
for (auto iter = blocks_.rbegin(); iter != blocks_.rend(); iter++) {
if (iter->ready) {
break;
}
iter->ready = true;
}
moveReadyBlocks(buff);
static_assert(sizeof(Metadata) > 7, "getRemainder() is not safe");
auto metadata = reinterpret_cast<Metadata*>(calloc(1, sizeof(Metadata)));
metadata->numBlocks = numBlocks_;
metadata->numKeys = numKeys_;
metadata->divisor = divisor_;
metadata->keyBits = keyBits_;
metadata->remainderBits = remainderBits_;
VLOG(2) << "Metadata: " << metadata->debugString();
buff.append(IOBuf::takeOwnership(metadata, sizeof(Metadata)));
}
}
#endif