folly/folly/concurrency/container/test/FlatCombiningPriorityQueueTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/concurrency/container/FlatCombiningPriorityQueue.h>

#include <condition_variable>
#include <iomanip>
#include <mutex>
#include <queue>

#include <folly/Benchmark.h>
#include <folly/portability/GTest.h>

#include <glog/logging.h>

DEFINE_bool(bench, false, "run benchmark");
DEFINE_int32(reps, 10, "number of reps");
DEFINE_int32(ops, 100000, "number of operations per rep");
DEFINE_int32(size, 64, "initial size of the priority queue");
DEFINE_int32(work, 1000, "amount of unrelated work per operation");

void doWork(int work) {
  uint64_t a = 0;
  for (int i = work; i > 0; --i) {
    a += i;
  }
  folly::doNotOptimizeAway(a);
}

/// Baseline implementation represents a conventional single-lock
/// implementation that supports cond var blocking.
template <
    typename T,
    typename PriorityQueue = std::priority_queue<T>,
    typename Mutex = std::mutex>
class BaselinePQ {
 public:
  template <
      typename... PQArgs,
      typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
  explicit BaselinePQ(size_t maxSize = 0, PQArgs... args)
      : maxSize_(maxSize), pq_(std::forward<PQArgs>(args)...) {}

  bool empty() const {
    std::lock_guard<Mutex> g(m_);
    return pq_.empty();
  }

  size_t size() const {
    std::lock_guard<Mutex> g(m_);
    return pq_.size();
  }

  bool try_push(const T& val) {
    std::lock_guard<Mutex> g(m_);
    if (maxSize_ > 0 && pq_.size() == maxSize_) {
      return false;
    }
    DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
    try {
      pq_.push(val);
      notempty_.notify_one();
      return true;
    } catch (const std::bad_alloc&) {
      return false;
    }
  }

  bool try_pop(T& val) {
    std::lock_guard<Mutex> g(m_);
    if (!pq_.empty()) {
      val = pq_.top();
      pq_.pop();
      notfull_.notify_one();
      return true;
    }
    return false;
  }

  bool try_peek(T& val) {
    std::lock_guard<Mutex> g(m_);
    if (!pq_.empty()) {
      val = pq_.top();
      return true;
    }
    return false;
  }

 private:
  Mutex m_;
  size_t maxSize_;
  PriorityQueue pq_;
  std::condition_variable notempty_;
  std::condition_variable notfull_;
};

using FCPQ = folly::FlatCombiningPriorityQueue<int>;
using Baseline = BaselinePQ<int>;

#if FOLLY_SANITIZE_THREAD
static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16};
#else
static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 64};
#endif
static uint32_t nthreads;

template <typename PriorityQueue, typename Func>
static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
  int ops = FLAGS_ops;
  int size = FLAGS_size;
  std::atomic<bool> start{false};
  std::atomic<uint32_t> started{0};

  for (int i = 0; i < size; ++i) {
    CHECK(pq.try_push(i * (ops / size)));
  }

  std::vector<std::thread> threads(nthreads);
  for (uint32_t tid = 0; tid < nthreads; ++tid) {
    threads[tid] = std::thread([&, tid] {
      started.fetch_add(1);
      while (!start.load()) {
        /* nothing */;
      }
      fn(tid);
    });
  }

  while (started.load() < nthreads) {
    /* nothing */;
  }
  auto tbegin = std::chrono::steady_clock::now();

  // begin time measurement
  start.store(true);

  for (auto& t : threads) {
    t.join();
  }

  // end time measurement
  uint64_t duration = 0;
  auto tend = std::chrono::steady_clock::now();
  duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
                 .count();
  return duration;
}

