folly/folly/io/async/test/AsyncBaseTestLib.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>

#include <cstdio>
#include <cstdlib>
#include <memory>
#include <random>
#include <thread>
#include <vector>

#include <glog/logging.h>

#include <folly/ScopeGuard.h>
#include <folly/String.h>
#include <folly/experimental/io/FsUtil.h>
#include <folly/portability/GTest.h>
#include <folly/portability/Sockets.h>
#include <folly/test/TestUtils.h>

#include <folly/experimental/io/AsyncBase.h>
#include <folly/experimental/io/test/IoTestTempFileUtil.h>

namespace folly {
namespace test {
namespace async_base_test_lib_detail {

constexpr size_t kODirectAlign = 4096; // align reads to 4096 B (for O_DIRECT)
constexpr size_t kDefaultFileSize = 6 << 20; // 6MiB

constexpr size_t kBatchNumEntries = 1024;
constexpr size_t kBatchSize = 192;
constexpr size_t kBatchBlockSize = 4096;

struct TestSpec {
  off_t start;
  size_t size;
};

struct TestUtil {
  static void waitUntilReadable(int fd);
  static folly::Range<folly::AsyncBase::Op**> readerWait(
      folly::AsyncBase* reader);
  using ManagedBuffer = std::unique_ptr<char, void (*)(void*)>;
  static ManagedBuffer allocateAligned(size_t size);
};

template <typename TAsync>
std::unique_ptr<TAsync> getAIO(size_t capacity, AsyncBase::PollMode pollMode) {
  try {
    auto ret = std::make_unique<TAsync>(capacity, pollMode);
    ret->initializeContext();

    return ret;
  } catch (const std::runtime_error&) {
  }

  return nullptr;
}

template <typename TAsync>
void testReadsSerially(
    const std::vector<TestSpec>& specs, folly::AsyncBase::PollMode pollMode) {
  auto aioReader = getAIO<TAsync>(1, pollMode);
  SKIP_IF(!aioReader) << "TAsync not available";

  typename TAsync::Op op;
  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };

  for (size_t i = 0; i < specs.size(); i++) {
    auto buf = TestUtil::allocateAligned(specs[i].size);
    op.pread(fd, buf.get(), specs[i].size, specs[i].start);
    aioReader->submit(&op);
    EXPECT_EQ((i + 1), aioReader->totalSubmits());
    EXPECT_EQ(aioReader->pending(), 1);
    auto ops =
        test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get());
    EXPECT_EQ(1, ops.size());
    EXPECT_TRUE(ops[0] == &op);
    EXPECT_EQ(aioReader->pending(), 0);
    ssize_t res = op.result();
    EXPECT_LE(0, res) << folly::errnoStr(-res);
    EXPECT_EQ(specs[i].size, res);
    op.reset();
  }
}

template <typename TAsync>
void testReadsParallel(
    const std::vector<TestSpec>& specs,
    folly::AsyncBase::PollMode pollMode,
    bool multithreaded) {
  auto aioReader = getAIO<TAsync>(specs.size(), pollMode);
  SKIP_IF(!aioReader) << "TAsync not available";

  std::unique_ptr<typename TAsync::Op[]> ops(new
                                             typename TAsync::Op[specs.size()]);
  uintptr_t sizeOf = sizeof(typename TAsync::Op);
  std::vector<TestUtil::ManagedBuffer> bufs;
  bufs.reserve(specs.size());

  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };

  std::vector<std::thread> threads;
  if (multithreaded) {
    threads.reserve(specs.size());
  }
  for (size_t i = 0; i < specs.size(); i++) {
    bufs.push_back(TestUtil::allocateAligned(specs[i].size));
  }
  auto submit = [&](size_t i) {
    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
    aioReader->submit(&ops[i]);
  };
  for (size_t i = 0; i < specs.size(); i++) {
    if (multithreaded) {
      threads.emplace_back([&submit, i] { submit(i); });
    } else {
      submit(i);
    }
  }
  for (auto& t : threads) {
    t.join();
  }
  std::vector<bool> pending(specs.size(), true);

