folly/folly/io/async/test/NotificationQueueTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/io/async/NotificationQueue.h>

#include <sys/types.h>

#include <iostream>
#include <list>
#include <thread>

#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

#ifndef _WIN32
#include <sys/wait.h>
#endif

using namespace std;
using namespace folly;

typedef NotificationQueue<int> IntQueue;

class QueueConsumer : public IntQueue::Consumer {
 public:
  QueueConsumer() {}

  void messageAvailable(int&& value) noexcept override {
    messages.push_back(value);
    if (fn) {
      fn(value);
    }
  }

  std::function<void(int)> fn;
  std::deque<int> messages;
};

class QueueTest {
 public:
  explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
      : queue(maxSize, type), terminationQueue(maxSize, type) {}

  void sendOne();
  void putMessages();
  void multiConsumer();
  void maxQueueSize();
  void maxReadAtOnce();
  void destroyCallback();
  void useAfterFork();

  IntQueue queue;
  IntQueue terminationQueue;
};

void QueueTest::sendOne() {
  // Create a notification queue and a callback in this thread
  EventBase eventBase;

  QueueConsumer consumer;
  consumer.fn = [&](int) {
    // Stop consuming after we receive 1 message
    consumer.stopConsuming();
  };
  consumer.startConsuming(&eventBase, &queue);

  // Start a new EventBase thread to put a message on our queue
  ScopedEventBaseThread t1;
  t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); });

  // Loop until we receive the message
  eventBase.loop();

  const auto& messages = consumer.messages;
  EXPECT_EQ(1, messages.size());
  EXPECT_EQ(5, messages.at(0));
}

void QueueTest::putMessages() {
  EventBase eventBase;

  QueueConsumer consumer;
  QueueConsumer consumer2;
  consumer.fn = [&](int msg) {
    // Stop consuming after we receive a message with value 0, and start
    // consumer2
    if (msg == 0) {
      consumer.stopConsuming();
      consumer2.startConsuming(&eventBase, &queue);
    }
  };
  consumer2.fn = [&](int msg) {
    // Stop consuming after we receive a message with value 0
    if (msg == 0) {
      consumer2.stopConsuming();
    }
  };
  consumer.startConsuming(&eventBase, &queue);

  list<int> msgList = {1, 2, 3, 4};
  vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
  // Call putMessages() several times to add messages to the queue
  queue.putMessages(msgList.begin(), msgList.end());
  queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
  // Test sending 17 messages, the pipe-based queue calls write in 16 byte
  // chunks
  queue.putMessages(msgVector.begin(), msgVector.end());

  // Loop until the consumer has stopped
  eventBase.loop();

  vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
  vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
  EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
  for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
    EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
  }
  EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
  for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
    EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
  }
}

