folly/folly/coro/Mutex.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <folly/Executor.h>
#include <folly/experimental/coro/Coroutine.h>
#include <folly/experimental/coro/ViaIfAsync.h>

#include <atomic>
#include <mutex>
#include <type_traits>

#if FOLLY_HAS_COROUTINES

namespace folly {
namespace coro {

/// A mutex that can be locked asynchronously using 'co_await'.
///
/// Ownership of the mutex is not tied to any particular thread.
/// This allows the coroutine owning the lock to transition from one thread
/// to another while holding the lock and then perform the unlock() operation
/// on another thread.
///
/// This mutex guarantees a FIFO scheduling algorithm - coroutines acquire the
/// lock in the order that they execute the 'co_await mutex.co_lock()'
/// operation.
///
/// Note that you cannot use std::scoped_lock/std::lock_guard to acquire the
/// lock as the lock must be acquired with use of 'co_await' which cannot be
/// used in a constructor.
///
/// You can still use the std::scoped_lock/std::lock_guard in conjunction with
/// std::adopt_lock to automatically unlock the mutex when the current scope
/// exits after having locked the mutex using either 'co_await m.co_lock()'
/// or 'm.try_lock()'.
///
/// You can also attempt to acquire the lock using std::unique_lock in
/// conjunction with std::try_to_lock.
///
/// For example:
///   folly::coro::Mutex m;
///   folly::Executor& executor;
///
///   folly::coro::Task<> asyncScopedLockExample()
///   {
///     std::unique_lock<folly::coro::Mutex> lock = co_await m.co_scoped_lock();
///     ...
///   }
///
///   folly::coro::Task<> asyncManualLockAndUnlock()
///   {
///     co_await m.co_lock(executor);
///     ...
///     m.unlock();
///   }
///
///   void nonAsyncTryLock()
///   {
///     if (m.try_lock())
///     {
///       // Once the lock is acquired you can pass ownership of the lock to
///       // a std::lock_guard object.
///       std::lock_guard<folly::coro::Mutex> lock{m, std::adopt_lock};
///       ...
///     }
///   }
///
///   void nonAsyncScopedTryLock()
///   {
///     std::unique_lock<folly::coro::Mutex> lock{m, std::try_to_lock};
///     if (lock)
///     {
///       ...
///     }
///   }
class Mutex {
  class ScopedLockAwaiter;
  class LockAwaiter;
  template <typename Awaiter>
  class LockOperation;

 public:
  /// Construct a new async mutex that is initially unlocked.
  Mutex() noexcept : state_(unlockedState()), waiters_(nullptr) {}

  Mutex(const Mutex&) = delete;
  Mutex(Mutex&&) = delete;
  Mutex& operator=(const Mutex&) = delete;
  Mutex& operator=(Mutex&&) = delete;

  ~Mutex();

  /// Try to lock the mutex synchronously.
  ///
  /// Returns true if the lock was able to be acquired synchronously, false
  /// if the lock could not be acquired because it was already locked.
  ///
  /// If this method returns true then the caller is responsible for ensuring
  /// that unlock() is called to release the lock.
  bool try_lock() noexcept {
    void* oldValue = unlockedState();
    return state_.compare_exchange_strong(
        oldValue,
        nullptr,
        std::memory_order_acquire,
        std::memory_order_relaxed);
  }

  /// Lock the mutex asynchronously, returning an RAII object that will release
  /// the lock at the end of the scope.
  ///
  /// You must co_await the return value to wait until the lock is acquired.
  ///
  /// Chain a call to .viaIfAsync() to specify the executor to resume on when
  /// the lock is eventually acquired in the case that the lock could not be
  /// acquired synchronously. Note that the executor will be passed implicitly
  /// if awaiting from a Task or AsyncGenerator coroutine. The awaiting
  /// coroutine will continue without suspending if the lock could be acquired
  /// synchronously.
  [[nodiscard]] LockOperation<ScopedLockAwaiter> co_scoped_lock() noexcept;

  /// Lock the mutex asynchronously.
  ///
  /// You must co_await the return value to wait until the lock is acquired.
  ///
  /// Chain a call to .viaIfAsync() to specify the executor to resume on when
  /// the lock is eventually acquired in the case that the lock could not be
  /// acquired synchronously. The awaiting coroutine will continue without
  /// suspending if the lock could be acquired synchronously.
  ///
  /// Once the 'co_await m.co_lock()' operation completes, the awaiting
  /// coroutine is responsible for ensuring that .unlock() is called to release
  /// the lock.
  ///
  /// Consider using co_scoped_lock() instead to obtain a std::scoped_lock
  /// that handles releasing the lock at the end of the scope.
  [[nodiscard]] LockOperation<LockAwaiter> co_lock() noexcept;

  /// Unlock the mutex.
  ///
  /// If there are other coroutines waiting to lock the mutex then this will
  /// schedule the resumption of the next coroutine in the queue.
  void unlock() noexcept;

 private:
  using folly_coro_aware_mutex = std::true_type;

  class LockAwaiter {
   public:
    explicit LockAwaiter(Mutex& mutex) noexcept : mutex_(mutex) {}

    bool await_ready() noexcept { return mutex_.try_lock(); }

    bool await_suspend(coroutine_handle<> awaitingCoroutine) noexcept {
      awaitingCoroutine_ = awaitingCoroutine;
      return mutex_.lockAsyncImpl(this);
    }

    void await_resume() noexcept {}

   protected:
    Mutex& mutex_;

   private:
    friend Mutex;

    coroutine_handle<> awaitingCoroutine_;
    LockAwaiter* next_;
  };

  class ScopedLockAwaiter : public LockAwaiter {
   public:
    using LockAwaiter::LockAwaiter;

    std::unique_lock<Mutex> await_resume() noexcept {
      return std::unique_lock<Mutex>{mutex_, std::adopt_lock};
    }
  };

  template <typename Awaiter>
  class LockOperation {
   public:
    explicit LockOperation(Mutex& mutex) noexcept : mutex_(mutex) {}

    auto viaIfAsync(folly::Executor::KeepAlive<> executor) const {
      return folly::coro::co_viaIfAsync(std::move(executor), Awaiter{mutex_});
    }

   private:
    Mutex& mutex_;
  };

  // Special value for state_ that indicates the mutex is not locked.
  void* unlockedState() noexcept { return this; }

  // Try to lock the mutex.
  //
  // Returns true if the lock could not be acquired synchronously and awaiting
  // coroutine should suspend. In this case the coroutine will be resumed later
  // once it acquires the mutex. Returns false if the lock was acquired
  // synchronously and the awaiting coroutine should continue without
  // suspending.
  bool lockAsyncImpl(LockAwaiter* awaiter);

  // This contains either:
  // - this    => Not locked
  // - nullptr => Locked, no newly queued waiters (ie. empty list of waiters)
  // - other   => Pointer to first LockAwaiter* in a linked-list of newly
  //              queued awaiters in LIFO order.
  std::atomic<void*> state_;

  // Linked-list of waiters in FIFO order.
  // Only the current lock holder is allowed to access this member.
  LockAwaiter* waiters_;
};

inline Mutex::LockOperation<Mutex::ScopedLockAwaiter>
Mutex::co_scoped_lock() noexcept {
  return LockOperation<ScopedLockAwaiter>{*this};
}

inline Mutex::LockOperation<Mutex::LockAwaiter> Mutex::co_lock() noexcept {
  return LockOperation<LockAwaiter>{*this};
}

} // namespace coro
} // namespace folly

#endif // FOLLY_HAS_COROUTINES