TEST(FCPriQueue, basic) {
  FCPQ pq;
  CHECK(pq.empty());
  CHECK_EQ(pq.size(), 0);
  int v;
  CHECK(!pq.try_pop(v));
  // try_pop() returns an Optional
  EXPECT_FALSE(bool(pq.try_pop()));

  CHECK(pq.try_push(1));
  CHECK(pq.try_push(2));
  CHECK(!pq.empty());
  CHECK_EQ(pq.size(), 2);

  pq.peek(v);
  CHECK_EQ(v, 2); // higher value has higher priority
  CHECK(pq.try_peek(v));
  CHECK_EQ(v, 2);
  CHECK(!pq.empty());
  CHECK_EQ(pq.size(), 2);

  CHECK(pq.try_pop(v));
  CHECK_EQ(v, 2);
  CHECK(!pq.empty());
  CHECK_EQ(pq.size(), 1);

  CHECK(pq.try_pop(v));
  CHECK_EQ(v, 1);
  CHECK(pq.empty());
  CHECK_EQ(pq.size(), 0);

  CHECK(pq.try_push(1));
  CHECK(pq.try_push(2));

  // check successful try_pop()
  EXPECT_EQ(*pq.try_pop(), 2);
  CHECK(!pq.empty());
  CHECK_EQ(pq.size(), 1);

  EXPECT_EQ(*pq.try_pop(), 1);
  CHECK(pq.empty());
  CHECK_EQ(pq.size(), 0);
}

TEST(FCPriQueue, bounded) {
  FCPQ pq(1);
  CHECK(pq.try_push(1));
  CHECK(!pq.try_push(1));
  CHECK_EQ(pq.size(), 1);
  CHECK(!pq.empty());
  int v;
  CHECK(pq.try_pop(v));
  CHECK_EQ(v, 1);
  CHECK_EQ(pq.size(), 0);
  CHECK(pq.empty());
}

TEST(FCPriQueue, timeout) {
  FCPQ pq(1);
  int v;
  CHECK(!pq.try_peek(v));
  CHECK(!pq.try_pop(v));
  pq.push(10);
  CHECK(!pq.try_push(20));

  auto dur = std::chrono::microseconds(1000);
  EXPECT_EQ(*pq.try_pop(), 10);
  CHECK(pq.empty());
  // check try_***_for
  EXPECT_FALSE(bool(pq.try_pop_for(dur)));
  EXPECT_FALSE(bool(pq.try_peek_for(dur)));
  CHECK(pq.try_push_for(10, dur));
  CHECK(!pq.try_push_for(20, dur));
  EXPECT_EQ(*pq.try_peek_for(dur), 10);
  EXPECT_EQ(*pq.try_pop_for(dur), 10);

  CHECK(pq.empty());
  // check try_***_until
  EXPECT_FALSE(bool(pq.try_pop_until(std::chrono::steady_clock::now() + dur)));

  EXPECT_FALSE(bool(pq.try_peek_until(std::chrono::steady_clock::now() + dur)));
  CHECK(pq.try_push_until(10, std::chrono::steady_clock::now() + dur));
  CHECK(!pq.try_push_until(20, std::chrono::steady_clock::now() + dur));
  EXPECT_EQ(*pq.try_peek_until(std::chrono::steady_clock::now() + dur), 10);
  EXPECT_EQ(*pq.try_pop_until(std::chrono::steady_clock::now() + dur), 10);
  CHECK(pq.empty());
}

TEST(FCPriQueue, pushPop) {
  int ops = 1000;
  int work = 0;
  std::chrono::steady_clock::time_point when =
      std::chrono::steady_clock::now() + std::chrono::hours(24);
  for (auto n : nthr) {
    nthreads = n;
    FCPQ pq(10000);
    auto fn = [&](uint32_t tid) {
      for (int i = tid; i < ops; i += nthreads) {
        CHECK(pq.try_push(i));
        CHECK(pq.try_push_until(i, when));
        pq.push(i);
        doWork(work);
        int v;
        CHECK(pq.try_pop(v));
        EXPECT_NE(pq.try_pop_until(when), folly::none);
        pq.pop(v);
        doWork(work);
      }
    };
    run_once(pq, fn);
  }
}