void QueueTest::multiConsumer() {
  uint32_t numConsumers = 8;
  uint32_t numMessages = 10000;

  // Create several consumers each running in their own EventBase thread
  vector<QueueConsumer> consumers(numConsumers);
  vector<ScopedEventBaseThread> threads(numConsumers);

  for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
    QueueConsumer* consumer = &consumers[consumerIdx];

    consumer->fn = [consumer, consumerIdx, this](int value) {
      // Treat 0 as a signal to stop.
      if (value == 0) {
        consumer->stopConsuming();
        // Put a message on the terminationQueue to indicate we have stopped
        terminationQueue.putMessage(consumerIdx);
      }
    };

    EventBase* eventBase = threads[consumerIdx].getEventBase();
    eventBase->runInEventBaseThread([eventBase, consumer, this] {
      consumer->startConsuming(eventBase, &queue);
    });
  }

  // Now add a number of messages from this thread
  // Start at 1 rather than 0, since 0 is the signal to stop.
  for (uint32_t n = 1; n < numMessages; ++n) {
    queue.putMessage(n);
  }
  // Now add a 0 for each consumer, to signal them to stop
  for (uint32_t n = 0; n < numConsumers; ++n) {
    queue.putMessage(0);
  }

  // Wait until we get notified that all of the consumers have stopped
  // We use a separate notification queue for this.
  QueueConsumer terminationConsumer;
  vector<uint32_t> consumersStopped(numConsumers, 0);
  uint32_t consumersRemaining = numConsumers;
  terminationConsumer.fn = [&](int consumerIdx) {
    --consumersRemaining;
    if (consumersRemaining == 0) {
      terminationConsumer.stopConsuming();
    }

    EXPECT_GE(consumerIdx, 0);
    EXPECT_LT(consumerIdx, numConsumers);
    ++consumersStopped[consumerIdx];
  };
  EventBase eventBase;
  terminationConsumer.startConsuming(&eventBase, &terminationQueue);
  eventBase.loop();

  // Verify that we saw exactly 1 stop message for each consumer
  for (uint32_t n = 0; n < numConsumers; ++n) {
    EXPECT_EQ(1, consumersStopped[n]);
  }

  // Validate that every message sent to the main queue was received exactly
  // once.
  vector<int> messageCount(numMessages, 0);
  for (uint32_t n = 0; n < numConsumers; ++n) {
    for (int msg : consumers[n].messages) {
      EXPECT_GE(msg, 0);
      EXPECT_LT(msg, numMessages);
      ++messageCount[msg];
    }
  }

  // 0 is the signal to stop, and should have been received once by each
  // consumer
  EXPECT_EQ(numConsumers, messageCount[0]);
  // All other messages should have been received exactly once
  for (uint32_t n = 1; n < numMessages; ++n) {
    EXPECT_EQ(1, messageCount[n]);
  }
}

void QueueTest::maxQueueSize() {
  // Create a queue with a maximum size of 5, and fill it up

  for (int n = 0; n < 5; ++n) {
    queue.tryPutMessage(n);
  }

  // Calling tryPutMessage() now should fail
  EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);

  EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
  int val = 5;
  EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));

  // Pop a message from the queue
  int result = -1;
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(0, result);

  // We should be able to write another message now that we popped one off.
  queue.tryPutMessage(5);
  // But now we are full again.
  EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
  // putMessage() should let us exceed the maximum
  queue.putMessage(6);

  // Pull another mesage off
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(1, result);

  // tryPutMessage() should still fail since putMessage() actually put us over
  // the max.
  EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);

  // Pull another message off and try again
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(2, result);
  queue.tryPutMessage(7);

  // Now pull all the remaining messages off
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(3, result);
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(4, result);
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(5, result);
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(6, result);
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(7, result);

  // There should be no messages left
  result = -1;
  EXPECT_TRUE(!queue.tryConsume(result));
  EXPECT_EQ(-1, result);
}

