/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <chrono>
#include <stdexcept>
#include <thread>
#include <utility>
#include <folly/Singleton.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/observer/CoreCachedObserver.h>
#include <folly/observer/HazptrObserver.h>
#include <folly/observer/Observer.h>
#include <folly/observer/ReadMostlyTLObserver.h>
#include <folly/observer/SimpleObservable.h>
#include <folly/observer/WithJitter.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace std::literals;
using namespace folly::observer;
namespace {
template <typename T>
struct AltAtomic : std::atomic<T> {};
} // namespace
TEST(Observer, Observable) {
SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
EXPECT_EQ(42, **observer);
folly::Baton<> baton;
auto waitingObserver = makeObserver([observer, &baton]() {
*observer;
baton.post();
return folly::Unit();
});
baton.reset();
observable.setValue(24);
EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(24, **observer);
}
TEST(Observer, MakeObserver) {
SimpleObservable<int> observable(42);
auto observer = makeObserver(
[child = observable.getObserver()]() { return **child + 1; });
EXPECT_EQ(43, **observer);
folly::Baton<> baton;
auto waitingObserver = makeObserver([observer, &baton]() {
*observer;
baton.post();
return folly::Unit();
});
baton.reset();
observable.setValue(24);
EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(25, **observer);
}
TEST(Observer, MakeObserverDiamond) {
SimpleObservable<int> observable(42);
auto observer1 = makeObserver(
[child = observable.getObserver()]() { return **child + 1; });
auto observer2 = makeObserver([child = observable.getObserver()]() {
return std::make_shared<int>(**child + 2);
});
auto observer = makeObserver(
[observer1, observer2]() { return (**observer1) * (**observer2); });
EXPECT_EQ(43 * 44, *observer.getSnapshot());
folly::Baton<> baton;
auto waitingObserver = makeObserver([observer, &baton]() {
*observer;
baton.post();
return folly::Unit();
});
baton.reset();
observable.setValue(24);
EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(25 * 26, **observer);
}
TEST(Observer, CreateException) {
struct ExpectedException {};
EXPECT_THROW(
auto observer = makeObserver(
[]() -> std::shared_ptr<int> { throw ExpectedException(); }),
ExpectedException);
EXPECT_THROW(
auto observer =
makeObserver([]() -> std::shared_ptr<int> { return nullptr; }),
std::logic_error);
}
TEST(Observer, NullValue) {
SimpleObservable<int> observable(41);
auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() {
auto value = **innerObserver;
if (value % 2 != 0) {
return value * 2;
}
throw std::logic_error("I prefer odd numbers");
});
folly::Baton<> baton;
auto waitingObserver = makeObserver([oddObserver, &baton]() {
*oddObserver;
baton.post();
return folly::Unit();
});
baton.reset();
EXPECT_EQ(82, **oddObserver);
observable.setValue(2);
// Waiting observer shouldn't be updated
EXPECT_FALSE(baton.try_wait_for(std::chrono::seconds{1}));
baton.reset();
EXPECT_EQ(82, **oddObserver);
observable.setValue(23);
EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(46, **oddObserver);
}
TEST(Observer, Cycle) {
if (!folly::kIsDebug) {
// Cycle detection is only available in debug builds
return;
}
EXPECT_DEATH(
[] {
SimpleObservable<bool> observable(false);
folly::Optional<Observer<int>> observerB;
auto observerA =
makeObserver([observer = observable.getObserver(), &observerB]() {
if (**observer) {
return ***observerB;
}
return 42;
});
observerB = makeObserver([observerA]() { return **observerA; });
EXPECT_EQ(42, **observerA);
EXPECT_EQ(42, ***observerB);
observable.setValue(true);
folly::observer_detail::ObserverManager::waitForAllUpdates();
}(),
"Observer cycle detected.");
}
TEST(Observer, Stress) {
SimpleObservable<int> observable(0);
auto values = std::make_shared<folly::Synchronized<std::vector<int>>>();
auto observer = makeObserver([child = observable.getObserver(), values]() {
auto value = **child * 10;
values->withWLock([&](std::vector<int>& vals) { vals.push_back(value); });
return value;
});
EXPECT_EQ(0, **observer);
values->withRLock([](const std::vector<int>& vals) {
EXPECT_EQ(1, vals.size());
EXPECT_EQ(0, vals.back());
});
constexpr size_t numIters = 10000;
for (size_t i = 1; i <= numIters; ++i) {
observable.setValue(i);
}
while (**observer != numIters * 10) {
std::this_thread::yield();
}
values->withRLock([numIters = numIters](const std::vector<int>& vals) {
EXPECT_EQ(numIters * 10, vals.back());
EXPECT_LT(vals.size(), numIters / 2);
EXPECT_EQ(0, vals[0]);
EXPECT_EQ(numIters * 10, vals.back());
for (auto value : vals) {
EXPECT_EQ(0, value % 10);
}
for (size_t i = 0; i < vals.size() - 1; ++i) {
EXPECT_LE(vals[i], vals[i + 1]);
}
});
}
TEST(Observer, StressMultipleUpdates) {
SimpleObservable<int> observable1(0);
SimpleObservable<int> observable2(0);
auto observer = makeObserver(
[o1 = observable1.getObserver(), o2 = observable2.getObserver()]() {
return (**o1) * (**o2);
});
EXPECT_EQ(0, **observer);
constexpr size_t numIters = 10000;
for (size_t i = 1; i <= numIters; ++i) {
observable1.setValue(i);
observable2.setValue(i);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(i * i, **observer);
}
}
TEST(Observer, TLObserver) {
auto createTLObserver = [](int value) {
return folly::observer::makeTLObserver([=] { return value; });
};
auto k =
std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(42));
EXPECT_EQ(42, ***k);
k = std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(41));
EXPECT_EQ(41, ***k);
k = std::make_unique<folly::observer::TLObserver<int>>( // copy-ctor
static_cast<folly::observer::TLObserver<int> const&>(
createTLObserver(40)));
EXPECT_EQ(40, ***k);
}
TEST(ReadMostlyTLObserver, ReadMostlyTLObserver) {
auto createReadMostlyTLObserver = [](int value) {
return folly::observer::makeReadMostlyTLObserver([=] { return value; });
};
auto k = std::make_unique<folly::observer::ReadMostlyTLObserver<int>>(
createReadMostlyTLObserver(42));
EXPECT_EQ(42, *k->getShared());
k = std::make_unique<folly::observer::ReadMostlyTLObserver<int>>(
createReadMostlyTLObserver(41));
EXPECT_EQ(41, *k->getShared());
}
TEST(ReadMostlyTLObserver, Update) {
SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
ReadMostlyTLObserver readMostlyObserver(observer);
EXPECT_EQ(*readMostlyObserver.getShared(), 42);
observable.setValue(24);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*readMostlyObserver.getShared(), 24);
}
TEST(Observer, SubscribeCallback) {
static auto mainThreadId = std::this_thread::get_id();
static std::function<void()> updatesCob;
static bool slowGet = false;
static std::atomic<size_t> getCallsStart{0};
static std::atomic<size_t> getCallsFinish{0};
struct Observable {
~Observable() { EXPECT_EQ(mainThreadId, std::this_thread::get_id()); }
};
struct Traits {
using element_type = int;
static std::shared_ptr<const int> get(Observable&) {
++getCallsStart;
if (slowGet) {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::seconds{2});
}
++getCallsFinish;
return std::make_shared<const int>(42);
}
static void subscribe(Observable&, std::function<void()> cob) {
updatesCob = std::move(cob);
}
static void unsubscribe(Observable&) {}
};
std::thread cobThread;
{
auto observer =
folly::observer::ObserverCreator<Observable, Traits>().getObserver();
EXPECT_TRUE(updatesCob);
EXPECT_GE(2, getCallsStart);
EXPECT_GE(2, getCallsFinish);
updatesCob();
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(3, getCallsStart);
EXPECT_EQ(3, getCallsFinish);
slowGet = true;
cobThread = std::thread([] { updatesCob(); });
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
EXPECT_EQ(4, getCallsStart);
EXPECT_EQ(3, getCallsFinish);
// Observer is destroyed here
}
// Make sure that destroying the observer actually joined the updates callback
EXPECT_EQ(4, getCallsStart);
EXPECT_EQ(4, getCallsFinish);
cobThread.join();
}
TEST(Observer, SetCallback) {
folly::observer::SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
folly::Baton<> baton;
int callbackValue = 0;
size_t callbackCallsCount = 0;
auto callbackHandle =
observer.addCallback([&](folly::observer::Snapshot<int> snapshot) {
++callbackCallsCount;
callbackValue = *snapshot;
baton.post();
});
baton.wait();
baton.reset();
EXPECT_EQ(42, callbackValue);
EXPECT_EQ(1, callbackCallsCount);
observable.setValue(43);
baton.wait();
baton.reset();
EXPECT_EQ(43, callbackValue);
EXPECT_EQ(2, callbackCallsCount);
callbackHandle.cancel();
observable.setValue(44);
EXPECT_FALSE(baton.timed_wait(std::chrono::milliseconds{100}));
EXPECT_EQ(43, callbackValue);
EXPECT_EQ(2, callbackCallsCount);
}
TEST(Observer, CallbackCalledOncePerSnapshot) {
SimpleObservable<folly::Unit> observable(folly::unit);
auto observer = observable.getObserver();
int value = 1;
SimpleObservable<int> intObservable(value);
auto squareObserver =
makeObserver([o = intObservable.getObserver()] { return **o * **o; });
folly::Baton baton;
size_t callbackCallsCount = 0;
auto callbackHandle = observer.addCallback([&](auto) {
// The main point of this test is that the callback depends on
// `squareObserver`. A refresh of `squareObserver` should not trigger the
// callback, since the callback is associated only with `observer`.
//
// Note that we do not guarantee that **squareObserver necessarily reflects
// the latest update.
EXPECT_GE(value * value, **squareObserver);
++callbackCallsCount;
baton.post();
});
baton.wait();
baton.reset();
EXPECT_EQ(1, callbackCallsCount);
// Check that any second updates to squareObserver don't trigger the callback
// again
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(1, callbackCallsCount);
value = 2;
intObservable.setValue(value);
observable.setValue(folly::Unit{});
baton.wait();
baton.reset();
EXPECT_EQ(2, callbackCallsCount);
value = 3;
// Updating intObservable should not trigger the callback
intObservable.setValue(value);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(9, **squareObserver);
EXPECT_EQ(2, callbackCallsCount);
}
TEST(Observer, CallbackMemoryLeak) {
folly::observer::SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
auto callbackHandle = observer.addCallback([](auto) {});
// should not leak
callbackHandle = observer.addCallback([](auto) {});
}
int makeObserverRecursion(int n) {
if (n == 0) {
return 0;
}
return **makeObserver([=] { return makeObserverRecursion(n - 1) + 1; });
}
TEST(Observer, NestedMakeObserver) {
EXPECT_EQ(32, makeObserverRecursion(32));
}
TEST(Observer, WaitForAllUpdates) {
folly::observer::SimpleObservable<int> observable{42};
auto observer = makeObserver([o = observable.getObserver()] {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
return **o;
});
EXPECT_EQ(42, **observer);
observable.setValue(43);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(43, **observer);
folly::observer_detail::ObserverManager::waitForAllUpdates();
}
TEST(Observer, IgnoreUpdates) {
int callbackCalled = 0;
folly::observer::SimpleObservable<int> observable(42);
auto observer =
folly::observer::makeObserver([even = std::make_shared<bool>(true),
odd = std::make_shared<bool>(false),
observer = observable.getObserver()] {
if (**observer % 2 == 0) {
return even;
}
return odd;
});
auto callbackHandle = observer.addCallback([&](auto) { ++callbackCalled; });
EXPECT_EQ(1, callbackCalled);
observable.setValue(43);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(2, callbackCalled);
observable.setValue(45);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(2, callbackCalled);
observable.setValue(46);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(3, callbackCalled);
}
TEST(Observer, GetSnapshotOnManagerThread) {
auto observer42 = folly::observer::makeObserver([] { return 42; });
folly::observer::SimpleObservable<int> observable(1);
folly::Baton<> startBaton;
folly::Baton<> finishBaton;
folly::Baton<> destructorBaton;
{
finishBaton.post();
auto slowObserver = folly::observer::makeObserver(
[guard = folly::makeGuard([observer42, &destructorBaton]() {
// We expect this to be called on a ObserverManager thread, but
// outside of processing an observer updates.
observer42.getSnapshot();
destructorBaton.post();
}),
observer = observable.getObserver(),
&startBaton,
&finishBaton] {
startBaton.post();
finishBaton.wait();
finishBaton.reset();
return **observer;
});
EXPECT_EQ(1, **slowObserver);
startBaton.reset();
finishBaton.post();
observable.setValue(2);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(2, **slowObserver);
startBaton.reset();
observable.setValue(3);
startBaton.wait();
}
finishBaton.post();
destructorBaton.wait();
}
TEST(Observer, Shutdown) {
folly::SingletonVault::singleton()->destroyInstances();
auto observer = folly::observer::makeObserver([] { return 42; });
EXPECT_EQ(42, **observer);
}
TEST(Observer, MakeValueObserver) {
struct ValueStruct {
ValueStruct(int value, int id) : value_(value), id_(id) {}
bool operator==(const ValueStruct& other) const {
return value_ == other.value_;
}
const int value_;
const int id_;
};
SimpleObservable<ValueStruct> observable(ValueStruct(1, 1));
std::vector<int> observedIds;
std::vector<int> observedValues;
std::vector<int> observedValues2;
auto ch1 = observable.getObserver().addCallback(
[&](auto snapshot) { observedIds.push_back(snapshot->id_); });
auto ch2 = makeValueObserver(observable.getObserver())
.addCallback([&](auto snapshot) {
observedValues.push_back(snapshot->value_);
});
auto ch3 = makeValueObserver([observer = observable.getObserver()] {
return **observer;
}).addCallback([&](auto snapshot) {
observedValues2.push_back(snapshot->value_);
});
folly::observer_detail::ObserverManager::waitForAllUpdates();
observable.setValue(ValueStruct(1, 2));
folly::observer_detail::ObserverManager::waitForAllUpdates();
observable.setValue(ValueStruct(2, 3));
folly::observer_detail::ObserverManager::waitForAllUpdates();
observable.setValue(ValueStruct(2, 4));
folly::observer_detail::ObserverManager::waitForAllUpdates();
observable.setValue(ValueStruct(3, 5));
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(observedIds, std::vector<int>({1, 2, 3, 4, 5}));
EXPECT_EQ(observedValues, std::vector<int>({1, 2, 3}));
EXPECT_EQ(observedValues2, std::vector<int>({1, 2, 3}));
size_t creatorCalls = 0;
auto o = makeValueObserver([&] {
++creatorCalls;
return 42;
});
EXPECT_EQ(42, **o);
EXPECT_EQ(1, creatorCalls);
}
TEST(Observer, MakeStaticObserver) {
auto explicitStringObserver = makeStaticObserver<std::string>("hello");
EXPECT_EQ(**explicitStringObserver, "hello");
auto implicitIntObserver = makeStaticObserver(5);
EXPECT_EQ(**implicitIntObserver, 5);
auto explicitSharedPtrObserver =
makeStaticObserver<std::shared_ptr<int>>(std::make_shared<int>(5));
EXPECT_EQ(***explicitSharedPtrObserver, 5);
auto implicitSharedPtrObserver = makeStaticObserver(std::make_shared<int>(5));
EXPECT_EQ(**implicitSharedPtrObserver, 5);
}
TEST(Observer, AtomicObserver) {
SimpleObservable<int> observable{42};
SimpleObservable<int> observable2{12};
AtomicObserver<int> observer{observable.getObserver()};
AtomicObserver<int> observerCopy{observer};
EXPECT_EQ(*observer, 42);
EXPECT_EQ(*observerCopy, 42);
observable.setValue(24);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*observer, 24);
EXPECT_EQ(*observerCopy, 24);
observer = observable2.getObserver();
EXPECT_EQ(*observer, 12);
EXPECT_EQ(*observerCopy, 24);
observable2.setValue(15);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*observer, 15);
EXPECT_EQ(*observerCopy, 24);
observerCopy = observer;
EXPECT_EQ(*observerCopy, 15);
auto dependentObserver =
makeAtomicObserver([o = observer] { return *o + 1; });
EXPECT_EQ(*dependentObserver, 16);
observable2.setValue(20);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*dependentObserver, 21);
}
TEST(Observer, ReadMostlyAtomicObserver) {
SimpleObservable<int> observable{42};
ReadMostlyAtomicObserver<int> observer{observable.getObserver()};
EXPECT_EQ(*observer, 42);
observable.setValue(24);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*observer, 24);
auto dependentObserver = makeReadMostlyAtomicObserver(
[o = observer.getUnderlyingObserver()] { return **o + 1; });
EXPECT_EQ(*dependentObserver, 25);
observable.setValue(20);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(*dependentObserver, 21);
}
void runHazptrObserverTest(bool useLocalSnapshot) {
struct IntHolder {
explicit IntHolder(int val) : val_(val) {}
IntHolder(const IntHolder&) = delete;
IntHolder& operator=(const IntHolder&) = delete;
IntHolder(IntHolder&&) = default;
IntHolder& operator=(IntHolder&&) = delete;
int val_;
};
auto value = [=](const auto& observer) {
if (useLocalSnapshot) {
return observer->getSnapshot()->val_;
} else {
return observer->getLocalSnapshot()->val_;
}
};
SimpleObservable<IntHolder> observable{IntHolder{42}};
auto observer =
std::make_unique<HazptrObserver<IntHolder>>(observable.getObserver());
// Verify that copies get updated too.
auto observerCopy = std::make_unique<HazptrObserver<IntHolder>>(*observer);
EXPECT_EQ(value(observer), 42);
EXPECT_EQ(value(observerCopy), 42);
observable.setValue(IntHolder{24});
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(value(observer), 24);
EXPECT_EQ(value(observerCopy), 24);
auto dependentObserver = makeHazptrObserver([o = observable.getObserver()] {
return IntHolder{o.getSnapshot()->val_ + 1};
});
EXPECT_EQ(value(&dependentObserver), 25);
observable.setValue(IntHolder{20});
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(value(&dependentObserver), 21);
// And moves as well even if the originals disappear.
auto observerMove =
std::make_unique<HazptrObserver<IntHolder>>(std::move(*observer));
observer.reset();
observerCopy.reset();
observable.setValue(IntHolder{26});
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(value(observerMove), 26);
}
TEST(Observer, HazptrObserver) {
runHazptrObserverTest(/* useLocalSnapshot */ false);
}
TEST(Observer, HazptrObserverLocalSnapshot) {
runHazptrObserverTest(/* useLocalSnapshot */ true);
}
TEST(Observer, HazptrObserverExplicitDomain) {
SimpleObservable<int> observable{0};
folly::hazptr_domain<AltAtomic> domain;
HazptrObserver obs{observable.getObserver(), domain};
EXPECT_EQ(0, *obs.getSnapshot());
observable.setValue(1);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(1, *obs.getSnapshot());
}
TEST(Observer, HazptrObserverRecursiveReclamation) {
// in this scenario:
// * in scope is a domain instance with no executor
// * the hazptr-observer is itself owned by a hazptr-obj
// * tha hazptr-obj is retired with deferred reclamation enforced
// * the hazptr-obj and the hazptr-observer states are owned by the domain
// * many objects are retired, bringing the domain close to the threshold
// * one observable update cycle states within the callback
// * the state cycle will reach the threshold, forcing immediate reclamation
// * we expect forced reclamation within the callback to succeed
//
// regression test for bug:
// * forced reclamation within the callback would deadlock in the observer-
// manager thread
struct ObserverObj : folly::hazptr_obj_base<ObserverObj> {
HazptrObserver<int> inner;
std::atomic<bool>& reclaimed_;
ObserverObj(
Observer<int> observer,
folly::hazptr_domain<>& domain,
std::atomic<bool>& reclaimed)
: inner{observer, domain}, reclaimed_{reclaimed} {}
~ObserverObj() { reclaimed_ = true; }
};
struct EmptyObj : folly::hazptr_obj_base<EmptyObj> {};
SimpleObservable<int> observable{0};
std::atomic<bool> reclaimed{false}; // not a baton on purpose!
folly::hazptr_domain<> domain;
{
std::atomic<ObserverObj*> cell{};
// wire up the hazptr-observer to report its own reclamation to the test
cell.store(
new ObserverObj{observable.getObserver(), domain, reclaimed},
std::memory_order_release);
// retire the observer while it is protected: this way, retirement cannot be
// immediate because the observer is still protected, so retirement must be
// deferred; and since one retirement is deferred, continued retirements of
// the states objects will also be deferred, until the threshold is reached
auto hptr = folly::make_hazard_pointer(domain);
hptr.protect(cell);
cell.exchange(nullptr, std::memory_order_acquire)->retire(domain);
}
// enqueue maximally many retirements - but without forcing reclamation
auto thresh = folly::detail::hazptr_domain_rcount_threshold();
auto adjust = 2; // 1 for the retired HazptrObserver, 1 for the next setValue
for (int i = 0; i < thresh - adjust; ++i) {
(new EmptyObj())->retire(domain);
}
EXPECT_FALSE(reclaimed);
observable.setValue(1);
CHECK( // should take less than 1ms on an unloaded machine
folly::observer_detail::ObserverManager::tryWaitForAllUpdatesFor(1s));
EXPECT_TRUE(reclaimed); // verify that forced reclamation really did happen
}
TEST(Observer, CoreCachedObserver) {
SimpleObservable<int> observable(42);
auto observer = observable.getObserver();
auto ccObserver = std::make_unique<CoreCachedObserver<int>>(
makeObserver([observer] { return **observer; }));
EXPECT_EQ(***ccObserver, 42);
observable.setValue(41);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(***ccObserver, 41);
// Verify that copies get updated too.
auto ccObserverCopy = std::make_unique<CoreCachedObserver<int>>(*ccObserver);
observable.setValue(40);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(***ccObserver, 40);
EXPECT_EQ(***ccObserverCopy, 40);
// And moves as well even if the originals disappear.
auto ccObserverMove =
std::make_unique<CoreCachedObserver<int>>(std::move(*ccObserverCopy));
ccObserver.reset();
ccObserverCopy.reset();
observable.setValue(39);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(***ccObserverMove, 39);
}
TEST(Observer, Unwrap) {
SimpleObservable<bool> selectorObservable{true};
SimpleObservable<int> trueObservable{1};
SimpleObservable<int> falseObservable{2};
auto observer = makeObserver([selectorO = selectorObservable.getObserver(),
trueO = trueObservable.getObserver(),
falseO = falseObservable.getObserver()] {
if (**selectorO) {
return trueO;
}
return falseO;
});
EXPECT_EQ(**observer, 1);
selectorObservable.setValue(false);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(**observer, 2);
falseObservable.setValue(3);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(**observer, 3);
trueObservable.setValue(4);
selectorObservable.setValue(true);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(**observer, 4);
}
TEST(Observer, UnwrapSimpleObservable) {
SimpleObservable<int> a{1};
SimpleObservable<int> b{2};
SimpleObservable<Observer<int>> observable{a.getObserver()};
auto o = observable.getObserver();
EXPECT_EQ(1, **o);
a.setValue(3);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(3, **o);
observable.setValue(b.getObserver());
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(2, **o);
b.setValue(4);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(4, **o);
}
TEST(Observer, WithJitterMonotoneProgress) {
SimpleObservable<int> observable(0);
auto observer = observable.getObserver();
EXPECT_EQ(0, **observer);
auto laggingObserver = withJitter(
std::move(observer),
std::chrono::milliseconds{100},
std::chrono::milliseconds{100});
EXPECT_EQ(0, **laggingObserver);
// Updates should never propagate out of order. E.g., if update 1 arrives and
// is delayed by 100 milliseconds, followed immediately by the arrival of
// update 2 with 1 millisecond delay, then update 1 should never overwrite
// update 2.
for (int i = 1, lastSeen = 0; i <= 50; ++i) {
auto curr = **laggingObserver;
EXPECT_LE(lastSeen, curr);
lastSeen = curr;
observable.setValue(i);
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds{10});
}
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{2});
// The latest update is eventually propagated
EXPECT_EQ(50, **laggingObserver);
}
TEST(Observer, WithJitterActuallyInducesLag) {
SimpleObservable<int> observable(0);
auto observer = observable.getObserver();
EXPECT_EQ(0, **observer);
auto laggingObserver = withJitter(
observer, std::chrono::seconds{10}, std::chrono::milliseconds::zero());
EXPECT_EQ(0, **laggingObserver);
observable.setValue(42);
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
EXPECT_EQ(0, **laggingObserver);
}
TEST(Observer, WithJitterNoEarlyRefresh) {
SimpleObservable<int> observable(0);
auto base = observable.getObserver();
auto copy = makeObserver([base] { return **base; });
auto laggingObserver = withJitter(
base, std::chrono::seconds{10}, std::chrono::milliseconds::zero());
auto delta = makeObserver(
[copy, laggingObserver] { return **copy - **laggingObserver; });
EXPECT_EQ(0, **base);
EXPECT_EQ(0, **copy);
EXPECT_EQ(0, **laggingObserver);
EXPECT_EQ(0, **delta);
observable.setValue(42);
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
// Updates along the base -> copy -> delta path should not trigger an early
// refresh of laggingObserver
EXPECT_EQ(42, **base);
EXPECT_EQ(42, **copy);
EXPECT_EQ(0, **laggingObserver);
EXPECT_EQ(42, **delta);
}
TEST(SimpleObservable, DefaultConstructible) {
struct Data {
int i = 42;
};
static_assert(std::is_default_constructible<Data>::value);
static_assert(std::is_default_constructible<SimpleObservable<Data>>::value);
SimpleObservable<Data> observable;
EXPECT_EQ((**observable.getObserver()).i, 42);
}
TEST(Observer, MakeObserverUpdatesTracking) {
SimpleObservable<int> observable(0);
auto slowObserver = makeObserver([o = observable.getObserver()] {
std::this_thread::sleep_for(std::chrono::milliseconds{10});
return **o;
});
auto tlObserver = makeTLObserver(slowObserver);
auto rmtlObserver = makeReadMostlyTLObserver(slowObserver);
auto atomicObserver = makeAtomicObserver(slowObserver);
auto rmatomicObserver = makeReadMostlyAtomicObserver(slowObserver);
auto hazptrObserver = makeHazptrObserver(slowObserver);
EXPECT_EQ(0, **tlObserver);
EXPECT_EQ(0, *(rmtlObserver.getShared()));
EXPECT_EQ(0, *atomicObserver);
EXPECT_EQ(0, *rmatomicObserver);
EXPECT_EQ(0, *(hazptrObserver.getSnapshot()));
EXPECT_EQ(0, *(hazptrObserver.getLocalSnapshot()));
auto tlObserverCheck = makeObserver([&]() mutable { return **tlObserver; });
auto rmtlObserverCheck =
makeObserver([&]() mutable { return *(rmtlObserver.getShared()); });
auto atomicObserverCheck =
makeObserver([&]() mutable { return *atomicObserver; });
auto rmatomicObserverCheck =
makeObserver([&]() mutable { return *rmatomicObserver; });
auto hazptrObserverGetSnapshotCheck =
makeObserver([&]() mutable { return *(hazptrObserver.getSnapshot()); });
auto hazptrObserverGetLocalSnapshotCheck = makeObserver(
[&]() mutable { return *(hazptrObserver.getLocalSnapshot()); });
for (size_t i = 1; i <= 10; ++i) {
observable.setValue(i);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(i, **tlObserverCheck);
EXPECT_EQ(i, **rmtlObserverCheck);
EXPECT_EQ(i, **atomicObserverCheck);
EXPECT_EQ(i, **rmatomicObserverCheck);
EXPECT_EQ(i, **hazptrObserverGetSnapshotCheck);
EXPECT_EQ(i, **hazptrObserverGetLocalSnapshotCheck);
}
}
TEST(Observer, Fibers) {
folly::EventBase evb;
auto& fm = folly::fibers::getFiberManager(evb);
auto f1 = fm.addTaskFuture([] {
auto o = makeObserver([] {
folly::futures::sleep(std::chrono::milliseconds{10}).get();
return 1;
});
EXPECT_EQ(1, **o);
});
auto f2 = fm.addTaskFuture([] {
auto o = makeObserver([] {
folly::futures::sleep(std::chrono::milliseconds{20}).get();
return 2;
});
EXPECT_EQ(2, **o);
});
std::move(f1).getVia(&evb);
std::move(f2).getVia(&evb);
}
std::mutex lockingObservableLock;
std::atomic<size_t> lockingObservableValue{0};
folly::Function<void()> lockingObservableCallback;
TEST(Observer, ObservableLockInversion) {
struct LockingObservable {
using element_type = size_t;
std::shared_ptr<const size_t> get() {
std::lock_guard<std::mutex> lg(lockingObservableLock);
return std::make_shared<const size_t>(lockingObservableValue.load());
}
void subscribe(folly::Function<void()> cb) {
lockingObservableCallback = std::move(cb);
}
void unsubscribe() { lockingObservableCallback = nullptr; }
};
auto observer =
folly::observer::ObserverCreator<LockingObservable>().getObserver();
EXPECT_EQ(0, **observer);
constexpr size_t kNumIters = 1000;
std::thread updater([&] {
for (size_t i = 1; i <= kNumIters; ++i) {
lockingObservableValue = i;
lockingObservableCallback();
}
});
while (true) {
std::lock_guard<std::mutex> lg(lockingObservableLock);
if (**makeObserver([o = observer] { return **o; }) == kNumIters) {
break;
}
}
updater.join();
}
folly::Function<void()> throwingObservableCallback;
TEST(Observer, ObservableGetThrow) {
struct ThrowingObservable {
using element_type = size_t;
std::shared_ptr<const size_t> get() {
if (getCalled_.exchange(true)) {
throw std::logic_error("Transient error");
}
return std::make_shared<const size_t>(42);
}
void subscribe(folly::Function<void()> cb) {
throwingObservableCallback = std::move(cb);
}
void unsubscribe() { throwingObservableCallback = nullptr; }
private:
std::atomic<bool> getCalled_{false};
};
auto observer =
folly::observer::ObserverCreator<ThrowingObservable>().getObserver();
EXPECT_EQ(42, **observer);
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(42, **observer);
throwingObservableCallback();
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(42, **observer);
struct ExpectedException {};
struct AlwaysThrowingObservable {
using element_type = size_t;
std::shared_ptr<const size_t> get() { throw ExpectedException(); }
void subscribe(folly::Function<void()>) {}
void unsubscribe() {}
};
EXPECT_THROW(
folly::observer::ObserverCreator<AlwaysThrowingObservable>()
.getObserver(),
ExpectedException);
}
TEST(Observer, ReenableSingletons) {
folly::observer::SimpleObservable<size_t> observable(0);
constexpr size_t kMaxValue = 10000;
std::mutex forkMutex;
std::thread publishThread([&] {
for (size_t i = 1; i <= kMaxValue; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds{1});
{
std::lock_guard<std::mutex> lg(forkMutex);
observable.setValue(i);
}
}
});
auto observer = observable.getObserver();
while (**observer < kMaxValue) {
std::this_thread::sleep_for(std::chrono::milliseconds{10});
folly::SingletonVault::singleton()->destroyInstances();
{
std::lock_guard<std::mutex> lg(forkMutex);
folly::SingletonVault::singleton()->reenableInstances();
}
folly::observer_detail::ObserverManager::vivify();
}
publishThread.join();
}
TEST(Observer, ReenableSingletonWithPendingUpdate) {
folly::observer::SimpleObservable<size_t> observable(0);
auto observer = observable.getObserver();
EXPECT_EQ(0, **observer);
folly::SingletonVault::singleton()->destroyInstances();
observable.setValue(42);
folly::SingletonVault::singleton()->reenableInstances();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
EXPECT_EQ(42, **observer);
}