folly/folly/test/MPMCQueueTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/MPMCQueue.h>

#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <utility>

#include <boost/intrusive_ptr.hpp>
#include <boost/thread/barrier.hpp>

#include <folly/Format.h>
#include <folly/Memory.h>
#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
#include <folly/stop_watch.h>
#include <folly/test/DeterministicSchedule.h>

FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr)

using namespace folly;
using namespace detail;
using namespace test;
using std::string;
using std::unique_ptr;
using std::vector;
using std::chrono::milliseconds;
using std::chrono::seconds;
using std::chrono::steady_clock;

typedef DeterministicSchedule DSched;

template <template <typename> class Atom>
void run_mt_sequencer_thread(
    int numThreads,
    int numOps,
    uint32_t init,
    TurnSequencer<Atom>& seq,
    Atom<uint32_t>& spinThreshold,
    int& prev,
    int i) {
  for (int op = i; op < numOps; op += numThreads) {
    seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
    EXPECT_EQ(prev, op - 1);
    prev = op;
    seq.completeTurn(init + op);
  }
}

template <template <typename> class Atom>
void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
  TurnSequencer<Atom> seq(init);
  Atom<uint32_t> spinThreshold(0);

  int prev = -1;
  vector<std::thread> threads(numThreads);
  for (int i = 0; i < numThreads; ++i) {
    threads[i] = DSched::thread(std::bind(
        run_mt_sequencer_thread<Atom>,
        numThreads,
        numOps,
        init,
        std::ref(seq),
        std::ref(spinThreshold),
        std::ref(prev),
        i));
  }

  for (auto& thr : threads) {
    DSched::join(thr);
  }

  EXPECT_EQ(prev, numOps - 1);
}

TEST(MPMCQueue, sequencer) {
  run_mt_sequencer_test<std::atomic>(1, 100, 0);
  run_mt_sequencer_test<std::atomic>(2, 100000, -100);
  run_mt_sequencer_test<std::atomic>(100, 10000, -100);
}

TEST(MPMCQueue, sequencerEmulatedFutex) {
  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
}

TEST(MPMCQueue, sequencerDeterministic) {
  DSched sched(DSched::uniform(0));
  run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
  run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
  run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
}

template <bool Dynamic = false, typename T>
void runElementTypeTest(T&& src) {
  MPMCQueue<T, std::atomic, Dynamic> cq(10);
  cq.blockingWrite(std::forward<T>(src));
  T dest;
  cq.blockingRead(dest);
  EXPECT_TRUE(cq.write(std::move(dest)));
  EXPECT_TRUE(cq.read(dest));
  auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
  EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
  EXPECT_TRUE(cq.read(dest));
  auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
  EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
  EXPECT_TRUE(cq.read(dest));
}

struct RefCounted {
  static thread_local int active_instances;

  mutable std::atomic<int> rc;

  RefCounted() : rc(0) { ++active_instances; }

  ~RefCounted() { --active_instances; }
};
thread_local int RefCounted::active_instances;

void intrusive_ptr_add_ref(RefCounted const* p) {
  p->rc++;
}

void intrusive_ptr_release(RefCounted const* p) {
  if (--(p->rc) == 0) {
    delete p;
  }
}

TEST(MPMCQueue, lotsOfElementTypes) {
  runElementTypeTest(10);
  runElementTypeTest(string("abc"));
  runElementTypeTest(std::make_pair(10, string("def")));
  runElementTypeTest(vector<string>{{"abc"}});
  runElementTypeTest(std::make_shared<char>('a'));
  runElementTypeTest(std::make_unique<char>('a'));
  runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
  EXPECT_EQ(RefCounted::active_instances, 0);
}

