folly/folly/executors/test/FunctionSchedulerTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <algorithm>
#include <atomic>
#include <cassert>
#include <random>

#include <boost/thread.hpp>
#include <glog/logging.h>

#include <folly/Random.h>
#include <folly/executors/FunctionScheduler.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

using namespace folly;
using std::atomic;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::milliseconds;
using std::chrono::steady_clock;

namespace {

/*
 * Helper functions for controlling how long this test takes.
 *
 * Using larger intervals here will make the tests less flaky when run on
 * heavily loaded systems.  However, this will also make the tests take longer
 * to run.
 */
static const auto timeFactor = std::chrono::milliseconds(400);
std::chrono::milliseconds testInterval(int n) {
  return n * timeFactor;
}
int getTicksWithinRange(int n, int min, int max) {
  assert(min <= max);
  n = std::max(min, n);
  n = std::min(max, n);
  return n;
}
void delay(float n) {
  microseconds usec(static_cast<microseconds::rep>(
      duration_cast<microseconds>(timeFactor).count() * n));
  usleep(usec.count());
}

} // namespace

TEST(FunctionScheduler, StartAndShutdown) {
  FunctionScheduler fs;
  EXPECT_TRUE(fs.start());
  EXPECT_FALSE(fs.start());
  EXPECT_TRUE(fs.shutdown());
  EXPECT_FALSE(fs.shutdown());
  // start again
  EXPECT_TRUE(fs.start());
  EXPECT_FALSE(fs.start());
  EXPECT_TRUE(fs.shutdown());
  EXPECT_FALSE(fs.shutdown());
}

TEST(FunctionScheduler, SimpleAdd) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  fs.shutdown();
  delay(2);
  EXPECT_EQ(2, total);
}

TEST(FunctionScheduler, AddCancel) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  delay(2);
  EXPECT_EQ(4, total);
  EXPECT_TRUE(fs.cancelFunction("add2"));
  EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
  delay(2);
  EXPECT_EQ(4, total);
  fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
  delay(1);
  EXPECT_EQ(5, total);
  delay(2);
  EXPECT_EQ(6, total);
  fs.shutdown();
}

TEST(FunctionScheduler, AddCancel2) {
  atomic<int> total{0};
  FunctionScheduler fs;

  // Test adds and cancels while the scheduler is stopped
  EXPECT_FALSE(fs.cancelFunction("add2"));
  fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
  EXPECT_TRUE(fs.cancelFunction("add2"));
  EXPECT_FALSE(fs.cancelFunction("add2"));
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.addFunction([&] { total += 3; }, testInterval(3), "add3");

  EXPECT_EQ(0, total);
  fs.start();
  delay(1);
  EXPECT_EQ(5, total);

  // Cancel add2 while the scheduler is running
  EXPECT_TRUE(fs.cancelFunction("add2"));
  EXPECT_FALSE(fs.cancelFunction("add2"));
  EXPECT_FALSE(fs.cancelFunction("bogus"));

  delay(3);
  EXPECT_EQ(8, total);
  EXPECT_TRUE(fs.cancelFunction("add3"));

  // Test a function that cancels itself
  atomic<int> selfCancelCount{0};
  fs.addFunction(
      [&] {
        ++selfCancelCount;
        if (selfCancelCount > 2) {
          fs.cancelFunction("selfCancel");
        }
      },
      testInterval(1),
      "selfCancel",
      testInterval(1));
  delay(4);
  EXPECT_EQ(3, selfCancelCount);
  EXPECT_FALSE(fs.cancelFunction("selfCancel"));

  // Test a function that schedules another function
  atomic<int> adderCount{0};
  atomic<int> fn2Count = 0;
  auto fn2 = [&] { ++fn2Count; };
  auto fnAdder = [&] {
    ++adderCount;
    if (adderCount == 2) {
      fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
    }
  };
  fs.addFunction(fnAdder, testInterval(4), "adder");
  // t0: adder fires
  delay(1); // t1
  EXPECT_EQ(1, adderCount);
  EXPECT_EQ(0, fn2Count);
  // t4: adder fires, schedules fn2
  delay(4); // t5
  EXPECT_EQ(2, adderCount);
  EXPECT_EQ(0, fn2Count);
  // t6: fn2 fires
  delay(2); // t7
  EXPECT_EQ(2, adderCount);
  EXPECT_EQ(1, fn2Count);
  // t8: adder fires
  // t9: fn2 fires
  delay(3); // t10
  EXPECT_EQ(3, adderCount);
  EXPECT_EQ(2, fn2Count);
  EXPECT_TRUE(fs.cancelFunction("fn2"));
  EXPECT_TRUE(fs.cancelFunction("adder"));
  delay(5); // t10
  EXPECT_EQ(3, adderCount);
  EXPECT_EQ(2, fn2Count);

  EXPECT_EQ(8, total);
  EXPECT_EQ(3, selfCancelCount);
}

