folly/folly/test/ProducerConsumerQueueBenchmark.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/ProducerConsumerQueue.h>

#include <cstdio>
#include <iostream>
#include <thread>

#include <glog/logging.h>

#include <folly/Benchmark.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/PThread.h>
#include <folly/stats/Histogram.h>

namespace {

using namespace folly;

typedef unsigned int ThroughputType;
typedef ProducerConsumerQueue<ThroughputType> ThroughputQueueType;

typedef unsigned long LatencyType;
typedef ProducerConsumerQueue<LatencyType> LatencyQueueType;

template <class QueueType>
struct ThroughputTest {
  explicit ThroughputTest(size_t size, int iters, int cpu0, int cpu1)
      : queue_(size), done_(false), iters_(iters), cpu0_(cpu0), cpu1_(cpu1) {}

  void producer() {
    if (cpu0_ > -1) {
      cpu_set_t cpuset;
      CPU_ZERO(&cpuset);
      CPU_SET(cpu0_, &cpuset);
      /* TODO(cavalcanti): enable this bench on OSX and use
       * pthread_set_qos_self_np for handling big.LITTLE macs.
       */
      pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    }
    for (int i = 0; i < iters_; ++i) {
      ThroughputType item = i;
      while (!queue_.write((ThroughputType)item)) {
      }
    }
  }

  void consumer() {
    if (cpu1_ > -1) {
      cpu_set_t cpuset;
      CPU_ZERO(&cpuset);
      CPU_SET(cpu1_, &cpuset);
      pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    }
    for (int i = 0; i < iters_; ++i) {
      ThroughputType item = 0;
      while (!queue_.read(item)) {
      }
      doNotOptimizeAway(item);
    }
  }

  QueueType queue_;
  std::atomic<bool> done_;
  const int iters_;
  int cpu0_;
  int cpu1_;
};

template <class QueueType>
struct LatencyTest {
  explicit LatencyTest(size_t size, int iters, int cpu0, int cpu1)
      : queue_(size),
        done_(false),
        iters_(iters),
        cpu0_(cpu0),
        cpu1_(cpu1),
        hist_(1, 0, 30) {
    computeTimeCost();
  }

  static uint64_t timespecDiff(timespec end, timespec start) {
    if (end.tv_sec == start.tv_sec) {
      assert(end.tv_nsec >= start.tv_nsec);
      return uint64_t(end.tv_nsec - start.tv_nsec);
    }
    assert(end.tv_sec > start.tv_sec);
    auto diff = uint64_t(end.tv_sec - start.tv_sec);
    assert(diff < std::numeric_limits<uint64_t>::max() / 1000000000ULL);
    return diff * 1000000000ULL + end.tv_nsec - start.tv_nsec;
  }

  void computeTimeCost() {
    timespec start, end;
    clock_gettime(CLOCK_REALTIME, &start);
    for (int i = 0; i < iters_; ++i) {
      timespec tv;
      clock_gettime(CLOCK_REALTIME, &tv);
    }
    clock_gettime(CLOCK_REALTIME, &end);
    time_cost_ = 2 * timespecDiff(end, start) / iters_;
  }

  void producer() {
    if (cpu0_ > -1) {
      cpu_set_t cpuset;
      CPU_ZERO(&cpuset);
      CPU_SET(cpu0_, &cpuset);
      pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    }
    for (int i = 0; i < iters_; ++i) {
      timespec sleeptime, sleepstart;
      clock_gettime(CLOCK_REALTIME, &sleepstart);
      do {
        clock_gettime(CLOCK_REALTIME, &sleeptime);
      } while (timespecDiff(sleeptime, sleepstart) < 1000000);

      timespec tv;
      clock_gettime(CLOCK_REALTIME, &tv);
      while (!queue_.write((LatencyType)tv.tv_nsec)) {
      }
    }
  }