TEST(MPMCQueue, lotsOfElementTypesDynamic) {
  runElementTypeTest<true>(10);
  runElementTypeTest<true>(string("abc"));
  runElementTypeTest<true>(std::make_pair(10, string("def")));
  runElementTypeTest<true>(vector<string>{{"abc"}});
  runElementTypeTest<true>(std::make_shared<char>('a'));
  runElementTypeTest<true>(std::make_unique<char>('a'));
  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
  EXPECT_EQ(RefCounted::active_instances, 0);
}

TEST(MPMCQueue, singleThreadEnqdeq) {
  // Non-dynamic version only.
  // False positive for dynamic version. Capacity can be temporarily
  // higher than specified.
  MPMCQueue<int> cq(10);

  for (int pass = 0; pass < 10; ++pass) {
    for (int i = 0; i < 10; ++i) {
      EXPECT_TRUE(cq.write(i));
    }
    EXPECT_FALSE(cq.write(-1));
    EXPECT_FALSE(cq.isEmpty());
    EXPECT_EQ(cq.size(), 10);

    for (int i = 0; i < 5; ++i) {
      int dest = -1;
      EXPECT_TRUE(cq.read(dest));
      EXPECT_EQ(dest, i);
    }
    for (int i = 5; i < 10; ++i) {
      int dest = -1;
      cq.blockingRead(dest);
      EXPECT_EQ(dest, i);
    }
    int dest = -1;
    EXPECT_FALSE(cq.read(dest));
    EXPECT_EQ(dest, -1);

    EXPECT_TRUE(cq.isEmpty());
    EXPECT_EQ(cq.size(), 0);
  }
}

TEST(MPMCQueue, tryenqCapacityTest) {
  // Non-dynamic version only.
  // False positive for dynamic version. Capacity can be temporarily
  // higher than specified.
  for (size_t cap = 1; cap < 100; ++cap) {
    MPMCQueue<int> cq(cap);
    for (size_t i = 0; i < cap; ++i) {
      EXPECT_TRUE(cq.write(i));
    }
    EXPECT_FALSE(cq.write(100));
  }
}

TEST(MPMCQueue, enqCapacityTest) {
  // Non-dynamic version only.
  // False positive for dynamic version. Capacity can be temporarily
  // higher than specified.
  for (auto cap : {1, 100, 10000}) {
    MPMCQueue<int> cq(cap);
    for (int i = 0; i < cap; ++i) {
      cq.blockingWrite(i);
    }
    int t = 0;
    int when;
    auto thr = std::thread([&] {
      cq.blockingWrite(100);
      when = t;
    });
    usleep(2000);
    t = 1;
    int dummy;
    cq.blockingRead(dummy);
    thr.join();
    EXPECT_EQ(when, 1);
  }
}

template <template <typename> class Atom, bool Dynamic = false>
void runTryEnqDeqThread(
    int numThreads,
    int n, /*numOps*/
    MPMCQueue<int, Atom, Dynamic>& cq,
    std::atomic<uint64_t>& sum,
    int t) {
  uint64_t threadSum = 0;
  int src = t;
  // received doesn't reflect any actual values, we just start with
  // t and increment by numThreads to get the rounding of termination
  // correct if numThreads doesn't evenly divide numOps
  int received = t;
  while (src < n || received < n) {
    if (src < n && cq.write(src)) {
      src += numThreads;
    }

    int dst;
    if (received < n && cq.read(dst)) {
      received += numThreads;
      threadSum += dst;
    }
  }
  sum += threadSum;
}

template <template <typename> class Atom, bool Dynamic = false>
void runTryEnqDeqTest(int numThreads, int numOps) {
  // write and read aren't linearizable, so we don't have
  // hard guarantees on their individual behavior.  We can still test
  // correctness in aggregate
  MPMCQueue<int, Atom, Dynamic> cq(numThreads);

  uint64_t n = numOps;
  vector<std::thread> threads(numThreads);
  std::atomic<uint64_t> sum(0);
  for (int t = 0; t < numThreads; ++t) {
    threads[t] = DSched::thread(std::bind(
        runTryEnqDeqThread<Atom, Dynamic>,
        numThreads,
        n,
        std::ref(cq),
        std::ref(sum),
        t));
  }
  for (auto& t : threads) {
    DSched::join(t);
  }
  EXPECT_TRUE(cq.isEmpty());
  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
}

