folly/folly/concurrency/container/FlatCombiningPriorityQueue.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <queue>

#include <glog/logging.h>

#include <folly/Optional.h>
#include <folly/detail/Futex.h>
#include <folly/synchronization/FlatCombining.h>

namespace folly {

/// Thread-safe priority queue based on flat combining. If the
/// constructor parameter maxSize is greater than 0 (default = 0),
/// then the queue is bounded. This template provides blocking,
/// non-blocking, and timed variants of each of push(), pop(), and
/// peek() operations. The empty() and size() functions are inherently
/// non-blocking.
///
/// PriorityQueue must support the interface of std::priority_queue,
/// specifically empty(), size(), push(), top(), and pop().  Mutex
/// must meet the standard Lockable requirements.
///
/// By default FlatCombining uses a dedicated combiner thread, which
/// yields better latency and throughput under high contention but
/// higher overheads under low contention. If the constructor
/// parameter dedicated is false, then there will be no dedicated
/// combiner thread and any requester may do combining of operations
/// requested by other threads. For more details see the comments for
/// FlatCombining.
///
/// Usage examples:
/// @code
///   FlatCombiningPriorityQueue<int> pq(1);
///   CHECK(pq.empty());
///   CHECK(pq.size() == 0);
///   int v;
///   CHECK(!try_pop(v));
///   CHECK(!try_pop_until(v, now() + seconds(1)));
///   CHECK(!try_peek(v));
///   CHECK(!try_peek_until(v, now() + seconds(1)));
///   pq.push(10);
///   CHECK(!pq.empty());
///   CHECK(pq.size() == 1);
///   CHECK(!pq.try_push(20));
///   CHECK(!pq.try_push_until(20), now() + seconds(1)));
///   peek(v);
///   CHECK_EQ(v, 10);
///   CHECK(pq.size() == 1);
///   pop(v);
///   CHECK_EQ(v, 10);
///   CHECK(pq.empty());
/// @encode

template <
    typename T,
    typename PriorityQueue = std::priority_queue<T>,
    typename Mutex = std::mutex,
    template <typename> class Atom = std::atomic>
class FlatCombiningPriorityQueue
    : public folly::FlatCombining<
          FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
          Mutex,
          Atom> {
  using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
  using FC = folly::FlatCombining<FCPQ, Mutex, Atom>;

 public:
  template <
      typename... PQArgs,
      typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
  explicit FlatCombiningPriorityQueue(
      // Concurrent priority queue parameter
      const size_t maxSize = 0,
      // Flat combining parameters
      const bool dedicated = true,
      const uint32_t numRecs = 0,
      const uint32_t maxOps = 0,
      // (Sequential) PriorityQueue Parameters
      PQArgs... args)
      : FC(dedicated, numRecs, maxOps),
        maxSize_(maxSize),
        pq_(std::forward<PQArgs>(args)...) {}

  /// Returns true iff the priority queue is empty
  bool empty() const {
    bool res;
    auto fn = [&] { res = pq_.empty(); };
    const_cast<FCPQ*>(this)->requestFC(fn);
    return res;
  }

  /// Returns the number of items in the priority queue
  size_t size() const {
    size_t res;
    auto fn = [&] { res = pq_.size(); };
    const_cast<FCPQ*>(this)->requestFC(fn);
    return res;
  }

  /// Non-blocking push. Succeeds if there is space in the priority
  /// queue to insert the new item. Tries once if no time point is
  /// provided or until the provided time_point is reached. If
  /// successful, inserts the provided item in the priority queue
  /// according to its priority.
  bool try_push(const T& val) {
    return try_push_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::min());
  }