  size_t remaining = specs.size();
  while (remaining != 0) {
    EXPECT_EQ(remaining, aioReader->pending());
    auto completed =
        test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get());
    size_t nrRead = completed.size();
    remaining -= nrRead;

    for (size_t i = 0; i < nrRead; i++) {
      int id = (reinterpret_cast<uintptr_t>(completed[i]) -
                reinterpret_cast<uintptr_t>(ops.get())) /
          sizeOf;
      EXPECT_GE(id, 0);
      EXPECT_LT(id, specs.size());
      EXPECT_TRUE(pending[id]);
      pending[id] = false;
      ssize_t res = ops[id].result();
      EXPECT_LE(0, res) << folly::errnoStr(-res);
      EXPECT_EQ(specs[id].size, res);
    }
  }
  EXPECT_EQ(specs.size(), aioReader->totalSubmits());

  EXPECT_EQ(aioReader->pending(), 0);
  for (size_t i = 0; i < pending.size(); i++) {
    EXPECT_FALSE(pending[i]);
  }
}

template <typename TAsync>
void testReadsQueued(
    const std::vector<TestSpec>& specs, folly::AsyncBase::PollMode pollMode) {
  size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
  auto aioReader = getAIO<TAsync>(readerCapacity, pollMode);
  SKIP_IF(!aioReader) << "TAsync not available";
  folly::AsyncBaseQueue aioQueue(aioReader.get());
  std::unique_ptr<typename TAsync::Op[]> ops(new
                                             typename TAsync::Op[specs.size()]);
  uintptr_t sizeOf = sizeof(typename TAsync::Op);
  std::vector<TestUtil::ManagedBuffer> bufs;

  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };
  for (size_t i = 0; i < specs.size(); i++) {
    bufs.push_back(TestUtil::allocateAligned(specs[i].size));
    ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
    aioQueue.submit(&ops[i]);
  }
  std::vector<bool> pending(specs.size(), true);

  size_t remaining = specs.size();
  while (remaining != 0) {
    if (remaining >= readerCapacity) {
      EXPECT_EQ(readerCapacity, aioReader->pending());
      EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
    } else {
      EXPECT_EQ(remaining, aioReader->pending());
      EXPECT_EQ(0, aioQueue.queued());
    }
    auto completed =
        test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get());
    size_t nrRead = completed.size();
    remaining -= nrRead;

    for (size_t i = 0; i < nrRead; i++) {
      int id = (reinterpret_cast<uintptr_t>(completed[i]) -
                reinterpret_cast<uintptr_t>(ops.get())) /
          sizeOf;
      EXPECT_GE(id, 0);
      EXPECT_LT(id, specs.size());
      EXPECT_TRUE(pending[id]);
      pending[id] = false;
      ssize_t res = ops[id].result();
      EXPECT_LE(0, res) << folly::errnoStr(-res);
      EXPECT_EQ(specs[id].size, res);
    }
  }
  EXPECT_EQ(specs.size(), aioReader->totalSubmits());
  EXPECT_EQ(aioReader->pending(), 0);
  EXPECT_EQ(aioQueue.queued(), 0);
  for (size_t i = 0; i < pending.size(); i++) {
    EXPECT_FALSE(pending[i]);
  }
}

template <typename TAsync>
void testReads(
    const std::vector<TestSpec>& specs, folly::AsyncBase::PollMode pollMode) {
  testReadsSerially<TAsync>(specs, pollMode);
  testReadsParallel<TAsync>(specs, pollMode, false);
  testReadsParallel<TAsync>(specs, pollMode, true);
  testReadsQueued<TAsync>(specs, pollMode);
}

template <typename T>
class AsyncTest : public ::testing::Test {};
TYPED_TEST_SUITE_P(AsyncTest);

TYPED_TEST_P(AsyncTest, ZeroAsyncDataNotPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, 0}}, folly::AsyncBase::NOT_POLLABLE);
}

TYPED_TEST_P(AsyncTest, ZeroAsyncDataPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, 0}}, folly::AsyncBase::POLLABLE);
}

