folly/folly/test/DeterministicScheduleTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <folly/test/DeterministicSchedule.h>

#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/AtomicUtil.h>

using namespace folly::test;

static_assert(
    std::is_same_v<int, folly::atomic_value_type_t<DeterministicAtomic<int>>>);

TEST(DeterministicSchedule, uniform) {
  auto p = DeterministicSchedule::uniform(0);
  int buckets[10] = {};
  for (int i = 0; i < 100000; ++i) {
    buckets[p(10)]++;
  }
  for (int i = 0; i < 10; ++i) {
    EXPECT_TRUE(buckets[i] > 9000);
  }
}

TEST(DeterministicSchedule, uniformSubset) {
  auto ps = DeterministicSchedule::uniformSubset(0, 3, 100);
  int buckets[10] = {};
  std::set<int> seen;
  for (int i = 0; i < 100000; ++i) {
    if (i > 0 && (i % 100) == 0) {
      EXPECT_EQ(seen.size(), 3);
      seen.clear();
    }
    int x = ps(10);
    seen.insert(x);
    EXPECT_TRUE(seen.size() <= 3);
    buckets[x]++;
  }
  for (int i = 0; i < 10; ++i) {
    EXPECT_TRUE(buckets[i] > 9000);
  }
}

TEST(DeterministicSchedule, buggyAdd) {
  for (bool bug : {false, true}) {
    DeterministicSchedule sched(DeterministicSchedule::uniform(0));
    if (bug) {
      FOLLY_TEST_DSCHED_VLOG("Test with race condition");
    } else {
      FOLLY_TEST_DSCHED_VLOG("Test without race condition");
    }
    DeterministicMutex m;
    // The use of DeterinisticAtomic is not needed here, but it makes
    // it easier to understand the sequence of events in logs.
    DeterministicAtomic<int> test{0};
    DeterministicAtomic<int> baseline{0};
    int numThreads = 10;
    std::vector<std::thread> threads(numThreads);
    for (int t = 0; t < numThreads; ++t) {
      threads[t] = DeterministicSchedule::thread([&, t] {
        baseline.fetch_add(1);
        // Atomic increment of test protected by mutex m
        do {
          // Some threads use lock() others use try_lock()
          if ((t & 1) == 0) {
            m.lock();
          } else {
            if (!m.try_lock()) {
              continue;
            }
          }
          int newval = test.load() + 1;
          if (bug) {
            // Break the atomicity of the increment operation
            m.unlock();
            m.lock();
          }
          test.store(newval);
          m.unlock();
          break;
        } while (true);
      }); // thread lambda
    } // for t
    DeterministicSchedule::joinAll(threads);
    if (!bug) {
      EXPECT_EQ(test.load(), baseline.load());
    } else {
      if (test.load() == baseline.load()) {
        FOLLY_TEST_DSCHED_VLOG("Didn't catch the bug");
      } else {
        FOLLY_TEST_DSCHED_VLOG("Caught the bug");
      }
    }
  } // for bug
}

/*
 * Test DSched support for auxiliary data and global invariants
 *
 * How to use DSched support for auxiliary data and global invariants
 * (Let Foo<T, Atom> be the template to be tested):
 *   1. Add friend AnnotatedFoo<T> to Foo<T,Atom> (Typically, in Foo.h).
 *   2. Define a class AuxData for whatever auxiliary data is needed
 *      to maintain global knowledge of shared and private state.
 *   3. Define:
 *        static AuxData* aux_;
 *        static thread_local uint32_t tid_;
 *   4. (Optional) Define gflags for command line options. E.g.:
 *        DEFINE_int64(seed, 0, "Seed for random number generators");
 *   5. (Optionl) Define macros for mangement of auxiliary data. E.g.,
 *        #define AUX_THR(x)    (aux_->t_[tid_]->x)
 *   6. (Optional) Define macro for creating auxiliary actions. E.g.,
 *        #define AUX_ACT(act)                                       \
 *          {                                                        \
 *            AUX_THR(func_) = __func__;                             \
 *            AUX_THR(line_) = __LINE__;                             \
 *            AuxAct auxact([&](bool success) { if (success); act}); \
 *            DeterministicSchedule::setAuxAct(auxact);              \
 *          }
 *      [Note: Auxiliary actions must not contain any standard shared
 *      accesses, or else deadlock will occur. Use the load_direct()
 *      member function of DeterministicAtomic instead.]
 *   7. Define AnnotatedFoo<T> derived from Foo<T,DeterministicAtomic>.
 *   8. Define member functions in AnnotatedFoo to manage DSched::auxChk.
 *   9. Define member functions for logging and checkig global invariants.
 *  10. Define member functions for direct access to data members of Foo.
 *  11. (Optional) Add a member function dummyStep() to update
 *      auxiliary data race-free when the next step is unknoown or
 *      not conveniently accessible (e.g., in a different
 *      library). The functions adds a dummy shared step to force
 *      DSched to invoke the auxiliary action at a known point.This
 *      is needed for now because DSched allows threads to run in
 *      parallel between shared accesses. Hence, concurrent updates
 *      of shared auxiliary data can be racy if executed outside
 *      auxiliary actions. This may be obviated in the future if
 *      DSched supports fully seriallized execution.
 *        void dummyStep() {
 *          DeterministicSchedule::beforeSharedAccess();
 *          DeterministicSchedule::afterSharedAccess(true);
 *        }
 *  12. Override member functions of Foo as needed in order to
 *      annotate the code with auxiliary actions. [Note: There may be
 *      a lot of duplication of Foo's code. Alternatively, Foo can be
 *      annotated directly.]
 *  13. Define TEST using instances of AuxData and AnnotatedFoo.
 *  14. For debugging, iteratively add (as needed) auxiliary data,
 *      global invariants, logging details, command line flags as
 *      needed and selectively generate relevant logs to detect the
 *      race condition shortly after it occurs.
 *
 * In the following example Foo = AtomicCounter
 */

