folly/folly/concurrency/container/RelaxedConcurrentPriorityQueue.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <algorithm>
#include <atomic>
#include <climits>
#include <cmath>
#include <iomanip>
#include <iostream>
#include <mutex>

#include <folly/Random.h>
#include <folly/SpinLock.h>
#include <folly/ThreadLocal.h>
#include <folly/detail/Futex.h>
#include <folly/lang/Align.h>
#include <folly/synchronization/Hazptr.h>
#include <folly/synchronization/WaitOptions.h>
#include <folly/synchronization/detail/Spin.h>

/// ------ Concurrent Priority Queue Implementation ------
// The concurrent priority queue implementation is based on the
// Mound data structure (Mounds: Array-Based Concurrent Priority Queues
// by Yujie Liu and Michael Spear, ICPP 2012)
//
/// --- Overview ---
// This relaxed implementation extends the Mound algorithm, and provides
// following features:
// - Arbitrary priorities.
// - Unbounded size.
// - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop]
// - Supports blocking.
// - Fast and Scalable.
//
/// --- Mound ---
// A Mound is a heap where each element is a sorted linked list.
// First nodes in the lists maintain the heap property. Push randomly
// selects a leaf at the bottom level, then uses binary search to find
// a place to insert the new node to the head of the list. Pop gets
// the node from the head of the list at the root, then swap the
// list down until the heap feature holds. To use Mound in our
// implementation, we need to solve the following problems:
// - 1. Lack of general relaxed implementations. Mound is appealing
// for relaxed priority queue implementation because pop the whole
// list from the root is straightforward. One thread pops the list
// and following threads can pop from the list until its empty.
// Those pops only trigger one swap done operation. Thus reduce
// the latency for pop and reduce the contention for Mound.
// The difficulty is to provide a scalable and fast mechanism
// to let threads concurrently get elements from the list.
// - 2. Lack of control of list length. The length for every
// lists is critical for the performance. Mound suffers from not
// only the extreme cases(Push with increasing priorities, Mound
// becomes a sorted linked list; Push with decreasing priorities,
// Mound becomes to a regular heap), but also the common case(for
// random generated priorities, Mound degrades to the regular heap
// after millions of push/pop operations). The difficulty is to
// stabilize the list length without losing the accuracy and performance.
// - 3. Does not support blocking. Blocking is an important feature.
// Mound paper does not mention it. Designing the new algorithm for
// efficient blocking is challenging.
// - 4. Memory management. Mound allows optimistic reads. We need to
// protect the node from been reclaimed.
//
/// --- Design ---
// Our implementation extends Mound algorithm to support
// efficient relaxed pop. We employ a shared buffer algorithm to
// share the popped list. Our algorithm makes popping from shared
// buffer as fast as fetch_and_add. We improve the performance
// and compact the heap structure by stabilizing the size of each list.
// The implementation exposes the template parameter to set the
// preferred list length. Under the hood, we provide algorithms for
// fast inserting, pruning, and merging. The blocking algorithm is
// tricky. It allows one producer only wakes one consumer at a time.
// It also does not block the producer. For optimistic read, we use
// hazard pointer to protect the node from been reclaimed. We optimize the
// check-lock-check pattern by using test-test-and-set spin lock.

/// --- Template Parameters: ---
// 1. PopBatch could be 0 or a positive integer.
// If it is 0, only pop one node at a time.
// This is the strict implementation. It guarantees the return
// priority is alway the highest.  If it is > 0, we keep
// up to that number of nodes in a shared buffer to be consumed by
// subsequent pop operations.
//
// 2. ListTargetSize represents the minimal length for the list. It
// solves the problem when inserting to Mound with
// decreasing priority order (degrade to a heap).  Moreover,
// it maintains the Mound structure stable after trillions of
// operations, which causes unbalanced problem in the original
// Mound algorithm. We set the prunning length and merging lengtyh
// based on this parameter.
//
/// --- Interface ---
//  void push(const T& val)
//  void pop(T& val)
//  size_t size()
//  bool empty()

namespace folly {

template <
    typename T,
    bool MayBlock = false,
    bool SupportsSize = false,
    size_t PopBatch = 16,
    size_t ListTargetSize = 25,
    typename Mutex = folly::SpinLock,
    template <typename> class Atom = std::atomic>
class RelaxedConcurrentPriorityQueue {
  // Max height of the tree
  static constexpr uint32_t MAX_LEVELS = 32;
  // The default minimum value
  static constexpr T MIN_VALUE = std::numeric_limits<T>::min();