TEST(MPMCQueue, mtTryEnqDeq) {
  int nts[] = {1, 3, 100};

  int n = 100000;
  for (int nt : nts) {
    runTryEnqDeqTest<std::atomic>(nt, n);
  }
}

TEST(MPMCQueue, mtTryEnqDeqDynamic) {
  int nts[] = {1, 3, 100};

  int n = 100000;
  for (int nt : nts) {
    runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
  }
}

TEST(MPMCQueue, mtTryEnqDeqEmulatedFutex) {
  int nts[] = {1, 3, 100};

  int n = 100000;
  for (int nt : nts) {
    runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
  }
}

TEST(MPMCQueue, mtTryEnqDeqEmulatedFutexDynamic) {
  int nts[] = {1, 3, 100};

  int n = 100000;
  for (int nt : nts) {
    runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
  }
}

TEST(MPMCQueue, mtTryEnqDeqDeterministic) {
  int nts[] = {3, 10};

  long seed = 0;
  LOG(INFO) << "using seed " << seed;

  int n = 1000;
  for (int nt : nts) {
    {
      DSched sched(DSched::uniform(seed));
      runTryEnqDeqTest<DeterministicAtomic>(nt, n);
    }
    {
      DSched sched(DSched::uniformSubset(seed, 2));
      runTryEnqDeqTest<DeterministicAtomic>(nt, n);
    }
    {
      DSched sched(DSched::uniform(seed));
      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
    }
    {
      DSched sched(DSched::uniformSubset(seed, 2));
      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
    }
  }
}

uint64_t nowMicro() {
  timeval tv;
  gettimeofday(&tv, nullptr);
  return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}

template <typename Q>
struct WriteMethodCaller {
  WriteMethodCaller() {}
  virtual ~WriteMethodCaller() = default;
  virtual bool callWrite(Q& q, int i) = 0;
  virtual string methodName() = 0;
};

template <typename Q>
struct BlockingWriteCaller : public WriteMethodCaller<Q> {
  bool callWrite(Q& q, int i) override {
    q.blockingWrite(i);
    return true;
  }
  string methodName() override { return "blockingWrite"; }
};

template <typename Q>
struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
  bool callWrite(Q& q, int i) override { return q.writeIfNotFull(i); }
  string methodName() override { return "writeIfNotFull"; }
};

template <typename Q>
struct WriteCaller : public WriteMethodCaller<Q> {
  bool callWrite(Q& q, int i) override { return q.write(i); }
  string methodName() override { return "write"; }
};

template <
    typename Q,
    class Clock = steady_clock,
    class Duration = typename Clock::duration>
struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
  const Duration duration_;
  explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
  bool callWrite(Q& q, int i) override {
    auto then = Clock::now() + duration_;
    return q.tryWriteUntil(then, i);
  }
  string methodName() override {
    return folly::sformat(
        "tryWriteUntil({}ms)",
        std::chrono::duration_cast<milliseconds>(duration_).count());
  }
};

