folly/folly/compression/test/CompressionTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/compression/Compression.h>

#include <algorithm>
#include <random>
#include <set>
#include <thread>
#include <unordered_map>
#include <utility>

#include <glog/logging.h>

#include <folly/Random.h>
#include <folly/Varint.h>
#include <folly/hash/Hash.h>
#include <folly/io/IOBufQueue.h>
#include <folly/portability/GTest.h>

#if FOLLY_HAVE_LIBZSTD
#include <zstd.h>

#include <folly/compression/Zstd.h>
#endif

#if FOLLY_HAVE_LIBZ
#include <folly/compression/Zlib.h>

namespace zlib = folly::compression::zlib;
#endif

namespace folly {
namespace compression {
namespace test {

class DataHolder {
 public:
  DataHolder(const DataHolder&) = delete;
  DataHolder& operator=(const DataHolder&) = delete;
  uint64_t hash(size_t size) const;
  ByteRange data(size_t size) const;

 protected:
  explicit DataHolder(size_t sizeLog2);
  const size_t size_;
  std::unique_ptr<uint8_t[]> data_;
  mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
};

DataHolder::DataHolder(size_t sizeLog2)
    : size_(size_t(1) << sizeLog2), data_(new uint8_t[size_]) {}

uint64_t DataHolder::hash(size_t size) const {
  CHECK_LE(size, size_);
  auto p = hashCache_.find(size);
  if (p != hashCache_.end()) {
    return p->second;
  }

  uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
  hashCache_[size] = h;
  return h;
}

ByteRange DataHolder::data(size_t size) const {
  CHECK_LE(size, size_);
  return ByteRange(data_.get(), size);
}

uint64_t hashIOBuf(const IOBuf* buf) {
  uint64_t h = folly::hash::fnv64_hash_start;
  for (auto& range : *buf) {
    h = folly::hash::fnv64_buf(range.data(), range.size(), h);
  }
  return h;
}

class RandomDataHolder : public DataHolder {
 public:
  explicit RandomDataHolder(size_t sizeLog2);
};

RandomDataHolder::RandomDataHolder(size_t sizeLog2) : DataHolder(sizeLog2) {
  memset(data_.get(), 0, size_);

  static constexpr size_t numThreadsLog2 = 3;
  static constexpr size_t numThreads = size_t(1) << numThreadsLog2;

  uint32_t seed = randomNumberSeed();

  std::vector<std::thread> threads;
  threads.reserve(numThreads);
  for (size_t t = 0; t < numThreads; ++t) {
    threads.emplace_back([this, seed, t, sizeLog2] {
      std::mt19937 rng(seed + t);
      size_t countLog2 = sizeLog2 - numThreadsLog2;
      size_t start = size_t(t) << countLog2;
      for (size_t i = 0; i < countLog2; ++i) {
        this->data_[start + i] = static_cast<uint8_t>(rng());
      }
    });
  }

  for (auto& t : threads) {
    t.join();
  }
}

class ConstantDataHolder : public DataHolder {
 public:
  explicit ConstantDataHolder(size_t sizeLog2);
};

ConstantDataHolder::ConstantDataHolder(size_t sizeLog2) : DataHolder(sizeLog2) {
  memset(data_.get(), 'a', size_);
}

constexpr size_t dataSizeLog2 = 27; // 128MiB
RandomDataHolder randomDataHolder(dataSizeLog2);
ConstantDataHolder constantDataHolder(dataSizeLog2);

// The intersection of the provided codecs & those that are compiled in.
static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
  std::vector<CodecType> supported;

  std::copy_if(
      std::begin(v), std::end(v), std::back_inserter(supported), hasCodec);

  return supported;
}

// All compiled-in compression codecs.
static std::vector<CodecType> availableCodecs() {
  std::vector<CodecType> codecs;

  for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
    auto type = static_cast<CodecType>(i);
    if (hasCodec(type)) {
      codecs.push_back(type);
    }
  }

  return codecs;
}

static std::vector<CodecType> availableStreamCodecs() {
  std::vector<CodecType> codecs;

  for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
    auto type = static_cast<CodecType>(i);
    if (hasStreamCodec(type)) {
      codecs.push_back(type);
    }
  }

  return codecs;
}

TEST(CompressionTestNeedsUncompressedLength, Simple) {
  static const struct {
    CodecType type;
    bool needsUncompressedLength;
  } expectations[] = {
      {CodecType::NO_COMPRESSION, false},
      {CodecType::LZ4, true},
      {CodecType::SNAPPY, false},
      {CodecType::ZLIB, false},
      {CodecType::LZ4_VARINT_SIZE, false},
      {CodecType::LZMA2, false},
      {CodecType::LZMA2_VARINT_SIZE, false},
      {CodecType::ZSTD, false},
      {CodecType::GZIP, false},
      {CodecType::LZ4_FRAME, false},
      {CodecType::BZIP2, false},
  };

  for (auto const& test : expectations) {
    if (hasCodec(test.type)) {
      EXPECT_EQ(
          getCodec(test.type)->needsUncompressedLength(),
          test.needsUncompressedLength);
    }
  }
}

class CompressionTest
    : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
 protected:
  void SetUp() override {
    auto tup = GetParam();
    int lengthLog = std::get<0>(tup);
    // Small hack to test empty data
    uncompressedLength_ = (lengthLog < 0) ? 0 : uint64_t(1) << std::get<0>(tup);
    chunks_ = std::get<1>(tup);
    codec_ = getCodec(std::get<2>(tup));
  }

  void runSimpleIOBufTest(const DataHolder& dh);

  void runSimpleStringTest(const DataHolder& dh);

 private:
  std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;

  uint64_t uncompressedLength_;
  size_t chunks_;
  std::unique_ptr<Codec> codec_;
};

