folly/folly/io/async/test/SimpleAsyncIOTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/experimental/io/SimpleAsyncIO.h>

#include <bitset>

#include <folly/File.h>
#include <folly/Random.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/io/IOBuf.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

#include <glog/logging.h>

using namespace folly;

class SimpleAsyncIOTest : public ::testing::TestWithParam<SimpleAsyncIO::Mode> {
 public:
  void SetUp() override { config_.setMode(GetParam()); }

  static std::string testTypeToString(
      testing::TestParamInfo<SimpleAsyncIO::Mode> const& setting) {
    switch (setting.param) {
      case SimpleAsyncIO::Mode::AIO:
        return "aio";
      case SimpleAsyncIO::Mode::IOURING:
        return "iouring";
    }
  }

 protected:
  SimpleAsyncIO::Config config_;
};

TEST_P(SimpleAsyncIOTest, WriteAndReadBack) {
  auto tmpfile = File::temporary();
  SimpleAsyncIO aio(config_);

  Baton done;
  int result;
  const std::string data("Green Room Rockers");

  aio.pwrite(
      tmpfile.fd(), data.data(), data.size(), 0, [&done, &result](int rc) {
        result = rc;
        done.post();
      });
  ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
  EXPECT_EQ(result, data.size());

  std::array<uint8_t, 128> buffer;
  done.reset();
  aio.pread(
      tmpfile.fd(), buffer.data(), buffer.size(), 0, [&done, &result](int rc) {
        result = rc;
        done.post();
      });
  ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
  EXPECT_EQ(result, data.size());
  EXPECT_EQ(memcmp(buffer.data(), data.data(), data.size()), 0);
}

std::string makeRandomBinaryString(size_t size) {
  std::string content;
  content.clear();
  while (content.size() < size) {
    content.append(std::bitset<8>(folly::Random::rand32()).to_string());
  }
  content.resize(size);
  return content;
}

TEST_P(SimpleAsyncIOTest, ChainedReads) {
  auto tmpfile = File::temporary();
  int fd = tmpfile.fd();
  Baton done;

  static const size_t chunkSize = 128;
  static const size_t numChunks = 1000;
  std::vector<std::unique_ptr<IOBuf>> writeChunks;
  std::vector<std::unique_ptr<IOBuf>> readChunks;
  std::atomic<uint32_t> completed = 0;

  for (size_t i = 0; i < numChunks; ++i) {
    writeChunks.push_back(IOBuf::copyBuffer(makeRandomBinaryString(chunkSize)));
    readChunks.push_back(IOBuf::create(chunkSize));
  }

  // allow for one read and one write for each chunk to be outstanding.
  SimpleAsyncIO aio(config_.setMaxRequests(numChunks * 2));
  for (size_t i = 0; i < numChunks; ++i) {
    aio.pwrite(
        fd,
        writeChunks[i]->data(),
        chunkSize,
        i * chunkSize,
        [fd, i, &readChunks, &aio, &done, &completed](int rc) {
          ASSERT_EQ(rc, chunkSize);
          aio.pread(
              fd,
              readChunks[i]->writableData(),
              chunkSize,
              i * chunkSize,
              [=, &done, &completed](int rc) {
                ASSERT_EQ(rc, chunkSize);
                if (++completed == numChunks) {
                  done.post();
                }
              });
        });
  }

  ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(60)));

  for (size_t i = 0; i < numChunks; ++i) {
    CHECK_EQ(
        memcmp(writeChunks[i]->data(), readChunks[i]->data(), chunkSize), 0);
  }
}

TEST_P(SimpleAsyncIOTest, DestroyWithPendingIO) {
  auto tmpfile = File::temporary();
  int fd = tmpfile.fd();
  std::atomic<uint32_t> completed = 0;
  static const size_t bufferSize = 128;
  static const size_t numWrites = 100;
  std::array<uint8_t, bufferSize> buffer;
  memset(buffer.data(), 0, buffer.size());

  // Slam out 100 writes and then destroy the SimpleAsyncIO instance
  // without waiting for them to complete.
  {
    SimpleAsyncIO aio(config_);
    for (size_t i = 0; i < numWrites; ++i) {
      aio.pwrite(
          fd, buffer.data(), bufferSize, i * bufferSize, [&completed](int rc) {
            ASSERT_EQ(rc, bufferSize);
            ++completed;
          });
    }
  }

  // Destructor should have blocked until all IO was done.
  ASSERT_EQ(completed, numWrites);
}

#if FOLLY_HAS_COROUTINES
static folly::coro::Task<folly::Unit> doCoAsyncWrites(
    SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
  std::vector<folly::coro::Task<int>> writes;

  for (int i = 0; i < copies; ++i) {
    writes.emplace_back(
        aio.co_pwrite(fd, data.data(), data.length(), data.length() * i));
  }

  auto results = co_await folly::coro::collectAllRange(std::move(writes));

  for (int result : results) {
    EXPECT_EQ(result, data.length());
  }
  co_return Unit{};
}

static folly::coro::Task<folly::Unit> doCoAsyncReads(
    SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
  std::vector<std::unique_ptr<char[]>> buffers;
  std::vector<folly::coro::Task<int>> reads;

  for (int i = 0; i < copies; ++i) {
    buffers.emplace_back(std::make_unique<char[]>(data.length()));

    reads.emplace_back(
        aio.co_pread(fd, buffers[i].get(), data.length(), data.length() * i));
  }

  auto results = co_await folly::coro::collectAllRange(std::move(reads));

  for (int i = 0; i < copies; ++i) {
    EXPECT_EQ(results[i], data.length());
    EXPECT_EQ(::memcmp(data.data(), buffers[i].get(), data.length()), 0);
  }
  co_return Unit{};
}

TEST_P(SimpleAsyncIOTest, CoroutineReadWrite) {
  auto tmpfile = File::temporary();
  int fd = tmpfile.fd();
  SimpleAsyncIO aio(config_);
  std::string testStr = "Uncle Touchy goes to college";
  folly::coro::blockingWait(doCoAsyncWrites(aio, fd, testStr, 10));
  folly::coro::blockingWait(doCoAsyncReads(aio, fd, testStr, 10));
}
#endif // FOLLY_HAS_COROUTINES

INSTANTIATE_TEST_SUITE_P(
    SimpleAsyncIOTests,
    SimpleAsyncIOTest,
    ::testing::Values(
        SimpleAsyncIO::Mode::AIO /*, SimpleAsyncIO::Mode::IOURING */),
    SimpleAsyncIOTest::testTypeToString);