template <typename Q>
string producerConsumerBench(
    Q&& queue,
    string qName,
    int numProducers,
    int numConsumers,
    int numOps,
    WriteMethodCaller<Q>& writer,
    bool ignoreContents = false) {
  Q& q = queue;

  struct rusage beginUsage;
  getrusage(RUSAGE_SELF, &beginUsage);

  auto beginMicro = nowMicro();

  uint64_t n = numOps;
  std::atomic<uint64_t> sum(0);
  std::atomic<uint64_t> failed(0);

  vector<std::thread> producers(numProducers);
  for (int t = 0; t < numProducers; ++t) {
    producers[t] = DSched::thread([&, t] {
      for (int i = t; i < numOps; i += numProducers) {
        while (!writer.callWrite(q, i)) {
          ++failed;
        }
      }
    });
  }

  vector<std::thread> consumers(numConsumers);
  for (int t = 0; t < numConsumers; ++t) {
    consumers[t] = DSched::thread([&, t] {
      uint64_t localSum = 0;
      for (int i = t; i < numOps; i += numConsumers) {
        int dest = -1;
        q.blockingRead(dest);
        EXPECT_FALSE(dest == -1);
        localSum += dest;
      }
      sum += localSum;
    });
  }

  for (auto& t : producers) {
    DSched::join(t);
  }
  for (auto& t : consumers) {
    DSched::join(t);
  }
  if (!ignoreContents) {
    EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
  }

  auto endMicro = nowMicro();

  struct rusage endUsage;
  getrusage(RUSAGE_SELF, &endUsage);

  uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
  long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
      (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
  uint64_t failures = failed;
  size_t allocated = q.allocatedCapacity();

  return folly::sformat(
      "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
      "handoff, {} failures, {} allocated",
      qName,
      numProducers,
      writer.methodName(),
      numConsumers,
      nanosPer,
      csw,
      n,
      failures,
      allocated);
}

template <bool Dynamic = false>
void runMtProdConsDeterministic(long seed) {
  // we use the Bench method, but perf results are meaningless under DSched
  DSched sched(DSched::uniform(seed));

  using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;

  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  size_t cap;

  for (const auto& caller : callers) {
    cap = 10;
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
        "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
            folly::to<std::string>(cap) + ")",
        1,
        1,
        1000,
        *caller);
    cap = 100;
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
        "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
            folly::to<std::string>(cap) + ")",
        10,
        10,
        1000,
        *caller);
    cap = 10;
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
        "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
            folly::to<std::string>(cap) + ")",
        1,
        1,
        1000,
        *caller);
    cap = 100;
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
        "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
            folly::to<std::string>(cap) + ")",
        10,
        10,
        1000,
        *caller);
    cap = 1;
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
        "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
            folly::to<std::string>(cap) + ")",
        10,
        10,
        1000,
        *caller);
  }
}

void runMtProdConsDeterministicDynamic(
    long seed,
    uint32_t prods,
    uint32_t cons,
    uint32_t numOps,
    size_t cap,
    size_t minCap,
    size_t mult) {
  // we use the Bench method, but perf results are meaningless under DSched
  DSched sched(DSched::uniform(seed));

  using QueueType = MPMCQueue<int, DeterministicAtomic, true>;

  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));

  for (const auto& caller : callers) {
    LOG(INFO) << producerConsumerBench(
        MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
        "MPMCQueue<int, DeterministicAtomic, true>(" +
            folly::to<std::string>(cap) + ", " +
            folly::to<std::string>(minCap) + ", " +
            folly::to<std::string>(mult) + ")",
        prods,
        cons,
        numOps,
        *caller);
  }
}

// This is a benchmark, not a test
TEST(MPMCQueue, DISABLEDMtProdConsDeterministic) {
  runMtProdConsDeterministic(0);
}

// This is a benchmark, not a test
TEST(MPMCQueue, DISABLEDMtProdConsDeterministicDynamic) {
  runMtProdConsDeterministic<true>(0);
}

template <typename T>
void setFromEnv(T& var, const char* envvar) {
  char* str = std::getenv(envvar);
  if (str) {
    var = atoi(str);
  }
}

