
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

#include <folly/logging/AsyncFileWriter.h>

#ifndef _WIN32
#include <sys/wait.h>

#include <thread>

#include <folly/Conv.h>
#include <folly/Exception.h>
#include <folly/File.h>
#include <folly/FileUtil.h>
#include <folly/String.h>
#include <folly/Synchronized.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <folly/init/Init.h>
#include <folly/lang/SafeAssert.h>
#include <folly/logging/Init.h>
#include <folly/logging/LoggerDB.h>
#include <folly/logging/xlog.h>
#include <folly/portability/Config.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <folly/portability/Unistd.h>
#include <folly/system/ThreadId.h>
#include <folly/system/ThreadName.h>
#include <folly/test/TestUtils.h>
#include <folly/testing/TestUtil.h>

    "number of threads to use to generate normal log messages during "
    "the AsyncFileWriter.discard test");
    "number of threads to use to generate non-discardable log messages during "
    "the AsyncFileWriter.discard test");
    "how long the read thread should sleep between reads in "
    "the AsyncFileWriter.discard test");
    "A timeout for the AsyncFileWriter.discard test if it cannot generate "
    "enough discards");
    "The number of discard events to wait for in the AsyncFileWriter.discard "

using namespace folly;
using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
using testing::ContainsRegex;

TEST(AsyncFileWriter, noMessages) {
  TemporaryFile tmpFile{"logging_test"};

  // Test the simple construction and destruction of an AsyncFileWriter
  // without ever writing any messages.  This still exercises the I/O
  // thread start-up and shutdown code.
  AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};