enum Exp {
  NoFC,
  FCNonBlock,
  FCBlock,
  FCTimed,
};

static uint64_t test(std::string name, Exp exp, uint64_t base) {
  int ops = FLAGS_ops;
  int work = FLAGS_work;

  uint64_t min = UINTMAX_MAX;
  uint64_t max = 0;
  uint64_t sum = 0;

  for (int r = 0; r < FLAGS_reps; ++r) {
    uint64_t dur;
    switch (exp) {
      case NoFC: {
        Baseline pq;
        auto fn = [&](uint32_t tid) {
          for (int i = tid; i < ops; i += nthreads) {
            CHECK(pq.try_push(i));
            doWork(work);
            int v;
            CHECK(pq.try_pop(v));
            doWork(work);
          }
        };
        dur = run_once(pq, fn);
        break;
      }
      case FCNonBlock: {
        FCPQ pq;
        auto fn = [&](uint32_t tid) {
          for (int i = tid; i < ops; i += nthreads) {
            CHECK(pq.try_push(i));
            doWork(work);
            int v;
            CHECK(pq.try_pop(v));
            doWork(work);
          }
        };
        dur = run_once(pq, fn);
        break;
      }
      case FCBlock: {
        FCPQ pq;
        auto fn = [&](uint32_t tid) {
          for (int i = tid; i < ops; i += nthreads) {
            pq.push(i);
            doWork(work);
            int v;
            pq.pop(v);
            doWork(work);
          }
        };
        dur = run_once(pq, fn);
        break;
      }
      case FCTimed: {
        FCPQ pq;
        auto fn = [&](uint32_t tid) {
          std::chrono::steady_clock::time_point when =
              std::chrono::steady_clock::now() + std::chrono::hours(24);
          for (int i = tid; i < ops; i += nthreads) {
            CHECK(pq.try_push_until(i, when));
            doWork(work);
            EXPECT_NE(pq.try_pop_until(when), folly::none);
            doWork(work);
          }
        };
        dur = run_once(pq, fn);
        break;
      }
      default:
        CHECK(false);
    }

    sum += dur;
    min = std::min(min, dur);
    max = std::max(max, dur);
  }

  uint64_t avg = sum / FLAGS_reps;
  uint64_t res = min;
  std::cout << name;
  std::cout << "   " << std::setw(4) << max / FLAGS_ops << " ns";
  std::cout << "   " << std::setw(4) << avg / FLAGS_ops << " ns";
  std::cout << "   " << std::setw(4) << res / FLAGS_ops << " ns";
  if (base) {
    std::cout << " " << std::setw(3) << 100 * base / res << "%";
  }
  std::cout << std::endl;
  return res;
}

TEST(FCPriQueue, bench) {
  if (!FLAGS_bench) {
    return;
  }

  std::cout << "Test_name, Max time, Avg time, Min time, % base min / min"
            << std::endl;
  for (int i : nthr) {
    nthreads = i;
    std::cout << "\n------------------------------------ Number of threads = "
              << i << std::endl;
    uint64_t base = test("baseline                    ", NoFC, 0);
    test("baseline - dup              ", NoFC, base);
    std::cout << "---- fc -------------------------------" << std::endl;
    test("fc non-blocking             ", FCNonBlock, base);
    test("fc non-blocking - dup       ", FCNonBlock, base);
    test("fc timed                    ", FCTimed, base);
    test("fc timed - dup              ", FCTimed, base);
    test("fc blocking                 ", FCBlock, base);
    test("fc blocking - dup           ", FCBlock, base);
  }
}