  // Align size for the shared buffer node
  static constexpr size_t Align = 1u << 7;
  static constexpr int LevelForForceInsert = 3;
  static constexpr int LevelForTraverseParent = 7;

  static_assert(PopBatch <= 256, "PopBatch must be <= 256");
  static_assert(
      ListTargetSize >= 1 && ListTargetSize <= 256,
      "TargetSize must be in the range [1, 256]");

  // The maximal length for the list
  static constexpr size_t PruningSize = ListTargetSize * 2;
  // When pop from Mound, tree elements near the leaf
  // level are likely be very small (the length of the list). When
  // swapping down after pop a list, we check the size of the
  // children to decide whether to merge them to their parent.
  static constexpr size_t MergingSize = ListTargetSize;

  /// List Node structure
  struct Node : public folly::hazptr_obj_base<Node, Atom> {
    Node* next;
    T val;
  };

  /// Mound Element (Tree node), head points to a linked list
  struct MoundElement {
    // Reading (head, size) without acquiring the lock
    Atom<Node*> head;
    Atom<size_t> size;
    alignas(Align) Mutex lock;
    MoundElement() { // initializer
      head.store(nullptr, std::memory_order_relaxed);
      size.store(0, std::memory_order_relaxed);
    }
  };

  /// The pos strcture simplify the implementation
  struct Position {
    uint32_t level;
    uint32_t index;
  };

  /// Node for shared buffer should be aligned
  struct BufferNode {
    alignas(Align) Atom<Node*> pnode;
  };

  /// Data members

  // Mound structure -> 2D array to represent a tree
  MoundElement* levels_[MAX_LEVELS];
  // Record the current leaf level (root is 0)
  Atom<uint32_t> bottom_;
  // It is used when expanding the tree
  Atom<uint32_t> guard_;

  // Mound with shared buffer
  // Following two members are accessed by consumers
  std::unique_ptr<BufferNode[]> shared_buffer_;
  alignas(Align) Atom<int> top_loc_;

  /// Blocking algorithm
  // Numbers of futexs in the array
  static constexpr size_t NumFutex = 128;
  // The index gap for accessing futex in the array
  static constexpr size_t Stride = 33;
  std::unique_ptr<folly::detail::Futex<Atom>[]> futex_array_;
  alignas(Align) Atom<uint32_t> cticket_;
  alignas(Align) Atom<uint32_t> pticket_;

  // Two counters to calculate size of the queue
  alignas(Align) Atom<size_t> counter_p_;
  alignas(Align) Atom<size_t> counter_c_;

 public:
  /// Constructor
  RelaxedConcurrentPriorityQueue()
      : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) {
    if (MayBlock) {
      futex_array_.reset(new folly::detail::Futex<Atom>[NumFutex]);
    }

    if (PopBatch > 0) {
      top_loc_ = -1;
      shared_buffer_.reset(new BufferNode[PopBatch]);
      for (size_t i = 0; i < PopBatch; i++) {
        shared_buffer_[i].pnode = nullptr;
      }
    }
    bottom_.store(0, std::memory_order_relaxed);
    guard_.store(0, std::memory_order_relaxed);
    // allocate the root MoundElement and initialize Mound
    levels_[0] = new MoundElement[1]; // default MM for MoundElement
    for (uint32_t i = 1; i < MAX_LEVELS; i++) {
      levels_[i] = nullptr;
    }
  }

  ~RelaxedConcurrentPriorityQueue() {
    if (PopBatch > 0) {
      deleteSharedBuffer();
    }
    if (MayBlock) {
      futex_array_.reset();
    }
    Position pos;
    pos.level = pos.index = 0;
    deleteAllNodes(pos);
    // default MM for MoundElement
    for (int i = getBottomLevel(); i >= 0; i--) {
      delete[] levels_[i];
    }
  }

  void push(const T& val) {
    moundPush(val);
    if (SupportsSize) {
      counter_p_.fetch_add(1, std::memory_order_relaxed);
    }
  }

  void pop(T& val) {
    moundPop(val);
    if (SupportsSize) {
      counter_c_.fetch_add(1, std::memory_order_relaxed);
    }
  }

  /// Note: size() and empty() are guaranteed to be accurate only if
  ///       the queue is not changed concurrently.
  /// Returns an estimate of the size of the queue
  size_t size() {
    DCHECK(SupportsSize);
    size_t p = counter_p_.load(std::memory_order_acquire);
    size_t c = counter_c_.load(std::memory_order_acquire);
    return (p > c) ? p - c : 0;
  }

