/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <functional>
#include <utility>
#include <vector>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseAtomicNotificationQueue.h>
#include <folly/portability/GTest.h>
using namespace folly;
using namespace std;
template <typename Task>
struct AtomicNotificationQueueConsumer {
explicit AtomicNotificationQueueConsumer(vector<Task>& tasks)
: tasks(tasks) {}
void operator()(Task&& value) noexcept {
tasks.push_back(value);
if (fn) {
fn(std::move(value));
}
}
function<void(Task&&)> fn;
vector<Task>& tasks;
};
TEST(AtomicNotificationQueueTest, TryPutMessage) {
vector<int> data;
AtomicNotificationQueueConsumer<int> consumer{data};
EventBaseAtomicNotificationQueue<int, decltype(consumer)> queue{
std::move(consumer)};
constexpr uint32_t kMaxSize = 10;
for (auto i = 1; i <= 9; ++i) {
queue.putMessage(std::move(i));
}
EXPECT_TRUE(queue.tryPutMessage(10, kMaxSize));
EXPECT_EQ(queue.size(), 10);
EXPECT_FALSE(queue.tryPutMessage(11, kMaxSize));
EXPECT_EQ(queue.size(), 10);
queue.putMessage(11);
EXPECT_EQ(queue.size(), 11);
EXPECT_FALSE(queue.tryPutMessage(12, kMaxSize));
queue.drain();
EXPECT_TRUE(queue.tryPutMessage(0, kMaxSize));
EXPECT_EQ(queue.size(), 1);
}
TEST(AtomicNotificationQueueTest, DiscardDequeuedTasks) {
struct TaskWithExpiry {
int datum;
bool isExpired;
};
struct Consumer {
explicit Consumer(std::vector<int>& data) : data(data) {}
AtomicNotificationQueueTaskStatus operator()(
TaskWithExpiry&& task) noexcept {
if (task.isExpired) {
return AtomicNotificationQueueTaskStatus::DISCARD;
}
data.push_back(task.datum);
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
vector<int>& data;
};
vector<int> data;
Consumer consumer{data};
EventBaseAtomicNotificationQueue<TaskWithExpiry, Consumer> queue{
std::move(consumer)};
queue.setMaxReadAtOnce(10);
vector<TaskWithExpiry> tasks = {
{0, false},
{1, true},
{2, true},
{3, false},
{4, false},
{5, false},
{6, false},
{7, true},
{8, false},
{9, false},
{10, false},
{11, false},
{12, true},
{13, false},
{14, true},
{15, false},
};
EventBase eventBase;
queue.startConsuming(&eventBase);
for (auto& t : tasks) {
queue.putMessage(t);
}
eventBase.loopOnce();
vector<int> expectedMessages = {0, 3, 4, 5, 6, 8, 9, 10, 11, 13};
EXPECT_EQ(data.size(), expectedMessages.size());
for (unsigned i = 0; i < expectedMessages.size(); ++i) {
EXPECT_EQ(data.at(i), expectedMessages[i]);
}
data.clear();
eventBase.loopOnce();
EXPECT_EQ(data.size(), 1);
EXPECT_EQ(data.at(0), 15);
}
TEST(AtomicNotificationQueueTest, PutMessage) {
struct Data {
int datum;
bool isExpired;
explicit Data(int datum, bool isExpired)
: datum(datum), isExpired(isExpired) {}
bool operator==(const Data& data) const {
return datum == data.datum && isExpired == data.isExpired;
}
};
struct Consumer {
explicit Consumer(vector<Data>& data) : data(data) {}
void operator()(Data&& task) noexcept { data.push_back(task); }
vector<Data>& data;
};
vector<Data> expected =
{Data(10, false),
Data(20, true),
Data(-8, true),
Data(0, false)},
actual;
Consumer consumer{actual};
EventBaseAtomicNotificationQueue<Data, decltype(consumer)> queue{
std::move(consumer)};
queue.setMaxReadAtOnce(0);
EventBase eventBase;
queue.startConsuming(&eventBase);
for (auto& t : expected) {
queue.putMessage(t.datum, t.isExpired);
}
eventBase.loopOnce();
EXPECT_EQ(expected.size(), actual.size());
for (unsigned i = 0; i < expected.size(); ++i) {
EXPECT_EQ(expected[i], actual[i]);
}
}
TEST(AtomicNotificationQueueTest, ConsumeStop) {
struct Consumer {
size_t* consumed;
auto operator()(bool&& stop) noexcept {
++*consumed;
return stop ? AtomicNotificationQueueTaskStatus::CONSUMED_STOP
: AtomicNotificationQueueTaskStatus::CONSUMED;
}
};
size_t consumed = 0;
Consumer consumer{&consumed};
EventBaseAtomicNotificationQueue<bool, decltype(consumer)> queue{
std::move(consumer)};
queue.setMaxReadAtOnce(3);
EventBase eventBase;
queue.startConsuming(&eventBase);
for (auto t : {false, true, false, false, false, false}) {
queue.putMessage(t);
}
ASSERT_EQ(consumed, 0);
eventBase.loopOnce(EVLOOP_NONBLOCK);
// Second message stopped consuming.
ASSERT_EQ(std::exchange(consumed, 0), 2);
eventBase.loopOnce(EVLOOP_NONBLOCK);
// setMaxReadAtOnce() still honored.
ASSERT_EQ(std::exchange(consumed, 0), 3);
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
for (auto t : {true, true, true}) {
queue.putMessage(t);
}
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
// Local queue is still non-empty, add a message to the atomic queue.
queue.putMessage(true);
// All messages, including the one just added, should be consumed.
for (size_t i = 0; i < 3; ++i) {
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
}
}
// drive() should not process messages that were put during its execution.
TEST(AtomicNotificationQueueTest, DriveSnapshot) {
struct Consumer {
size_t* consumed;
auto operator()(folly::Func f) noexcept {
f();
++*consumed;
return AtomicNotificationQueueTaskStatus::CONSUMED;
}
};
size_t consumed = 0;
Consumer consumer{&consumed};
EventBaseAtomicNotificationQueue<folly::Func, decltype(consumer)> queue{
std::move(consumer)};
EventBase eventBase;
queue.startConsuming(&eventBase);
queue.setMaxReadAtOnce(0);
queue.putMessage([&] { queue.putMessage([] {}); });
ASSERT_EQ(consumed, 0);
// Atomic queue was empty when the second putMessage happened(), so the second
// message is not processed.
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
bool lastRan = false;
queue.putMessage([&] {});
queue.putMessage([&] { queue.putMessage([&] { lastRan = true; }); });
// Leave the second message in the local queue.
queue.setMaxReadAtOnce(1);
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
// This will be in the atomic queue.
queue.putMessage([] {});
queue.setMaxReadAtOnce(0);
// We'll consume the local queue and part of the atomic queue, but stop at the
// message that was put in the callback.
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 2);
EXPECT_FALSE(lastRan);
eventBase.loopOnce(EVLOOP_NONBLOCK);
ASSERT_EQ(std::exchange(consumed, 0), 1);
EXPECT_TRUE(lastRan);
}