TEST(FunctionScheduler, AddMultiple) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
  EXPECT_THROW(
      fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
      std::invalid_argument); // function name already exists

  fs.start();
  delay(1);
  EXPECT_EQ(5, total);
  delay(4);
  EXPECT_EQ(12, total);
  EXPECT_TRUE(fs.cancelFunction("add2"));
  delay(2);
  EXPECT_EQ(15, total);
  fs.shutdown();
  delay(3);
  EXPECT_EQ(15, total);
  fs.shutdown();
}

TEST(FunctionScheduler, AddAfterStart) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
  fs.start();
  delay(3);
  EXPECT_EQ(10, total);
  fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
  delay(2);
  EXPECT_EQ(17, total);
}

TEST(FunctionScheduler, ShutdownStart) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  fs.start();
  delay(1);
  fs.shutdown();
  fs.start();
  delay(1);
  EXPECT_EQ(4, total);
  EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
  delay(2);
  EXPECT_EQ(6, total);
}

TEST(FunctionScheduler, ResetFunc) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
  fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
  fs.start();
  delay(1);
  EXPECT_EQ(5, total);
  EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
  EXPECT_TRUE(fs.resetFunctionTimer("add2"));
  delay(1);
  // t2: after the reset, add2 should have been invoked immediately
  EXPECT_EQ(7, total);
  delay(1.5);
  // t3.5: add3 should have been invoked. add2 should not
  EXPECT_EQ(10, total);
  delay(1);
  // t4.5: add2 should have been invoked once more (it was reset at t1)
  EXPECT_EQ(12, total);
}

TEST(FunctionScheduler, ResetFunc2) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunctionOnce([&] { total += 2; }, "add2", testInterval(1));
  fs.addFunctionOnce([&] { total += 3; }, "add3", testInterval(1));
  fs.start();
  delay(2);
  fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(2));
  EXPECT_TRUE(fs.resetFunctionTimer("add4"));
  fs.addFunctionOnce([&] { total += 3; }, "add6", testInterval(2));
  delay(1);
  EXPECT_TRUE(fs.resetFunctionTimer("add4"));
  delay(3);
  EXPECT_FALSE(fs.resetFunctionTimer("add3"));
  fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(1));
}

TEST(FunctionScheduler, ResetFuncWhileRunning) {
  struct State {
    boost::barrier barrier_a{2};
    boost::barrier barrier_b{2};
    boost::barrier barrier_c{2};
    boost::barrier barrier_d{2};
    atomic<bool> set = false;
    atomic<size_t> count = 0;
  };

  State state; // held by ref
  auto mv = std::make_shared<size_t>(); // gets moved

  FunctionScheduler fs;
  fs.addFunction(
      [&, mv /* ref + shared_ptr fit in in-situ storage */] {
        if (!state.set) { // first invocation
          state.barrier_a.wait();
          // ensure that resetFunctionTimer is called in this critical section
          state.barrier_b.wait();
          ++state.count;
          EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
          state.barrier_c.wait();
          // main thread checks count here
          state.barrier_d.wait();
        } else { // subsequent invocations
          ++state.count;
        }
      },
      testInterval(3),
      "nada");
  fs.start();

  state.barrier_a.wait();
  state.set = true;
  fs.resetFunctionTimer("nada");
  EXPECT_EQ(0, state.count) << "sanity check";
  state.barrier_b.wait();
  // fn thread increments count and checks mv here
  state.barrier_c.wait();
  EXPECT_EQ(1, state.count) << "sanity check";
  state.barrier_d.wait();
  delay(1);
  EXPECT_EQ(2, state.count) << "sanity check";
}

TEST(FunctionScheduler, AddInvalid) {
  atomic<int> total{0};
  FunctionScheduler fs;
  // interval may not be negative
  EXPECT_THROW(
      fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
      std::invalid_argument);

  EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
}

TEST(FunctionScheduler, NoFunctions) {
  FunctionScheduler fs;
  EXPECT_TRUE(fs.start());
  fs.shutdown();
  FunctionScheduler fs2;
  fs2.shutdown();
}

TEST(FunctionScheduler, AddWhileRunning) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.start();
  delay(1);
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  // The function should be invoked nearly immediately when we add it
  // and the FunctionScheduler is already running
  delay(0.5);
  auto t = total.load();
  EXPECT_EQ(2, t);
  delay(2);
  t = total.load();
  EXPECT_EQ(4, t);
}