  /// Returns true only if the queue was empty during the call.
  bool empty() { return isEmpty(); }

 private:
  uint32_t getBottomLevel() { return bottom_.load(std::memory_order_acquire); }

  /// This function is only called by the destructor
  void deleteSharedBuffer() {
    DCHECK(PopBatch > 0);
    // delete nodes in the buffer
    int loc = top_loc_.load(std::memory_order_relaxed);
    while (loc >= 0) {
      Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
      delete n;
    }
    // delete buffer
    shared_buffer_.reset();
  }

  /// This function is only called by the destructor
  void deleteAllNodes(const Position& pos) {
    if (getElementSize(pos) == 0) {
      // current list is empty, do not need to check
      // its children again.
      return;
    }

    Node* curList = getList(pos);
    setTreeNode(pos, nullptr);
    while (curList != nullptr) { // reclaim nodes
      Node* n = curList;
      curList = curList->next;
      delete n;
    }

    if (!isLeaf(pos)) {
      deleteAllNodes(leftOf(pos));
      deleteAllNodes(rightOf(pos));
    }
  }

  /// Check the first node in TreeElement keeps the heap structure.
  bool isHeap(const Position& pos) {
    if (isLeaf(pos)) {
      return true;
    }
    Position lchild = leftOf(pos);
    Position rchild = rightOf(pos);
    return isHeap(lchild) && isHeap(rchild) &&
        readValue(pos) >= readValue(lchild) &&
        readValue(pos) >= readValue(rchild);
  }

  /// Current position is leaf?
  FOLLY_ALWAYS_INLINE bool isLeaf(const Position& pos) {
    return pos.level == getBottomLevel();
  }

  /// Current element is the root?
  FOLLY_ALWAYS_INLINE bool isRoot(const Position& pos) {
    return pos.level == 0;
  }

  /// Locate the parent node
  FOLLY_ALWAYS_INLINE Position parentOf(const Position& pos) {
    Position res;
    res.level = pos.level - 1;
    res.index = pos.index / 2;
    return res;
  }

  /// Locate the left child
  FOLLY_ALWAYS_INLINE Position leftOf(const Position& pos) {
    Position res;
    res.level = pos.level + 1;
    res.index = pos.index * 2;
    return res;
  }

  /// Locate the right child
  FOLLY_ALWAYS_INLINE Position rightOf(const Position& pos) {
    Position res;
    res.level = pos.level + 1;
    res.index = pos.index * 2 + 1;
    return res;
  }

  /// get the list size in current MoundElement
  FOLLY_ALWAYS_INLINE size_t getElementSize(const Position& pos) {
    return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
  }

  /// Set the size of current MoundElement
  FOLLY_ALWAYS_INLINE void setElementSize(
      const Position& pos, const uint32_t& v) {
    levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
  }

