folly/folly/executors/test/EDFThreadPoolExecutorBenchmark.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <array>
#include <atomic>
#include <chrono>
#include <mutex>
#include <thread>

#include <folly/Benchmark.h>
#include <folly/BenchmarkUtil.h>
#include <folly/MPMCQueue.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/EDFThreadPoolExecutor.h>
#include <folly/executors/SoftRealTimeExecutor.h>
#include <folly/executors/ThreadPoolExecutor.h>

using namespace folly;

// Use 19 threads because it's common to use 0.8 * numCores, and 24 is a common
// number of cores.
static constexpr size_t kNumThreads = 19;

void throughput(uint32_t n, std::unique_ptr<ThreadPoolExecutor> ex) {
  while (n--) {
    ex->add([]() {});
  }
  ex->join();
}

BENCHMARK_NAMED_PARAM(
    throughput, CPUEx, std::make_unique<CPUThreadPoolExecutor>(kNumThreads))
BENCHMARK_RELATIVE_NAMED_PARAM(
    throughput, EDFEx, std::make_unique<EDFThreadPoolExecutor>(kNumThreads))

void saturated(
    uint32_t n, std::unique_ptr<ThreadPoolExecutor> ex, size_t numTasks) {
  std::atomic<size_t> numAlive{0};
  std::atomic<bool> finish{false};

  size_t expected = 0;
  while (n--) {
    while (numAlive.load(std::memory_order_relaxed) == numTasks) {
      // spin
    }

    while (!numAlive.compare_exchange_weak(
        expected, expected + 1, std::memory_order_relaxed)) {
      // try again
    }

    ex->add([&numAlive, &finish, numTasks]() {
      size_t expectedNumTasks = numTasks;
      const size_t wanted = numTasks - 1;
      while (!numAlive.compare_exchange_weak(
          expectedNumTasks, wanted, std::memory_order_relaxed)) {
        if (finish.load(std::memory_order_acquire)) {
          break;
        }
        expectedNumTasks = numTasks;
      }
    });
  }

  finish.store(true, std::memory_order_release);

  ex->join();
}

BENCHMARK_NAMED_PARAM(
    saturated, CPUEx_1, std::make_unique<CPUThreadPoolExecutor>(kNumThreads), 1)
BENCHMARK_RELATIVE_NAMED_PARAM(
    saturated, EDFEx_1, std::make_unique<EDFThreadPoolExecutor>(kNumThreads), 1)

BENCHMARK_NAMED_PARAM(
    saturated,
    CPUEx_10,
    std::make_unique<CPUThreadPoolExecutor>(kNumThreads),
    10)
BENCHMARK_RELATIVE_NAMED_PARAM(
    saturated,
    EDFEx_10,
    std::make_unique<EDFThreadPoolExecutor>(kNumThreads),
    10)

BENCHMARK_NAMED_PARAM(
    saturated,
    CPUEx_100,
    std::make_unique<CPUThreadPoolExecutor>(kNumThreads),
    100)
BENCHMARK_RELATIVE_NAMED_PARAM(
    saturated,
    EDFEx_100,
    std::make_unique<EDFThreadPoolExecutor>(kNumThreads),
    100)

void multiThreaded(uint32_t n, std::unique_ptr<ThreadPoolExecutor> ex) {
  static constexpr size_t kMallocSize = 128;
  static constexpr size_t kNumTasks = 8;

  struct WorkItem {
    std::array<std::atomic<char*>, kNumTasks> ptrs{};
    std::atomic<int> tasksDone{0};
  };

  std::array<WorkItem, 1024> workItems{};
  folly::MPMCQueue<WorkItem*> mallocQueue{workItems.size()};
  folly::MPMCQueue<WorkItem*> memsetQueue{workItems.size()};
  folly::MPMCQueue<WorkItem*> freeQueue{workItems.size()};

  for (size_t i = 0; i < std::min(static_cast<size_t>(n), workItems.size());
       i++) {
    mallocQueue.write(&workItems[i]);
  }

  auto maybeFinishState = [](WorkItem* workItem,
                             folly::MPMCQueue<WorkItem*>& nextQueue) {
    int tasksDone = workItem->tasksDone.fetch_add(1);
    if (tasksDone + 1 == kNumTasks) {
      workItem->tasksDone.store(0);
      nextQueue.write(workItem);
    }
  };

  auto mallocThread = std::thread([&]() {
    for (uint32_t i = 0; i < n; i++) {
      WorkItem* workItem;
      mallocQueue.blockingRead(workItem);
      for (size_t taskIndex = 0; taskIndex < kNumTasks; taskIndex++) {
        ex->add([workItem, taskIndex, &maybeFinishState, &memsetQueue]() {
          workItem->ptrs[taskIndex].store(
              static_cast<char*>(malloc(kMallocSize)));
          maybeFinishState(workItem, /*nextQueue=*/memsetQueue);
        });
      }
    }
  });

  auto memsetThread = std::thread([&]() {
    for (uint32_t i = 0; i < n; i++) {
      WorkItem* workItem;
      memsetQueue.blockingRead(workItem);
      for (size_t taskIndex = 0; taskIndex < kNumTasks; taskIndex++) {
        int ch = static_cast<int>(i);
        ex->add([workItem, taskIndex, ch, &maybeFinishState, &freeQueue]() {
          memset(workItem->ptrs[taskIndex].load(), ch, kMallocSize);
          maybeFinishState(workItem, /*nextQueue=*/freeQueue);
        });
      }
    }
  });

  auto freeThread = std::thread([&]() {
    for (uint32_t i = 0; i < n; i++) {
      WorkItem* workItem;
      freeQueue.blockingRead(workItem);
      for (size_t taskIndex = 0; taskIndex < kNumTasks; taskIndex++) {
        ex->add([workItem, taskIndex, &maybeFinishState, &mallocQueue]() {
          free(workItem->ptrs[taskIndex].load());
          workItem->ptrs[taskIndex].store(nullptr);
          maybeFinishState(workItem, /*nextQueue=*/mallocQueue);
        });
      }
    }
  });

  mallocThread.join();
  memsetThread.join();
  freeThread.join();
  ex->join();

  for (auto& workItem : workItems) {
    for (size_t i = 0; i < kNumTasks; i++) {
      free(workItem.ptrs[i]);
    }
  }
}

BENCHMARK_NAMED_PARAM(
    multiThreaded, CPUEx, std::make_unique<CPUThreadPoolExecutor>(kNumThreads))
BENCHMARK_RELATIVE_NAMED_PARAM(
    multiThreaded, EDFEx, std::make_unique<EDFThreadPoolExecutor>(kNumThreads))

int main(int argc, char* argv[]) {
  gflags::ParseCommandLineFlags(&argc, &argv, true);
  folly::runBenchmarks();

  return 0;
}