folly/folly/concurrency/container/test/LockFreeRingBufferTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <iostream>
#include <thread>

#include <folly/concurrency/container/LockFreeRingBuffer.h>
#include <folly/portability/GTest.h>
#include <folly/test/DeterministicSchedule.h>

namespace folly {

TEST(LockFreeRingBuffer, writeReadSequentially) {
  const int capacity = 256;
  const int turns = 4;

  LockFreeRingBuffer<int> rb(capacity);
  LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
  for (unsigned int turn = 0; turn < turns; turn++) {
    for (unsigned int write = 0; write < capacity; write++) {
      int val = turn * capacity + write;
      rb.write(val);
    }

    for (unsigned int write = 0; write < capacity; write++) {
      int dest = 0;
      ASSERT_TRUE(rb.tryRead(dest, cur));
      ASSERT_EQ(turn * capacity + write, dest);
      cur.moveForward();
    }
  }
}

TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) {
  const int capacity = 256;
  const int turns = 4;

  LockFreeRingBuffer<int> rb(capacity);
  for (unsigned int turn = 0; turn < turns; turn++) {
    for (unsigned int write = 0; write < capacity; write++) {
      int val = turn * capacity + write;
      rb.write(val);
    }

    LockFreeRingBuffer<int>::Cursor cur = rb.currentHead();
    cur.moveBackward(1); /// last write
    for (int write = capacity - 1; write >= 0; write--) {
      int foo = 0;
      ASSERT_TRUE(rb.tryRead(foo, cur));
      ASSERT_EQ(turn * capacity + write, foo);
      cur.moveBackward();
    }
  }
}

TEST(LockFreeRingBuffer, readsCanBlock) {
  // Start a reader thread, confirm that reading can block
  std::atomic<bool> readerHasRun(false);
  LockFreeRingBuffer<int> rb(1);
  auto cursor = rb.currentHead();
  cursor.moveForward(3); // wait for the 4th write

  const int sentinel = 0xfaceb00c;

  auto reader = std::thread([&]() {
    int val = 0;
    EXPECT_TRUE(rb.waitAndTryRead(val, cursor));
    readerHasRun = true;
    EXPECT_EQ(sentinel, val);
  });

  for (int i = 0; i < 4; i++) {
    EXPECT_FALSE(readerHasRun);
    int val = sentinel;
    rb.write(val);
  }
  reader.join();
  EXPECT_TRUE(readerHasRun);
}

// expose the cursor raw value via a wrapper type
template <typename T, template <typename> class Atom>
uint64_t value(const typename LockFreeRingBuffer<T, Atom>::Cursor& rbcursor) {
  typedef typename LockFreeRingBuffer<T, Atom>::Cursor RBCursor;

  struct ExposedCursor : RBCursor {
    ExposedCursor(const RBCursor& cursor) : RBCursor(cursor) {}
    uint64_t value() { return this->ticket; }
  };
  return ExposedCursor(rbcursor).value();
}

template <template <typename> class Atom>
void runReader(
    LockFreeRingBuffer<int, Atom>& rb, std::atomic<int32_t>& writes) {
  int32_t idx;
  while ((idx = writes--) > 0) {
    rb.write(idx);
  }
}

template <template <typename> class Atom>
void runWritesNeverFail(int capacity, int writes, int writers) {
  using folly::test::DeterministicSchedule;

  DeterministicSchedule sched(DeterministicSchedule::uniform(0));
  LockFreeRingBuffer<int, Atom> rb(capacity);

  std::atomic<int32_t> writes_remaining(writes);
  std::vector<std::thread> threads(writers);

  for (int i = 0; i < writers; i++) {
    threads[i] = DeterministicSchedule::thread(
        std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining)));
  }

  for (auto& thread : threads) {
    DeterministicSchedule::join(thread);
  }

  EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead()));
}

TEST(LockFreeRingBuffer, writesNeverFail) {
  using folly::detail::EmulatedFutexAtomic;
  using folly::test::DeterministicAtomic;

  runWritesNeverFail<DeterministicAtomic>(1, 100, 4);
  runWritesNeverFail<DeterministicAtomic>(10, 100, 4);
  runWritesNeverFail<DeterministicAtomic>(100, 1000, 8);
  runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16);

  runWritesNeverFail<std::atomic>(1, 100, 4);
  runWritesNeverFail<std::atomic>(10, 100, 4);
  runWritesNeverFail<std::atomic>(100, 1000, 8);
  runWritesNeverFail<std::atomic>(1000, 10000, 16);

  runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4);
  runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4);
  runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8);
  runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16);
}