  /// Extend the tree level
  void grow(uint32_t btm) {
    while (true) {
      if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
        break;
      }
      // someone already expanded the tree
      if (btm != getBottomLevel()) {
        return;
      }
      std::this_thread::yield();
    }
    // double check the bottom has not changed yet
    if (btm != getBottomLevel()) {
      guard_.store(0, std::memory_order_release);
      return;
    }
    // create and initialize the new level
    uint32_t tmp_btm = getBottomLevel();
    uint32_t size = 1 << (tmp_btm + 1);
    MoundElement* new_level = new MoundElement[size]; // MM
    levels_[tmp_btm + 1] = new_level;
    bottom_.store(tmp_btm + 1, std::memory_order_release);
    guard_.store(0, std::memory_order_release);
  }

  /// TODO: optimization
  // This function is important, it selects a position to insert the
  // node, there are two execution paths when this function returns.
  // 1. It returns a position with head node has lower priority than the target.
  // Thus it could be potentially used as the starting element to do the binary
  // search to find the fit position.  (slow path)
  // 2. It returns a position, which is not the best fit.
  // But it prevents aggressively grow the Mound. (fast path)
  Position selectPosition(
      const T& val,
      bool& path,
      uint32_t& seed,
      folly::hazptr_holder<Atom>& hptr) {
    while (true) {
      uint32_t b = getBottomLevel();
      int bound = 1 << b; // number of elements in this level
      int steps = 1 + b * b; // probe the length
      ++seed;
      uint32_t index = seed % bound;

      for (int i = 0; i < steps; i++) {
        int loc = (index + i) % bound;
        Position pos;
        pos.level = b;
        pos.index = loc;
        // the first round, we do the quick check
        if (optimisticReadValue(pos, hptr) <= val) {
          path = false;
          seed = ++loc;
          return pos;
        } else if (
            b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) {
          // [fast path] conservative implementation
          // it makes sure every tree element should
          // have more than the given number of nodes.
          seed = ++loc;
          path = true;
          return pos;
        }
        if (b != getBottomLevel()) {
          break;
        }
      }
      // failed too many times grow
      if (b == getBottomLevel()) {
        grow(b);
      }
    }
  }

  /// Swap two Tree Elements (head, size)
  void swapList(const Position& a, const Position& b) {
    Node* tmp = getList(a);
    setTreeNode(a, getList(b));
    setTreeNode(b, tmp);

    // need to swap the tree node meta-data
    uint32_t sa = getElementSize(a);
    uint32_t sb = getElementSize(b);
    setElementSize(a, sb);
    setElementSize(b, sa);
  }

  FOLLY_ALWAYS_INLINE void lockNode(const Position& pos) {
    levels_[pos.level][pos.index].lock.lock();
  }

  FOLLY_ALWAYS_INLINE void unlockNode(const Position& pos) {
    levels_[pos.level][pos.index].lock.unlock();
  }

  FOLLY_ALWAYS_INLINE bool trylockNode(const Position& pos) {
    return levels_[pos.level][pos.index].lock.try_lock();
  }

  FOLLY_ALWAYS_INLINE T
  optimisticReadValue(const Position& pos, folly::hazptr_holder<Atom>& hptr) {
    Node* tmp = hptr.protect(levels_[pos.level][pos.index].head);
    return (tmp == nullptr) ? MIN_VALUE : tmp->val;
  }

  // Get the value from the head of the list as the elementvalue
  FOLLY_ALWAYS_INLINE T readValue(const Position& pos) {
    Node* tmp = getList(pos);
    return (tmp == nullptr) ? MIN_VALUE : tmp->val;
  }

  FOLLY_ALWAYS_INLINE Node* getList(const Position& pos) {
    return levels_[pos.level][pos.index].head.load(std::memory_order_acquire);
  }

  FOLLY_ALWAYS_INLINE void setTreeNode(const Position& pos, Node* t) {
    levels_[pos.level][pos.index].head.store(t, std::memory_order_release);
  }

  // Merge two sorted lists
  Node* mergeList(Node* base, Node* source) {
    if (base == nullptr) {
      return source;
    } else if (source == nullptr) {
      return base;
    }

    Node *res, *p;
    // choose the head node
    if (base->val >= source->val) {
      res = base;
      base = base->next;
      p = res;
    } else {
      res = source;
      source = source->next;
      p = res;
    }

    while (base != nullptr && source != nullptr) {
      if (base->val >= source->val) {
        p->next = base;
        base = base->next;
      } else {
        p->next = source;
        source = source->next;
      }
      p = p->next;
    }
    if (base == nullptr) {
      p->next = source;
    } else {
      p->next = base;
    }
    return res;
  }

  /// Merge list t to the Element Position
  void mergeListTo(const Position& pos, Node* t, const size_t& list_length) {
    Node* head = getList(pos);
    setTreeNode(pos, mergeList(head, t));
    uint32_t ns = getElementSize(pos) + list_length;
    setElementSize(pos, ns);
  }

  bool pruningLeaf(const Position& pos) {
    if (getElementSize(pos) <= PruningSize) {
      unlockNode(pos);
      return true;
    }

    int b = getBottomLevel();
    int leaves = 1 << b;
    int cnodes = 0;
    for (int i = 0; i < leaves; i++) {
      Position tmp;
      tmp.level = b;
      tmp.index = i;
      if (getElementSize(tmp) != 0) {
        cnodes++;
      }
      if (cnodes > leaves * 2 / 3) {
        break;
      }
    }

    if (cnodes <= leaves * 2 / 3) {
      unlockNode(pos);
      return true;
    }
    return false;
  }

  /// Split the current list into two lists,
  /// then split the tail list and merge to two children.
  void startPruning(const Position& pos) {
    if (isLeaf(pos) && pruningLeaf(pos)) {
      return;
    }

    // split the list, record the tail
    Node* pruning_head = getList(pos);
    int steps = ListTargetSize; // keep in the original list
    for (int i = 0; i < steps - 1; i++) {
      pruning_head = pruning_head->next;
    }
    Node* t = pruning_head;
    pruning_head = pruning_head->next;
    t->next = nullptr;
    int tail_length = getElementSize(pos) - steps;
    setElementSize(pos, steps);

    // split the tail list into two lists
    // evenly merge to two children
    if (pos.level != getBottomLevel()) {
      // split the rest into two lists
      int left_length = (tail_length + 1) / 2;
      int right_length = tail_length - left_length;
      Node *to_right, *to_left = pruning_head;
      for (int i = 0; i < left_length - 1; i++) {
        pruning_head = pruning_head->next;
      }
      to_right = pruning_head->next;
      pruning_head->next = nullptr;

      Position lchild = leftOf(pos);
      Position rchild = rightOf(pos);
      if (left_length != 0) {
        lockNode(lchild);
        mergeListTo(lchild, to_left, left_length);
      }
      if (right_length != 0) {
        lockNode(rchild);
        mergeListTo(rchild, to_right, right_length);
      }
      unlockNode(pos);
      if (left_length != 0 && getElementSize(lchild) > PruningSize) {
        startPruning(lchild);
      } else if (left_length != 0) {
        unlockNode(lchild);
      }
      if (right_length != 0 && getElementSize(rchild) > PruningSize) {
        startPruning(rchild);
      } else if (right_length != 0) {
        unlockNode(rchild);
      }
    } else { // time to grow the Mound
      grow(pos.level);
      // randomly choose a child to insert
      if (steps % 2 == 1) {
        Position rchild = rightOf(pos);
        lockNode(rchild);
        mergeListTo(rchild, pruning_head, tail_length);
        unlockNode(pos);
        unlockNode(rchild);
      } else {
        Position lchild = leftOf(pos);
        lockNode(lchild);
        mergeListTo(lchild, pruning_head, tail_length);
        unlockNode(pos);
        unlockNode(lchild);
      }
    }
  }

  // This function insert the new node (always) at the head of the
  // current list. It needs to lock the parent & current
  // This function may cause the list becoming tooooo long, so we
  // provide pruning algorithm.
  bool regularInsert(const Position& pos, const T& val, Node* newNode) {
    // insert to the root node
    if (isRoot(pos)) {
      lockNode(pos);
      T nv = readValue(pos);
      if (FOLLY_LIKELY(nv <= val)) {
        newNode->next = getList(pos);
        setTreeNode(pos, newNode);
        uint32_t sz = getElementSize(pos);
        setElementSize(pos, sz + 1);
        if (FOLLY_UNLIKELY(sz > PruningSize)) {
          startPruning(pos);
        } else {
          unlockNode(pos);
        }
        return true;
      }
      unlockNode(pos);
      return false;
    }

    // insert to an inner node
    Position parent = parentOf(pos);
    if (!trylockNode(parent)) {
      return false;
    }
    if (!trylockNode(pos)) {
      unlockNode(parent);
      return false;
    }
    T pv = readValue(parent);
    T nv = readValue(pos);
    if (FOLLY_LIKELY(pv > val && nv <= val)) {
      // improve the accuracy by getting the node(R) with less priority than the
      // new value from parent level, insert the new node to the parent list
      // and insert R to the current list.
      // It only happens at >= LevelForTraverseParent for reducing contention
      uint32_t sz = getElementSize(pos);
      if (pos.level >= LevelForTraverseParent) {
        Node* start = getList(parent);
        while (start->next != nullptr && start->next->val >= val) {
          start = start->next;
        }
        if (start->next != nullptr) {
          newNode->next = start->next;
          start->next = newNode;
          while (start->next->next != nullptr) {
            start = start->next;
          }
          newNode = start->next;
          start->next = nullptr;
        }
        unlockNode(parent);

        Node* curList = getList(pos);
        if (curList == nullptr) {
          newNode->next = nullptr;
          setTreeNode(pos, newNode);
        } else {
          Node* p = curList;
          if (p->val <= newNode->val) {
            newNode->next = curList;
            setTreeNode(pos, newNode);
          } else {
            while (p->next != nullptr && p->next->val >= newNode->val) {
              p = p->next;
            }
            newNode->next = p->next;
            p->next = newNode;
          }
        }
        setElementSize(pos, sz + 1);
      } else {
        unlockNode(parent);
        newNode->next = getList(pos);
        setTreeNode(pos, newNode);
        setElementSize(pos, sz + 1);
      }
      if (FOLLY_UNLIKELY(sz > PruningSize)) {
        startPruning(pos);
      } else {
        unlockNode(pos);
      }
      return true;
    }
    unlockNode(parent);
    unlockNode(pos);
    return false;
  }

  bool forceInsertToRoot(Node* newNode) {
    Position pos;
    pos.level = pos.index = 0;
    std::unique_lock<Mutex> lck(
        levels_[pos.level][pos.index].lock, std::try_to_lock);
    if (!lck.owns_lock()) {
      return false;
    }
    uint32_t sz = getElementSize(pos);
    if (sz >= ListTargetSize) {
      return false;
    }

    Node* curList = getList(pos);
    if (curList == nullptr) {
      newNode->next = nullptr;
      setTreeNode(pos, newNode);
    } else {
      Node* p = curList;
      if (p->val <= newNode->val) {
        newNode->next = curList;
        setTreeNode(pos, newNode);
      } else {
        while (p->next != nullptr && p->next->val >= newNode->val) {
          p = p->next;
        }
        newNode->next = p->next;
        p->next = newNode;
      }
    }
    setElementSize(pos, sz + 1);
    return true;
  }

  // This function forces the new node inserting to the current position
  // if the element does not hold the enough nodes. It is safe to
  // lock just one position to insert, because it won't be the first
  // node to sustain the heap structure.
  bool forceInsert(const Position& pos, const T& val, Node* newNode) {
    if (isRoot(pos)) {
      return forceInsertToRoot(newNode);
    }

    while (true) {
      std::unique_lock<Mutex> lck(
          levels_[pos.level][pos.index].lock, std::try_to_lock);
      if (!lck.owns_lock()) {
        if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
          continue;
        } else {
          return false;
        }
      }
      T nv = readValue(pos);
      uint32_t sz = getElementSize(pos);
      // do not allow the new node to be the first one
      // do not allow the list size tooooo big
      if (FOLLY_UNLIKELY(nv < val || sz >= ListTargetSize)) {
        return false;
      }

      Node* p = getList(pos);
      // find a place to insert the node
      while (p->next != nullptr && p->next->val > val) {
        p = p->next;
      }
      newNode->next = p->next;
      p->next = newNode;
      // do not forget to change the metadata
      setElementSize(pos, sz + 1);
      return true;
    }
  }

  void binarySearchPosition(
      Position& cur, const T& val, folly::hazptr_holder<Atom>& hptr) {
    Position parent, mid;
    if (cur.level == 0) {
      return;
    }
    // start from the root
    parent.level = parent.index = 0;

    while (true) { // binary search
      mid.level = (cur.level + parent.level) / 2;
      mid.index = cur.index >> (cur.level - mid.level);

      T mv = optimisticReadValue(mid, hptr);
      if (val < mv) {
        parent = mid;
      } else {
        cur = mid;
      }

      if (mid.level == 0 || // the root
          ((parent.level + 1 == cur.level) && parent.level != 0)) {
        return;
      }
    }
  }

  // The push keeps the length of each element stable
  void moundPush(const T& val) {
    Position cur;
    folly::hazptr_holder<Atom> hptr = folly::make_hazard_pointer<Atom>();
    Node* newNode = new Node;
    newNode->val = val;
    uint32_t seed = folly::Random::rand32() % (1 << 21);

    while (true) {
      // shell we go the fast path?
      bool go_fast_path = false;
      // chooice the right node to start
      cur = selectPosition(val, go_fast_path, seed, hptr);
      if (go_fast_path) {
        if (FOLLY_LIKELY(forceInsert(cur, val, newNode))) {
          if (MayBlock) {
            blockingPushImpl();
          }
          return;
        } else {
          continue;
        }
      }

      binarySearchPosition(cur, val, hptr);
      if (FOLLY_LIKELY(regularInsert(cur, val, newNode))) {
        if (MayBlock) {
          blockingPushImpl();
        }
        return;
      }
    }
  }

  int popToSharedBuffer(const uint32_t rsize, Node* head) {
    Position pos;
    pos.level = pos.index = 0;

    int num = std::min(rsize, (uint32_t)PopBatch);
    for (int i = num - 1; i >= 0; i--) {
      // wait until this block is empty
      while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
        ;
      shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
      head = head->next;
    }
    if (num > 0) {
      top_loc_.store(num - 1, std::memory_order_release);
    }
    setTreeNode(pos, head);
    return rsize - num;
  }

  void mergeDown(const Position& pos) {
    if (isLeaf(pos)) {
      unlockNode(pos);
      return;
    }

    // acquire locks for L and R and compare
    Position lchild = leftOf(pos);
    Position rchild = rightOf(pos);
    lockNode(lchild);
    lockNode(rchild);
    // read values
    T nv = readValue(pos);
    T lv = readValue(lchild);
    T rv = readValue(rchild);
    if (nv >= lv && nv >= rv) {
      unlockNode(pos);
      unlockNode(lchild);
      unlockNode(rchild);
      return;
    }

    // If two children contains nodes less than the
    // threshold, we merge two children to the parent
    // and do merge down on both of them.
    size_t sum =
        getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
    if (sum <= MergingSize) {
      Node* l1 = mergeList(getList(rchild), getList(lchild));
      setTreeNode(pos, mergeList(l1, getList(pos)));
      setElementSize(pos, sum);
      setTreeNode(lchild, nullptr);
      setElementSize(lchild, 0);
      setTreeNode(rchild, nullptr);
      setElementSize(rchild, 0);
      unlockNode(pos);
      mergeDown(lchild);
      mergeDown(rchild);
      return;
    }
    // pull from right
    if (rv >= lv && rv > nv) {
      swapList(rchild, pos);
      unlockNode(pos);
      unlockNode(lchild);
      mergeDown(rchild);
    } else if (lv >= rv && lv > nv) {
      // pull from left
      swapList(lchild, pos);
      unlockNode(pos);
      unlockNode(rchild);
      mergeDown(lchild);
    }
  }

  bool deferSettingRootSize(Position& pos) {
    if (isLeaf(pos)) {
      setElementSize(pos, 0);
      unlockNode(pos);
      return true;
    }

    // acquire locks for L and R and compare
    Position lchild = leftOf(pos);
    Position rchild = rightOf(pos);
    lockNode(lchild);
    lockNode(rchild);
    if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
      setElementSize(pos, 0);
      unlockNode(pos);
      unlockNode(lchild);
      unlockNode(rchild);
      return true;
    } else {
      // read values
      T lv = readValue(lchild);
      T rv = readValue(rchild);
      if (lv >= rv) {
        swapList(lchild, pos);
        setElementSize(lchild, 0);
        unlockNode(pos);
        unlockNode(rchild);
        pos = lchild;
      } else {
        swapList(rchild, pos);
        setElementSize(rchild, 0);
        unlockNode(pos);
        unlockNode(lchild);
        pos = rchild;
      }
      return false;
    }
  }

  bool moundPopMany(T& val) {
    // pop from the root
    Position pos;
    pos.level = pos.index = 0;
    // the root is nullptr, return false
    Node* head = getList(pos);
    if (head == nullptr) {
      unlockNode(pos);
      return false;
    }

    // shared buffer already filled by other threads
    if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
      unlockNode(pos);
      return false;
    }

    uint32_t sz = getElementSize(pos);
    // get the one node first
    val = head->val;
    Node* p = head;
    head = head->next;
    sz--;

    if (PopBatch > 0) {
      sz = popToSharedBuffer(sz, head);
    } else {
      setTreeNode(pos, head);
    }

    bool done = false;
    if (FOLLY_LIKELY(sz == 0)) {
      done = deferSettingRootSize(pos);
    } else {
      setElementSize(pos, sz);
    }

    if (FOLLY_LIKELY(!done)) {
      mergeDown(pos);
    }

    p->retire();
    return true;
  }

  void blockingPushImpl() {
    auto p = pticket_.fetch_add(1, std::memory_order_acq_rel);
    auto loc = getFutexArrayLoc(p);
    uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);

    while (true) {
      uint32_t ready = p << 1; // get the lower 31 bits
      // avoid the situation that push has larger ticket already set the value
      if (FOLLY_UNLIKELY(
              ready + 1 < curfutex ||
              ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
        return;
      }

      if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
        if (curfutex &
            1) { // One or more consumers may be blocked on this futex
          detail::futexWake(&futex_array_[loc]);
        }
        return;
      } else {
        curfutex = futex_array_[loc].load(std::memory_order_acquire);
      }
    }
  }

  // This could guarentee the Mound is empty
  FOLLY_ALWAYS_INLINE bool isMoundEmpty() {
    Position pos;
    pos.level = pos.index = 0;
    return getElementSize(pos) == 0;
  }

  // Return true if the shared buffer is empty
  FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty() {
    return top_loc_.load(std::memory_order_acquire) < 0;
  }

  FOLLY_ALWAYS_INLINE bool isEmpty() {
    if (PopBatch > 0) {
      return isMoundEmpty() && isSharedBufferEmpty();
    }
    return isMoundEmpty();
  }

  FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t& curticket) {
    auto loc = getFutexArrayLoc(curticket);
    auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
    uint32_t short_cticket = curticket & 0x7FFFFFFF;
    uint32_t futex_ready = curfutex >> 1;
    // handle unsigned 31 bits overflow
    return futex_ready >= short_cticket ||
        short_cticket - futex_ready > 0x40000000;
  }

  template <typename Clock, typename Duration>
  FOLLY_NOINLINE bool trySpinBeforeBlock(
      const size_t& curticket,
      const std::chrono::time_point<Clock, Duration>& deadline,
      const folly::WaitOptions& opt = wait_options()) {
    return folly::detail::spin_pause_until(deadline, opt, [=] {
             return futexIsReady(curticket);
           }) == folly::detail::spin_result::success;
  }

  void tryBlockingPop(const size_t& curticket) {
    auto loc = getFutexArrayLoc(curticket);
    auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
    if (curfutex &
        1) { /// The last round consumers are still waiting, go to sleep
      detail::futexWait(&futex_array_[loc], curfutex);
    }
    if (trySpinBeforeBlock(
            curticket,
            std::chrono::time_point<std::chrono::steady_clock>::max())) {
      return; /// Spin until the push ticket is ready
    }
    while (true) {
      curfutex = futex_array_[loc].load(std::memory_order_acquire);
      if (curfutex &
          1) { /// The last round consumers are still waiting, go to sleep
        detail::futexWait(&futex_array_[loc], curfutex);
      } else if (!futexIsReady(curticket)) { // current ticket < pop ticket
        uint32_t blocking_futex = curfutex + 1;
        if (futex_array_[loc].compare_exchange_strong(
                curfutex, blocking_futex)) {
          detail::futexWait(&futex_array_[loc], blocking_futex);
        }
      } else {
        return;
      }
    }
  }

  void blockingPopImpl() {
    auto ct = cticket_.fetch_add(1, std::memory_order_acq_rel);
    // fast path check
    if (futexIsReady(ct)) {
      return;
    }
    // Blocking
    tryBlockingPop(ct);
  }

  bool tryPopFromMound(T& val) {
    if (isMoundEmpty()) {
      return false;
    }
    Position pos;
    pos.level = pos.index = 0;

    // lock the root
    if (trylockNode(pos)) {
      return moundPopMany(val);
    }
    return false;
  }

  FOLLY_ALWAYS_INLINE static folly::WaitOptions wait_options() { return {}; }

  template <typename Clock, typename Duration>
  FOLLY_NOINLINE bool tryWait(
      const std::chrono::time_point<Clock, Duration>& deadline,
      const folly::WaitOptions& opt = wait_options()) {
    // Fast path, by quick check the status
    switch (folly::detail::spin_pause_until(
        deadline, opt, [=] { return !isEmpty(); })) {
      case folly::detail::spin_result::success:
        return true;
      case folly::detail::spin_result::timeout:
        return false;
      case folly::detail::spin_result::advance:
        break;
    }

    // Spinning strategy
    while (true) {
      auto res =
          folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); });
      if (res == folly::detail::spin_result::success) {
        return true;
      } else if (res == folly::detail::spin_result::timeout) {
        return false;
      }
    }
    return true;
  }

  bool tryPopFromSharedBuffer(T& val) {
    int get_or = -1;
    if (!isSharedBufferEmpty()) {
      get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
      if (get_or >= 0) {
        Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
        shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
        val = c->val;
        c->retire();
        return true;
      }
    }
    return false;
  }

  size_t getFutexArrayLoc(size_t s) {
    return ((s - 1) * Stride) & (NumFutex - 1);
  }

  void moundPop(T& val) {
    if (MayBlock) {
      blockingPopImpl();
    }

    if (PopBatch > 0) {
      if (tryPopFromSharedBuffer(val)) {
        return;
      }
    }

    while (true) {
      if (FOLLY_LIKELY(tryPopFromMound(val))) {
        return;
      }
      tryWait(std::chrono::time_point<std::chrono::steady_clock>::max());
      if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
        return;
      }
    }
  }
};

} // namespace folly