void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
  const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
  const auto compressed = split(codec_->compress(original.get()));
  EXPECT_LE(
      compressed->computeChainDataLength(),
      codec_->maxCompressedLength(uncompressedLength_));
  if (!codec_->needsUncompressedLength()) {
    auto uncompressed = codec_->uncompress(compressed.get());
    EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
  }
  {
    auto uncompressed =
        codec_->uncompress(compressed.get(), uncompressedLength_);
    EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
  }
}

void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
  const auto original = std::string(
      reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
      uncompressedLength_);
  const auto compressed = codec_->compress(original);
  EXPECT_LE(
      compressed.length(), codec_->maxCompressedLength(uncompressedLength_));

  if (!codec_->needsUncompressedLength()) {
    auto uncompressed = codec_->uncompress(compressed);
    EXPECT_EQ(uncompressedLength_, uncompressed.length());
    EXPECT_EQ(uncompressed, original);
  }
  {
    auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
    EXPECT_EQ(uncompressedLength_, uncompressed.length());
    EXPECT_EQ(uncompressed, original);
  }
}

// Uniformly split data into (potentially empty) chunks.
std::unique_ptr<IOBuf> CompressionTest::split(
    std::unique_ptr<IOBuf> data) const {
  if (data->isChained()) {
    data->coalesce();
  }

  const size_t size = data->computeChainDataLength();

  std::multiset<size_t> splits;
  for (size_t i = 1; i < chunks_; ++i) {
    splits.insert(Random::rand64(size));
  }

  folly::IOBufQueue result;

  size_t offset = 0;
  for (size_t split : splits) {
    result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
    offset = split;
  }
  result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));

  return result.move();
}

TEST_P(CompressionTest, RandomData) {
  runSimpleIOBufTest(randomDataHolder);
}

TEST_P(CompressionTest, ConstantData) {
  runSimpleIOBufTest(constantDataHolder);
}

TEST_P(CompressionTest, RandomDataString) {
  runSimpleStringTest(randomDataHolder);
}

TEST_P(CompressionTest, ConstantDataString) {
  runSimpleStringTest(constantDataHolder);
}

INSTANTIATE_TEST_SUITE_P(
    CompressionTest,
    CompressionTest,
    testing::Combine(
        testing::Values(-1, 0, 1, 12, 22, 25, 27),
        testing::Values(1, 2, 3, 8, 65),
        testing::ValuesIn(availableCodecs())));

class CompressionVarintTest
    : public testing::TestWithParam<std::tuple<int, CodecType>> {
 protected:
  void SetUp() override {
    auto tup = GetParam();
    uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
    codec_ = getCodec(std::get<1>(tup));
  }

  void runSimpleTest(const DataHolder& dh);

  uint64_t uncompressedLength_;
  std::unique_ptr<Codec> codec_;
};

inline uint64_t oneBasedMsbPos(uint64_t number) {
  uint64_t pos = 0;
  for (; number > 0; ++pos, number >>= 1) {
  }
  return pos;
}

void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
  auto compressed = codec_->compress(original.get());
  auto breakPoint =
      1ULL +
      Random::rand64(
          std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9ULL);
  auto tinyBuf = IOBuf::copyBuffer(
      compressed->data(), std::min<size_t>(compressed->length(), breakPoint));
  compressed->trimStart(breakPoint);
  tinyBuf->prependChain(std::move(compressed));
  compressed = std::move(tinyBuf);

  auto uncompressed = codec_->uncompress(compressed.get());

  EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
}

TEST_P(CompressionVarintTest, RandomData) {
  runSimpleTest(randomDataHolder);
}

TEST_P(CompressionVarintTest, ConstantData) {
  runSimpleTest(constantDataHolder);
}

INSTANTIATE_TEST_SUITE_P(
    CompressionVarintTest,
    CompressionVarintTest,
    testing::Combine(
        testing::Values(0, 1, 12, 22, 25, 27),
        testing::ValuesIn(supportedCodecs({
            CodecType::LZ4_VARINT_SIZE,
            CodecType::LZMA2_VARINT_SIZE,
        }))));

TEST(LZMATest, UncompressBadVarint) {
  if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) {
    std::string const str(kMaxVarintLength64 * 2, '\xff');
    ByteRange input((folly::StringPiece(str)));
    auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE);
    auto buffer = IOBuf::create(16);
    buffer->append(buffer->capacity());
    MutableByteRange output{buffer->writableData(), buffer->length()};
    EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error);
  }
}

class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
 protected:
  void SetUp() override { codec_ = getCodec(GetParam()); }

  void runSimpleTest(const DataHolder& dh);

  std::unique_ptr<Codec> codec_;
};

void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
  constexpr uint64_t uncompressedLength = 42;
  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
  auto compressed = codec_->compress(original.get());

  if (!codec_->needsUncompressedLength()) {
    auto uncompressed = codec_->uncompress(compressed.get());
    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
  }
  {
    auto uncompressed =
        codec_->uncompress(compressed.get(), uncompressedLength);
    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
  }

  EXPECT_THROW(
      codec_->uncompress(compressed.get(), uncompressedLength + 1),
      std::runtime_error);

  auto corrupted = compressed->clone();
  corrupted->unshare();
  // Truncate the last character
  corrupted->prev()->trimEnd(1);
  if (!codec_->needsUncompressedLength()) {
    EXPECT_THROW(codec_->uncompress(corrupted.get()), std::runtime_error);
  }

  EXPECT_THROW(
      codec_->uncompress(corrupted.get(), uncompressedLength),
      std::runtime_error);

  corrupted = compressed->clone();
  corrupted->unshare();
  // Corrupt the first character
  ++(corrupted->writableData()[0]);

  if (!codec_->needsUncompressedLength()) {
    EXPECT_THROW(codec_->uncompress(corrupted.get()), std::runtime_error);
  }

  EXPECT_THROW(
      codec_->uncompress(corrupted.get(), uncompressedLength),
      std::runtime_error);
}