TYPED_TEST_P(AsyncTest, SingleAsyncDataNotPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::NOT_POLLABLE);
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::NOT_POLLABLE);
}

TYPED_TEST_P(AsyncTest, SingleAsyncDataPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::POLLABLE);
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::POLLABLE);
}

TYPED_TEST_P(AsyncTest, MultipleAsyncDataNotPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        4 * test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::NOT_POLLABLE);
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        4 * test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::NOT_POLLABLE);

  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, 5 * 1024 * 1024},
       {test::async_base_test_lib_detail::kODirectAlign, 5 * 1024 * 1024}},
      folly::AsyncBase::NOT_POLLABLE);

  test::async_base_test_lib_detail::testReads<TypeParam>(
      {
          {test::async_base_test_lib_detail::kODirectAlign, 0},
          {test::async_base_test_lib_detail::kODirectAlign,
           test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign,
           2 * test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign,
           20 * test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign, 1024 * 1024},
      },
      folly::AsyncBase::NOT_POLLABLE);
}

TYPED_TEST_P(AsyncTest, MultipleAsyncDataPollable) {
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        4 * test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::POLLABLE);
  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        2 * test::async_base_test_lib_detail::kODirectAlign},
       {test::async_base_test_lib_detail::kODirectAlign,
        4 * test::async_base_test_lib_detail::kODirectAlign}},
      folly::AsyncBase::POLLABLE);

  test::async_base_test_lib_detail::testReads<TypeParam>(
      {{0, 5 * 1024 * 1024},
       {test::async_base_test_lib_detail::kODirectAlign, 5 * 1024 * 1024}},
      folly::AsyncBase::NOT_POLLABLE);

  test::async_base_test_lib_detail::testReads<TypeParam>(
      {
          {test::async_base_test_lib_detail::kODirectAlign, 0},
          {test::async_base_test_lib_detail::kODirectAlign,
           test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign,
           2 * test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign,
           20 * test::async_base_test_lib_detail::kODirectAlign},
          {test::async_base_test_lib_detail::kODirectAlign, 1024 * 1024},
      },
      folly::AsyncBase::NOT_POLLABLE);
}

TYPED_TEST_P(AsyncTest, ManyAsyncDataNotPollable) {
  {
    std::vector<test::async_base_test_lib_detail::TestSpec> v;
    for (int i = 0; i < 1000; i++) {
      v.push_back(
          {off_t(test::async_base_test_lib_detail::kODirectAlign * i),
           test::async_base_test_lib_detail::kODirectAlign});
    }
    test::async_base_test_lib_detail::testReads<TypeParam>(
        v, folly::AsyncBase::NOT_POLLABLE);
  }
}

TYPED_TEST_P(AsyncTest, ManyAsyncDataPollable) {
  {
    std::vector<test::async_base_test_lib_detail::TestSpec> v;
    for (int i = 0; i < 1000; i++) {
      v.push_back(
          {off_t(test::async_base_test_lib_detail::kODirectAlign * i),
           test::async_base_test_lib_detail::kODirectAlign});
    }
    test::async_base_test_lib_detail::testReads<TypeParam>(
        v, folly::AsyncBase::POLLABLE);
  }
}

TYPED_TEST_P(AsyncTest, NonBlockingWait) {
  TypeParam aioReader(1, folly::AsyncBase::NOT_POLLABLE);
  typename TypeParam::Op op;
  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };
  size_t size = 2 * test::async_base_test_lib_detail::kODirectAlign;
  auto buf = test::async_base_test_lib_detail::TestUtil::allocateAligned(size);
  op.pread(fd, buf.get(), size, 0);
  aioReader.submit(&op);
  EXPECT_EQ(aioReader.pending(), 1);

  folly::Range<folly::AsyncBase::Op**> completed;
  while (completed.empty()) {
    // poll without blocking until the read request completes.
    completed = aioReader.wait(0);
  }
  EXPECT_EQ(completed.size(), 1);

  EXPECT_TRUE(completed[0] == &op);
  ssize_t res = op.result();
  EXPECT_LE(0, res) << folly::errnoStr(-res);
  EXPECT_EQ(size, res);
  EXPECT_EQ(aioReader.pending(), 0);
}