void QueueTest::maxReadAtOnce() {
  // Add 100 messages to the queue
  for (int n = 0; n < 100; ++n) {
    queue.putMessage(n);
  }

  EventBase eventBase;

  // Record how many messages were processed each loop iteration.
  uint32_t messagesThisLoop = 0;
  std::vector<uint32_t> messagesPerLoop;
  std::function<void()> loopFinished = [&] {
    // Record the current number of messages read this loop
    messagesPerLoop.push_back(messagesThisLoop);
    // Reset messagesThisLoop to 0 for the next loop
    messagesThisLoop = 0;

    // To prevent use-after-free bugs when eventBase destructs,
    // prevent calling runInLoop any more after the test is finished.
    // 55 == number of times loop should run.
    if (messagesPerLoop.size() != 55) {
      // Reschedule ourself to run at the end of the next loop
      eventBase.runInLoop(loopFinished);
    }
  };
  // Schedule the first call to loopFinished
  eventBase.runInLoop(loopFinished);

  QueueConsumer consumer;
  // Read the first 50 messages 10 at a time.
  consumer.setMaxReadAtOnce(10);
  consumer.fn = [&](int value) {
    ++messagesThisLoop;
    // After 50 messages, drop to reading only 1 message at a time.
    if (value == 50) {
      consumer.setMaxReadAtOnce(1);
    }
    // Terminate the loop when we reach the end of the messages.
    if (value == 99) {
      eventBase.terminateLoopSoon();
    }
  };
  consumer.startConsuming(&eventBase, &queue);

  // Run the event loop until the consumer terminates it
  eventBase.loop();

  // The consumer should have read all 100 messages in order
  EXPECT_EQ(100, consumer.messages.size());
  for (int n = 0; n < 100; ++n) {
    EXPECT_EQ(n, consumer.messages.at(n));
  }

  // Currently EventBase happens to still run the loop callbacks even after
  // terminateLoopSoon() is called.  However, we don't really want to depend on
  // this behavior.  In case this ever changes in the future, add
  // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
  // last loop iteration.
  if (messagesThisLoop > 0) {
    messagesPerLoop.push_back(messagesThisLoop);
    messagesThisLoop = 0;
  }

  // For the first 5 loops it should have read 10 messages each time.
  // After that it should have read 1 messages per loop for the next 50 loops.
  EXPECT_EQ(55, messagesPerLoop.size());
  for (int n = 0; n < 5; ++n) {
    EXPECT_EQ(10, messagesPerLoop.at(n));
  }
  for (int n = 5; n < 55; ++n) {
    EXPECT_EQ(1, messagesPerLoop.at(n));
  }
}

void QueueTest::destroyCallback() {
  // Rather than using QueueConsumer, define a separate class for the destroy
  // test.  The DestroyTestConsumer will delete itself inside the
  // messageAvailable() callback.  With a regular QueueConsumer this would
  // destroy the std::function object while the function is running, which we
  // should probably avoid doing.  This uses a pointer to a std::function to
  // avoid destroying the function object.
  class DestroyTestConsumer : public IntQueue::Consumer {
   public:
    void messageAvailable(int&& value) noexcept override {
      DestructorGuard g(this);
      if (fn && *fn) {
        (*fn)(value);
      }
    }

    std::function<void(int)>* fn;

   protected:
    ~DestroyTestConsumer() override = default;
  };

  EventBase eventBase;
  // Create a queue and add 2 messages to it
  queue.putMessage(1);
  queue.putMessage(2);

  // Create two QueueConsumers allocated on the heap.
  // Have whichever one gets called first destroy both of the QueueConsumers.
  // This way one consumer will be destroyed from inside its messageAvailable()
  // callback, and one consume will be destroyed when it isn't inside
  // messageAvailable().
  auto consumer1 = makeDelayedDestructionUniquePtr<DestroyTestConsumer>();
  auto consumer2 = makeDelayedDestructionUniquePtr<DestroyTestConsumer>();
  std::function<void(int)> fn = [&](int) {
    consumer1 = nullptr;
    consumer2 = nullptr;
  };
  consumer1->fn = &fn;
  consumer2->fn = &fn;

  consumer1->startConsuming(&eventBase, &queue);
  consumer2->startConsuming(&eventBase, &queue);

  // Run the event loop.
  eventBase.loop();

  // One of the consumers should have fired, received the message,
  // then destroyed both consumers.
  EXPECT_TRUE(!consumer1);
  EXPECT_TRUE(!consumer2);
  // One message should be left in the queue
  int result = 1;
  EXPECT_TRUE(queue.tryConsume(result));
  EXPECT_EQ(2, result);
}