TEST_P(CompressionCorruptionTest, RandomData) {
  runSimpleTest(randomDataHolder);
}

TEST_P(CompressionCorruptionTest, ConstantData) {
  runSimpleTest(constantDataHolder);
}

INSTANTIATE_TEST_SUITE_P(
    CompressionCorruptionTest,
    CompressionCorruptionTest,
    testing::ValuesIn(
        // NO_COMPRESSION can't detect corruption
        // LZ4 can't detect corruption reliably (sigh)
        supportedCodecs({
            CodecType::SNAPPY,
            CodecType::ZLIB,
            CodecType::LZMA2,
            CodecType::ZSTD,
            CodecType::LZ4_FRAME,
            CodecType::BZIP2,
        })));

static bool codecHasFlush(CodecType type) {
  return type != CodecType::BZIP2;
}

class StreamingUnitTest : public testing::TestWithParam<CodecType> {
 protected:
  void SetUp() override { codec_ = getStreamCodec(GetParam()); }

  bool hasFlush() const { return codecHasFlush(GetParam()); }

  std::unique_ptr<StreamCodec> codec_;
};

TEST(StreamingUnitTest, needsDataLength) {
  static const struct {
    CodecType type;
    bool needsDataLength;
  } expectations[] = {
      {CodecType::ZLIB, false},
      {CodecType::GZIP, false},
      {CodecType::LZMA2, false},
      {CodecType::LZMA2_VARINT_SIZE, true},
      {CodecType::ZSTD, false},
  };

  for (auto const& test : expectations) {
    if (hasStreamCodec(test.type)) {
      EXPECT_EQ(
          getStreamCodec(test.type)->needsDataLength(), test.needsDataLength);
    }
  }
}

TEST_P(StreamingUnitTest, maxCompressedLength) {
  for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
    EXPECT_GE(codec_->maxCompressedLength(length), length);
  }
}

TEST_P(StreamingUnitTest, getUncompressedLength) {
  auto const empty = IOBuf::create(0);
  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(""));
  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(""));
  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
  EXPECT_ANY_THROW(codec_->getUncompressedLength(empty.get(), 1));

  auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
  auto const compressed = codec_->compress(data.get());
  auto const compressedString =
      StringPiece(ByteRange(data->data(), data->length()));

  if (auto const length = codec_->getUncompressedLength(data.get())) {
    EXPECT_EQ(100, *length);
  }
  if (auto const length = codec_->getUncompressedLength(compressedString)) {
    EXPECT_EQ(100, *length);
  }
  EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
  EXPECT_EQ(
      uint64_t(100), codec_->getUncompressedLength(compressedString, 100));
  // If the uncompressed length is stored in the frame, then make sure it throws
  // when it is given the wrong length.
  if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
    EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
  }
  if (codec_->getUncompressedLength(compressedString) == uint64_t(100)) {
    EXPECT_ANY_THROW(codec_->getUncompressedLength(compressedString, 200));
  }
}

TEST_P(StreamingUnitTest, emptyData) {
  ByteRange input{};
  auto buffer = IOBuf::create(codec_->maxCompressedLength(0));
  buffer->append(buffer->capacity());
  MutableByteRange output;

  // Test compressing empty data in one pass
  if (!codec_->needsDataLength()) {
    output = {buffer->writableData(), buffer->length()};
    EXPECT_TRUE(
        codec_->compressStream(input, output, StreamCodec::FlushOp::END));
  }
  codec_->resetStream(0);
  output = {buffer->writableData(), buffer->length()};
  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));

  // Test uncompressing the compressed empty data is equivalent to the empty
  // string
  {
    size_t compressedSize = buffer->length() - output.size();
    auto const compressed =
        IOBuf::copyBuffer(buffer->writableData(), compressedSize);
    auto inputRange = compressed->coalesce();
    codec_->resetStream(0);
    output = {buffer->writableData(), buffer->length()};
    EXPECT_TRUE(codec_->uncompressStream(
        inputRange, output, StreamCodec::FlushOp::END));
    EXPECT_EQ(output.size(), buffer->length());
  }

  // Test compressing empty data with multiple calls to compressStream()
  {
    auto largeBuffer = IOBuf::create(codec_->maxCompressedLength(0) * 2);
    largeBuffer->append(largeBuffer->capacity());
    codec_->resetStream(0);
    output = {largeBuffer->writableData(), largeBuffer->length()};
    EXPECT_FALSE(codec_->compressStream(input, output));
    if (hasFlush()) {
      EXPECT_TRUE(
          codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
    }
    EXPECT_TRUE(
        codec_->compressStream(input, output, StreamCodec::FlushOp::END));
  }

  // Test uncompressing empty data
  output = {};
  codec_->resetStream();
  EXPECT_TRUE(codec_->uncompressStream(input, output));
  if (hasFlush()) {
    codec_->resetStream();
    EXPECT_TRUE(
        codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
  }
  codec_->resetStream();
  EXPECT_TRUE(
      codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
  codec_->resetStream(0);
  EXPECT_TRUE(codec_->uncompressStream(input, output));
  if (hasFlush()) {
    codec_->resetStream(0);
    EXPECT_TRUE(
        codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
  }
  codec_->resetStream(0);
  EXPECT_TRUE(
      codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
}

TEST_P(StreamingUnitTest, noForwardProgress) {
  auto inBuffer = IOBuf::create(2);
  inBuffer->writableData()[0] = 'a';
  inBuffer->writableData()[1] = 'a';
  inBuffer->append(2);
  const auto compressed = codec_->compress(inBuffer.get());
  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));

  ByteRange emptyInput;
  MutableByteRange emptyOutput;

  const std::array<StreamCodec::FlushOp, 3> flushOps = {{
      StreamCodec::FlushOp::NONE,
      StreamCodec::FlushOp::FLUSH,
      StreamCodec::FlushOp::END,
  }};

  // No progress is not okay twice in a row for all flush operations when
  // compressing
  for (const auto flushOp : flushOps) {
    if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
      continue;
    }
    if (codec_->needsDataLength()) {
      codec_->resetStream(inBuffer->computeChainDataLength());
    } else {
      codec_->resetStream();
    }
    auto input = inBuffer->coalesce();
    MutableByteRange output = {
        outBuffer->writableTail(), outBuffer->tailroom()};
    // Compress some data to avoid empty data special casing
    while (!input.empty()) {
      codec_->compressStream(input, output);
    }
    EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
    EXPECT_THROW(
        codec_->compressStream(emptyInput, emptyOutput, flushOp),
        std::runtime_error);
  }

  // No progress is not okay twice in a row for all flush operations when
  // uncompressing
  for (const auto flushOp : flushOps) {
    if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
      continue;
    }
    codec_->resetStream();
    auto input = compressed->coalesce();
    // Remove the last byte so the operation is incomplete
    input.uncheckedSubtract(1);
    MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
    // Uncompress some data to avoid empty data special casing
    while (!input.empty()) {
      EXPECT_FALSE(codec_->uncompressStream(input, output));
    }
    EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
    EXPECT_THROW(
        codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
        std::runtime_error);
  }
}

