/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <array> #include <atomic> #include <cstdint> #include <limits> #include <new> #include <stdexcept> #include <thread> #include <utility> #include <glog/logging.h> #include <folly/ConstexprMath.h> #include <folly/Likely.h> #include <folly/Portability.h> #include <folly/ScopeGuard.h> #include <folly/Utility.h> #include <folly/chrono/Hardware.h> #include <folly/detail/Futex.h> #include <folly/functional/Invoke.h> #include <folly/lang/Align.h> #include <folly/lang/Bits.h> #include <folly/lang/Exception.h> #include <folly/portability/Asm.h> #include <folly/synchronization/AtomicNotification.h> #include <folly/synchronization/AtomicUtil.h> #include <folly/synchronization/Lock.h> #include <folly/synchronization/detail/InlineFunctionRef.h> #include <folly/synchronization/detail/Sleeper.h> namespace folly { namespace detail { namespace distributed_mutex { // kUnlocked is used to show unlocked state // // When locking threads encounter kUnlocked in the underlying storage, they // can just acquire the lock without any further effort constexpr auto kUnlocked = …; // kLocked is used to show that the mutex is currently locked, and future // attempts to lock the mutex should enqueue on the central storage // // Locking threads find this on central storage only when there is a // contention chain that is undergoing wakeups, in every other case, a locker // will either find kUnlocked or an arbitrary address with the kLocked bit set constexpr auto kLocked = …; // kTimedWaiter is set when there is at least one timed waiter on the mutex // // Timed waiters do not follow the sleeping strategy employed by regular, // non-timed threads. They sleep on the central mutex atomic through an // extended futex() interface that allows sleeping with the same semantics for // non-standard integer widths // // When a regular non-timed thread unlocks or enqueues on the mutex, and sees // a timed waiter, it takes ownership of all the timed waiters. The thread // that has taken ownership of the timed waiter releases the timed waiters // when it gets a chance at the critical section. At which point it issues a // wakeup to single timed waiter, timed waiters always issue wake() calls to // other timed waiters constexpr auto kTimedWaiter = …; // kUninitialized means that the thread has just enqueued, and has not yet // gotten to initializing itself with the address of its successor // // this becomes significant for threads that are trying to wake up the // uninitialized thread, if they see that the thread is not yet initialized, // they can do nothing but spin, and wait for the thread to get initialized // // This also plays a role in the functioning of flat combining as implemented // in DistributedMutex. When a thread owning the lock goes through the // contention chain to either unlock the mutex or combine critical sections // from the other end. The presence of kUninitialized means that the // combining thread is not able to make progress after this point. So we // transfer the lock. constexpr auto kUninitialized = …; // kWaiting will be set in the waiter's futex structs while they are spinning // while waiting for the mutex constexpr auto kWaiting = …; // kWake will be set by threads that are waking up waiters that have enqueued constexpr auto kWake = …; // kSkipped will be set by a waker when they see that a waiter has been // preempted away by the kernel, in this case the thread that got skipped will // have to wake up and put itself back on the queue constexpr auto kSkipped = …; // kAboutToWait will be set by a waiter that enqueues itself with the purpose // of waiting on a futex constexpr auto kAboutToWait = …; // kSleeping will be set by a waiter right before enqueueing on a futex. When // a thread wants to wake up a waiter that has enqueued on a futex, it should // set the futex to contain kWake // // a thread that is unlocking and wants to skip over a sleeping thread also // calls futex_.exchange(kSleeping) on the sleeping thread's futex word. It // does this to 1. detect whether the sleeping thread had actually gone to // sleeping on the futex word so it can skip it, and 2. to synchronize with // other non atomic writes in the sleeping thread's context (such as the write // to track the next waiting thread). // // We reuse kSleeping instead of say using another constant kEarlyDelivery to // avoid situations where a thread has to enter kernel mode due to calling // futexWait() twice because of the presence of a waking thread. This // situation can arise when an unlocking thread goes to skip over a sleeping // thread, sees that the thread has slept and move on, but the sleeping thread // had not yet entered futex(). This interleaving causes the thread calling // futex() to return spuriously, as the futex word is not what it should be constexpr auto kSleeping = …; // kCombined is set by the lock holder to let the waiter thread know that its // combine request was successfully completed by the lock holder. A // successful combine means that the thread requesting the combine operation // does not need to unlock the mutex; in fact, doing so would be an error. constexpr auto kCombined = …; // kCombineUninitialized is like kUninitialized but is set by a thread when it // enqueues in hopes of getting its critical section combined with the lock // holder constexpr auto kCombineUninitialized = …; // kCombineWaiting is set by a thread when it is ready to have its combine // record fulfilled by the lock holder. In particular, this signals to the // lock holder that the thread has set its next_ pointer in the contention // chain constexpr auto kCombineWaiting = …; // kExceptionOccurred is set on the waiter futex when the remote task throws // an exception. It is the caller's responsibility to retrieve the exception // and rethrow it in their own context. Note that when the caller uses a // noexcept function as their critical section, they can avoid checking for // this value // // This allows us to avoid all cost of exceptions in the memory layout of the // fast path (no errors) as exceptions are stored as an std::exception_ptr in // the same union that stores the return value of the critical section. We // also avoid all CPU overhead because the combiner uses a try-catch block // without any additional branching to handle exceptions constexpr auto kExceptionOccurred = …; // Alias for processor's time-stamp counter value to help distinguish it from // other integers CpuTicks; // The number of spins that we are allowed to do before we resort to marking a // thread as having slept // // This is just a magic number from benchmarks constexpr auto kScheduledAwaySpinThreshold = …; // The maximum time to spin before a thread starts yielding its processor // in hopes of getting skipped constexpr auto kMaxSpinTime = …; // The maximum number of contention chains we can resolve with flat combining. // After this number of contention chains, the mutex falls back to regular // two-phased mutual exclusion to ensure that we don't starve the combiner // thread constexpr auto kMaxCombineIterations = …; /** * Write only data that is available to the thread that is waking up another. * Only the waking thread is allowed to write to this, the thread to be woken * is allowed to read from this after a wakeup has been issued */ template <template <typename> class Atomic> class WakerMetadata { … }; /** * Type of the type-erased callable that is used for combining from the lock * holder's end. This has 48 bytes of inline storage that can be used to * minimize cache misses when combining */ CombineFunction; /** * Waiter encapsulates the state required for waiting on the mutex, this * contains potentially heavy state and is intended to be allocated on the * stack as part of a lock() function call * * To ensure that synchronization does not cause unintended side effects on * the rest of the thread stack (eg. metadata in lockImplementation(), or any * other data in the user's thread), we aggresively pad this struct and use * custom alignment internally to ensure that the relevant data fits within a * single cacheline. The added alignment here also gives us some room to * wiggle in the bottom few bits of the mutex, where we store extra metadata */ template <template <typename> class Atomic> class Waiter { … }; /** * A template that helps us differentiate between the different ways to return * a value from a combined critical section. A return value of type void * cannot be stored anywhere, so we use specializations and pick the right one * switched through std::conditional_t * * This is then used by CoalescedTask and its family of functions to implement * efficient return value transfers to the waiting threads */ template <typename Func> class RequestWithReturn { … }; template <typename Func> class RequestWithoutReturn { … }; // we need to use std::integral_constant::value here as opposed to // std::integral_constant::operator T() because MSVC errors out with the // implicit conversion Request; /** * A template that helps us to transform a callable returning a value to one * that returns void so it can be type erased and passed on to the waker. If * the return value is small enough, it gets coalesced into the wait struct * for optimal data transfer. When it's not small enough to fit in the waiter * storage buffer, we place it on it's own cacheline with isolation to prevent * false-sharing with the on-stack metadata of the waiter thread * * This helps a combined critical section feel more normal in the case where * the user wants to return a value, for example * * auto value = mutex_.lock_combine([&]() { * return data_.value(); * }); * * Without this, the user would typically create a dummy object that they * would then assign to from within the lambda. With return value chaining, * this pattern feels more natural * * Note that it is important to copy the entire callble into this class. * Storing something like a reference instead is not desirable because it does * not allow InlineFunctionRef to use inline storage to represent the user's * callable without extra indirections * * We use std::conditional_t and switch to the right type of task with the * CoalescedTask type alias */ template <typename Func, typename Waiter> class TaskWithCoalesce { … }; template <typename Func, typename Waiter> class TaskWithoutCoalesce { … }; template <typename Func, typename Waiter> class TaskWithBigReturnValue { … }; template <typename T, bool> struct Sizeof_; Sizeof_<T, false>; Sizeof_<T, true>; template <typename T> struct Sizeof : Sizeof_<T, std::is_void<T>::value> { … }; // we need to use std::integral_constant::value here as opposed to // std::integral_constant::operator T() because MSVC errors out with the // implicit conversion CoalescedTask; /** * Given a request and a wait node, coalesce them into a CoalescedTask that * coalesces the return value into the wait node when invoked from a remote * thread * * When given a null request through nullptr_t, coalesce() returns null as well */ template <typename Waiter> std::nullptr_t coalesce(std::nullptr_t&, Waiter&) { … } template < typename Request, typename Waiter, typename Func = typename Request::F> CoalescedTask<Func, Waiter> coalesce(Request& request, Waiter& waiter) { … } /** * Given a task, create storage for the return value. When we get a type * of CoalescedTask, this returns an instance of CoalescedTask::StorageType. * std::nullptr_t otherwise */ inline std::nullptr_t makeReturnValueStorageFor(std::nullptr_t&) { … } template < typename CoalescedTask, typename StorageType = typename CoalescedTask::StorageType> StorageType makeReturnValueStorageFor(CoalescedTask&) { … } /** * Given a task and storage, attach them together if needed. This only helps * when we have a task that returns a value bigger than can be coalesced. In * that case, we need to attach the storage with the task so the return value * can be transferred to this thread from the remote thread */ template <typename Task, typename Storage> void attach(Task&, Storage&) { … } template < typename R, typename W, typename StorageType = typename TaskWithBigReturnValue<R, W>::StorageType> void attach(TaskWithBigReturnValue<R, W>& task, StorageType& storage) { … } template <typename Request, typename Waiter> void throwIfExceptionOccurred(Request&, Waiter& waiter, bool exception) { … } /** * Given a CoalescedTask, a wait node and a request. Detach the return value * into the request from the wait node and task. */ template <typename Waiter> void detach(std::nullptr_t&, Waiter&, bool exception, std::nullptr_t&) { … } template <typename Waiter, typename F> void detach( RequestWithoutReturn<F>& request, Waiter& waiter, bool exception, folly::Unit&) { … } template <typename Waiter, typename F> void detach( RequestWithReturn<F>& request, Waiter& waiter, bool exception, folly::Unit&) { … } template <typename Waiter, typename F, typename Storage> void detach( RequestWithReturn<F>& request, Waiter& waiter, bool exception, Storage& storage) { … } /** * Get the time since epoch in CPU cycles * * This is faster than std::chrono::steady_clock because it avoids a VDSO * access to get the timestamp counter * * Note that the hardware timestamp counter on x86, like std::steady_clock is * guaranteed to be monotonically increasing - * https://c9x.me/x86/html/file_module_x86_id_278.html */ inline CpuTicks time() { … } /** * Zero out the other bits used by the implementation and return just an * address from a uintptr_t */ template <typename Type> Type* extractPtr(std::uintptr_t from) { … } /** * Strips the given CPU timestamp into only the least significant 56 bits by * moving the least significant 56 bits over by 8 zeroing out the bottom 8 * bits to be used as a medium of information transfer for the thread wait * nodes */ inline std::uint64_t strip(CpuTicks time) { … } /** * Recover the timestamp value from an integer that has the timestamp encoded * in it */ inline std::uint64_t recover(std::uint64_t from) { … } template <template <typename> class Atomic, bool TimePublishing> class DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy { … }; template <template <typename> class Atomic, bool TimePublishing> DistributedMutex<Atomic, TimePublishing>::DistributedMutex() : … { … } template <typename Waiter> std::uint64_t publish( std::uint64_t spins, CpuTicks current, CpuTicks previous, CpuTicks elapsed, bool& shouldPublish, Waiter& waiter, std::uint32_t waitMode) { … } template <typename Waiter> bool spin(Waiter& waiter, std::uint32_t& sig, std::uint32_t mode) { … } template <typename Waiter> void doFutexWake(Waiter* waiter) { … } template <typename Waiter> bool doFutexWait(Waiter* waiter, Waiter*& next) { … } template <typename Waiter> bool wait(Waiter* waiter, std::uint32_t mode, Waiter*& next, uint32_t& signal) { … } inline void recordTimedWaiterAndClearTimedBit( bool& timedWaiter, std::uintptr_t& previous) { … } template <typename Atomic> void wakeTimedWaiters(Atomic* state, bool timedWaiters) { … } template <template <typename> class Atomic, bool TimePublishing> template <typename Func> auto DistributedMutex<Atomic, TimePublishing>::lock_combine(Func func) -> folly::invoke_result_t<const Func&> { … } template <template <typename> class Atomic, bool TimePublishing> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::lock() { … } template <template <typename> class Atomic, bool TimePublishing> template <typename Rep, typename Period, typename Func> folly::Optional<invoke_result_t<Func&>> DistributedMutex<Atomic, TimePublishing>::try_lock_combine_for( const std::chrono::duration<Rep, Period>& duration, Func func) { … } template <template <typename> class Atomic, bool TimePublishing> template <typename Clock, typename Duration, typename Func> folly::Optional<invoke_result_t<Func&>> DistributedMutex<Atomic, TimePublishing>::try_lock_combine_until( const std::chrono::time_point<Clock, Duration>& deadline, Func func) { … } template <typename Atomic, template <typename> class A, bool T> auto tryLockNoLoad(Atomic& atomic, DistributedMutex<A, T>&) { … } template <template <typename> class Atomic, bool TimePublishing> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock() { … } template < template <typename> class Atomic, bool TimePublishing, typename State, typename Request> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy lockImplementation( DistributedMutex<Atomic, TimePublishing>& mutex, State& atomic, Request& request) { … } inline bool preempted(std::uint64_t value, CpuTicks now) { … } inline bool isSleeper(std::uintptr_t value) { … } inline bool isInitialized(std::uintptr_t value) { … } inline bool isCombiner(std::uintptr_t value) { … } inline bool isWaitingCombiner(std::uintptr_t value) { … } template <typename Waiter> CombineFunction loadTask(Waiter* current, std::uintptr_t value) { … } template <typename Waiter> [[FOLLY_ATTR_GNU_COLD]] void transferCurrentException(Waiter* waiter) { … } template <template <typename> class Atomic> FOLLY_ALWAYS_INLINE std::uintptr_t tryCombine( Waiter<Atomic>* waiter, std::uintptr_t value, std::uintptr_t next, std::uint64_t iteration, CpuTicks now, CombineFunction task) { … } template <typename Waiter> FOLLY_ALWAYS_INLINE std::uintptr_t tryWake( bool publishing, Waiter* waiter, std::uintptr_t value, std::uintptr_t next, std::uintptr_t waker, Waiter*& sleepers, std::uint64_t iteration, CombineFunction task) { … } template <typename Waiter> bool wake( bool publishing, Waiter& waiter, std::uintptr_t waker, Waiter*& sleepers, std::uint64_t iter) { … } template <typename Atomic, typename Proxy, typename Sleepers> bool tryUnlockClean(Atomic& state, Proxy& proxy, Sleepers sleepers) { … } template <template <typename> class Atomic, bool Publish> void DistributedMutex<Atomic, Publish>::unlock( DistributedMutex::DistributedMutexStateProxy const& proxy_) { … } template <typename Atomic, typename Deadline, typename MakeProxy> auto timedLock(Atomic& state, Deadline deadline, MakeProxy proxy) { … } template <template <typename> class Atomic, bool TimePublishing> template <typename Clock, typename Duration> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock_until( const std::chrono::time_point<Clock, Duration>& deadline) { … } template <template <typename> class Atomic, bool TimePublishing> template <typename Rep, typename Period> typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy DistributedMutex<Atomic, TimePublishing>::try_lock_for( const std::chrono::duration<Rep, Period>& duration) { … } } // namespace distributed_mutex } // namespace detail } // namespace folly namespace std { unique_lock< ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>>; lock_guard< ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>>; } // namespace std