TEST(AsyncFileWriter, simpleMessages) {
  TemporaryFile tmpFile{"logging_test"};

    AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
    for (int n = 0; n < 10; ++n) {
      writer.writeMessage(folly::to<std::string>("message ", n, "\n"));

  std::string data;
  auto ret = folly::readFile(tmpFile.path().string().c_str(), data);

  std::string expected =
      "message 0\n"
      "message 1\n"
      "message 2\n"
      "message 3\n"
      "message 4\n"
      "message 5\n"
      "message 6\n"
      "message 7\n"
      "message 8\n"
      "message 9\n";
  EXPECT_EQ(expected, data);

namespace {
static std::vector<std::string>* internalWarnings;

void handleLoggingError(
    StringPiece /* file */, int /* lineNumber */, std::string&& msg) {
} // namespace

TEST(AsyncFileWriter, ioError) {
  // Set the LoggerDB internal warning handler so we can record the messages
  std::vector<std::string> logErrors;
  internalWarnings = &logErrors;

  // Create an AsyncFileWriter that refers to a pipe whose read end is closed
  std::array<int, 2> fds;
  auto rc = pipe(;
  folly::checkUnixError(rc, "failed to create pipe");
#ifndef _WIN32
  signal(SIGPIPE, SIG_IGN);

  // Log a bunch of messages to the writer
  size_t numMessages = 100;
    AsyncFileWriter writer{folly::File{fds[1], true}};
    for (size_t n = 0; n < numMessages; ++n) {
      writer.writeMessage(folly::to<std::string>("message ", n, "\n"));


  // AsyncFileWriter should have some internal warning messages about the
  // log failures.  This will generally be many fewer than the number of
  // messages we wrote, though, since it performs write batching.
  // GTest on Windows doesn't support alternation in the regex syntax -_-....
  const std::string kExpectedErrorMessage =
#ifdef _WIN32
      // The `pipe` call above is actually implemented via sockets, so we get
      // a different error message.
      "An established connection was aborted by the software in your host machine\\.";
      "Broken pipe";

  for (const auto& msg : logErrors) {
            "error writing to log file .* in AsyncFileWriter.*: " +
  EXPECT_GT(logErrors.size(), 0);
  EXPECT_LE(logErrors.size(), numMessages);

namespace {
size_t fillUpPipe(int fd) {
  int flags = fcntl(fd, F_GETFL);
  folly::checkUnixError(flags, "failed get file descriptor flags");
  auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
  std::vector<char> data;
  size_t totalBytes = 0;
  size_t bytesToWrite = data.size();
  while (true) {
    auto bytesWritten = writeNoInt(fd,, bytesToWrite);
    if (bytesWritten < 0) {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        // We blocked.  Keep trying smaller writes, until we get down to a
        // single byte, just to make sure the logging code really won't be able
        // to write anything to the pipe.
        if (bytesToWrite <= 1) {
        } else {
          bytesToWrite /= 2;
      } else {
        throwSystemError("error writing to pipe");
    } else {
      totalBytes += bytesWritten;
  XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");

  rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
  folly::checkUnixError(rc, "failed to put pipe back in blocking mode");

  return totalBytes;
} // namespace

TEST(AsyncFileWriter, flush) {
  // Set up a pipe(), then write data to the write endpoint until it fills up
  // and starts blocking.
  std::array<int, 2> fds;
  auto rc = pipe(;
  folly::checkUnixError(rc, "failed to create pipe");
  File readPipe{fds[0], true};
  File writePipe{fds[1], true};

  auto paddingSize = fillUpPipe(writePipe.fd());

  // Now set up an AsyncFileWriter pointing at the write end of the pipe
  AsyncFileWriter writer{std::move(writePipe)};

  // Write a message
  writer.writeMessage("test message: " + std::string(200, 'x'));

  // Call flush().  Use a separate thread, since this should block until we
  // consume data from the pipe.
  Promise<Unit> promise;
  auto future = promise.getFuture();
  auto flushFunction = [&] { writer.flush(); };
  std::thread flushThread{
      [&]() { promise.setTry(makeTryWith(flushFunction)); }};
  // Detach the flush thread now rather than joining it at the end of the
  // function.  This way if something goes wrong during the test we will fail
  // with the real error, rather than crashing due to the std::thread
  // destructor running on a still-joinable thread.

  // Sleep briefly, and make sure flush() still hasn't completed.
  // If it has completed this doesn't necessarily indicate a bug in
  // AsyncFileWriter, but instead indicates that our test code failed to
  // successfully cause a blocking write.
  /* sleep override */

  // Now read from the pipe
  std::vector<char> buf;
  auto bytesRead = readFull(readPipe.fd(),, buf.size());
  EXPECT_EQ(bytesRead, paddingSize);

  // Make sure flush completes successfully now

// A large-ish message suffix, just to consume space and help fill up
// log buffers faster.
static constexpr StringPiece kMsgSuffix{

namespace {
std::atomic<size_t> totalDiscarded;
void discardCallback(size_t n) {
  totalDiscarded += n;
} // namespace

class ReadStats {
      : deadline_{steady_clock::now() + milliseconds{FLAGS_async_discard_timeout_msec}},
            std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}

  void clearSleepDuration() {; }
  std::chrono::microseconds getSleepUS() const {
    return std::chrono::microseconds{readSleepUS_.load()};

  bool shouldWriterStop() const {
    // Stop after we have seen the required number of separate discard events.
    // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
    // ensures the async writer blocks and then makes progress again multiple
    // times.
    if (FLAGS_async_discard_num_events > 0 &&
        discardEventsSeen_.load() >
            static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
      return true;

    // Stop after a timeout, even if we don't hit the number of requested
    // discards.
    return steady_clock::now() > deadline_;
  void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
    auto map = perThreadWriteData_.wlock();
        map->find(threadID) == map->end(),
        "multiple writer threads with same ID");
    auto& data = (*map)[threadID];
    data.numMessagesWritten = messagesWritten;
    data.flags = flags;

  void check(size_t nDiscarded) {
    auto writeDataMap = perThreadWriteData_.wlock();

    EXPECT_EQ("", trailingData_);
    EXPECT_EQ(0, numUnableToParse_);
    EXPECT_EQ(0, numOutOfOrder_);

    // Check messages received from each writer thread
    size_t readerStatsChecked = 0;
    size_t totalMessagesWritten = 0;
    size_t totalMessagesRead = 0;
    for (const auto& writeEntry : *writeDataMap) {
      const auto& writeInfo = writeEntry.second;
      totalMessagesWritten += writeInfo.numMessagesWritten;

      auto iter = perThreadReadData_.find(writeEntry.first);
      if (iter == perThreadReadData_.end()) {
        // We never received any messages from this writer thread.
        // This is okay as long as this is not a NEVER_DISCARD writer.
        EXPECT_EQ(0, writeInfo.flags);
      const auto& readInfo = iter->second;
      totalMessagesRead += readInfo.numMessagesRead;
      if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
        // Non-discarding threads should never discard anything
        EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
        EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
      } else {
        // Other threads may have discarded some messages
        EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
        EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);

    EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
    EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());

    // This test is intended to check the discard behavior.
    // Fail the test if we didn't actually trigger any discards before we timed
    // out.
    EXPECT_GT(numDiscarded_, 0);
    EXPECT_EQ(nDiscarded, numDiscarded_);

    XLOG(DBG1) << totalMessagesWritten << " messages written, "
               << totalMessagesRead << " messages read, " << numDiscarded_
               << " messages discarded";

  void messageReceived(StringPiece msg) {
    if (msg.endsWith(" log messages discarded: "
                     "logging faster than we can write")) {
      auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
      XLOG(DBG3, "received discard notification: ", discardCount);
      numDiscarded_ += discardCount;

    size_t threadID = 0;
    size_t messageIndex = 0;
    try {
      parseMessage(msg, &threadID, &messageIndex);
    } catch (const std::exception&) {
      XLOG(ERR, "unable to parse log message: ", msg);

    auto& data = perThreadReadData_[threadID];
    if (messageIndex > data.lastId) {
      data.lastId = messageIndex;
    } else {
      XLOG(ERR) << "received out-of-order messages from writer " << threadID
                << ": " << messageIndex << " received after " << data.lastId;

  void trailingData(StringPiece data) { trailingData_ = data.str(); }

  struct ReaderData {
    size_t numMessagesRead{0};
    size_t lastId{0};
  struct WriterData {
    size_t numMessagesWritten{0};
    int flags{0};

  void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
    // Validate and strip off the message prefix and suffix
    constexpr StringPiece prefix{"thread "};
    if (!msg.startsWith(prefix)) {
      throw std::runtime_error("bad message prefix");
    if (!msg.endsWith(kMsgSuffix)) {
      throw std::runtime_error("bad message suffix");

    // Parse then strip off the thread index
    auto threadIDEnd = msg.find(' ');
    if (threadIDEnd == StringPiece::npos) {
      throw std::runtime_error("no middle found");
    *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));

    // Validate that the middle of the message is what we expect,
    // then strip it off
    constexpr StringPiece middle{" message "};
    if (!msg.startsWith(middle)) {
      throw std::runtime_error("bad message middle");

    // Parse the message index
    *messageIndex = folly::to<size_t>(msg);

   * Data about each writer thread, as recorded by the reader thread.
   * At the end of the test we will compare perThreadReadData_ (recorded by the
   * reader) with perThreadWriteData_ (recorded by the writers) to make sure
   * the data matches up.
   * This is a map from writer_thread_id to ReaderData.
   * The writer_thread_id is extracted from the received messages.
   * This field does not need locking as it is only updated by the single
   * reader thread.
  std::unordered_map<size_t, ReaderData> perThreadReadData_;

   * Additional information recorded by the reader thread.
  std::string trailingData_;
  size_t numUnableToParse_{0};
  size_t numOutOfOrder_{0};
  size_t numDiscarded_{0};

   * deadline_ is a maximum end time for the test.
   * The writer threads quit if the deadline is reached even if they have not
   * produced the desired number of discard events yet.
  const std::chrono::steady_clock::time_point deadline_;

   * How long the reader thread should sleep between each read event.
   * This is initially set to a non-zero value (read from the
   * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
   * slowly, which will fill up the pipe buffer and cause discard events.
   * Once we have produce enough discards and are ready to finish the test the
   * main thread reduces readSleepUS_ to 0, so the reader will finish the
   * remaining message backlog quickly.
  std::atomic<uint64_t> readSleepUS_{0};

   * A count of how many discard events have been seen so far.
   * The reader increments discardEventsSeen_ each time it sees a discard
   * notification message.  A "discard event" basically corresponds to a single
   * group of dropped messages.  Once the reader pulls some messages off out of
   * the pipe the writers should be able to send more data, but the buffer will
   * eventually fill up again, producing another discard event.
  std::atomic<uint64_t> discardEventsSeen_{0};

   * Data about each writer thread, as recorded by the writers.
   * When each writer thread finishes it records how many messages it wrote,
   * plus the flags it used to write the messages.
  folly::Synchronized<std::unordered_map<size_t, WriterData>>

 * readThread() reads messages slowly from a pipe.  This helps test the
 * AsyncFileWriter behavior when I/O is slow.
void readThread(folly::File&& file, ReadStats* stats) {
  std::vector<char> buffer;

  size_t bufferIdx = 0;
  while (true) {
    /* sleep override */

    auto readResult = folly::readNoInt(
        file.fd(), + bufferIdx, buffer.size() - bufferIdx);
    if (readResult < 0) {
      XLOG(ERR, "error reading from pipe: ", errno);
    if (readResult == 0) {
      XLOG(DBG2, "read EOF");

    auto logDataLen = bufferIdx + readResult;
    StringPiece logData{, logDataLen};
    auto idx = 0;
    while (true) {
      auto end = logData.find('\n', idx);
      if (end == StringPiece::npos) {
        bufferIdx = logDataLen - idx;
        memmove(, + idx, bufferIdx);

      StringPiece logMsg{ + idx, end - idx};
      idx = end + 1;

  if (bufferIdx != 0) {
    stats->trailingData(StringPiece{, bufferIdx});

 * writeThread() writes a series of messages to the AsyncFileWriter
void writeThread(
    AsyncFileWriter* writer, size_t id, uint32_t flags, ReadStats* readStats) {
  size_t msgID = 0;
  while (true) {
            "thread ", id, " message ", msgID, kMsgSuffix, '\n'),

    // Break out once the reader has seen enough discards
    if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
      readStats->writerFinished(id, msgID, flags);

 * The discard test spawns a number of threads that each write a large number
 * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
 * thread reads from it slowly, causing a backlog to build up.
 * The test then checks that:
 * - The read thread always receives full messages (no partial log messages)
 * - Messages that are received are received in order
 * - The number of messages received plus the number reported in discard
 *   notifications matches the number of messages sent.

TEST(AsyncFileWriter, discard) {
  std::array<int, 2> fds;
  auto pipeResult = pipe(;
  folly::checkUnixError(pipeResult, "pipe failed");
  folly::File readPipe{fds[0], true};
  folly::File writePipe{fds[1], true};


  ReadStats readStats;
  std::thread reader(readThread, std::move(readPipe), &readStats);
    AsyncFileWriter writer{std::move(writePipe)};

    std::vector<std::thread> writeThreads;
    size_t numThreads = FLAGS_async_discard_num_normal_writers +

    for (size_t n = 0; n < numThreads; ++n) {
      uint32_t flags = 0;
      if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
        flags = LogWriter::NEVER_DISCARD;
      XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);

      writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);

    for (auto& t : writeThreads) {
    XLOG(DBG2, "writers done");
  // Clear the read sleep duration so the reader will finish quickly now


#ifndef _WIN32
 * Test that AsyncFileWriter operates correctly after a fork() in both the
 * parent and child processes.
TEST(AsyncFileWriter, fork) {
  SKIP_IF(folly::kIsSanitizeThread) << "Not supported for TSAN";

  TemporaryFile tmpFile{"logging_test"};

  // The number of messages to send before the fork and from each process
  constexpr size_t numMessages = 10;
  constexpr size_t numBgThreads = 2;

  // This can be increased to add some delay in the parent and child messages
  // so that they are likely to be interleaved in the log rather than grouped
  // together.  This doesn't really affect the test behavior or correctness
  // otherwise, though.
  constexpr milliseconds sleepDuration(0);

    AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
    writer.writeMessage(folly::to<std::string>("parent pid=", getpid(), "\n"));

    // Start some background threads just to exercise the behavior
    // when other threads are also logging to the writer when the fork occurs
    std::vector<std::thread> bgThreads;
    std::atomic<bool> stop{false};
    for (size_t n = 0; n < numBgThreads; ++n) {
      bgThreads.emplace_back([&] {
        size_t iter = 0;
        while (!stop) {
              folly::to<std::string>("bgthread_", getpid(), "_", iter, "\n"));

    for (size_t n = 0; n < numMessages; ++n) {
      writer.writeMessage(folly::to<std::string>("prefork", n, "\n"));

    auto pid = fork();
    folly::checkUnixError(pid, "failed to fork");
    if (pid == 0) {
      writer.writeMessage(folly::to<std::string>("child pid=", getpid(), "\n"));
      for (size_t n = 0; n < numMessages; ++n) {
        writer.writeMessage(folly::to<std::string>("child", n, "\n"));

      // Use _exit() rather than exit() in the child, purely to prevent
      // ASAN from complaining that we leak memory for the background threads.
      // (These threads don't actually exist in the child, so it is difficult
      // to clean up their allocated state entirely.)
      // Explicitly flush the writer since exiting with _exit() won't do this
      // automatically.

    for (size_t n = 0; n < numMessages; ++n) {
      writer.writeMessage(folly::to<std::string>("parent", n, "\n"));

    // Stop the background threads.
    stop = true;
    for (auto& t : bgThreads) {

    int status;
    auto waited = waitpid(pid, &status, 0);
    folly::checkUnixError(waited, "failed to wait on child");
    ASSERT_EQ(waited, pid);

  // Read back the logged messages
  std::string data;
  auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
  ASSERT_TRUE(ret) << "failed to read log file";

  XLOG(DBG1) << "log contents:\n" << data;

  // The log file should contain all of the messages we wrote, from both the
  // parent and child processes.
  for (size_t n = 0; n < numMessages; ++n) {
        data, ContainsRegex(folly::to<std::string>("prefork", n, "\n")));
    EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("parent", n, "\n")));
    EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("child", n, "\n")));
  SKIP() << "pthread_atfork() is not supported on this platform";

 * Have several threads concurrently performing fork() calls while several
 * other threads continuously create and destroy AsyncFileWriter objects.
 * This exercises the synchronization around registration of the AtFork
 * handlers and the creation/destruction of the AsyncFileWriter I/O thread.
TEST(AsyncFileWriter, crazyForks) {
  SKIP_IF(folly::kIsSanitizeThread) << "Not supported for TSAN";

  constexpr size_t numAsyncWriterThreads = 10;
  constexpr size_t numForkThreads = 5;
  constexpr size_t numForkIterations = 20;
  std::atomic<bool> stop{false};

  // Spawn several threads that continuously create and destroy
  // AsyncFileWriter objects.
  std::vector<std::thread> asyncWriterThreads;
  for (size_t n = 0; n < numAsyncWriterThreads; ++n) {
    asyncWriterThreads.emplace_back([n, &stop] {
      folly::setThreadName(folly::to<std::string>("async_", n));

      TemporaryFile tmpFile{"logging_test"};
      while (!stop) {
        // Create an AsyncFileWriter, write a message to it, then destroy it.
        AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
            "async thread ", folly::getOSThreadID(), "\n"));

  // Spawn several threads that repeatedly fork.
  std::vector<std::thread> forkThreads;
  std::mutex forkStartMutex;
  std::condition_variable forkStartCV;
  bool forkStart = false;
  for (size_t n = 0; n < numForkThreads; ++n) {
    forkThreads.emplace_back([n, &forkStartMutex, &forkStartCV, &forkStart] {
      folly::setThreadName(folly::to<std::string>("fork_", n));

      // Wait until forkStart is set just to have a better chance of all the
      // fork threads running simultaneously.
        std::unique_lock<std::mutex> l(forkStartMutex);
        forkStartCV.wait(l, [&forkStart] { return forkStart; });

      for (size_t i = 0; i < numForkIterations; ++i) {
        XLOG(DBG3) << "fork " << n << ":" << i;
        auto pid = fork();
        folly::checkUnixError(pid, "forkFailed");
        if (pid == 0) {
          XLOG(DBG3) << "child " << getpid();

        // parent
        int status;
        auto waited = waitpid(pid, &status, 0);
        folly::checkUnixError(waited, "failed to wait on child");
        EXPECT_EQ(waited, pid);

  // Kick off the fork threads
    std::unique_lock<std::mutex> l(forkStartMutex);
    forkStart = true;

  // Wait for the fork threads to finish
  for (auto& t : forkThreads) {

  // Stop and wait for the AsyncFileWriter threads
  stop = true;
  for (auto& t : asyncWriterThreads) {
  SKIP() << "pthread_atfork() is not supported on this platform";
#endif // !_WIN32