TEST(NotificationQueueTest, ConsumeUntilDrained) {
  // Basic tests: make sure we
  // - drain all the messages
  // - ignore any maxReadAtOnce
  // - can't add messages during draining
  EventBase eventBase;
  IntQueue queue;
  QueueConsumer consumer;
  consumer.fn = [&](int i) {
    EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
    EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
    EXPECT_THROW(queue.putMessage(i), std::runtime_error);
    std::vector<int> ints{1, 2, 3};
    EXPECT_THROW(
        queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
  };
  consumer.setMaxReadAtOnce(10); // We should ignore this
  consumer.startConsuming(&eventBase, &queue);
  for (int i = 0; i < 20; i++) {
    queue.putMessage(i);
  }
  EXPECT_TRUE(consumer.consumeUntilDrained());
  EXPECT_EQ(20, consumer.messages.size());

  // Make sure there can only be one drainer at once
  folly::Baton<> callbackBaton, threadStartBaton;
  consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
  QueueConsumer competingConsumer;
  competingConsumer.startConsuming(&eventBase, &queue);
  queue.putMessage(1);
  atomic<bool> raceA{false};
  atomic<bool> raceB{false};
  size_t numConsA = 0;
  size_t numConsB = 0;
  auto thread = std::thread([&] {
    threadStartBaton.post();
    raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
  });
  threadStartBaton.wait();
  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
  callbackBaton.post();
  thread.join();
  EXPECT_FALSE(raceA && raceB);
  EXPECT_TRUE(raceA || raceB);
  EXPECT_TRUE(raceA ^ raceB);
}

TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
  for (size_t i = 0; i < 1 << 8; ++i) {
    // Basic tests: make sure we
    // - drain all the messages
    // - ignore any maxReadAtOnce
    // - can't add messages during draining
    EventBase eventBase;
    IntQueue queue;
    QueueConsumer consumer;
    consumer.fn = [&](int j) {
      EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
      EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
      EXPECT_THROW(queue.putMessage(j), std::runtime_error);
      std::vector<int> ints{1, 2, 3};
      EXPECT_THROW(
          queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
    };
    consumer.setMaxReadAtOnce(10); // We should ignore this
    consumer.startConsuming(&eventBase, &queue);
    for (int j = 0; j < 20; j++) {
      queue.putMessage(j);
    }
    EXPECT_TRUE(consumer.consumeUntilDrained());
    EXPECT_EQ(20, consumer.messages.size());

    // Make sure there can only be one drainer at once
    folly::Baton<> callbackBaton, threadStartBaton;
    consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
    QueueConsumer competingConsumer;
    competingConsumer.startConsuming(&eventBase, &queue);
    queue.putMessage(1);
    atomic<bool> raceA{false};
    atomic<bool> raceB{false};
    size_t numConsA = 0;
    size_t numConsB = 0;
    auto thread = std::thread([&] {
      threadStartBaton.post();
      raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
    });
    threadStartBaton.wait();
    raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
    callbackBaton.post();
    thread.join();
    EXPECT_FALSE(raceA && raceB);
    EXPECT_TRUE(raceA || raceB);
    EXPECT_TRUE(raceA ^ raceB);
  }
}

TEST(NotificationQueueTest, SendOneEventFD) {
  QueueTest qt(0, IntQueue::FdType::EVENTFD);
  qt.sendOne();
}

TEST(NotificationQueueTest, PutMessagesEventFD) {
  QueueTest qt(0, IntQueue::FdType::EVENTFD);
  qt.sendOne();
}

TEST(NotificationQueueTest, MultiConsumerEventFD) {
  QueueTest qt(0, IntQueue::FdType::EVENTFD);
  qt.multiConsumer();
}

TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
  QueueTest qt(5, IntQueue::FdType::EVENTFD);
  qt.maxQueueSize();
}

TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
  QueueTest qt(0, IntQueue::FdType::EVENTFD);
  qt.maxReadAtOnce();
}

TEST(NotificationQueueTest, DestroyCallbackEventFD) {
  QueueTest qt(0, IntQueue::FdType::EVENTFD);
  qt.destroyCallback();
}