TEST_P(StreamingUnitTest, stateTransitions) {
  auto inBuffer = IOBuf::create(2);
  inBuffer->writableData()[0] = 'a';
  inBuffer->writableData()[1] = 'a';
  inBuffer->append(2);
  auto compressed = codec_->compress(inBuffer.get());
  ByteRange const in = compressed->coalesce();
  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
  MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};

  auto compress = [&](StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
                      bool empty = false) {
    auto input = in;
    auto output = empty ? MutableByteRange{} : out;
    return codec_->compressStream(input, output, flushOp);
  };
  auto compress_all = [&](bool expect,
                          StreamCodec::FlushOp flushOp =
                              StreamCodec::FlushOp::NONE,
                          bool empty = false) {
    auto input = in;
    auto output = empty ? MutableByteRange{} : out;
    while (!input.empty()) {
      if (expect) {
        EXPECT_TRUE(codec_->compressStream(input, output, flushOp));
      } else {
        EXPECT_FALSE(codec_->compressStream(input, output, flushOp));
      }
    }
  };
  auto uncompress = [&](StreamCodec::FlushOp flushOp =
                            StreamCodec::FlushOp::NONE,
                        bool empty = false) {
    auto input = in;
    auto output = empty ? MutableByteRange{} : out;
    return codec_->uncompressStream(input, output, flushOp);
  };

  // compression flow
  if (!codec_->needsDataLength()) {
    codec_->resetStream();
    EXPECT_FALSE(compress());
    EXPECT_FALSE(compress());
    if (hasFlush()) {
      EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
    }
    EXPECT_FALSE(compress());
    EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
  }
  codec_->resetStream(in.size() * 5);
  compress_all(false);
  compress_all(false);
  if (hasFlush()) {
    compress_all(true, StreamCodec::FlushOp::FLUSH);
  }
  compress_all(false);
  compress_all(true, StreamCodec::FlushOp::END);

  // uncompression flow
  codec_->resetStream();
  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
  if (hasFlush()) {
    codec_->resetStream();
    EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
  }
  codec_->resetStream();
  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
  codec_->resetStream();
  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
  if (hasFlush()) {
    codec_->resetStream();
    EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
  }
  // compress -> uncompress
  codec_->resetStream(in.size());
  EXPECT_FALSE(compress());
  EXPECT_THROW(uncompress(), std::logic_error);
  // uncompress -> compress
  codec_->resetStream(inBuffer->computeChainDataLength());
  EXPECT_TRUE(uncompress(StreamCodec::FlushOp::NONE));
  EXPECT_THROW(compress(), std::logic_error);
  // end -> compress
  if (!codec_->needsDataLength()) {
    codec_->resetStream();
    EXPECT_FALSE(compress());
    EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
    EXPECT_THROW(compress(), std::logic_error);
  }
  codec_->resetStream(in.size() * 2);
  compress_all(false);
  compress_all(true, StreamCodec::FlushOp::END);
  EXPECT_THROW(compress(), std::logic_error);
  // end -> uncompress
  codec_->resetStream();
  EXPECT_TRUE(uncompress(StreamCodec::FlushOp::END));
  EXPECT_THROW(uncompress(), std::logic_error);
  // flush -> compress
  if (hasFlush()) {
    codec_->resetStream(in.size());
    EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
    EXPECT_THROW(compress(), std::logic_error);
  }
  // flush -> end
  if (hasFlush()) {
    codec_->resetStream(in.size());
    EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
    EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
  }
  // undefined -> compress
  codec_->compress(inBuffer.get());
  EXPECT_THROW(compress(), std::logic_error);
  codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength());
  EXPECT_THROW(compress(), std::logic_error);
  // undefined -> undefined
  codec_->uncompress(compressed.get());
  codec_->compress(inBuffer.get());
}

INSTANTIATE_TEST_SUITE_P(
    StreamingUnitTest,
    StreamingUnitTest,
    testing::ValuesIn(availableStreamCodecs()));

class StreamingCompressionTest
    : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
 protected:
  bool hasFlush() const { return codecHasFlush(std::get<2>(GetParam())); }

  void SetUp() override {
    auto const tup = GetParam();
    uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
    chunkSize_ = size_t(1) << std::get<1>(tup);
    codec_ = getStreamCodec(std::get<2>(tup));
  }

  void runResetStreamTest(DataHolder const& dh);
  void runCompressStreamTest(DataHolder const& dh);
  void runUncompressStreamTest(DataHolder const& dh);
  void runFlushTest(DataHolder const& dh);

 private:
  std::vector<ByteRange> split(ByteRange data) const;

  uint64_t uncompressedLength_;
  size_t chunkSize_;
  std::unique_ptr<StreamCodec> codec_;
};