using DSched = DeterministicSchedule;

/** Forward declaration of annotated template */
template <typename T>
struct AnnotatedAtomicCounter;

/** Original template to be tested */
template <typename T, template <typename> class Atom = std::atomic>
class AtomicCounter {
  /** Friend declaration to allow full access */
  friend struct AnnotatedAtomicCounter<T>;

 public:
  explicit AtomicCounter(T val) : counter_(val) {}

  void inc() { this->counter_.fetch_add(1); }

  void incBug() { this->counter_.store(this->counter_.load() + 1); }

  T load() { return this->counter_.load(); }

 private:
  Atom<T> counter_ = {0};
};

/** auxiliary data */
struct AuxData {
  using T = int;

  /* General */
  uint64_t step_ = {0};
  uint64_t lastUpdate_ = {0};

  struct PerThread {
    /* General */
    std::string func_;
    int line_;
    /* Custom */
    T count_ = {0};
  };

  std::vector<PerThread> t_;

  explicit AuxData(int nthr) : t_(nthr) {}
};

static AuxData* aux_;
static thread_local uint32_t tid_;

/* Command line flags */
DEFINE_int64(seed, 0, "Seed for random number generators");
DEFINE_int64(max_steps, 1000000, "Max. number of shared steps for the test");
DEFINE_int64(num_reps, 1, "Number of test repetitions");
DEFINE_int64(num_ops, 1000, "Number of increments per repetition");
DEFINE_int64(liveness_thresh, 1000000, "Liveness threshold");
DEFINE_int64(log_begin, 0, "Step number to start logging. No logging if <= 0");
DEFINE_int64(log_length, 1000, "Length of step by step log (if log_begin > 0)");
DEFINE_int64(log_freq, 100000, "Log every so many steps");
DEFINE_int32(num_threads, 1, "Number of producers");
DEFINE_bool(bug, false, "Introduce bug");

/** Aux macros */
#define AUX_THR(x) (aux_->t_[tid_].x)
#define AUX_UPDATE() (aux_->lastUpdate_ = aux_->step_ + 1)

/** Macro for inline definition of auxiliary actions */
#define AUX_ACT(act)                         \
  do {                                       \
    AUX_THR(func_) = __func__;               \
    AUX_THR(line_) = __LINE__;               \
    AuxAct auxfn([&](bool success) {         \
      if (success) {                         \
      }                                      \
      if (true) {                            \
        act                                  \
      }                                      \
    });                                      \
    DeterministicSchedule::setAuxAct(auxfn); \
  } while (0)

/** Alias for original class */
template <typename T>
using Base = AtomicCounter<T, DeterministicAtomic>;

/** Annotated shared class */
template <typename T>
struct AnnotatedAtomicCounter : public Base<T> {
  /** Manage DSched auxChk */
  void setAuxChk() {
    AuxChk auxfn([&](uint64_t step) {
      auxLog(step);
      auxCheck();
    });
    DeterministicSchedule::setAuxChk(auxfn);
  }

  void clearAuxChk() { DeterministicSchedule::clearAuxChk(); }

  /** Aux log function */
  void auxLog(uint64_t step) {
    if (aux_->step_ == 0) {
      aux_->lastUpdate_ = step;
    }
    aux_->step_ = step;
    if (step > (uint64_t)FLAGS_max_steps) {
      exit(0);
    }
    bool doLog =
        (((FLAGS_log_begin > 0) && (step >= (uint64_t)FLAGS_log_begin) &&
          (step <= (uint64_t)FLAGS_log_begin + FLAGS_log_length)) ||
         ((step % FLAGS_log_freq) == 0));
    if (doLog) {
      doAuxLog(step);
    }
  }

  void doAuxLog(uint64_t step) {
    std::stringstream ss;
    /* General */
    ss << step << " - " << aux_->lastUpdate_ << " --";
    /* Shared */
    ss << " counter =" << this->counter_.load_direct();
    /* Thread */
    ss << " -- t" << tid_ << " " << AUX_THR(func_) << ":" << AUX_THR(line_);
    ss << " count[" << tid_ << "] = " << AUX_THR(count_);
    /* Output */
    std::cerr << ss.str() << std::endl;
  }