// This is a benchmark, not a test
TEST(MPMCQueue, DISABLEDMtProdConsDeterministicDynamicWithArguments) {
  long seed = 0;
  uint32_t prods = 10;
  uint32_t cons = 10;
  uint32_t numOps = 1000;
  size_t cap = 10000;
  size_t minCap = 9;
  size_t mult = 3;
  setFromEnv(seed, "SEED");
  setFromEnv(prods, "PRODS");
  setFromEnv(cons, "CONS");
  setFromEnv(numOps, "NUM_OPS");
  setFromEnv(cap, "CAP");
  setFromEnv(minCap, "MIN_CAP");
  setFromEnv(mult, "MULT");
  runMtProdConsDeterministicDynamic(
      seed, prods, cons, numOps, cap, minCap, mult);
}

#define PC_BENCH(q, np, nc, ...) \
  producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)

template <bool Dynamic = false>
void runMtProdCons() {
  using QueueType = MPMCQueue<int, std::atomic, Dynamic>;

  int n = 100000;
  setFromEnv(n, "NUM_OPS");
  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  for (const auto& caller : callers) {
    LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
  }
}

// This is a benchmark, not a test
TEST(MPMCQueue, DISABLEDMtProdCons) {
  runMtProdCons();
}

// This is a benchmark, not a test
TEST(MPMCQueue, DISABLEDMtProdConsDynamic) {
  runMtProdCons</* Dynamic = */ true>();
}

template <bool Dynamic = false>
void runMtProdConsEmulatedFutex() {
  using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;

  const int n = 100000 / (folly::kIsSanitizeThread ? 10 : 1);
  vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  callers.emplace_back(
      std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  for (const auto& caller : callers) {
    LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
    LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
  }
}

TEST(MPMCQueue, mtProdConsEmulatedFutex) {
  runMtProdConsEmulatedFutex();
}

TEST(MPMCQueue, mtProdConsEmulatedFutexDynamic) {
  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
}

template <template <typename> class Atom, bool Dynamic = false>
void runNeverFailThread(
    int numThreads,
    int n, /*numOps*/
    MPMCQueue<int, Atom, Dynamic>& cq,
    std::atomic<uint64_t>& sum,
    int t) {
  uint64_t threadSum = 0;
  for (int i = t; i < n; i += numThreads) {
    // enq + deq
    EXPECT_TRUE(cq.writeIfNotFull(i));

    int dest = -1;
    EXPECT_TRUE(cq.readIfNotEmpty(dest));
    EXPECT_TRUE(dest >= 0);
    threadSum += dest;
  }
  sum += threadSum;
}

template <template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
  // always #enq >= #deq
  MPMCQueue<int, Atom, Dynamic> cq(numThreads);

  uint64_t n = numOps;
  auto beginMicro = nowMicro();

  vector<std::thread> threads(numThreads);
  std::atomic<uint64_t> sum(0);
  for (int t = 0; t < numThreads; ++t) {
    threads[t] = DSched::thread(std::bind(
        runNeverFailThread<Atom, Dynamic>,
        numThreads,
        n,
        std::ref(cq),
        std::ref(sum),
        t));
  }
  for (auto& t : threads) {
    DSched::join(t);
  }
  EXPECT_TRUE(cq.isEmpty());
  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);

  return nowMicro() - beginMicro;
}

template <template <typename> class Atom, bool Dynamic = false>
void runMtNeverFail(std::vector<int>& nts, int n) {
  for (int nt : nts) {
    uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
              << " threads";
  }
}

// All the never_fail tests are for the non-dynamic version only.
// False positive for dynamic version. Some writeIfNotFull() and
// tryWriteUntil() operations may fail in transient conditions related
// to expansion.

TEST(MPMCQueue, mtNeverFail) {
  std::vector<int> nts{1, 3, 100};
  int n = 100000;
  runMtNeverFail<std::atomic>(nts, n);
}

TEST(MPMCQueue, mtNeverFailEmulatedFutex) {
  std::vector<int> nts{1, 3, 100};
  int n = 100000;
  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
}

template <bool Dynamic = false>
void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
  LOG(INFO) << "using seed " << seed;
  for (int nt : nts) {
    {
      DSched sched(DSched::uniform(seed));
      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
    }
    {
      DSched sched(DSched::uniformSubset(seed, 2));
      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
    }
  }
}