std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
  size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
  std::vector<ByteRange> result;
  result.reserve(pieces + 1);
  while (!data.empty()) {
    size_t const pieceSize = std::min(data.size(), chunkSize_);
    result.push_back(data.subpiece(0, pieceSize));
    data.uncheckedAdvance(pieceSize);
  }
  return result;
}

static std::unique_ptr<IOBuf> compressSome(
    StreamCodec* codec,
    ByteRange data,
    uint64_t bufferSize,
    StreamCodec::FlushOp flush) {
  bool result;
  IOBufQueue queue;
  do {
    auto buffer = IOBuf::create(bufferSize);
    buffer->append(buffer->capacity());
    MutableByteRange output{buffer->writableData(), buffer->length()};

    result = codec->compressStream(data, output, flush);
    buffer->trimEnd(output.size());
    queue.append(std::move(buffer));

  } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
  EXPECT_TRUE(data.empty());
  return queue.move();
}

static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
    StreamCodec* codec,
    ByteRange& data,
    uint64_t bufferSize,
    StreamCodec::FlushOp flush) {
  bool result;
  IOBufQueue queue;
  do {
    auto buffer = IOBuf::create(bufferSize);
    buffer->append(buffer->capacity());
    MutableByteRange output{buffer->writableData(), buffer->length()};

    result = codec->uncompressStream(data, output, flush);
    buffer->trimEnd(output.size());
    queue.append(std::move(buffer));

  } while (queue.tailroom() == 0 && !result);
  return std::make_pair(result, queue.move());
}

void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
  auto const input = dh.data(uncompressedLength_);
  // Compress some but leave state unclean
  codec_->resetStream(uncompressedLength_);
  compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
  // Reset stream and compress all
  if (codec_->needsDataLength()) {
    codec_->resetStream(uncompressedLength_);
  } else {
    codec_->resetStream();
  }
  auto compressed =
      compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
  auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
}

TEST_P(StreamingCompressionTest, resetStream) {
  runResetStreamTest(constantDataHolder);
  runResetStreamTest(randomDataHolder);
}

void StreamingCompressionTest::runCompressStreamTest(
    const folly::compression::test::DataHolder& dh) {
  auto const inputs = split(dh.data(uncompressedLength_));

  IOBufQueue queue;
  codec_->resetStream(uncompressedLength_);
  // Compress many inputs in a row
  for (auto const input : inputs) {
    queue.append(compressSome(
        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
  }
  // Finish the operation with empty input.
  ByteRange empty;
  queue.append(
      compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));

  auto const uncompressed = codec_->uncompress(queue.front());
  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
}

TEST_P(StreamingCompressionTest, compressStream) {
  runCompressStreamTest(constantDataHolder);
  runCompressStreamTest(randomDataHolder);
}

void StreamingCompressionTest::runUncompressStreamTest(
    const folly::compression::test::DataHolder& dh) {
  const auto flush =
      hasFlush() ? StreamCodec::FlushOp::FLUSH : StreamCodec::FlushOp::NONE;
  auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
  // Concatenate 3 compressed frames in a row
  auto compressed = codec_->compress(data.get());
  compressed->prependChain(codec_->compress(data.get()));
  compressed->prependChain(codec_->compress(data.get()));
  // Pass all 3 compressed frames in one input buffer
  auto input = compressed->coalesce();
  // Uncompress the first frame
  codec_->resetStream(data->computeChainDataLength());
  {
    auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
    ASSERT_TRUE(result.first);
    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
  }
  // Uncompress the second frame
  codec_->resetStream();
  {
    auto const result = uncompressSome(
        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
    ASSERT_TRUE(result.first);
    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
  }
  // Uncompress the third frame
  codec_->resetStream();
  {
    auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
    ASSERT_TRUE(result.first);
    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
  }
  EXPECT_TRUE(input.empty());
}

TEST_P(StreamingCompressionTest, uncompressStream) {
  runUncompressStreamTest(constantDataHolder);
  runUncompressStreamTest(randomDataHolder);
}

void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
  auto const inputs = split(dh.data(uncompressedLength_));
  auto uncodec = getStreamCodec(codec_->type());

  if (codec_->needsDataLength()) {
    codec_->resetStream(uncompressedLength_);
  } else {
    codec_->resetStream();
  }
  for (auto input : inputs) {
    // Compress some data and flush the stream
    auto compressed = compressSome(
        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
    auto compressedRange = compressed->coalesce();
    // Uncompress the compressed data
    auto result = uncompressSome(
        uncodec.get(),
        compressedRange,
        chunkSize_,
        StreamCodec::FlushOp::FLUSH);
    // All compressed data should have been consumed
    EXPECT_TRUE(compressedRange.empty());
    // The frame isn't complete
    EXPECT_FALSE(result.first);
    // The uncompressed data should be exactly the input data
    EXPECT_EQ(input.size(), result.second->computeChainDataLength());
    auto const data = IOBuf::wrapBuffer(input);
    EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
  }
}

TEST_P(StreamingCompressionTest, testFlush) {
  if (!hasFlush()) {
    return;
  }
  runFlushTest(constantDataHolder);
  runFlushTest(randomDataHolder);
}

INSTANTIATE_TEST_SUITE_P(
    StreamingCompressionTest,
    StreamingCompressionTest,
    testing::Combine(
        testing::Values(0, 1, 12, 22, 27),
        testing::Values(12, 17, 20),
        testing::ValuesIn(availableStreamCodecs())));

namespace {

// Codec types included in the codec returned by getAutoUncompressionCodec() by
// default.
std::vector<CodecType> autoUncompressionCodecTypes = {{
    CodecType::LZ4_FRAME,
    CodecType::ZSTD,
    CodecType::ZLIB,
    CodecType::GZIP,
    CodecType::LZMA2,
    CodecType::BZIP2,
}};

} // namespace

class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
 protected:
  void SetUp() override {
    codecType_ = GetParam();
    codec_ = getCodec(codecType_);
    autoType_ = std::any_of(
        autoUncompressionCodecTypes.begin(),
        autoUncompressionCodecTypes.end(),
        [&](CodecType o) { return codecType_ == o; });
    // Add the codec with type codecType_ as the terminal codec if it is not in
    // autoUncompressionCodecTypes.
    auto_ = getAutoUncompressionCodec({}, getTerminalCodec());
  }

  void runSimpleTest(const DataHolder& dh);

  std::unique_ptr<Codec> getTerminalCodec() {
    return (autoType_ ? nullptr : getCodec(codecType_));
  }

  std::unique_ptr<Codec> codec_;
  std::unique_ptr<Codec> auto_;
  CodecType codecType_;
  // true if codecType_ is in autoUncompressionCodecTypes
  bool autoType_;
};

void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
  constexpr uint64_t uncompressedLength = 1000;
  auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
  auto compressed = codec_->compress(original.get());

  if (!codec_->needsUncompressedLength()) {
    auto uncompressed = auto_->uncompress(compressed.get());
    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
  }
  {
    auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
  }
  ASSERT_GE(compressed->computeChainDataLength(), 8);
  for (size_t i = 0; i < 8; ++i) {
    auto split = compressed->clone();
    auto rest = compressed->clone();
    split->trimEnd(split->length() - i);
    rest->trimStart(i);
    split->insertAfterThisOne(std::move(rest));
    auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
    EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
  }
}