TYPED_TEST_P(AsyncTest, Cancel) {
  constexpr size_t kNumOpsBatch1 = 10;
  constexpr size_t kNumOpsBatch2 = 10;

  TypeParam aioReader(
      kNumOpsBatch1 + kNumOpsBatch2, folly::AsyncBase::NOT_POLLABLE);
  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };

  size_t completed = 0;

  std::vector<std::unique_ptr<folly::AsyncBase::Op>> ops;
  std::vector<test::async_base_test_lib_detail::TestUtil::ManagedBuffer> bufs;
  const auto schedule = [&](size_t n) {
    for (size_t i = 0; i < n; ++i) {
      const size_t size = 2 * test::async_base_test_lib_detail::kODirectAlign;
      bufs.push_back(
          test::async_base_test_lib_detail::TestUtil::allocateAligned(size));

      ops.push_back(std::make_unique<typename TypeParam::Op>());
      auto& op = *ops.back();

      op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
      op.pread(fd, bufs.back().get(), size, 0);
      aioReader.submit(&op);
    }
  };

  // Mix completed and canceled operations for this test.
  // In order to achieve that, schedule in two batches and do partial
  // wait() after the first one.

  schedule(kNumOpsBatch1);
  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
  EXPECT_EQ(completed, 0);

  auto result = aioReader.wait(1);
  EXPECT_GE(result.size(), 1);
  EXPECT_EQ(completed, result.size());
  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());

  schedule(kNumOpsBatch2);
  EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
  EXPECT_EQ(completed, result.size());

  auto canceled = aioReader.cancel();
  EXPECT_EQ(canceled.size(), ops.size() - result.size());
  EXPECT_EQ(aioReader.pending(), 0);
  EXPECT_EQ(completed, result.size());

  size_t foundCompleted = 0;
  for (auto& op : ops) {
    if (op->state() == folly::AsyncBaseOp::State::COMPLETED) {
      ++foundCompleted;
    } else {
      EXPECT_TRUE(op->state() == folly::AsyncBaseOp::State::CANCELED) << *op;
    }
  }
  EXPECT_EQ(foundCompleted, completed);
}

// batch tests
template <typename T>
class AsyncBatchTest : public ::testing::Test {};
TYPED_TEST_SUITE_P(AsyncBatchTest);

TYPED_TEST_P(AsyncBatchTest, BatchRead) {
  TypeParam aioReader;
  auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
  if (fd == -1)
    fd = ::open(tempFile.path().c_str(), O_RDONLY);
  SKIP_IF(fd == -1) << "Tempfile can't be opened: " << folly::errnoStr(errno);
  SCOPE_EXIT {
    ::close(fd);
  };

  using OpPtr = folly::AsyncBaseOp*;
  std::unique_ptr<typename TypeParam::Op[]> ops(
      new typename TypeParam::Op[kBatchNumEntries]);

  std::unique_ptr<OpPtr[]> opPtrs(new OpPtr[kBatchNumEntries]);
  std::vector<folly::test::async_base_test_lib_detail::TestUtil::ManagedBuffer>
      bufs;

  bufs.reserve(kBatchNumEntries);

  size_t completed = 0;

  for (size_t i = 0; i < kBatchNumEntries; i++) {
    bufs.push_back(
        folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
            kBatchBlockSize));
    auto& op = ops[i];
    opPtrs[i] = &op;

    op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
    op.pread(fd, bufs[i].get(), kBatchBlockSize, i * kBatchBlockSize);
  }
  aioReader.submit(
      Range<AsyncBase::Op**>(opPtrs.get(), opPtrs.get() + kBatchNumEntries));
  aioReader.wait(kBatchNumEntries);
  CHECK_EQ(completed, kBatchNumEntries);
}

} // namespace async_base_test_lib_detail
} // namespace test
} // namespace folly