  void auxCheck() {
    /* Liveness */
    CHECK_LT(aux_->step_, aux_->lastUpdate_ + FLAGS_liveness_thresh);
    /* Safety */
    int sum = {0};
    for (auto& t : aux_->t_) {
      sum += t.count_;
    }
    CHECK_EQ(this->counter_.load_direct(), sum);
  }

  /* Direct access without going through DSched */
  T loadDirect() { return this->counter_.load_direct(); }

  /* Constructor -- calls original constructor */
  explicit AnnotatedAtomicCounter(int val) : Base<T>(val) {}

  /* Overloads of original member functions (as needed) */

  void inc() {
    AUX_ACT({ ++AUX_THR(count_); });
    this->counter_.fetch_add(1);
  }

  void incBug() {
    AUX_ACT({});
    T newval = this->counter_.load() + 1;
    AUX_ACT({ ++AUX_THR(count_); });
    this->counter_.store(newval);
  }
};

using Annotated = AnnotatedAtomicCounter<int>;

TEST(DeterministicSchedule, globalInvariants) {
  CHECK_GT(FLAGS_num_threads, 0);

  DSched sched(DSched::uniform(FLAGS_seed));
  for (int i = 0; i < FLAGS_num_reps; ++i) {
    aux_ = new AuxData(FLAGS_num_threads);
    Annotated annotated(0);
    annotated.setAuxChk();

    std::vector<std::thread> threads(FLAGS_num_threads);
    for (int tid = 0; tid < FLAGS_num_threads; ++tid) {
      threads[tid] = DSched::thread([&, tid]() {
        tid_ = tid;
        for (int j = tid; j < FLAGS_num_ops; j += FLAGS_num_threads) {
          (FLAGS_bug) ? annotated.incBug() : annotated.inc();
        }
      });
    }
    for (auto& t : threads) {
      DSched::join(t);
    }
    std::cerr << "====== rep " << i << " completed in step " << aux_->step_
              << std::endl;
    annotated.doAuxLog(aux_->step_);
    std::cerr << std::endl;
    EXPECT_EQ(annotated.loadDirect(), FLAGS_num_ops);
    annotated.clearAuxChk();
    delete aux_;
  }
}

struct DSchedTimestampTest : public DSchedTimestamp {
  explicit DSchedTimestampTest(size_t v) : DSchedTimestamp(v) {}
};

TEST(DeterministicSchedule, threadTimestamps) {
  ThreadTimestamps tss;
  DSchedThreadId tid0(0);
  DSchedThreadId tid1(1);

  ASSERT_FALSE(tss.atLeastAsRecentAs(tid0, DSchedTimestampTest(1)));

  tss.setIfNotPresent(tid0, DSchedTimestampTest(1));
  ASSERT_TRUE(tss.atLeastAsRecentAs(tid0, DSchedTimestampTest(1)));
  ASSERT_FALSE(tss.atLeastAsRecentAs(tid0, DSchedTimestampTest(2)));
  ASSERT_FALSE(tss.atLeastAsRecentAs(tid1, DSchedTimestampTest(1)));

  tss.setIfNotPresent(tid0, DSchedTimestampTest(2));
  ASSERT_FALSE(tss.atLeastAsRecentAs(tid0, DSchedTimestampTest(2)));

  auto ts = tss.advance(tid0);
  ASSERT_TRUE(ts.atLeastAsRecentAs(DSchedTimestampTest(2)));
  ASSERT_FALSE(ts.atLeastAsRecentAs(DSchedTimestampTest(3)));
  ASSERT_TRUE(tss.atLeastAsRecentAs(tid0, DSchedTimestampTest(2)));
  ASSERT_FALSE(tss.atLeastAsRecentAs(tid1, DSchedTimestampTest(1)));

  ThreadTimestamps tss2;
  tss2.setIfNotPresent(tid1, DSchedTimestampTest(3));
  ASSERT_FALSE(tss2.atLeastAsRecentAs(tid1, DSchedTimestampTest(4)));
  ASSERT_TRUE(tss2.atLeastAsRecentAs(tid1, DSchedTimestampTest(3)));

  ASSERT_FALSE(tss.atLeastAsRecentAsAny(tss2));
  tss.sync(tss2);
  ASSERT_TRUE(tss.atLeastAsRecentAs(tid1, DSchedTimestampTest(3)));
  ASSERT_FALSE(tss.atLeastAsRecentAs(tid1, DSchedTimestampTest(4)));

  ThreadTimestamps tss3;
  tss3.setIfNotPresent(tid1, DSchedTimestampTest(4));
  ASSERT_TRUE(tss3.atLeastAsRecentAsAny(tss2));
  ASSERT_FALSE(tss2.atLeastAsRecentAsAny(tss3));

  ThreadTimestamps tss4, tss5;
  tss4.setIfNotPresent(DSchedThreadId(10), DSchedTimestampTest(5));
  tss5.setIfNotPresent(DSchedThreadId(11), DSchedTimestampTest(5));
  ASSERT_FALSE(tss4.atLeastAsRecentAsAny(tss5));
  ASSERT_FALSE(tss5.atLeastAsRecentAsAny(tss4));
}

int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  gflags::ParseCommandLineFlags(&argc, &argv, true);
  return RUN_ALL_TESTS();
}