TEST_P(AutomaticCodecTest, RandomData) {
  runSimpleTest(randomDataHolder);
}

TEST_P(AutomaticCodecTest, ConstantData) {
  runSimpleTest(constantDataHolder);
}

TEST_P(AutomaticCodecTest, ValidPrefixes) {
  const auto prefixes = codec_->validPrefixes();
  for (const auto& prefix : prefixes) {
    EXPECT_FALSE(prefix.empty());
    // Ensure that all strings are at least 8 bytes for LZMA2.
    // The bytes after the prefix should be ignored by `canUncompress()`.
    IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
    data.append(8);
    EXPECT_TRUE(codec_->canUncompress(&data));
    EXPECT_TRUE(auto_->canUncompress(&data));
  }
}

TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
  if (codec_->needsUncompressedLength()) {
    EXPECT_TRUE(auto_->needsUncompressedLength());
  }
}

TEST_P(AutomaticCodecTest, maxUncompressedLength) {
  EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
}

TEST_P(AutomaticCodecTest, DefaultCodec) {
  const uint64_t length = 42;
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(getCodec(CodecType::ZSTD));
  auto automatic =
      getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
  auto compressed = codec_->compress(original.get());
  std::unique_ptr<IOBuf> decompressed;

  if (automatic->needsUncompressedLength()) {
    decompressed = automatic->uncompress(compressed.get(), length);
  } else {
    decompressed = automatic->uncompress(compressed.get());
  }

  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
}

namespace {
class CustomCodec : public Codec {
 public:
  static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
    return std::make_unique<CustomCodec>(std::move(prefix), type);
  }
  explicit CustomCodec(std::string prefix, CodecType type)
      : Codec(CodecType::USER_DEFINED),
        prefix_(std::move(prefix)),
        codec_(getCodec(type)) {}

 private:
  std::vector<std::string> validPrefixes() const override { return {prefix_}; }

  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
    return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
  }

  bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
    auto clone = data->cloneCoalescedAsValue();
    if (clone.length() < prefix_.size()) {
      return false;
    }
    return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
  }

  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
    auto result = IOBuf::copyBuffer(prefix_);
    result->appendToChain(codec_->compress(data));
    EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
    return result;
  }

  std::unique_ptr<IOBuf> doUncompress(
      const IOBuf* data, Optional<uint64_t> uncompressedLength) override {
    EXPECT_TRUE(canUncompress(data, uncompressedLength));
    auto clone = data->cloneCoalescedAsValue();
    clone.trimStart(prefix_.size());
    return codec_->uncompress(&clone, uncompressedLength);
  }

  std::string prefix_;
  std::unique_ptr<Codec> codec_;
};
} // namespace

TEST_P(AutomaticCodecTest, CustomCodec) {
  const uint64_t length = 42;
  auto ab = CustomCodec::create("ab", CodecType::ZSTD);
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
  auto automatic =
      getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));

  auto abCompressed = ab->compress(original.get());
  std::unique_ptr<IOBuf> abDecompressed;
  if (automatic->needsUncompressedLength()) {
    abDecompressed = automatic->uncompress(abCompressed.get(), length);
  } else {
    abDecompressed = automatic->uncompress(abCompressed.get());
  }
  EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
  EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));

  auto compressed = codec_->compress(original.get());
  std::unique_ptr<IOBuf> decompressed;
  if (automatic->needsUncompressedLength()) {
    decompressed = automatic->uncompress(compressed.get(), length);
  } else {
    decompressed = automatic->uncompress(compressed.get());
  }
  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
}

TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
  const uint64_t length = 42;
  auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
  codecs.push_back(getCodec(CodecType::LZ4_FRAME));
  auto automatic =
      getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
  auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));

  auto noneCompressed = none->compress(original.get());
  std::unique_ptr<IOBuf> noneDecompressed;
  if (automatic->needsUncompressedLength()) {
    noneDecompressed = automatic->uncompress(noneCompressed.get(), length);
  } else {
    noneDecompressed = automatic->uncompress(noneCompressed.get());
  }
  EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
  EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));

  auto compressed = codec_->compress(original.get());
  std::unique_ptr<IOBuf> decompressed;
  if (automatic->needsUncompressedLength()) {
    decompressed = automatic->uncompress(compressed.get(), length);
  } else {
    decompressed = automatic->uncompress(compressed.get());
  }
  EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
}

TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
  // No default codec can uncompress 1 bytes.
  IOBuf buf{IOBuf::CREATE, 1};
  buf.append(1);
  EXPECT_FALSE(codec_->canUncompress(&buf, 1));
  EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
  EXPECT_FALSE(auto_->canUncompress(&buf, 1));
  EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
}

INSTANTIATE_TEST_SUITE_P(
    AutomaticCodecTest,
    AutomaticCodecTest,
    testing::ValuesIn(availableCodecs()));

namespace {

// Codec that always "uncompresses" to the same string.
class ConstantCodec : public Codec {
 public:
  static std::unique_ptr<Codec> create(
      std::string uncompressed, CodecType type) {
    return std::make_unique<ConstantCodec>(std::move(uncompressed), type);
  }
  explicit ConstantCodec(std::string uncompressed, CodecType type)
      : Codec(type), uncompressed_(std::move(uncompressed)) {}

 private:
  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
    return uncompressedLength;
  }

  std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
    throw std::runtime_error("ConstantCodec error: compress() not supported.");
  }

  std::unique_ptr<IOBuf> doUncompress(
      const IOBuf*, Optional<uint64_t>) override {
    return IOBuf::copyBuffer(uncompressed_);
  }

  std::string uncompressed_;
  std::unique_ptr<Codec> codec_;
};

} // namespace

class TerminalCodecTest : public testing::TestWithParam<CodecType> {
 protected:
  void SetUp() override {
    codecType_ = GetParam();
    codec_ = getCodec(codecType_);
    auto_ = getAutoUncompressionCodec();
  }

  CodecType codecType_;
  std::unique_ptr<Codec> codec_;
  std::unique_ptr<Codec> auto_;
};

// Test that the terminal codec's uncompress() function is called when the
// default chosen automatic codec throws.
TEST_P(TerminalCodecTest, uncompressIfDefaultThrows) {
  std::string const original = "abc";
  auto const compressed = codec_->compress(original);

  // Sanity check: the automatic codec can uncompress the original string.
  auto const uncompressed = auto_->uncompress(compressed);
  EXPECT_EQ(uncompressed, original);

  // Truncate the compressed string.
  auto const truncated = compressed.substr(0, compressed.size() - 1);
  auto const truncatedBuf =
      IOBuf::wrapBuffer(truncated.data(), truncated.size());
  EXPECT_TRUE(auto_->canUncompress(truncatedBuf.get()));
  EXPECT_ANY_THROW(auto_->uncompress(truncated));

  // Expect the terminal codec to successfully uncompress the string.
  std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
      {}, ConstantCodec::create("dummyString", CodecType::USER_DEFINED));
  EXPECT_TRUE(terminal->canUncompress(truncatedBuf.get()));
  EXPECT_EQ(terminal->uncompress(truncated), "dummyString");
}

// If the terminal codec has one of the "default types" automatically added in
// the AutomaticCodec, check that the default codec is no longer added.
TEST_P(TerminalCodecTest, terminalOverridesDefaults) {
  std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
      {}, ConstantCodec::create("dummyString", codecType_));
  std::string const original = "abc";
  auto const compressed = codec_->compress(original);
  EXPECT_EQ(terminal->uncompress(compressed), "dummyString");
}

INSTANTIATE_TEST_SUITE_P(
    TerminalCodecTest,
    TerminalCodecTest,
    testing::ValuesIn(autoUncompressionCodecTypes));

TEST(ValidPrefixesTest, CustomCodec) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
  const auto none = getAutoUncompressionCodec(std::move(codecs));
  const auto prefixes = none->validPrefixes();
  const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
  EXPECT_TRUE(it != prefixes.end());
}

#define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
  do {                                                       \
    if (kIsDebug) {                                          \
      EXPECT_THROW((statement), expected_exception);         \
    } else {                                                 \
      EXPECT_NO_THROW((statement));                          \
    }                                                        \
  } while (false)

TEST(CheckCompatibleTest, SimplePrefixSecond) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
  codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

TEST(CheckCompatibleTest, SimplePrefixFirst) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
  codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

TEST(CheckCompatibleTest, Empty) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

TEST(CheckCompatibleTest, ZstdPrefix) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

TEST(CheckCompatibleTest, ZstdDuplicate) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