TEST(MPMCQueue, mtNeverFailDeterministic) {
  std::vector<int> nts{3, 10};
  long seed = 0; // nowMicro() % 10000;
  int n = 1000;
  runMtNeverFailDeterministic(nts, n, seed);
}

template <class Clock, template <typename> class Atom, bool Dynamic>
void runNeverFailUntilThread(
    int numThreads,
    int n, /*numOps*/
    MPMCQueue<int, Atom, Dynamic>& cq,
    std::atomic<uint64_t>& sum,
    int t) {
  uint64_t threadSum = 0;
  for (int i = t; i < n; i += numThreads) {
    // enq + deq
    auto soon = Clock::now() + std::chrono::seconds(1);
    EXPECT_TRUE(cq.tryWriteUntil(soon, i));

    int dest = -1;
    EXPECT_TRUE(cq.readIfNotEmpty(dest));
    EXPECT_TRUE(dest >= 0);
    threadSum += dest;
  }
  sum += threadSum;
}

template <class Clock, template <typename> class Atom, bool Dynamic = false>
uint64_t runNeverFailTest(int numThreads, int numOps) {
  // always #enq >= #deq
  MPMCQueue<int, Atom, Dynamic> cq(numThreads);

  uint64_t n = numOps;
  auto beginMicro = nowMicro();

  vector<std::thread> threads(numThreads);
  std::atomic<uint64_t> sum(0);
  for (int t = 0; t < numThreads; ++t) {
    threads[t] = DSched::thread(std::bind(
        runNeverFailUntilThread<Clock, Atom, Dynamic>,
        numThreads,
        n,
        std::ref(cq),
        std::ref(sum),
        t));
  }
  for (auto& t : threads) {
    DSched::join(t);
  }
  EXPECT_TRUE(cq.isEmpty());
  EXPECT_EQ(n * (n - 1) / 2 - sum, 0);

  return nowMicro() - beginMicro;
}

template <bool Dynamic = false>
void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
  for (int nt : nts) {
    uint64_t elapsed =
        runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
            nt, n);
    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
              << " threads";
  }
}

TEST(MPMCQueue, mtNeverFailUntilSystem) {
  std::vector<int> nts{1, 3, 100};
  int n = 100000;
  runMtNeverFailUntilSystem(nts, n);
}

template <bool Dynamic = false>
void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
  for (int nt : nts) {
    uint64_t elapsed =
        runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
            nt, n);
    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
              << " threads";
  }
}

TEST(MPMCQueue, mtNeverFailUntilSteady) {
  std::vector<int> nts{1, 3, 100};
  int n = 100000;
  runMtNeverFailUntilSteady(nts, n);
}

enum LifecycleEvent {
  NOTHING = -1,
  DEFAULT_CONSTRUCTOR,
  COPY_CONSTRUCTOR,
  MOVE_CONSTRUCTOR,
  TWO_ARG_CONSTRUCTOR,
  COPY_OPERATOR,
  MOVE_OPERATOR,
  DESTRUCTOR,
  MAX_LIFECYCLE_EVENT
};

static thread_local int lc_counts[MAX_LIFECYCLE_EVENT];
static thread_local int lc_prev[MAX_LIFECYCLE_EVENT];

static int lc_outstanding() {
  return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
      lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
      lc_counts[DESTRUCTOR];
}

static void lc_snap() {
  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
    lc_prev[i] = lc_counts[i];
  }
}

#define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)

static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
  for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
    int delta = i == what || i == what2 ? 1 : 0;
    EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
        << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
        << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
        << ", from line " << lineno;
  }
  lc_snap();
}

template <typename R>
struct Lifecycle {
  typedef R IsRelocatable;

  bool constructed;

  Lifecycle() noexcept : constructed(true) { ++lc_counts[DEFAULT_CONSTRUCTOR]; }