TEST(FunctionScheduler, NoShutdown) {
  atomic<int> total{0};
  {
    FunctionScheduler fs;
    fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
    fs.start();
    delay(0.5);
    EXPECT_EQ(2, total);
  }
  // Destroyed the FunctionScheduler without calling shutdown.
  // Everything should have been cleaned up, and the function will no longer
  // get called.
  delay(2);
  EXPECT_EQ(2, total);
}

TEST(FunctionScheduler, StartDelay) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction([&] { total += 2; }, testInterval(2), "add2", testInterval(2));
  fs.addFunction([&] { total += 3; }, testInterval(3), "add3", testInterval(2));
  EXPECT_THROW(
      fs.addFunction(
          [&] { total += 2; }, testInterval(3), "addX", testInterval(-1)),
      std::invalid_argument);
  fs.start();
  delay(1); // t1
  EXPECT_EQ(0, total);
  // t2 : add2 total=2
  // t2 : add3 total=5
  delay(2); // t3
  EXPECT_EQ(5, total);
  // t4 : add2: total=7
  // t5 : add3: total=10
  // t6 : add2: total=12
  delay(4); // t7
  EXPECT_EQ(12, total);
  fs.cancelFunction("add2");
  // t8 : add3: total=15
  delay(2); // t9
  EXPECT_EQ(15, total);
  fs.shutdown();
  delay(3);
  EXPECT_EQ(15, total);
  fs.shutdown();
}