  /// Non-blocking pop. Succeeds if the priority queue is
  /// nonempty. Tries once if no time point is provided or until the
  /// provided time_point is reached.  If successful, copies the
  /// highest priority item and removes it from the priority queue.
  bool try_pop(T& val) {
    return try_pop_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::min());
  }

  /// Non-blocking peek. Succeeds if the priority queue is
  /// nonempty. Tries once if no time point is provided or until the
  /// provided time_point is reached.  If successful, copies the
  /// highest priority item without removing it.
  bool try_peek(T& val) {
    return try_peek_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::min());
  }

  /// Blocking push. Inserts the provided item in the priority
  /// queue. If it is full, this function blocks until there is space
  /// for the new item.
  void push(const T& val) {
    try_push_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::max());
  }

  /// Blocking pop. Copies the highest priority item and removes
  /// it. If the priority queue is empty, this function blocks until
  /// it is nonempty.
  void pop(T& val) {
    try_pop_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::max());
  }

  /// Blocking peek. Copies the highest priority item without
  /// removing it. If the priority queue is empty, this function
  /// blocks until it is nonempty.
  void peek(T& val) {
    try_peek_impl(
        val, std::chrono::time_point<std::chrono::steady_clock>::max());
  }

  folly::Optional<T> try_pop() {
    T val;
    if (try_pop(val)) {
      return std::move(val);
    }
    return folly::none;
  }

  folly::Optional<T> try_peek() {
    T val;
    if (try_peek(val)) {
      return std::move(val);
    }
    return folly::none;
  }

  template <typename Rep, typename Period>
  folly::Optional<T> try_pop_for(
      const std::chrono::duration<Rep, Period>& timeout) {
    T val;
    if (try_pop(val) ||
        try_pop_impl(val, std::chrono::steady_clock::now() + timeout)) {
      return std::move(val);
    }
    return folly::none;
  }

  template <typename Rep, typename Period>
  bool try_push_for(
      const T& val, const std::chrono::duration<Rep, Period>& timeout) {
    return (
        try_push(val) ||
        try_push_impl(val, std::chrono::steady_clock::now() + timeout));
  }

  template <typename Rep, typename Period>
  folly::Optional<T> try_peek_for(
      const std::chrono::duration<Rep, Period>& timeout) {
    T val;
    if (try_peek(val) ||
        try_peek_impl(val, std::chrono::steady_clock::now() + timeout)) {
      return std::move(val);
    }
    return folly::none;
  }

  template <typename Clock, typename Duration>
  folly::Optional<T> try_pop_until(
      const std::chrono::time_point<Clock, Duration>& deadline) {
    T val;
    if (try_pop_impl(val, deadline)) {
      return std::move(val);
    }
    return folly::none;
  }

  template <typename Clock, typename Duration>
  bool try_push_until(
      const T& val, const std::chrono::time_point<Clock, Duration>& deadline) {
    return try_push_impl(val, deadline);
  }

  template <typename Clock, typename Duration>
  folly::Optional<T> try_peek_until(
      const std::chrono::time_point<Clock, Duration>& deadline) {
    T val;
    if (try_peek_impl(val, deadline)) {
      return std::move(val);
    }
    return folly::none;
  }

 private:
  size_t maxSize_;
  PriorityQueue pq_;
  detail::Futex<Atom> empty_{};
  detail::Futex<Atom> full_{};

  bool isTrue(detail::Futex<Atom>& futex) {
    return futex.load(std::memory_order_relaxed) != 0;
  }

  void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
    futex.store(val, std::memory_order_relaxed);
  }

  bool futexSignal(detail::Futex<Atom>& futex) {
    if (isTrue(futex)) {
      setFutex(futex, 0);
      return true;
    } else {
      return false;
    }
  }

  template <typename Clock, typename Duration>
  bool try_push_impl(
      const T& val, const std::chrono::time_point<Clock, Duration>& when);

  template <typename Clock, typename Duration>
  bool try_pop_impl(
      T& val, const std::chrono::time_point<Clock, Duration>& when);

  template <typename Clock, typename Duration>
  bool try_peek_impl(
      T& val, const std::chrono::time_point<Clock, Duration>& when);
};

/// Implementation

template <
    typename T,
    typename PriorityQueue,
    typename Mutex,
    template <typename>
    class Atom>
template <typename Clock, typename Duration>
inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_push_impl(
    const T& val, const std::chrono::time_point<Clock, Duration>& when) {
  while (true) {
    bool res;
    bool wake;

    auto fn = [&] {
      if (maxSize_ > 0 && pq_.size() == maxSize_) {
        setFutex(full_, 1);
        res = false;
        return;
      }
      DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
      try {
        pq_.push(val);
        wake = futexSignal(empty_);
        res = true;
        return;
      } catch (const std::bad_alloc&) {
        setFutex(full_, 1);
        res = false;
        return;
      }
    };
    this->requestFC(fn);

    if (res) {
      if (wake) {
        detail::futexWake(&empty_);
      }
      return true;
    }
    if (when == std::chrono::time_point<Clock>::min()) {
      return false;
    }
    while (isTrue(full_)) {
      if (when == std::chrono::time_point<Clock>::max()) {
        detail::futexWait(&full_, 1);
      } else {
        if (Clock::now() > when) {
          return false;
        } else {
          detail::futexWaitUntil(&full_, 1, when);
        }
      }
    } // inner while loop
  } // outer while loop
}

template <
    typename T,
    typename PriorityQueue,
    typename Mutex,
    template <typename>
    class Atom>
template <typename Clock, typename Duration>
inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_pop_impl(
    T& val, const std::chrono::time_point<Clock, Duration>& when) {
  while (true) {
    bool res;
    bool wake;

    auto fn = [&] {
      res = !pq_.empty();
      if (res) {
        val = pq_.top();
        pq_.pop();
        wake = futexSignal(full_);
      } else {
        setFutex(empty_, 1);
      }
    };
    this->requestFC(fn);

    if (res) {
      if (wake) {
        detail::futexWake(&full_);
      }
      return true;
    }
    while (isTrue(empty_)) {
      if (when == std::chrono::time_point<Clock>::max()) {
        detail::futexWait(&empty_, 1);
      } else {
        if (Clock::now() > when) {
          return false;
        } else {
          detail::futexWaitUntil(&empty_, 1, when);
        }
      }
    } // inner while loop
  } // outer while loop
}

template <
    typename T,
    typename PriorityQueue,
    typename Mutex,
    template <typename>
    class Atom>
template <typename Clock, typename Duration>
inline bool
FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_peek_impl(
    T& val, const std::chrono::time_point<Clock, Duration>& when) {
  while (true) {
    bool res;

    auto fn = [&] {
      res = !pq_.empty();
      if (res) {
        val = pq_.top();
      } else {
        setFutex(empty_, 1);
      }
    };
    this->requestFC(fn);

    if (res) {
      return true;
    }
    while (isTrue(empty_)) {
      if (when == std::chrono::time_point<Clock>::max()) {
        detail::futexWait(&empty_, 1);
      } else {
        if (Clock::now() > when) {
          return false;
        } else {
          detail::futexWaitUntil(&empty_, 1, when);
        }
      }
    } // inner while loop
  } // outer while loop
}

} // namespace folly