TEST(LockFreeRingBuffer, readerCanDetectSkips) {
  const int capacity = 4;
  const int rounds = 4;

  LockFreeRingBuffer<int> rb(capacity);
  auto cursor = rb.currentHead();
  cursor.moveForward(1);

  for (int round = 0; round < rounds; round++) {
    for (int i = 0; i < capacity; i++) {
      int val = round * capacity + i;
      rb.write(val);
    }
  }

  int result = -1;
  EXPECT_FALSE(rb.tryRead(result, cursor));
  EXPECT_FALSE(rb.waitAndTryRead(result, cursor));
  EXPECT_EQ(-1, result);

  cursor = rb.currentTail();
  EXPECT_TRUE(rb.tryRead(result, cursor));
  EXPECT_EQ(capacity * (rounds - 1), result);
}

TEST(LockFreeRingBuffer, cursorFromWrites) {
  const int capacity = 3;
  LockFreeRingBuffer<int> rb(capacity);

  // Workaround for template deduction failure
  auto (&cursorValue)(value<int, std::atomic>);

  int val = 0xfaceb00c;
  EXPECT_EQ(0, cursorValue(rb.writeAndGetCursor(val)));
  EXPECT_EQ(1, cursorValue(rb.writeAndGetCursor(val)));
  EXPECT_EQ(2, cursorValue(rb.writeAndGetCursor(val)));

  // Check that rb is giving out actual cursors and not just
  // pointing to the current slot.
  EXPECT_EQ(3, cursorValue(rb.writeAndGetCursor(val)));
}

TEST(LockFreeRingBuffer, moveBackwardsCanFail) {
  const int capacity = 3;
  LockFreeRingBuffer<int> rb(capacity);

  // Workaround for template deduction failure
  auto (&cursorValue)(value<int, std::atomic>);

  int val = 0xfaceb00c;
  rb.write(val);
  rb.write(val);

  auto cursor = rb.currentHead(); // points to 2
  EXPECT_EQ(2, cursorValue(cursor));
  EXPECT_TRUE(cursor.moveBackward());
  EXPECT_TRUE(cursor.moveBackward()); // now at 0
  EXPECT_FALSE(cursor.moveBackward()); // moving back does nothing
}

namespace {

struct S {
  int x;
  float y;
  char c;
};

} // namespace

TEST(LockFreeRingBuffer, contendedReadsAndWrites) {
  LockFreeRingBuffer<S> rb{2};
  std::atomic<bool> done{false};

  std::vector<std::thread> threads;
  for (int i = 0; i < 8; ++i) {
    threads.emplace_back([&] {
      while (!done.load(std::memory_order_relaxed)) {
        S value{10, -5.5, 100};
        rb.write(value);
      }
    });
  }
  for (int i = 0; i < 8; ++i) {
    threads.emplace_back([&] {
      S value;
      while (!done.load(std::memory_order_relaxed)) {
        if (rb.tryRead(value, rb.currentTail())) {
          EXPECT_EQ(10, value.x);
          EXPECT_EQ(-5.5, value.y);
          EXPECT_EQ(100, value.c);
        }
      }
    });
  }

  /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
  done.store(true, std::memory_order_relaxed);
  for (auto& thread : threads) {
    thread.join();
  }
}

TEST(LockFreeRingBuffer, cursorComparison) {
  LockFreeRingBuffer<int> rb{2};
  rb.write(5);
  EXPECT_TRUE(rb.currentHead() == rb.currentHead());
  EXPECT_FALSE(rb.currentHead() == rb.currentTail());

  EXPECT_TRUE(rb.currentHead() != rb.currentTail());
  EXPECT_FALSE(rb.currentHead() != rb.currentHead());

  EXPECT_TRUE(rb.currentHead() > rb.currentTail());
  EXPECT_FALSE(rb.currentTail() > rb.currentHead());

  EXPECT_TRUE(rb.currentTail() < rb.currentHead());
  EXPECT_FALSE(rb.currentHead() < rb.currentTail());
}

} // namespace folly