TEST(NotificationQueueTest, SendOnePipe) {
  QueueTest qt(0, IntQueue::FdType::PIPE);
  qt.sendOne();
}

TEST(NotificationQueueTest, PutMessagesPipe) {
  QueueTest qt(0, IntQueue::FdType::PIPE);
  qt.sendOne();
}

TEST(NotificationQueueTest, MultiConsumerPipe) {
  QueueTest qt(0, IntQueue::FdType::PIPE);
  qt.multiConsumer();
}

TEST(NotificationQueueTest, MaxQueueSizePipe) {
  QueueTest qt(5, IntQueue::FdType::PIPE);
  qt.maxQueueSize();
}

TEST(NotificationQueueTest, MaxReadAtOncePipe) {
  QueueTest qt(0, IntQueue::FdType::PIPE);
  qt.maxReadAtOnce();
}

TEST(NotificationQueueTest, DestroyCallbackPipe) {
  QueueTest qt(0, IntQueue::FdType::PIPE);
  qt.destroyCallback();
}

#ifndef _WIN32
/*
 * Test code that creates a NotificationQueue, then forks, and incorrectly
 * tries to send a message to the queue from the child process.
 *
 * The child process should crash in this scenario, since the child code has a
 * bug.  (Older versions of NotificationQueue didn't catch this in the child,
 * resulting in a crash in the parent process.)
 */
TEST(NotificationQueueTest, UseAfterFork) {
  IntQueue queue;
  int childStatus = 0;
  QueueConsumer consumer;

  // Boost sets a custom SIGCHLD handler, which fails the test if a child
  // process exits abnormally.  We don't want this.
  signal(SIGCHLD, SIG_DFL);

  // Log some info so users reading the test output aren't confused
  // by the child process' crash log messages.
  LOG(INFO) << "This test makes sure the child process crashes.  "
            << "Error log messagges and a backtrace are expected.";

  {
    // Start a separate thread consuming from the queue
    ScopedEventBaseThread t1;
    t1.getEventBase()->runInEventBaseThread(
        [&] { consumer.startConsuming(t1.getEventBase(), &queue); });

    // Send a message to it, just for sanity checking
    queue.putMessage(1234);

    // Fork
    pid_t pid = fork();
    if (pid == 0) {
      // The boost test framework installs signal handlers to catch errors.
      // We only want to catch in the parent.  In the child let SIGABRT crash
      // us normally.
      signal(SIGABRT, SIG_DFL);

      // Child.
      // We're horrible people, so we try to send a message to the queue
      // that is being consumed in the parent process.
      //
      // The putMessage() call should catch this error, and crash our process.
      queue.putMessage(9876);
      // We shouldn't reach here.
      _exit(0);
    }
    PCHECK(pid > 0);

    // Parent.  Wait for the child to exit.
    auto waited = waitpid(pid, &childStatus, 0);
    EXPECT_EQ(pid, waited);

    // Send another message to the queue before we terminate the thread.
    queue.putMessage(5678);
  }

  // The child process should have crashed when it tried to call putMessage()
  // on our NotificationQueue.
  EXPECT_TRUE(WIFSIGNALED(childStatus));
  EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));

  // Make sure the parent saw the expected messages.
  // It should have gotten 1234 and 5678 from the parent process, but not
  // 9876 from the child.
  EXPECT_EQ(2, consumer.messages.size());
  EXPECT_EQ(1234, consumer.messages.front());
  consumer.messages.pop_front();
  EXPECT_EQ(5678, consumer.messages.front());
  consumer.messages.pop_front();
}
#endif

TEST(NotificationQueueConsumer, make) {
  int value = 0;
  EventBase evb;
  NotificationQueue<int> queue(32);

  auto consumer =
      decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });

  consumer->startConsuming(&evb, &queue);

  int const newValue = 10;
  queue.tryPutMessage(newValue);

  evb.loopOnce();

  EXPECT_EQ(newValue, value);
}