  explicit Lifecycle(int /* n */, char const* /* s */) noexcept
      : constructed(true) {
    ++lc_counts[TWO_ARG_CONSTRUCTOR];
  }

  Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
    ++lc_counts[COPY_CONSTRUCTOR];
  }

  Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
    ++lc_counts[MOVE_CONSTRUCTOR];
  }

  Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
    ++lc_counts[COPY_OPERATOR];
    return *this;
  }

  Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
    ++lc_counts[MOVE_OPERATOR];
    return *this;
  }

  ~Lifecycle() noexcept {
    ++lc_counts[DESTRUCTOR];
    assert(lc_outstanding() >= 0);
    assert(constructed);
    constructed = false;
  }
};

template <typename R>
void runPerfectForwardingTest() {
  lc_snap();
  EXPECT_EQ(lc_outstanding(), 0);

  {
    // Non-dynamic only. False positive for dynamic.
    MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
    LIFECYCLE_STEP(NOTHING);

    for (int pass = 0; pass < 10; ++pass) {
      for (int i = 0; i < 10; ++i) {
        queue.blockingWrite();
        LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);

        queue.blockingWrite(1, "one");
        LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);

        {
          Lifecycle<R> src;
          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
          queue.blockingWrite(std::move(src));
          LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
        }
        LIFECYCLE_STEP(DESTRUCTOR);

        {
          Lifecycle<R> src;
          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
          queue.blockingWrite(src);
          LIFECYCLE_STEP(COPY_CONSTRUCTOR);
        }
        LIFECYCLE_STEP(DESTRUCTOR);

        EXPECT_TRUE(queue.write());
        LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
      }

      EXPECT_EQ(queue.size(), 50);
      EXPECT_FALSE(queue.write(2, "two"));
      LIFECYCLE_STEP(NOTHING);

      for (int i = 0; i < 50; ++i) {
        {
          Lifecycle<R> node;
          LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);

          queue.blockingRead(node);
          if (R::value) {
            // relocatable, moved via memcpy
            LIFECYCLE_STEP(DESTRUCTOR);
          } else {
            LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
          }
        }
        LIFECYCLE_STEP(DESTRUCTOR);
      }

      EXPECT_EQ(queue.size(), 0);
    }

    // put one element back before destruction
    {
      Lifecycle<R> src(3, "three");
      LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
      queue.write(std::move(src));
      LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
    }
    LIFECYCLE_STEP(DESTRUCTOR); // destroy src
  }
  LIFECYCLE_STEP(DESTRUCTOR); // destroy queue

  EXPECT_EQ(lc_outstanding(), 0);
}

TEST(MPMCQueue, perfectForwarding) {
  runPerfectForwardingTest<std::false_type>();
}

TEST(MPMCQueue, perfectForwardingRelocatable) {
  runPerfectForwardingTest<std::true_type>();
}

template <bool Dynamic = false>
void run_queue_moving() {
  lc_snap();
  EXPECT_EQ(lc_outstanding(), 0);

  {
    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
    LIFECYCLE_STEP(NOTHING);

    a.blockingWrite();
    LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);

    // move constructor
    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b =
        std::move(a);
    LIFECYCLE_STEP(NOTHING);
    EXPECT_EQ(a.capacity(), 0);
    EXPECT_EQ(a.size(), 0);
    EXPECT_EQ(b.capacity(), 50);
    EXPECT_EQ(b.size(), 1);

    b.blockingWrite();
    LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);

    // move operator
    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
    LIFECYCLE_STEP(NOTHING);
    c = std::move(b);
    LIFECYCLE_STEP(NOTHING);
    EXPECT_EQ(c.capacity(), 50);
    EXPECT_EQ(c.size(), 2);

    {
      Lifecycle<std::false_type> dst;
      LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
      c.blockingRead(dst);
      LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);

      {
        // swap
        MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
        LIFECYCLE_STEP(NOTHING);
        std::swap(c, d);
        LIFECYCLE_STEP(NOTHING);
        EXPECT_EQ(c.capacity(), 10);
        EXPECT_TRUE(c.isEmpty());
        EXPECT_EQ(d.capacity(), 50);
        EXPECT_EQ(d.size(), 1);

        d.blockingRead(dst);
        LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);

        c.blockingWrite(dst);
        LIFECYCLE_STEP(COPY_CONSTRUCTOR);

        d.blockingWrite(std::move(dst));
        LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
      } // d goes out of scope
      LIFECYCLE_STEP(DESTRUCTOR);
    } // dst goes out of scope
    LIFECYCLE_STEP(DESTRUCTOR);
  } // c goes out of scope
  LIFECYCLE_STEP(DESTRUCTOR);
}

