/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/detail/InlineTask.h>
#include <folly/portability/GTest.h>
#include <mutex>
#if FOLLY_HAS_COROUTINES
using namespace folly;
class MutexTest : public testing::Test {};
TEST_F(MutexTest, TryLock) {
coro::Mutex m;
CHECK(m.try_lock());
CHECK(!m.try_lock());
m.unlock();
CHECK(m.try_lock());
}
TEST_F(MutexTest, ScopedLock) {
coro::Mutex m;
{
std::unique_lock<coro::Mutex> lock{m, std::try_to_lock};
CHECK(lock.owns_lock());
{
std::unique_lock<coro::Mutex> lock2{m, std::try_to_lock};
CHECK(!lock2.owns_lock());
}
}
CHECK(m.try_lock());
m.unlock();
}
TEST_F(MutexTest, LockAsync) {
coro::Mutex m;
coro::Baton b1;
coro::Baton b2;
int value = 0;
auto makeTask = [&](coro::Baton& b) -> coro::Task<void> {
co_await m.co_lock();
++value;
co_await b;
++value;
m.unlock();
};
ManualExecutor executor;
auto f1 = makeTask(b1).scheduleOn(&executor).start();
executor.drain();
CHECK_EQ(1, value);
CHECK(!m.try_lock());
auto f2 = makeTask(b2).scheduleOn(&executor).start();
executor.drain();
CHECK_EQ(1, value);
// This will resume f1 coroutine and let it release the
// lock. This will in turn resume f2 which was suspended
// at co_await m.lockAsync() which will then increment the value
// before becoming blocked on
b1.post();
executor.drain();
CHECK_EQ(3, value);
CHECK(!m.try_lock());
b2.post();
executor.drain();
CHECK_EQ(4, value);
CHECK(m.try_lock());
}
TEST_F(MutexTest, ScopedLockAsync) {
coro::Mutex m;
coro::Baton b1;
coro::Baton b2;
int value = 0;
auto makeTask = [&](coro::Baton& b) -> coro::Task<void> {
auto lock = co_await m.co_scoped_lock();
++value;
co_await b;
++value;
};
ManualExecutor executor;
auto f1 = makeTask(b1).scheduleOn(&executor).start();
executor.drain();
CHECK_EQ(1, value);
CHECK(!m.try_lock());
auto f2 = makeTask(b2).scheduleOn(&executor).start();
executor.drain();
CHECK_EQ(1, value);
// This will resume f1 coroutine and let it release the
// lock. This will in turn resume f2 which was suspended
// at co_await m.lockAsync() which will then increment the value
// before becoming blocked on b2.
b1.post();
executor.drain();
CHECK_EQ(3, value);
CHECK(!m.try_lock());
b2.post();
executor.drain();
CHECK_EQ(4, value);
CHECK(m.try_lock());
}
TEST_F(MutexTest, ThreadSafety) {
CPUThreadPoolExecutor threadPool{
2, std::make_shared<NamedThreadFactory>("CPUThreadPool")};
int value = 0;
coro::Mutex mutex;
auto makeTask = [&]() -> coro::Task<void> {
for (int i = 0; i < 10'000; ++i) {
auto lock = co_await mutex.co_scoped_lock();
++value;
}
};
auto f1 = makeTask().scheduleOn(&threadPool).start();
auto f2 = makeTask().scheduleOn(&threadPool).start();
auto f3 = makeTask().scheduleOn(&threadPool).start();
std::move(f1).get();
std::move(f2).get();
std::move(f3).get();
CHECK_EQ(30'000, value);
}
#endif