/*
$ numactl -N 1 folly/experimental/test/fc_pri_queue_test --bench

[ RUN      ] FCPriQueue.bench
Test_name, Max time, Avg time, Min time, % base min / min

------------------------------------ Number of threads = 1
baseline                        815 ns    793 ns    789 ns
baseline - dup                  886 ns    827 ns    789 ns  99%
---- fc -------------------------------
fc non-blocking                 881 ns    819 ns    789 ns  99%
fc non-blocking - dup           833 ns    801 ns    786 ns 100%
fc timed                        863 ns    801 ns    781 ns 100%
fc timed - dup                  830 ns    793 ns    782 ns 100%
fc blocking                    1043 ns    820 ns    789 ns  99%
fc blocking - dup               801 ns    793 ns    789 ns 100%

------------------------------------ Number of threads = 2
baseline                        579 ns    557 ns    540 ns
baseline - dup                  905 ns    621 ns    538 ns 100%
---- fc -------------------------------
fc non-blocking                 824 ns    642 ns    568 ns  95%
fc non-blocking - dup           737 ns    645 ns    591 ns  91%
fc timed                        654 ns    590 ns    542 ns  99%
fc timed - dup                  666 ns    586 ns    534 ns 101%
fc blocking                     622 ns    599 ns    575 ns  93%
fc blocking - dup               677 ns    618 ns    570 ns  94%

------------------------------------ Number of threads = 3
baseline                        740 ns    717 ns    699 ns
baseline - dup                  742 ns    716 ns    697 ns 100%
---- fc -------------------------------
fc non-blocking                 730 ns    689 ns    645 ns 108%
fc non-blocking - dup           719 ns    695 ns    639 ns 109%
fc timed                        695 ns    650 ns    597 ns 117%
fc timed - dup                  694 ns    654 ns    624 ns 112%
fc blocking                     711 ns    687 ns    669 ns 104%
fc blocking - dup               716 ns    695 ns    624 ns 112%

------------------------------------ Number of threads = 4
baseline                        777 ns    766 ns    750 ns
baseline - dup                  778 ns    752 ns    731 ns 102%
---- fc -------------------------------
fc non-blocking                 653 ns    615 ns    589 ns 127%
fc non-blocking - dup           611 ns    593 ns    563 ns 133%
fc timed                        597 ns    577 ns    569 ns 131%
fc timed - dup                  618 ns    575 ns    546 ns 137%
fc blocking                     603 ns    590 ns    552 ns 135%
fc blocking - dup               614 ns    590 ns    556 ns 134%

------------------------------------ Number of threads = 6
baseline                        925 ns    900 ns    869 ns
baseline - dup                  930 ns    895 ns    866 ns 100%
---- fc -------------------------------
fc non-blocking                 568 ns    530 ns    481 ns 180%
fc non-blocking - dup           557 ns    521 ns    488 ns 177%
fc timed                        516 ns    496 ns    463 ns 187%
fc timed - dup                  517 ns    500 ns    474 ns 183%
fc blocking                     559 ns    513 ns    450 ns 193%
fc blocking - dup               564 ns    528 ns    466 ns 186%

------------------------------------ Number of threads = 8
baseline                        999 ns    981 ns    962 ns
baseline - dup                  998 ns    984 ns    965 ns  99%
---- fc -------------------------------
fc non-blocking                 491 ns    386 ns    317 ns 303%
fc non-blocking - dup           433 ns    344 ns    298 ns 322%
fc timed                        445 ns    348 ns    294 ns 327%
fc timed - dup                  446 ns    357 ns    292 ns 328%
fc blocking                     505 ns    389 ns    318 ns 302%
fc blocking - dup               416 ns    333 ns    293 ns 328%

------------------------------------ Number of threads = 12
baseline                       1092 ns   1080 ns   1072 ns
baseline - dup                 1085 ns   1074 ns   1065 ns 100%
---- fc -------------------------------
fc non-blocking                 360 ns    283 ns    258 ns 415%
fc non-blocking - dup           340 ns    278 ns    250 ns 427%
fc timed                        271 ns    260 ns    249 ns 429%
fc timed - dup                  397 ns    283 ns    253 ns 423%
fc blocking                     331 ns    279 ns    258 ns 415%
fc blocking - dup               358 ns    280 ns    259 ns 412%

------------------------------------ Number of threads = 16
baseline                       1120 ns   1115 ns   1103 ns
baseline - dup                 1122 ns   1118 ns   1114 ns  99%
---- fc -------------------------------
fc non-blocking                 339 ns    297 ns    246 ns 448%
fc non-blocking - dup           353 ns    301 ns    264 ns 417%
fc timed                        326 ns    287 ns    247 ns 445%
fc timed - dup                  338 ns    294 ns    259 ns 425%
fc blocking                     329 ns    288 ns    247 ns 445%
fc blocking - dup               375 ns    308 ns    265 ns 415%

------------------------------------ Number of threads = 24
baseline                       1073 ns   1068 ns   1064 ns
baseline - dup                 1075 ns   1071 ns   1069 ns  99%
---- fc -------------------------------
fc non-blocking                 439 ns    342 ns    278 ns 382%
fc non-blocking - dup           389 ns    318 ns    291 ns 364%
fc timed                        368 ns    324 ns    266 ns 398%
fc timed - dup                  412 ns    328 ns    302 ns 352%
fc blocking                     425 ns    345 ns    275 ns 386%
fc blocking - dup               429 ns    340 ns    269 ns 395%

------------------------------------ Number of threads = 32
baseline                       1001 ns    990 ns    981 ns
baseline - dup                 1002 ns    992 ns    983 ns  99%
---- fc -------------------------------
fc non-blocking                 404 ns    342 ns    273 ns 359%
fc non-blocking - dup           395 ns    316 ns    259 ns 378%
fc timed                        379 ns    330 ns    258 ns 380%
fc timed - dup                  392 ns    335 ns    274 ns 357%
fc blocking                     423 ns    340 ns    277 ns 353%
fc blocking - dup               445 ns    359 ns    275 ns 356%

------------------------------------ Number of threads = 48
baseline                        978 ns    975 ns    971 ns
baseline - dup                  977 ns    974 ns    972 ns  99%
---- fc -------------------------------
fc non-blocking                 424 ns    327 ns    258 ns 375%
fc non-blocking - dup           378 ns    317 ns    256 ns 379%
fc timed                        368 ns    311 ns    277 ns 350%
fc timed - dup                  385 ns    310 ns    251 ns 385%
fc blocking                     422 ns    313 ns    255 ns 380%
fc blocking - dup               406 ns    314 ns    258 ns 376%

------------------------------------ Number of threads = 64
baseline                        993 ns    981 ns    974 ns
baseline - dup                  984 ns    979 ns    975 ns  99%
---- fc -------------------------------
fc non-blocking                 353 ns    301 ns    266 ns 365%
fc non-blocking - dup           339 ns    301 ns    271 ns 358%
fc timed                        399 ns    321 ns    259 ns 375%
fc timed - dup                  381 ns    300 ns    263 ns 369%
fc blocking                     390 ns    301 ns    251 ns 387%
fc blocking - dup               345 ns    289 ns    259 ns 374%
[       OK ] FCPriQueue.bench (112424 ms)

$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                32
On-line CPU(s) list:   0-31
Thread(s) per core:    2
Core(s) per socket:    8
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
Stepping:              6
CPU MHz:               2200.000
CPU max MHz:           2200.0000
CPU min MHz:           1200.0000
BogoMIPS:              4399.87
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-7,16-23
NUMA node1 CPU(s):     8-15,24-31
Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca
                       cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht
                       tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc
                       arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc
                       aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl
                       vmx smx est tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1
                       sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx
                       lahf_lm epb tpr_shadow vnmi flexpriority ept vpid
                       xsaveopt dtherm arat pln pts

 */