TEST(FunctionScheduler, NoSteadyCatchup) {
  std::atomic<int> ticks(0);
  FunctionScheduler fs;
  // fs.setSteady(false); is the default
  fs.addFunction(
      [&ticks] {
        if (++ticks == 2) {
          std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
      },
      milliseconds(5));
  fs.start();
  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  // no steady catch up means we'd tick once for 200ms, then remaining
  // 300ms / 5 = 60 times
  EXPECT_LE(ticks.load(), 61);
}

TEST(FunctionScheduler, SteadyCatchup) {
  std::atomic<int> ticks(0);
  FunctionScheduler fs;
  fs.setSteady(true);
  fs.addFunction(
      [&ticks] {
        if (++ticks == 2) {
          std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
      },
      milliseconds(5));
  fs.start();

  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
  // enough to catch back up to schedule
  EXPECT_NEAR(100, ticks.load(), 10);
}

TEST(FunctionScheduler, UniformDistribution) {
  atomic<int> total{0};
  const int kTicks = 2;
  std::chrono::milliseconds minInterval =
      testInterval(kTicks) - (timeFactor / 5);
  std::chrono::milliseconds maxInterval =
      testInterval(kTicks) + (timeFactor / 5);
  FunctionScheduler fs;
  fs.addFunctionUniformDistribution(
      [&] { total += 2; },
      minInterval,
      maxInterval,
      "UniformDistribution",
      std::chrono::milliseconds(0));
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  delay(kTicks);
  EXPECT_EQ(4, total);
  delay(kTicks);
  EXPECT_EQ(6, total);
  fs.shutdown();
  delay(2);
  EXPECT_EQ(6, total);
}

TEST(FunctionScheduler, ConsistentDelay) {
  std::atomic<int> ticks(0);
  FunctionScheduler fs;

  std::atomic<long long> epoch(0);
  epoch = duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
              .count();

  // We should have runs at t = 0, 600, 800, 1200, or 4 total.
  // If at const interval, it would be t = 0, 600, 1000, or 3 total.
  fs.addFunctionConsistentDelay(
      [&ticks, &epoch] {
        auto now =
            duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
                .count();
        int t = ++ticks;
        if (t != 2) {
          // Sensitive to delays above 100ms.
          EXPECT_NEAR((now - epoch) - (t - 1) * 400, 0, 100);
        }
        if (t == 1) {
          /* sleep override */
          std::this_thread::sleep_for(std::chrono::milliseconds(600));
        }
      },
      milliseconds(400),
      "ConsistentDelay");

  fs.start();

  /* sleep override */
  std::this_thread::sleep_for(std::chrono::milliseconds(1300));
  EXPECT_EQ(ticks.load(), 4);
}

TEST(FunctionScheduler, ExponentialBackoff) {
  atomic<int> total{0};
  atomic<int> expectedInterval{0};
  atomic<int> nextInterval{2};
  FunctionScheduler fs;
  fs.addFunctionGenericDistribution(
      [&] { total += 2; },
      [&expectedInterval, &nextInterval]() mutable {
        auto interval = nextInterval.load();
        expectedInterval = interval;
        nextInterval = interval * interval;
        return testInterval(interval);
      },
      "ExponentialBackoff",
      "2^n * 100ms",
      std::chrono::milliseconds(0));
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  delay(expectedInterval);
  EXPECT_EQ(4, total);
  delay(expectedInterval);
  EXPECT_EQ(6, total);
  fs.shutdown();
  delay(2);
  EXPECT_EQ(6, total);
}

TEST(FunctionScheduler, GammaIntervalDistribution) {
  atomic<int> total{0};
  atomic<int> expectedInterval{0};
  FunctionScheduler fs;
  std::default_random_engine generator(folly::Random::rand32());
  // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
  // These values do not matter much in this test, as we are not testing the
  // std::gamma_distribution itself...
  std::gamma_distribution<double> gamma(2.0, 2.0);
  fs.addFunctionGenericDistribution(
      [&] { total += 2; },
      [&expectedInterval, generator, gamma]() mutable {
        expectedInterval =
            getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
        return testInterval(expectedInterval);
      },
      "GammaDistribution",
      "gamma(2.0,2.0)*100ms",
      std::chrono::milliseconds(0));
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  delay(expectedInterval);
  EXPECT_EQ(4, total);
  delay(expectedInterval);
  EXPECT_EQ(6, total);
  fs.shutdown();
  delay(2);
  EXPECT_EQ(6, total);
}

TEST(FunctionScheduler, PoissonDistribution) {
  auto interval = std::chrono::hours(24 * 365 * 10);
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction(
      [&] { total += 2; },
      interval,
      folly::FunctionScheduler::LatencyDistribution(true, interval),
      "PoissonDistribution",
      std::chrono::milliseconds(0));
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
}

TEST(FunctionScheduler, AddWithRunOnce) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunctionOnce([&] { total += 2; }, "add2");
  fs.start();
  delay(1);
  EXPECT_EQ(2, total);
  delay(2);
  EXPECT_EQ(2, total);

  fs.addFunctionOnce([&] { total += 2; }, "add2");
  delay(1);
  EXPECT_EQ(4, total);
  delay(2);
  EXPECT_EQ(4, total);

  fs.shutdown();
}

TEST(FunctionScheduler, cancelFunctionAndWait) {
  atomic<int> total{0};
  FunctionScheduler fs;
  fs.addFunction(
      [&] {
        delay(5);
        total += 2;
      },
      testInterval(100),
      "add2");

  fs.start();
  delay(1);
  EXPECT_EQ(0, total); // add2 is still sleeping

  EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
  EXPECT_EQ(2, total); // add2 should have completed

  EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
  fs.shutdown();
}

TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
  atomic<int> total{0};
  FunctionScheduler fs;

  fs.addFunction(
      [&] {
        delay(5);
        total += 2;
      },
      testInterval(100),
      "add2");

  fs.start();
  delay(1);
  EXPECT_EQ(0, total); // add2 is still sleeping

  fs.cancelAllFunctionsAndWait();
  EXPECT_EQ(2, total);

  EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
  fs.shutdown();
}

TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
  folly::Baton<> baton;
  std::thread th([&baton]() {
    FunctionScheduler fs;
    fs.addFunction([] { delay(10); }, testInterval(2), "func");
    fs.start();
    delay(1);
    EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
    baton.post();
  });

  ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  th.join();
}

TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
  folly::Baton<> baton;
  std::thread th([&baton]() {
    FunctionScheduler fs;
    fs.addFunction([] { delay(10); }, testInterval(2), "func");
    fs.start();
    delay(1);
    fs.cancelAllFunctionsAndWait();
    baton.post();
  });

  ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  th.join();
}

TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
  folly::Baton<> baton;
  std::thread th([&baton]() {
    std::atomic<int> nExecuted(0);
    FunctionScheduler fs;
    fs.addFunction(
        [&nExecuted] {
          nExecuted++;
          delay(10);
        },
        testInterval(2),
        "func0");
    fs.addFunction(
        [&nExecuted] {
          nExecuted++;
          delay(10);
        },
        testInterval(2),
        "func1",
        testInterval(5));
    fs.start();
    delay(1);
    fs.cancelAllFunctionsAndWait();
    EXPECT_EQ(nExecuted, 1);
    baton.post();
  });

  ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  th.join();
}

TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
  FunctionScheduler fs;
  fs.addFunction([] { delay(10); }, testInterval(2), "func");

  fs.start();
  delay(1);
  std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
  delay(1);
  std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
  th1.join();
  th2.join();
}