TEST(CheckCompatibleTest, ZlibIsPrefix) {
  std::vector<std::unique_ptr<Codec>> codecs;
  codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
  EXPECT_THROW_IF_DEBUG(
      getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
}

#if FOLLY_HAVE_LIBZSTD

#if ZSTD_VERSION_NUMBER < 10308
#define ZSTD_c_contentSizeFlag ZSTD_p_contentSizeFlag
#define ZSTD_c_checksumFlag ZSTD_p_checksumFlag
#define ZSTD_c_windowLog ZSTD_p_windowLog
#endif

TEST(ZstdTest, BackwardCompatible) {
  auto codec = getCodec(CodecType::ZSTD);
  {
    auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
    auto compressed = codec->compress(data.get());
    compressed->coalesce();
    EXPECT_EQ(
        data->length(),
        ZSTD_getFrameContentSize(compressed->data(), compressed->length()));
  }
  {
    auto const data =
        IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
    auto compressed = codec->compress(data.get());
    compressed->coalesce();
    EXPECT_EQ(
        data->length(),
        ZSTD_getFrameContentSize(compressed->data(), compressed->length()));
  }
}

TEST(ZstdTest, CustomOptions) {
  auto test = [](const DataHolder& dh, unsigned contentSizeFlag) {
    unsigned const wlog = 23;
    zstd::Options options(1);
    options.set(ZSTD_c_contentSizeFlag, contentSizeFlag);
    options.set(ZSTD_c_checksumFlag, 1);
    options.set(ZSTD_c_windowLog, wlog);
    auto codec = zstd::getCodec(std::move(options));
    size_t const uncompressedLength = (size_t)1 << 27;
    auto const original = std::string(
        reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
        uncompressedLength);
    auto const compressed = codec->compress(original);
    auto const uncompressed = codec->uncompress(compressed);
    EXPECT_EQ(uncompressed, original);
    EXPECT_EQ(
        codec->getUncompressedLength(
            folly::IOBuf::wrapBuffer(compressed.data(), compressed.size())
                .get()),
        contentSizeFlag ? uncompressedLength : Optional<uint64_t>());
    {
      ZSTD_frameHeader zfh;
      ZSTD_getFrameHeader(&zfh, compressed.data(), compressed.size());
      EXPECT_EQ(zfh.checksumFlag, 1);
      EXPECT_EQ(zfh.windowSize, 1ULL << wlog);
      EXPECT_EQ(
          zfh.frameContentSize,
          contentSizeFlag ? uncompressedLength : ZSTD_CONTENTSIZE_UNKNOWN);
    }
  };
  for (unsigned contentSizeFlag = 0; contentSizeFlag <= 1; ++contentSizeFlag) {
    test(constantDataHolder, contentSizeFlag);
    test(randomDataHolder, contentSizeFlag);
  }
}

TEST(ZstdTest, NegativeLevels) {
  EXPECT_EQ(zstd::Options(1).level(), 1);
  EXPECT_EQ(zstd::Options(-1).level(), -1);
  auto const original = std::string(
      reinterpret_cast<const char*>(randomDataHolder.data(16348).data()),
      16348);
  auto const plusCompressed =
      zstd::getCodec(zstd::Options(1))->compress(original);
  auto const minusCompressed =
      zstd::getCodec(zstd::Options(-100))->compress(original);
  EXPECT_GT(minusCompressed.size(), plusCompressed.size());
  auto codec = getCodec(CodecType::ZSTD);
  auto const uncompressed = codec->uncompress(minusCompressed);
  EXPECT_EQ(original, uncompressed);
}

#endif

#if FOLLY_HAVE_LIBZ

using ZlibFormat = zlib::Options::Format;

TEST(ZlibTest, Auto) {
  size_t const uncompressedLength_ = (size_t)1 << 15;
  auto const original = std::string(
      reinterpret_cast<const char*>(
          randomDataHolder.data(uncompressedLength_).data()),
      uncompressedLength_);
  auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO));

  // Test the codec can uncompress zlib data.
  {
    auto codec = getCodec(CodecType::ZLIB);
    auto const compressed = codec->compress(original);
    auto const uncompressed = optionCodec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
  }

  // Test the codec can uncompress gzip data.
  {
    auto codec = getCodec(CodecType::GZIP);
    auto const compressed = codec->compress(original);
    auto const uncompressed = optionCodec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
  }
}

TEST(ZlibTest, DefaultOptions) {
  size_t const uncompressedLength_ = (size_t)1 << 20;
  auto const original = std::string(
      reinterpret_cast<const char*>(
          randomDataHolder.data(uncompressedLength_).data()),
      uncompressedLength_);
  {
    auto codec = getCodec(CodecType::ZLIB);
    auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions());
    auto const compressed = optionCodec->compress(original);
    auto uncompressed = codec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
    uncompressed = optionCodec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
  }

  {
    auto codec = getCodec(CodecType::GZIP);
    auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions());
    auto const compressed = optionCodec->compress(original);
    auto uncompressed = codec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
    uncompressed = optionCodec->uncompress(compressed);
    EXPECT_EQ(original, uncompressed);
  }
}

class ZlibOptionsTest
    : public testing::TestWithParam<std::tuple<ZlibFormat, int, int, int>> {
 protected:
  void SetUp() override {
    auto tup = GetParam();
    options_.format = std::get<0>(tup);
    options_.windowSize = std::get<1>(tup);
    options_.memLevel = std::get<2>(tup);
    options_.strategy = std::get<3>(tup);
    codec_ = zlib::getStreamCodec(options_);
  }

  void runSimpleRoundTripTest(const DataHolder& dh);

 private:
  zlib::Options options_;
  std::unique_ptr<StreamCodec> codec_;
};

void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) {
  size_t const uncompressedLength = (size_t)1 << 16;
  auto const original = std::string(
      reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
      uncompressedLength);

  auto const compressed = codec_->compress(original);
  auto const uncompressed = codec_->uncompress(compressed);
  EXPECT_EQ(uncompressed, original);
}

TEST_P(ZlibOptionsTest, simpleRoundTripTest) {
  runSimpleRoundTripTest(constantDataHolder);
  runSimpleRoundTripTest(randomDataHolder);
}

INSTANTIATE_TEST_SUITE_P(
    ZlibOptionsTest,
    ZlibOptionsTest,
    testing::Combine(
        testing::Values(
            ZlibFormat::ZLIB,
            ZlibFormat::GZIP,
            ZlibFormat::RAW,
            ZlibFormat::AUTO),
        testing::Values(9, 12, 15),
        testing::Values(1, 8, 9),
        testing::Values(
            Z_DEFAULT_STRATEGY, Z_FILTERED, Z_HUFFMAN_ONLY, Z_RLE, Z_FIXED)));

#endif // FOLLY_HAVE_LIBZ

} // namespace test
} // namespace compression
} // namespace folly