/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/logging/AsyncFileWriter.h>
#ifndef _WIN32
#include <sys/wait.h>
#endif
#include <thread>
#include <folly/Conv.h>
#include <folly/Exception.h>
#include <folly/File.h>
#include <folly/FileUtil.h>
#include <folly/String.h>
#include <folly/Synchronized.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <folly/init/Init.h>
#include <folly/lang/SafeAssert.h>
#include <folly/logging/Init.h>
#include <folly/logging/LoggerDB.h>
#include <folly/logging/xlog.h>
#include <folly/portability/Config.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GMock.h>
#include <folly/portability/GTest.h>
#include <folly/portability/Unistd.h>
#include <folly/system/ThreadId.h>
#include <folly/system/ThreadName.h>
#include <folly/test/TestUtils.h>
#include <folly/testing/TestUtil.h>
DEFINE_int64(
async_discard_num_normal_writers,
30,
"number of threads to use to generate normal log messages during "
"the AsyncFileWriter.discard test");
DEFINE_int64(
async_discard_num_nodiscard_writers,
2,
"number of threads to use to generate non-discardable log messages during "
"the AsyncFileWriter.discard test");
DEFINE_int64(
async_discard_read_sleep_usec,
500,
"how long the read thread should sleep between reads in "
"the AsyncFileWriter.discard test");
DEFINE_int64(
async_discard_timeout_msec,
10000,
"A timeout for the AsyncFileWriter.discard test if it cannot generate "
"enough discards");
DEFINE_int64(
async_discard_num_events,
10,
"The number of discard events to wait for in the AsyncFileWriter.discard "
"test");
using namespace folly;
using namespace std::literals::chrono_literals;
using folly::test::TemporaryFile;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
using testing::ContainsRegex;
TEST(AsyncFileWriter, noMessages) {
TemporaryFile tmpFile{"logging_test"};
// Test the simple construction and destruction of an AsyncFileWriter
// without ever writing any messages. This still exercises the I/O
// thread start-up and shutdown code.
AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
}
TEST(AsyncFileWriter, simpleMessages) {
TemporaryFile tmpFile{"logging_test"};
{
AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
for (int n = 0; n < 10; ++n) {
writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
std::this_thread::yield();
}
}
tmpFile.close();
std::string data;
auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
ASSERT_TRUE(ret);
std::string expected =
"message 0\n"
"message 1\n"
"message 2\n"
"message 3\n"
"message 4\n"
"message 5\n"
"message 6\n"
"message 7\n"
"message 8\n"
"message 9\n";
EXPECT_EQ(expected, data);
}
namespace {
static std::vector<std::string>* internalWarnings;
void handleLoggingError(
StringPiece /* file */, int /* lineNumber */, std::string&& msg) {
internalWarnings->emplace_back(std::move(msg));
}
} // namespace
TEST(AsyncFileWriter, ioError) {
// Set the LoggerDB internal warning handler so we can record the messages
std::vector<std::string> logErrors;
internalWarnings = &logErrors;
LoggerDB::setInternalWarningHandler(handleLoggingError);
// Create an AsyncFileWriter that refers to a pipe whose read end is closed
std::array<int, 2> fds;
auto rc = pipe(fds.data());
folly::checkUnixError(rc, "failed to create pipe");
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
::close(fds[0]);
// Log a bunch of messages to the writer
size_t numMessages = 100;
{
AsyncFileWriter writer{folly::File{fds[1], true}};
for (size_t n = 0; n < numMessages; ++n) {
writer.writeMessage(folly::to<std::string>("message ", n, "\n"));
std::this_thread::yield();
}
}
LoggerDB::setInternalWarningHandler(nullptr);
// AsyncFileWriter should have some internal warning messages about the
// log failures. This will generally be many fewer than the number of
// messages we wrote, though, since it performs write batching.
//
// GTest on Windows doesn't support alternation in the regex syntax -_-....
const std::string kExpectedErrorMessage =
#ifdef _WIN32
// The `pipe` call above is actually implemented via sockets, so we get
// a different error message.
"An established connection was aborted by the software in your host machine\\.";
#else
"Broken pipe";
#endif
for (const auto& msg : logErrors) {
EXPECT_THAT(
msg,
ContainsRegex(
"error writing to log file .* in AsyncFileWriter.*: " +
kExpectedErrorMessage));
}
EXPECT_GT(logErrors.size(), 0);
EXPECT_LE(logErrors.size(), numMessages);
}
namespace {
size_t fillUpPipe(int fd) {
int flags = fcntl(fd, F_GETFL);
folly::checkUnixError(flags, "failed get file descriptor flags");
auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
std::vector<char> data;
data.resize(4000);
size_t totalBytes = 0;
size_t bytesToWrite = data.size();
while (true) {
auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
if (bytesWritten < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// We blocked. Keep trying smaller writes, until we get down to a
// single byte, just to make sure the logging code really won't be able
// to write anything to the pipe.
if (bytesToWrite <= 1) {
break;
} else {
bytesToWrite /= 2;
}
} else {
throwSystemError("error writing to pipe");
}
} else {
totalBytes += bytesWritten;
}
}
XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
return totalBytes;
}
} // namespace
TEST(AsyncFileWriter, flush) {
// Set up a pipe(), then write data to the write endpoint until it fills up
// and starts blocking.
std::array<int, 2> fds;
auto rc = pipe(fds.data());
folly::checkUnixError(rc, "failed to create pipe");
File readPipe{fds[0], true};
File writePipe{fds[1], true};
auto paddingSize = fillUpPipe(writePipe.fd());
// Now set up an AsyncFileWriter pointing at the write end of the pipe
AsyncFileWriter writer{std::move(writePipe)};
// Write a message
writer.writeMessage("test message: " + std::string(200, 'x'));
// Call flush(). Use a separate thread, since this should block until we
// consume data from the pipe.
Promise<Unit> promise;
auto future = promise.getFuture();
auto flushFunction = [&] { writer.flush(); };
std::thread flushThread{
[&]() { promise.setTry(makeTryWith(flushFunction)); }};
// Detach the flush thread now rather than joining it at the end of the
// function. This way if something goes wrong during the test we will fail
// with the real error, rather than crashing due to the std::thread
// destructor running on a still-joinable thread.
flushThread.detach();
// Sleep briefly, and make sure flush() still hasn't completed.
// If it has completed this doesn't necessarily indicate a bug in
// AsyncFileWriter, but instead indicates that our test code failed to
// successfully cause a blocking write.
/* sleep override */
std::this_thread::sleep_for(10ms);
EXPECT_FALSE(future.isReady());
// Now read from the pipe
std::vector<char> buf;
buf.resize(paddingSize);
auto bytesRead = readFull(readPipe.fd(), buf.data(), buf.size());
EXPECT_EQ(bytesRead, paddingSize);
// Make sure flush completes successfully now
std::move(future).get(50ms);
}
// A large-ish message suffix, just to consume space and help fill up
// log buffers faster.
static constexpr StringPiece kMsgSuffix{
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
namespace {
std::atomic<size_t> totalDiscarded;
void discardCallback(size_t n) {
totalDiscarded += n;
}
} // namespace
class ReadStats {
public:
ReadStats()
: deadline_{steady_clock::now() + milliseconds{FLAGS_async_discard_timeout_msec}},
readSleepUS_{static_cast<uint64_t>(
std::min(int64_t{0}, FLAGS_async_discard_read_sleep_usec))} {}
void clearSleepDuration() { readSleepUS_.store(0); }
std::chrono::microseconds getSleepUS() const {
return std::chrono::microseconds{readSleepUS_.load()};
}
bool shouldWriterStop() const {
// Stop after we have seen the required number of separate discard events.
// We stop based on discardEventsSeen_ rather than numDiscarded_ since this
// ensures the async writer blocks and then makes progress again multiple
// times.
if (FLAGS_async_discard_num_events > 0 &&
discardEventsSeen_.load() >
static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
return true;
}
// Stop after a timeout, even if we don't hit the number of requested
// discards.
return steady_clock::now() > deadline_;
}
void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
auto map = perThreadWriteData_.wlock();
FOLLY_SAFE_CHECK(
map->find(threadID) == map->end(),
"multiple writer threads with same ID");
auto& data = (*map)[threadID];
data.numMessagesWritten = messagesWritten;
data.flags = flags;
}
void check(size_t nDiscarded) {
auto writeDataMap = perThreadWriteData_.wlock();
EXPECT_EQ("", trailingData_);
EXPECT_EQ(0, numUnableToParse_);
EXPECT_EQ(0, numOutOfOrder_);
// Check messages received from each writer thread
size_t readerStatsChecked = 0;
size_t totalMessagesWritten = 0;
size_t totalMessagesRead = 0;
for (const auto& writeEntry : *writeDataMap) {
const auto& writeInfo = writeEntry.second;
totalMessagesWritten += writeInfo.numMessagesWritten;
auto iter = perThreadReadData_.find(writeEntry.first);
if (iter == perThreadReadData_.end()) {
// We never received any messages from this writer thread.
// This is okay as long as this is not a NEVER_DISCARD writer.
EXPECT_EQ(0, writeInfo.flags);
continue;
}
const auto& readInfo = iter->second;
++readerStatsChecked;
totalMessagesRead += readInfo.numMessagesRead;
if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
// Non-discarding threads should never discard anything
EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
} else {
// Other threads may have discarded some messages
EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
}
}
EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
// This test is intended to check the discard behavior.
// Fail the test if we didn't actually trigger any discards before we timed
// out.
EXPECT_GT(numDiscarded_, 0);
EXPECT_EQ(nDiscarded, numDiscarded_);
XLOG(DBG1) << totalMessagesWritten << " messages written, "
<< totalMessagesRead << " messages read, " << numDiscarded_
<< " messages discarded";
}
void messageReceived(StringPiece msg) {
if (msg.endsWith(" log messages discarded: "
"logging faster than we can write")) {
auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
XLOG(DBG3, "received discard notification: ", discardCount);
numDiscarded_ += discardCount;
++discardEventsSeen_;
return;
}
size_t threadID = 0;
size_t messageIndex = 0;
try {
parseMessage(msg, &threadID, &messageIndex);
} catch (const std::exception&) {
++numUnableToParse_;
XLOG(ERR, "unable to parse log message: ", msg);
return;
}
auto& data = perThreadReadData_[threadID];
data.numMessagesRead++;
if (messageIndex > data.lastId) {
data.lastId = messageIndex;
} else {
++numOutOfOrder_;
XLOG(ERR) << "received out-of-order messages from writer " << threadID
<< ": " << messageIndex << " received after " << data.lastId;
}
}
void trailingData(StringPiece data) { trailingData_ = data.str(); }
private:
struct ReaderData {
size_t numMessagesRead{0};
size_t lastId{0};
};
struct WriterData {
size_t numMessagesWritten{0};
int flags{0};
};
void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
// Validate and strip off the message prefix and suffix
constexpr StringPiece prefix{"thread "};
if (!msg.startsWith(prefix)) {
throw std::runtime_error("bad message prefix");
}
msg.advance(prefix.size());
if (!msg.endsWith(kMsgSuffix)) {
throw std::runtime_error("bad message suffix");
}
msg.subtract(kMsgSuffix.size());
// Parse then strip off the thread index
auto threadIDEnd = msg.find(' ');
if (threadIDEnd == StringPiece::npos) {
throw std::runtime_error("no middle found");
}
*threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
msg.advance(threadIDEnd);
// Validate that the middle of the message is what we expect,
// then strip it off
constexpr StringPiece middle{" message "};
if (!msg.startsWith(middle)) {
throw std::runtime_error("bad message middle");
}
msg.advance(middle.size());
// Parse the message index
*messageIndex = folly::to<size_t>(msg);
}
/**
* Data about each writer thread, as recorded by the reader thread.
*
* At the end of the test we will compare perThreadReadData_ (recorded by the
* reader) with perThreadWriteData_ (recorded by the writers) to make sure
* the data matches up.
*
* This is a map from writer_thread_id to ReaderData.
* The writer_thread_id is extracted from the received messages.
*
* This field does not need locking as it is only updated by the single
* reader thread.
*/
std::unordered_map<size_t, ReaderData> perThreadReadData_;
/*
* Additional information recorded by the reader thread.
*/
std::string trailingData_;
size_t numUnableToParse_{0};
size_t numOutOfOrder_{0};
size_t numDiscarded_{0};
/**
* deadline_ is a maximum end time for the test.
*
* The writer threads quit if the deadline is reached even if they have not
* produced the desired number of discard events yet.
*/
const std::chrono::steady_clock::time_point deadline_;
/**
* How long the reader thread should sleep between each read event.
*
* This is initially set to a non-zero value (read from the
* FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
* slowly, which will fill up the pipe buffer and cause discard events.
*
* Once we have produce enough discards and are ready to finish the test the
* main thread reduces readSleepUS_ to 0, so the reader will finish the
* remaining message backlog quickly.
*/
std::atomic<uint64_t> readSleepUS_{0};
/**
* A count of how many discard events have been seen so far.
*
* The reader increments discardEventsSeen_ each time it sees a discard
* notification message. A "discard event" basically corresponds to a single
* group of dropped messages. Once the reader pulls some messages off out of
* the pipe the writers should be able to send more data, but the buffer will
* eventually fill up again, producing another discard event.
*/
std::atomic<uint64_t> discardEventsSeen_{0};
/**
* Data about each writer thread, as recorded by the writers.
*
* When each writer thread finishes it records how many messages it wrote,
* plus the flags it used to write the messages.
*/
folly::Synchronized<std::unordered_map<size_t, WriterData>>
perThreadWriteData_;
};
/**
* readThread() reads messages slowly from a pipe. This helps test the
* AsyncFileWriter behavior when I/O is slow.
*/
void readThread(folly::File&& file, ReadStats* stats) {
std::vector<char> buffer;
buffer.resize(1024);
size_t bufferIdx = 0;
while (true) {
/* sleep override */
std::this_thread::sleep_for(stats->getSleepUS());
auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
if (readResult < 0) {
XLOG(ERR, "error reading from pipe: ", errno);
return;
}
if (readResult == 0) {
XLOG(DBG2, "read EOF");
break;
}
auto logDataLen = bufferIdx + readResult;
StringPiece logData{buffer.data(), logDataLen};
auto idx = 0;
while (true) {
auto end = logData.find('\n', idx);
if (end == StringPiece::npos) {
bufferIdx = logDataLen - idx;
memmove(buffer.data(), buffer.data() + idx, bufferIdx);
break;
}
StringPiece logMsg{logData.data() + idx, end - idx};
stats->messageReceived(logMsg);
idx = end + 1;
}
}
if (bufferIdx != 0) {
stats->trailingData(StringPiece{buffer.data(), bufferIdx});
}
}
/**
* writeThread() writes a series of messages to the AsyncFileWriter
*/
void writeThread(
AsyncFileWriter* writer, size_t id, uint32_t flags, ReadStats* readStats) {
size_t msgID = 0;
while (true) {
++msgID;
writer->writeMessage(
folly::to<std::string>(
"thread ", id, " message ", msgID, kMsgSuffix, '\n'),
flags);
// Break out once the reader has seen enough discards
if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
readStats->writerFinished(id, msgID, flags);
break;
}
}
}
/*
* The discard test spawns a number of threads that each write a large number
* of messages quickly. The AsyncFileWriter writes to a pipe, an a separate
* thread reads from it slowly, causing a backlog to build up.
*
* The test then checks that:
* - The read thread always receives full messages (no partial log messages)
* - Messages that are received are received in order
* - The number of messages received plus the number reported in discard
* notifications matches the number of messages sent.
*/
TEST(AsyncFileWriter, discard) {
std::array<int, 2> fds;
auto pipeResult = pipe(fds.data());
folly::checkUnixError(pipeResult, "pipe failed");
folly::File readPipe{fds[0], true};
folly::File writePipe{fds[1], true};
AsyncFileWriter::setDiscardCallback(discardCallback);
ReadStats readStats;
std::thread reader(readThread, std::move(readPipe), &readStats);
{
AsyncFileWriter writer{std::move(writePipe)};
std::vector<std::thread> writeThreads;
size_t numThreads = FLAGS_async_discard_num_normal_writers +
FLAGS_async_discard_num_nodiscard_writers;
for (size_t n = 0; n < numThreads; ++n) {
uint32_t flags = 0;
if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
flags = LogWriter::NEVER_DISCARD;
}
XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
}
for (auto& t : writeThreads) {
t.join();
}
XLOG(DBG2, "writers done");
}
// Clear the read sleep duration so the reader will finish quickly now
readStats.clearSleepDuration();
reader.join();
readStats.check(totalDiscarded);
AsyncFileWriter::setDiscardCallback(nullptr);
}
#ifndef _WIN32
/**
* Test that AsyncFileWriter operates correctly after a fork() in both the
* parent and child processes.
*/
TEST(AsyncFileWriter, fork) {
#if FOLLY_HAVE_PTHREAD_ATFORK
SKIP_IF(folly::kIsSanitizeThread) << "Not supported for TSAN";
TemporaryFile tmpFile{"logging_test"};
// The number of messages to send before the fork and from each process
constexpr size_t numMessages = 10;
constexpr size_t numBgThreads = 2;
// This can be increased to add some delay in the parent and child messages
// so that they are likely to be interleaved in the log rather than grouped
// together. This doesn't really affect the test behavior or correctness
// otherwise, though.
constexpr milliseconds sleepDuration(0);
{
AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
writer.writeMessage(folly::to<std::string>("parent pid=", getpid(), "\n"));
// Start some background threads just to exercise the behavior
// when other threads are also logging to the writer when the fork occurs
std::vector<std::thread> bgThreads;
std::atomic<bool> stop{false};
for (size_t n = 0; n < numBgThreads; ++n) {
bgThreads.emplace_back([&] {
size_t iter = 0;
while (!stop) {
writer.writeMessage(
folly::to<std::string>("bgthread_", getpid(), "_", iter, "\n"));
++iter;
}
});
}
for (size_t n = 0; n < numMessages; ++n) {
writer.writeMessage(folly::to<std::string>("prefork", n, "\n"));
}
auto pid = fork();
folly::checkUnixError(pid, "failed to fork");
if (pid == 0) {
writer.writeMessage(folly::to<std::string>("child pid=", getpid(), "\n"));
for (size_t n = 0; n < numMessages; ++n) {
writer.writeMessage(folly::to<std::string>("child", n, "\n"));
std::this_thread::sleep_for(sleepDuration);
}
// Use _exit() rather than exit() in the child, purely to prevent
// ASAN from complaining that we leak memory for the background threads.
// (These threads don't actually exist in the child, so it is difficult
// to clean up their allocated state entirely.)
//
// Explicitly flush the writer since exiting with _exit() won't do this
// automatically.
writer.flush();
_exit(0);
}
for (size_t n = 0; n < numMessages; ++n) {
writer.writeMessage(folly::to<std::string>("parent", n, "\n"));
std::this_thread::sleep_for(sleepDuration);
}
// Stop the background threads.
stop = true;
for (auto& t : bgThreads) {
t.join();
}
int status;
auto waited = waitpid(pid, &status, 0);
folly::checkUnixError(waited, "failed to wait on child");
ASSERT_EQ(waited, pid);
}
// Read back the logged messages
tmpFile.close();
std::string data;
auto ret = folly::readFile(tmpFile.path().string().c_str(), data);
ASSERT_TRUE(ret) << "failed to read log file";
XLOG(DBG1) << "log contents:\n" << data;
// The log file should contain all of the messages we wrote, from both the
// parent and child processes.
for (size_t n = 0; n < numMessages; ++n) {
EXPECT_THAT(
data, ContainsRegex(folly::to<std::string>("prefork", n, "\n")));
EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("parent", n, "\n")));
EXPECT_THAT(data, ContainsRegex(folly::to<std::string>("child", n, "\n")));
}
#else
SKIP() << "pthread_atfork() is not supported on this platform";
#endif // FOLLY_HAVE_PTHREAD_ATFORK
}
/**
* Have several threads concurrently performing fork() calls while several
* other threads continuously create and destroy AsyncFileWriter objects.
*
* This exercises the synchronization around registration of the AtFork
* handlers and the creation/destruction of the AsyncFileWriter I/O thread.
*/
TEST(AsyncFileWriter, crazyForks) {
#if FOLLY_HAVE_PTHREAD_ATFORK
SKIP_IF(folly::kIsSanitizeThread) << "Not supported for TSAN";
constexpr size_t numAsyncWriterThreads = 10;
constexpr size_t numForkThreads = 5;
constexpr size_t numForkIterations = 20;
std::atomic<bool> stop{false};
// Spawn several threads that continuously create and destroy
// AsyncFileWriter objects.
std::vector<std::thread> asyncWriterThreads;
for (size_t n = 0; n < numAsyncWriterThreads; ++n) {
asyncWriterThreads.emplace_back([n, &stop] {
folly::setThreadName(folly::to<std::string>("async_", n));
TemporaryFile tmpFile{"logging_test"};
while (!stop) {
// Create an AsyncFileWriter, write a message to it, then destroy it.
AsyncFileWriter writer{folly::File{tmpFile.fd(), false}};
writer.writeMessage(folly::to<std::string>(
"async thread ", folly::getOSThreadID(), "\n"));
}
});
}
// Spawn several threads that repeatedly fork.
std::vector<std::thread> forkThreads;
std::mutex forkStartMutex;
std::condition_variable forkStartCV;
bool forkStart = false;
for (size_t n = 0; n < numForkThreads; ++n) {
forkThreads.emplace_back([n, &forkStartMutex, &forkStartCV, &forkStart] {
folly::setThreadName(folly::to<std::string>("fork_", n));
// Wait until forkStart is set just to have a better chance of all the
// fork threads running simultaneously.
{
std::unique_lock<std::mutex> l(forkStartMutex);
forkStartCV.wait(l, [&forkStart] { return forkStart; });
}
for (size_t i = 0; i < numForkIterations; ++i) {
XLOG(DBG3) << "fork " << n << ":" << i;
auto pid = fork();
folly::checkUnixError(pid, "forkFailed");
if (pid == 0) {
XLOG(DBG3) << "child " << getpid();
_exit(0);
}
// parent
int status;
auto waited = waitpid(pid, &status, 0);
folly::checkUnixError(waited, "failed to wait on child");
EXPECT_EQ(waited, pid);
}
});
}
// Kick off the fork threads
{
std::unique_lock<std::mutex> l(forkStartMutex);
forkStart = true;
}
forkStartCV.notify_all();
// Wait for the fork threads to finish
for (auto& t : forkThreads) {
t.join();
}
// Stop and wait for the AsyncFileWriter threads
stop = true;
for (auto& t : asyncWriterThreads) {
t.join();
}
#else
SKIP() << "pthread_atfork() is not supported on this platform";
#endif // FOLLY_HAVE_PTHREAD_ATFORK
}
#endif // !_WIN32