  void consumer() {
    if (cpu1_ > -1) {
      cpu_set_t cpuset;
      CPU_ZERO(&cpuset);
      CPU_SET(cpu1_, &cpuset);
      pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    }
    for (int i = 0; i < iters_; ++i) {
      unsigned long enqueue_nsec;
      while (!queue_.read(enqueue_nsec)) {
      }

      timespec tv;
      clock_gettime(CLOCK_REALTIME, &tv);
      int diff = tv.tv_nsec - enqueue_nsec - time_cost_;
      if (diff < 0) {
        continue;
      }

      // Naive log-scale bucketing.
      int bucket;
      for (bucket = 0; bucket <= 30 && (1 << bucket) <= diff; ++bucket) {
      }
      hist_.addValue(bucket - 1);
    }
  }

  void printHistogram() { hist_.toTSV(std::cout); }

  QueueType queue_;
  std::atomic<bool> done_;
  int time_cost_;
  const int iters_;
  int cpu0_;
  int cpu1_;
  Histogram<int> hist_;
};

void BM_ProducerConsumer(int iters, int size) {
  BenchmarkSuspender susp;
  CHECK_GT(size, 0);
  ThroughputTest<ThroughputQueueType>* test =
      new ThroughputTest<ThroughputQueueType>(size, iters, -1, -1);
  susp.dismiss();

  std::thread producer([test] { test->producer(); });
  std::thread consumer([test] { test->consumer(); });

  producer.join();
  test->done_ = true;
  consumer.join();
  delete test;
}

void BM_ProducerConsumerAffinity(int iters, int size) {
  BenchmarkSuspender susp;
  CHECK_GT(size, 0);
  ThroughputTest<ThroughputQueueType>* test =
      new ThroughputTest<ThroughputQueueType>(size, iters, 0, 1);
  susp.dismiss();

  std::thread producer([test] { test->producer(); });
  std::thread consumer([test] { test->consumer(); });

  producer.join();
  test->done_ = true;
  consumer.join();
  delete test;
}

void BM_ProducerConsumerLatency(int /* iters */, int size) {
  BenchmarkSuspender susp;
  CHECK_GT(size, 0);
  LatencyTest<LatencyQueueType>* test =
      new LatencyTest<LatencyQueueType>(size, 100000, 0, 1);
  susp.dismiss();

  std::thread producer([test] { test->producer(); });
  std::thread consumer([test] { test->consumer(); });

  producer.join();
  test->done_ = true;
  consumer.join();
  test->printHistogram();
  delete test;
}

BENCHMARK_DRAW_LINE();

BENCHMARK_PARAM(BM_ProducerConsumer, 1048574)
BENCHMARK_PARAM(BM_ProducerConsumerAffinity, 1048574)
BENCHMARK_PARAM(BM_ProducerConsumerLatency, 1048574)

} // namespace

int main(int argc, char** argv) {
  google::InitGoogleLogging(argv[0]);
  gflags::ParseCommandLineFlags(&argc, &argv, true);

  runBenchmarks();
  return 0;
}

#if 0
/*
Benchmark

$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                24
On-line CPU(s) list:   0-23
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             24
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 60
Model name:            Intel Core Processor (Haswell, no TSX)
Stepping:              1
CPU MHz:               2494.244
BogoMIPS:              4988.48
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              4096K
NUMA node0 CPU(s):     0-23

$ ../buck-out/gen/folly/test/producer_consumer_queue_benchmark
5       6       1       5
6       7       1893    11358
7       8       39671   277697
8       9       34921   279368
9       10      17799   160191
10      11      3685    36850
11      12      1075    11825
12      13      456     5472
13      14      422     5486
14      15      64      896
15      16      7       105
16      17      3       48
17      18      3       51
============================================================================
folly/test/ProducerConsumerQueueBenchmark.cpp   relative  time/iter  iters/s
============================================================================
----------------------------------------------------------------------------
BM_ProducerConsumer(1048574)                                 5.82ns  171.75M
BM_ProducerConsumerAffinity(1048574)                         7.36ns  135.83M
BM_ProducerConsumerLatency(1048574)                         1.67min    9.99m
============================================================================
*/
#endif