/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>
#include <glog/logging.h>
#include <folly/Benchmark.h>
#include <folly/Random.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Rcu.h>
#include <folly/synchronization/RelaxedAtomic.h>
using namespace folly;
using rcu_domain = folly::rcu_domain;
DEFINE_int64(iters, 100000, "Number of iterations");
DEFINE_uint64(threads, 32, "Number of threads");
TEST(RcuTest, Basic) {
auto foo = new int(2);
rcu_retire(foo);
}
class des {
bool* d_;
public:
des(bool* d) : d_(d) {}
~des() { *d_ = true; }
};
TEST(RcuTest, Guard) {
bool del = false;
auto foo = new des(&del);
{ std::scoped_lock<rcu_domain> g(rcu_default_domain()); }
rcu_retire(foo);
rcu_synchronize();
EXPECT_TRUE(del);
}
TEST(RcuTest, SlowReader) {
std::thread t;
{
std::scoped_lock<rcu_domain> lock(rcu_default_domain());
t = std::thread([&]() { rcu_synchronize(); });
usleep(100); // Wait for synchronize to start
}
t.join();
}
static std::unique_lock<rcu_domain> try_retire(des* obj) {
std::unique_lock<rcu_domain> g(rcu_default_domain());
rcu_retire(obj);
return g;
}
TEST(RcuTest, CopyGuard) {
bool del = false;
auto foo = new des(&del);
{
auto res = try_retire(foo);
EXPECT_FALSE(del);
}
rcu_barrier();
EXPECT_TRUE(del);
}
static void delete_or_retire_oldint(int* oldint) {
if (folly::Random::rand32() % 2 == 0) {
rcu_retire(oldint, [](int* obj) {
*obj = folly::Random::rand32();
delete obj;
});
} else {
rcu_synchronize();
*oldint = folly::Random::rand32();
delete oldint;
}
}
TEST(RcuTest, Stress) {
std::vector<std::thread> readers;
constexpr uint32_t sz = 1000;
std::atomic<int*> ints[sz];
for (uint32_t i = 0; i < sz; i++) {
ints[i].store(new int(0), std::memory_order_release);
}
for (unsigned th = 0; th < FLAGS_threads; th++) {
readers.push_back(std::thread([&]() {
for (int i = 0; i < FLAGS_iters / 100; i++) {
std::scoped_lock<rcu_domain> lock(rcu_default_domain());
int sum = 0;
int* ptrs[sz];
for (uint32_t j = 0; j < sz; j++) {
ptrs[j] = ints[j].load(std::memory_order_acquire);
}
for (uint32_t j = 0; j < sz; j++) {
sum += *ptrs[j];
}
EXPECT_EQ(sum, 0);
}
}));
}
folly::relaxed_atomic<bool> done{false};
std::vector<std::thread> updaters;
for (unsigned th = 0; th < FLAGS_threads; th++) {
updaters.push_back(std::thread([&]() {
while (!done) {
auto newint = new int(0);
auto oldint = ints[folly::Random::rand32() % sz].exchange(
newint, std::memory_order_acq_rel);
delete_or_retire_oldint(oldint);
}
}));
}
for (auto& t : readers) {
t.join();
}
done = true;
for (auto& t : updaters) {
t.join();
}
// Cleanup for asan
rcu_synchronize();
for (uint32_t i = 0; i < sz; i++) {
delete ints[i].exchange(nullptr, std::memory_order_acq_rel);
}
}
TEST(RcuTest, Synchronize) {
std::vector<std::thread> threads;
for (unsigned th = 0; th < FLAGS_threads; th++) {
threads.push_back(std::thread([&]() {
for (int i = 0; i < 10; i++) {
rcu_synchronize();
}
}));
}
for (auto& t : threads) {
t.join();
}
}
TEST(RcuTest, NewDomainTest) {
rcu_domain newdomain(nullptr);
rcu_synchronize(newdomain);
}
TEST(RcuTest, NewDomainGuardTest) {
struct UniqueTag;
rcu_domain newdomain(nullptr);
bool del = false;
auto foo = new des(&del);
{ std::scoped_lock<rcu_domain> g(newdomain); }
rcu_retire(foo, {}, newdomain);
rcu_synchronize(newdomain);
EXPECT_TRUE(del);
}
TEST(RcuTest, MovableReader) {
{
std::unique_lock<rcu_domain> g(rcu_default_domain());
std::unique_lock<rcu_domain> f(std::move(g));
}
rcu_synchronize();
{
std::unique_lock<rcu_domain> g(rcu_default_domain(), std::defer_lock);
std::unique_lock<rcu_domain> f(rcu_default_domain());
g = std::move(f);
}
rcu_synchronize();
}
TEST(RcuTest, SynchronizeInCall) {
rcu_default_domain().call([]() { rcu_synchronize(); });
rcu_synchronize();
}
TEST(RcuTest, SafeForkTest) {
rcu_default_domain().lock();
rcu_default_domain().unlock();
auto pid = fork();
if (pid > 0) {
// Parent branch -- wait for child to exit.
rcu_synchronize();
int status = -1;
auto pid2 = waitpid(pid, &status, 0);
EXPECT_EQ(pid, pid2);
EXPECT_EQ(status, 0);
} else if (pid == 0) {
rcu_synchronize();
// Exit quickly to avoid spamming gtest output to console.
exit(0);
} else {
// Skip the test if fork() fails.
GTEST_SKIP();
}
}
TEST(RcuTest, ThreadLocalList) {
folly::detail::ThreadCachedLists lists;
std::vector<std::thread> threads{FLAGS_threads};
folly::relaxed_atomic<unsigned long> done{FLAGS_threads};
for (auto& tr : threads) {
tr = std::thread([&]() {
for (int i = 0; i < FLAGS_iters; i++) {
auto node = new folly::detail::ThreadCachedListsBase::Node;
lists.push(node);
}
--done;
});
}
while (done > 0) {
folly::detail::ThreadCachedLists::ListHead list{};
lists.collect(list);
list.forEach(
[](folly::detail::ThreadCachedLists::Node* node) { delete node; });
}
for (auto& thread : threads) {
thread.join();
}
// Run cleanup pass one more time to make ASAN happy
folly::detail::ThreadCachedLists::ListHead list{};
lists.collect(list);
list.forEach(
[](folly::detail::ThreadCachedLists::Node* node) { delete node; });
}
TEST(RcuTest, ThreadDeath) {
bool del = false;
std::thread t([&] {
auto foo = new des(&del);
rcu_retire(foo);
});
t.join();
rcu_synchronize();
EXPECT_TRUE(del);
}
TEST(RcuTest, RcuObjBase) {
bool retired = false;
struct base_test : rcu_obj_base<base_test> {
bool* ret_;
base_test(bool* ret) : ret_(ret) {}
~base_test() { (*ret_) = true; }
};
auto foo = new base_test(&retired);
foo->retire();
rcu_synchronize();
EXPECT_TRUE(retired);
}
TEST(RcuTest, Tsan) {
int data = 0;
std::thread t1([&] {
rcu_default_domain().lock();
data = 1;
rcu_default_domain().unlock();
// Delay before exiting so the thread is still alive for TSAN detection.
std::this_thread::sleep_for(std::chrono::milliseconds(200));
});
std::thread t2([&] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// This should establish a happens-before relationship between the earlier
// write (data = 1) and this write below (data = 2).
rcu_default_domain().synchronize();
data = 2;
});
t1.join();
t2.join();
EXPECT_EQ(data, 2);
}
TEST(RcuTest, DeeplyNestedReaders) {
std::vector<std::thread> readers;
std::atomic<int*> int_ptr = std::atomic<int*>(nullptr);
int_ptr.store(new int(0), std::memory_order_release);
for (unsigned th = 0; th < 32; th++) {
readers.push_back(std::thread([&]() {
std::vector<std::unique_lock<rcu_domain>> domain_readers;
for (unsigned i = 0; i < 8192; i++) {
domain_readers.push_back(
std::unique_lock<rcu_domain>(rcu_default_domain()));
EXPECT_EQ(*(int_ptr.load(std::memory_order_acquire)), 0);
}
}));
}
folly::relaxed_atomic<bool> done{false};
auto updater = std::thread([&]() {
while (!done) {
auto newint = new int(0);
auto oldint = int_ptr.exchange(newint, std::memory_order_acq_rel);
delete_or_retire_oldint(oldint);
}
});
for (auto& t : readers) {
t.join();
}
done = true;
updater.join();
// Clean up to avoid ASAN complaining about a leak.
rcu_synchronize();
delete int_ptr.exchange(nullptr, std::memory_order_acq_rel);
}