folly/folly/io/test/RecordIOTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/io/RecordIO.h>

#include <sys/types.h>

#include <random>

#include <glog/logging.h>

#include <folly/Conv.h>
#include <folly/FBString.h>
#include <folly/Random.h>
#include <folly/io/IOBufQueue.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/portability/Unistd.h>
#include <folly/testing/TestUtil.h>

DEFINE_int32(random_seed, folly::randomNumberSeed(), "random seed");

namespace folly {
namespace test {

namespace {
// shortcut
StringPiece sp(ByteRange br) {
  return StringPiece(br);
}

template <class T>
std::unique_ptr<IOBuf> iobufs(std::initializer_list<T> ranges) {
  IOBufQueue queue;
  for (auto& range : ranges) {
    StringPiece r(range);
    queue.append(IOBuf::wrapBuffer(r.data(), r.size()));
  }
  return queue.move();
}

} // namespace

TEST(RecordIOTest, Simple) {
  TemporaryFile file;
  {
    RecordIOWriter writer(File(file.fd()));
    writer.write(iobufs({"hello ", "world"}));
    writer.write(iobufs({"goodbye"}));
  }
  {
    RecordIOReader reader(File(file.fd()));
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello world", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("goodbye", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
  {
    RecordIOWriter writer(File(file.fd()));
    writer.write(iobufs({"meow"}));
    writer.write(iobufs({"woof"}));
  }
  {
    RecordIOReader reader(File(file.fd()));
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello world", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("goodbye", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("meow", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("woof", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
}

TEST(RecordIOTest, SmallRecords) {
  constexpr size_t kSize = 10;
  char tmp[kSize];
  memset(tmp, 'x', kSize);
  TemporaryFile file;
  {
    RecordIOWriter writer(File(file.fd()));
    for (size_t i = 0; i < kSize; ++i) { // record of size 0 should be ignored
      writer.write(IOBuf::wrapBuffer(tmp, i));
    }
  }
  {
    RecordIOReader reader(File(file.fd()));
    auto it = reader.begin();
    for (size_t i = 1; i < kSize; ++i) {
      ASSERT_FALSE(it == reader.end());
      EXPECT_EQ(StringPiece(tmp, i), sp((it++)->first));
    }
    EXPECT_TRUE(it == reader.end());
  }
}

TEST(RecordIOTest, MultipleFileIds) {
  TemporaryFile file;
  {
    RecordIOWriter writer(File(file.fd()), 1);
    writer.write(iobufs({"hello"}));
  }
  {
    RecordIOWriter writer(File(file.fd()), 2);
    writer.write(iobufs({"world"}));
  }
  {
    RecordIOWriter writer(File(file.fd()), 1);
    writer.write(iobufs({"goodbye"}));
  }
  {
    RecordIOReader reader(File(file.fd()), 0); // return all
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("world", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("goodbye", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
  {
    RecordIOReader reader(File(file.fd()), 1);
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("goodbye", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
  {
    RecordIOReader reader(File(file.fd()), 2);
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("world", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
  {
    RecordIOReader reader(File(file.fd()), 3);
    auto it = reader.begin();
    EXPECT_TRUE(it == reader.end());
  }
}

TEST(RecordIOTest, ExtraMagic) {
  TemporaryFile file;
  {
    RecordIOWriter writer(File(file.fd()));
    writer.write(iobufs({"hello"}));
  }
  uint8_t buf[recordio_helpers::headerSize() + 5];
  EXPECT_EQ(0, lseek(file.fd(), 0, SEEK_SET));
  EXPECT_EQ(sizeof(buf), read(file.fd(), buf, sizeof(buf)));
  // Append an extra magic
  const uint32_t magic = recordio_helpers::recordio_detail::Header::kMagic;
  EXPECT_EQ(sizeof(magic), write(file.fd(), &magic, sizeof(magic)));
  // and an extra record
  EXPECT_EQ(sizeof(buf), write(file.fd(), buf, sizeof(buf)));
  {
    RecordIOReader reader(File(file.fd()));
    auto it = reader.begin();
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello", sp((it++)->first));
    ASSERT_FALSE(it == reader.end());
    EXPECT_EQ("hello", sp((it++)->first));
    EXPECT_TRUE(it == reader.end());
  }
}

namespace {
void corrupt(int fd, off_t pos) {
  uint8_t val = 0;
  EXPECT_EQ(1, pread(fd, &val, 1, pos));
  ++val;
  EXPECT_EQ(1, pwrite(fd, &val, 1, pos));
}
} // namespace

TEST(RecordIOTest, Randomized) {
  SCOPED_TRACE(to<std::string>("Random seed is ", FLAGS_random_seed));
  std::mt19937 rnd(FLAGS_random_seed);

  size_t recordCount = std::uniform_int_distribution<uint32_t>(30, 300)(rnd);

  std::uniform_int_distribution<uint32_t> recordSizeDist(1, 3 << 16);
  std::uniform_int_distribution<uint32_t> charDist(0, 255);
  std::uniform_int_distribution<uint32_t> junkDist(0, 1 << 20);
  // corrupt 1/5 of all records
  std::uniform_int_distribution<uint32_t> corruptDist(0, 4);

  std::vector<std::pair<fbstring, off_t>> records;
  std::vector<off_t> corruptPositions;
  records.reserve(recordCount);
  TemporaryFile file;

  fbstring record;
  // Recreate the writer multiple times so we test that we create a
  // continuous stream
  for (size_t i = 0; i < 3; ++i) {
    RecordIOWriter writer(File(file.fd()));
    for (size_t j = 0; j < recordCount; ++j) {
      off_t beginPos = writer.filePos();
      record.clear();
      size_t recordSize = recordSizeDist(rnd);
      record.reserve(recordSize);
      for (size_t k = 0; k < recordSize; ++k) {
        record.push_back(charDist(rnd));
      }
      writer.write(iobufs({record}));

      bool corrupt = (corruptDist(rnd) == 0);
      if (corrupt) {
        // Corrupt one random byte in the record (including header)
        std::uniform_int_distribution<uint32_t> corruptByteDist(
            0, recordSize + recordio_helpers::headerSize() - 1);
        off_t corruptRel = corruptByteDist(rnd);
        VLOG(1) << "n=" << records.size() << " bpos=" << beginPos
                << " rsize=" << record.size() << " corrupt rel=" << corruptRel
                << " abs=" << beginPos + corruptRel;
        corruptPositions.push_back(beginPos + corruptRel);
      } else {
        VLOG(2) << "n=" << records.size() << " bpos=" << beginPos
                << " rsize=" << record.size() << " good";
        records.emplace_back(std::move(record), beginPos);
      }
    }
    VLOG(1) << "n=" << records.size() << " close abs=" << writer.filePos();
  }

  for (auto& pos : corruptPositions) {
    corrupt(file.fd(), pos);
  }

  {
    size_t i = 0;
    RecordIOReader reader(File(file.fd()));
    for (auto& r : reader) {
      SCOPED_TRACE(i);
      ASSERT_LT(i, records.size());
      EXPECT_EQ(records[i].first, sp(r.first));
      EXPECT_EQ(records[i].second, r.second);
      ++i;
    }
    EXPECT_EQ(records.size(), i);
  }
}

TEST(RecordIOTest, validateRecordAPI) {
  uint32_t hdrSize = recordio_helpers::headerSize();
  std::vector<uint32_t> testSizes = {
      0, 1, hdrSize - 1, hdrSize, hdrSize + 1, 2 * hdrSize, 1024};
  constexpr uint32_t kTestFileId = 100;
  std::mt19937 rnd(FLAGS_random_seed);

  for (auto& testSize : testSizes) {
    char testChar = testSize % 26 + 'A';
    // create a IOBuf of size = testSize
    auto buf = folly::IOBuf::create(testSize);
    buf->append(testSize);
    EXPECT_NE(buf, nullptr);
    auto wData = buf->writableData();
    if (testSize >= hdrSize) {
      // if the testSize is greater or equal than header size, populate buffer
      // bytes with testChar
      buf->trimStart(hdrSize);
      memset(wData, testChar, buf->capacity());
      recordio_helpers::prependHeader(buf, kTestFileId);
      buf->unshare();
      buf->coalesce();
      wData = buf->writableData();
    }
    // validate header
    auto res = recordio_helpers::validateRecordHeader(
        folly::Range<unsigned char*>(wData, buf->length()), kTestFileId);
    if (testSize > hdrSize) {
      // validation should succeed for buffers larger than header size
      EXPECT_TRUE(res);
      auto range = folly::Range<unsigned char*>(wData, buf->length());
      // make sure data validation also succeeds
      auto dataRes = recordio_helpers::validateRecordData(range);
      EXPECT_NE(dataRes.fileId, 0);

      // do the entire record validation again
      auto recRes = recordio_helpers::validateRecord(range, kTestFileId);
      EXPECT_NE(recRes.fileId, 0);

      std::uniform_int_distribution<uint32_t> dataDist(
          0, testSize - hdrSize - 1);
      size_t idx = dataDist(rnd);
      /* Now corrupt a random byte and expect data validation to fail */
      wData[hdrSize + idx] = testChar + 1;
      auto newDataRes = recordio_helpers::validateRecordData(range);
      EXPECT_EQ(newDataRes.fileId, 0);

      /* header validation should still pass */
      auto newRes = recordio_helpers::validateRecordHeader(range, kTestFileId);
      EXPECT_TRUE(newRes);

      /* corrupt a random header byte */
      std::uniform_int_distribution<uint32_t> hdrDist(0, hdrSize - 1);
      idx = hdrDist(rnd);
      wData[idx] = testChar;

      /* header validation should fail now */
      newRes = recordio_helpers::validateRecordHeader(range, kTestFileId);
      EXPECT_FALSE(newRes);
    } else {
      EXPECT_FALSE(res);
    }
  }
}

} // namespace test
} // namespace folly

int main(int argc, char* argv[]) {
  testing::InitGoogleTest(&argc, argv);
  gflags::ParseCommandLineFlags(&argc, &argv, true);
  return RUN_ALL_TESTS();
}