TEST(MPMCQueue, queueMoving) {
  run_queue_moving();
}

TEST(MPMCQueue, queueMovingDynamic) {
  run_queue_moving<true>();
}

TEST(MPMCQueue, explicitZeroCapacityFail) {
  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);

  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
}

template <bool Dynamic>
void testTryReadUntil() {
  MPMCQueue<int, std::atomic, Dynamic> q{1};

  const auto wait = std::chrono::milliseconds(100);
  stop_watch<> watch;
  bool rets[2];
  int vals[2];
  std::vector<std::thread> threads;
  boost::barrier b{3};
  for (int i = 0; i < 2; i++) {
    threads.emplace_back([&, i] {
      b.wait();
      rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
    });
  }

  b.wait();
  EXPECT_TRUE(q.write(42));

  for (int i = 0; i < 2; i++) {
    threads[i].join();
  }

  for (int i = 0; i < 2; i++) {
    int other = (i + 1) % 2;
    if (rets[i]) {
      EXPECT_EQ(42, vals[i]);
      EXPECT_FALSE(rets[other]);
    }
  }

  EXPECT_TRUE(watch.elapsed(wait));
}

template <bool Dynamic>
void testTryWriteUntil() {
  MPMCQueue<int, std::atomic, Dynamic> q{1};
  EXPECT_TRUE(q.write(42));

  const auto wait = std::chrono::milliseconds(100);
  stop_watch<> watch;
  bool rets[2];
  std::vector<std::thread> threads;
  boost::barrier b{3};
  for (int i = 0; i < 2; i++) {
    threads.emplace_back([&, i] {
      b.wait();
      rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
    });
  }

  b.wait();
  int x;
  EXPECT_TRUE(q.read(x));
  EXPECT_EQ(42, x);

  for (int i = 0; i < 2; i++) {
    threads[i].join();
  }
  EXPECT_TRUE(q.read(x));

  for (int i = 0; i < 2; i++) {
    int other = (i + 1) % 2;
    if (rets[i]) {
      EXPECT_EQ(i, x);
      EXPECT_FALSE(rets[other]);
    }
  }

  EXPECT_TRUE(watch.elapsed(wait));
}

TEST(MPMCQueue, tryReadUntil) {
  testTryReadUntil<false>();
}

TEST(MPMCQueue, tryReadUntilDynamic) {
  testTryReadUntil<true>();
}

TEST(MPMCQueue, tryWriteUntil) {
  testTryWriteUntil<false>();
}

TEST(MPMCQueue, tryWriteUntilDynamic) {
  testTryWriteUntil<true>();
}

template <bool Dynamic>
void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
  CHECK(q.write(1));
  /* The following must not block forever */
  q.tryWriteUntil(
      std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
}

TEST(MPMCQueue, tryWriteUntilTimeout) {
  folly::MPMCQueue<int, std::atomic, false> queue(1);
  testTimeout<false>(queue);
}

TEST(MPMCQueue, mustFailTryWriteUntilDynamic) {
  folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
  testTimeout<true>(queue);
}