folly/folly/futures/test/ViaTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <thread>

#include <folly/MPMCQueue.h>
#include <folly/executors/DrivableExecutor.h>
#include <folly/executors/InlineExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>

using namespace folly;

struct ManualWaiter : public DrivableExecutor {
  explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex_) : ex(ex_) {}

  void add(Func f) override { ex->add(std::move(f)); }

  void drive() override {
    ex->wait();
    ex->run();
  }

  std::shared_ptr<ManualExecutor> ex;
};

struct ViaFixture : public testing::Test {
  ViaFixture()
      : westExecutor(new ManualExecutor),
        eastExecutor(new ManualExecutor),
        waiter(new ManualWaiter(westExecutor)),
        done(false) {
    th = std::thread([=] {
      ManualWaiter eastWaiter(eastExecutor);
      while (!done) {
        eastWaiter.drive();
      }
    });
  }

  ~ViaFixture() override {
    done = true;
    eastExecutor->add([=]() {});
    th.join();
  }

  void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
    eastExecutor->add([=]() { cob(a + b); });
  }

  std::shared_ptr<ManualExecutor> westExecutor;
  std::shared_ptr<ManualExecutor> eastExecutor;
  std::shared_ptr<ManualWaiter> waiter;
  InlineExecutor inlineExecutor;
  std::atomic<bool> done;
  std::thread th;
};

TEST(Via, exceptionOnLaunch) {
  auto future = makeFuture<int>(std::runtime_error("E"));
  EXPECT_THROW(future.value(), std::runtime_error);
}

TEST(Via, thenValue) {
  auto future = makeFuture(std::move(1)).thenTry([](Try<int>&& t) {
    return t.value() == 1;
  });

  EXPECT_TRUE(future.value());
}

TEST(Via, thenFuture) {
  auto future = makeFuture(1).thenTry(
      [](Try<int>&& t) { return makeFuture(t.value() == 1); });
  EXPECT_TRUE(future.value());
}

static Future<std::string> doWorkStatic(Try<std::string>&& t) {
  return makeFuture(t.value() + ";static");
}

TEST(Via, thenFunction) {
  struct Worker {
    Future<std::string> doWork(Try<std::string>&& t) {
      return makeFuture(t.value() + ";class");
    }
    static Future<std::string> doWorkStatic(Try<std::string>&& t) {
      return makeFuture(t.value() + ";class-static");
    }
  } w;

  auto f = makeFuture(std::string("start"))
               .thenTry(doWorkStatic)
               .thenTry(Worker::doWorkStatic)
               .then(&Worker::doWork, &w);

  EXPECT_EQ(f.value(), "start;static;class-static;class");
}

TEST_F(ViaFixture, threadHops) {
  auto westThreadId = std::this_thread::get_id();
  auto f = via(eastExecutor.get())
               .thenTry([=](Try<Unit>&& /* t */) {
                 EXPECT_NE(std::this_thread::get_id(), westThreadId);
                 return makeFuture<int>(1);
               })
               .via(westExecutor.get())
               .thenTry([=](Try<int>&& t) {
                 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
                 return t.value();
               });
  EXPECT_EQ(std::move(f).getVia(waiter.get()), 1);
}

TEST_F(ViaFixture, chainVias) {
  auto westThreadId = std::this_thread::get_id();
  auto f = via(eastExecutor.get())
               .thenValue([=](auto&&) {
                 EXPECT_NE(std::this_thread::get_id(), westThreadId);
                 return 1;
               })
               .thenValue([=](int val) {
                 return makeFuture(val)
                     .via(westExecutor.get())
                     .thenValue([=](int v) mutable {
                       EXPECT_EQ(std::this_thread::get_id(), westThreadId);
                       return v + 1;
                     });
               })
               .thenValue([=](int val) {
                 // even though ultimately the future that triggers this one
                 // executed in the west thread, this thenValue() inherited the
                 // executor from its predecessor, ie the eastExecutor.
                 EXPECT_NE(std::this_thread::get_id(), westThreadId);
                 return val + 1;
               })
               .via(westExecutor.get())
               .thenValue([=](int val) {
                 // go back to west, so we can wait on it
                 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
                 return val + 1;
               });

  EXPECT_EQ(std::move(f).getVia(waiter.get()), 4);
}

TEST_F(ViaFixture, bareViaAssignment) {
  auto f = via(eastExecutor.get());
}
TEST_F(ViaFixture, viaAssignment) {
  // via()&&
  auto f = makeFuture().via(eastExecutor.get());
  // via()&
  auto f2 = f.via(eastExecutor.get());
}

struct PriorityExecutor : public Executor {
  void add(Func /* f */) override { count1++; }

  void addWithPriority(Func f, int8_t priority) override {
    int mid = getNumPriorities() / 2;
    int p = priority < 0 ? std::max(0, mid + priority)
                         : std::min(getNumPriorities() - 1, mid + priority);
    EXPECT_LT(p, 3);
    EXPECT_GE(p, 0);
    if (p == 0) {
      count0++;
    } else if (p == 1) {
      count1++;
    } else if (p == 2) {
      count2++;
    }
    f();
  }

  uint8_t getNumPriorities() const override { return 3; }

  int count0{0};
  int count1{0};
  int count2{0};
};

TEST(Via, priority) {
  PriorityExecutor exe;
  via(&exe, -1).thenValue([](auto&&) {});
  via(&exe, 0).thenValue([](auto&&) {});
  via(&exe, 1).thenValue([](auto&&) {});
  via(&exe, 42).thenValue([](auto&&) {}); // overflow should go to max priority
  via(&exe, -42).thenValue(
      [](auto&&) {}); // underflow should go to min priority
  via(&exe).thenValue([](auto&&) {}); // default to mid priority
  via(&exe, Executor::LO_PRI).thenValue([](auto&&) {});
  via(&exe, Executor::HI_PRI).thenValue([](auto&&) {});
  EXPECT_EQ(3, exe.count0);
  EXPECT_EQ(2, exe.count1);
  EXPECT_EQ(3, exe.count2);
}

TEST(Via, then2) {
  ManualExecutor x1, x2;
  bool a = false, b = false, c = false;
  via(&x1)
      .thenValue([&](auto&&) { a = true; })
      .thenValue([&](auto&&) { b = true; })
      .thenValueInline(folly::makeAsyncTask(&x2, [&](auto&&) { c = true; }));

  EXPECT_FALSE(a);
  EXPECT_FALSE(b);

  x1.run();
  EXPECT_TRUE(a);
  EXPECT_FALSE(b);
  EXPECT_FALSE(c);

  x1.run();
  EXPECT_TRUE(b);
  EXPECT_FALSE(c);

  x2.run();
  EXPECT_TRUE(c);
}

TEST(Via, allowInline) {
  ManualExecutor x1, x2;
  bool a = false, b = false, c = false, d = false, e = false, f = false,
       g = false, h = false, i = false, j = false, k = false, l = false,
       m = false, n = false, o = false, p = false, q = false, r = false;
  via(&x1)
      .thenValue([&](auto&&) { a = true; })
      .thenTryInline([&](auto&&) { b = true; })
      .thenTry([&](auto&&) { c = true; })
      .via(&x2)
      .thenTryInline([&](auto&&) { d = true; })
      .thenValue([&](auto&&) {
        e = true;
        return via(&x2).thenValue([&](auto&&) { f = true; });
      })
      .thenErrorInline(tag_t<std::exception>{}, [](auto&&) {})
      .thenValueInline([&](auto&&) { g = true; })
      .thenValue([&](auto&&) {
        h = true;
        return via(&x1).thenValue([&](auto&&) { i = true; });
      })
      .thenValueInline([&](auto&&) { j = true; })
      .semi()
      .deferValue([&](auto&&) { k = true; })
      .via(&x2)
      .thenValueInline([&](auto&&) { l = true; })
      .semi()
      .deferValue([&](auto&&) { m = true; })
      .via(&x1)
      .thenValue([&](auto&&) { n = true; })
      .semi()
      .deferValue([&](auto&&) { o = true; })
      .deferValue([&](auto&&) { p = true; })
      .via(&x1)
      .semi()
      .deferValue([&](auto&&) { q = true; })
      .deferValue([&](auto&&) { r = true; })
      .via(&x2);

  EXPECT_FALSE(a);
  EXPECT_FALSE(b);

  // Expect b to be satisfied inline with the task x1
  x1.run();
  EXPECT_TRUE(a);
  EXPECT_TRUE(b);
  EXPECT_FALSE(c);

  x1.run();
  EXPECT_TRUE(c);
  EXPECT_FALSE(d);

  // Demonstrate that the executor transition did not allow inline execution
  x2.run();
  EXPECT_TRUE(d);
  EXPECT_FALSE(e);

  x2.run();
  EXPECT_TRUE(e);
  EXPECT_FALSE(f);
  EXPECT_FALSE(g);

  // Completing nested continuation should satisfy inline continuation
  x2.run();
  EXPECT_TRUE(f);
  EXPECT_TRUE(g);
  EXPECT_FALSE(h);

  x2.run();
  EXPECT_TRUE(h);
  EXPECT_FALSE(i);
  EXPECT_FALSE(j);

  // Nested continuation on different executor should not complete next entry
  // inline
  x1.run();
  EXPECT_TRUE(i);
  EXPECT_FALSE(j);

  // Defer should run on x1 and therefore not inline
  // Subsequent deferred work is run on x1 and hence not inlined.
  x2.run();
  EXPECT_TRUE(j);
  EXPECT_TRUE(k);
  EXPECT_TRUE(l);
  EXPECT_FALSE(m);

  // Complete the deferred task
  x1.run();
  EXPECT_TRUE(m);
  EXPECT_FALSE(n);

  // Here defer and the above thenValue are both on x1, defer should be
  // inline
  x1.run();
  EXPECT_TRUE(n);
  EXPECT_TRUE(o);
  EXPECT_TRUE(p);
  EXPECT_FALSE(q);

  // Change of executor in deferred executor so now run x2 to complete
  x2.run();
  EXPECT_TRUE(q);
  EXPECT_TRUE(r);
}

#ifndef __APPLE__ // TODO #7372389
/// Simple executor that does work in another thread
class ThreadExecutor : public Executor {
  folly::MPMCQueue<Func> funcs;
  std::atomic<bool> done{false};
  std::thread worker;
  folly::Baton<> baton;

  void work() {
    baton.post();
    Func fn;
    while (!done) {
      while (!funcs.isEmpty()) {
        funcs.blockingRead(fn);
        fn();
      }
    }
  }

 public:
  explicit ThreadExecutor(size_t n = 1024) : funcs(n) {
    worker = std::thread(std::bind(&ThreadExecutor::work, this));
  }

  ~ThreadExecutor() override {
    done = true;
    funcs.write([] {});
    worker.join();
  }

  void add(Func fn) override { funcs.blockingWrite(std::move(fn)); }

  void waitForStartup() { baton.wait(); }
};

TEST(Via, viaThenGetWasRacy) {
  ThreadExecutor x;
  std::unique_ptr<int> val =
      folly::via(&x)
          .thenValue([](auto&&) { return std::make_unique<int>(42); })
          .get();
  ASSERT_TRUE(!!val);
  EXPECT_EQ(42, *val);
}

TEST(Via, callbackRace) {
  ThreadExecutor x;

  auto fn = [&x] {
    auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
    std::vector<Future<Unit>> futures;

    for (auto& p : *promises) {
      futures.emplace_back(p.getFuture().via(&x).thenTry([](Try<Unit>&&) {}));
    }

    x.waitForStartup();
    x.add([promises] {
      for (auto& p : *promises) {
        p.setValue();
      }
    });

    return collectAll(futures);
  };

  fn().wait();
}
#endif

class DummyDrivableExecutor : public DrivableExecutor {
 public:
  void add(Func /* f */) override {}
  void drive() override { ran = true; }
  bool ran{false};
};

TEST(Via, getVia) {
  {
    // non-void
    ManualExecutor x;
    auto f = via(&x).thenValue([](auto&&) { return true; });
    EXPECT_TRUE(std::move(f).getVia(&x));
  }

  {
    // void
    ManualExecutor x;
    auto f = via(&x).then();
    std::move(f).getVia(&x);
  }

  {
    DummyDrivableExecutor x;
    auto f = makeFuture(true);
    EXPECT_TRUE(std::move(f).getVia(&x));
    EXPECT_FALSE(x.ran);
  }
}

TEST(Via, SimpleTimedGetVia) {
  TimedDrivableExecutor e2;
  Promise<folly::Unit> p;
  auto f = p.getFuture();
  EXPECT_THROW(
      std::move(f).getVia(&e2, std::chrono::seconds(1)), FutureTimeout);
}

TEST(Via, getTryVia) {
  {
    // non-void
    ManualExecutor x;
    auto f = via(&x).thenValue([](auto&&) { return 23; });
    EXPECT_FALSE(f.isReady());
    EXPECT_EQ(23, std::move(f).getTryVia(&x).value());
  }

  {
    // void
    ManualExecutor x;
    auto f = via(&x).then();
    EXPECT_FALSE(f.isReady());
    auto t = std::move(f).getTryVia(&x);
    EXPECT_TRUE(t.hasValue());
  }

  {
    DummyDrivableExecutor x;
    auto f = makeFuture(23);
    EXPECT_EQ(23, std::move(f).getTryVia(&x).value());
    EXPECT_FALSE(x.ran);
  }
}

TEST(Via, SimpleTimedGetTryVia) {
  TimedDrivableExecutor e2;
  Promise<folly::Unit> p;
  auto f = p.getFuture();
  EXPECT_THROW(
      std::move(f).getTryVia(&e2, std::chrono::seconds(1)), FutureTimeout);
}

TEST(Via, waitVia) {
  {
    ManualExecutor x;
    auto f = via(&x).then();
    EXPECT_FALSE(f.isReady());
    f.waitVia(&x);
    EXPECT_TRUE(f.isReady());
  }

  {
    // try rvalue as well
    ManualExecutor x;
    auto f = via(&x).then().waitVia(&x);
    EXPECT_TRUE(f.isReady());
  }

  {
    DummyDrivableExecutor x;
    makeFuture(true).waitVia(&x);
    EXPECT_FALSE(x.ran);
  }
}

TEST(Via, viaRaces) {
  ManualExecutor x;
  Promise<Unit> p;
  auto tid = std::this_thread::get_id();
  bool done = false;

  std::thread t1([&] {
    p.getFuture()
        .via(&x)
        .thenTry(
            [&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
        .thenTry(
            [&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
        .thenTry([&](Try<Unit>&&) { done = true; });
  });

  std::thread t2([&] { p.setValue(); });

  while (!done) {
    x.run();
  }
  t1.join();
  t2.join();
}

TEST(Via, viaDummyExecutorFutureSetValueFirst) {
  // The callback object will get destroyed when passed to the executor.

  // A promise will be captured by the callback lambda so we can observe that
  // it will be destroyed.
  Promise<Unit> captured_promise;
  auto captured_promise_future = captured_promise.getFuture();

  DummyDrivableExecutor x;
  auto future = makeFuture().via(&x).thenValue(
      [c = std::move(captured_promise)](auto&&) { return 42; });

  EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
  EXPECT_THROW(
      std::move(captured_promise_future).get(std::chrono::seconds(5)),
      BrokenPromise);
}

TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
  // The callback object will get destroyed when passed to the executor.

  // A promise will be captured by the callback lambda so we can observe that
  // it will be destroyed.
  Promise<Unit> captured_promise;
  auto captured_promise_future = captured_promise.getFuture();

  DummyDrivableExecutor x;
  Promise<Unit> trigger;
  auto future = trigger.getFuture().via(&x).thenValue(
      [c = std::move(captured_promise)](auto&&) { return 42; });
  trigger.setValue();

  EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
  EXPECT_THROW(
      std::move(captured_promise_future).get(std::chrono::seconds(5)),
      BrokenPromise);
}

TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
  // The callback object will get destroyed when the ManualExecutor runs out
  // of scope.

  // A promise will be captured by the callback lambda so we can observe that
  // it will be destroyed.
  Promise<Unit> captured_promise;
  auto captured_promise_future = captured_promise.getFuture();

  Optional<SemiFuture<int>> future;
  {
    ManualExecutor x;
    future =
        makeFuture()
            .via(&x)
            .thenValue([c = std::move(captured_promise)](auto&&) { return 42; })
            .semi();
    x.clear();
  }

  EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
  EXPECT_THROW(
      std::move(captured_promise_future).get(std::chrono::seconds(5)),
      BrokenPromise);
}

TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
  // The callback object will get destroyed when the ManualExecutor runs out
  // of scope.

  // A promise will be captured by the callback lambda so we can observe that
  // it will be destroyed.
  Promise<Unit> captured_promise;
  auto captured_promise_future = captured_promise.getFuture();

  Optional<SemiFuture<int>> future;
  {
    ManualExecutor x;
    Promise<Unit> trigger;
    future =
        trigger.getFuture()
            .via(&x)
            .thenValue([c = std::move(captured_promise)](auto&&) { return 42; })
            .semi();
    trigger.setValue();
    x.clear();
  }

  EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
  EXPECT_THROW(
      std::move(captured_promise_future).get(std::chrono::seconds(5)),
      BrokenPromise);
}

TEST(ViaFunc, liftsVoid) {
  ManualExecutor x;
  int count = 0;
  Future<Unit> f = via(&x, [&] { count++; });

  EXPECT_EQ(0, count);
  x.run();
  EXPECT_EQ(1, count);
}

TEST(ViaFunc, value) {
  ManualExecutor x;
  EXPECT_EQ(42, via(&x, [] { return 42; }).getVia(&x));
}

TEST(ViaFunc, exception) {
  ManualExecutor x;
  EXPECT_THROW(
      via(&x, []() -> int { throw std::runtime_error("expected"); }).getVia(&x),
      std::runtime_error);
}

TEST(ViaFunc, future) {
  ManualExecutor x;
  EXPECT_EQ(42, via(&x, [] { return makeFuture(42); }).getVia(&x));
}

TEST(ViaFunc, semiFuture) {
  ManualExecutor x;
  EXPECT_EQ(42, via(&x, [] { return makeSemiFuture(42); }).getVia(&x));
}

TEST(ViaFunc, voidFuture) {
  ManualExecutor x;
  int count = 0;
  via(&x, [&] { count++; }).getVia(&x);
  EXPECT_EQ(1, count);
}

TEST(ViaFunc, isSticky) {
  ManualExecutor x;
  int count = 0;

  auto f = via(&x, [&] { count++; });
  x.run();

  std::move(f).thenValue([&](auto&&) { count++; });
  EXPECT_EQ(1, count);
  x.run();
  EXPECT_EQ(2, count);
}

TEST(ViaFunc, moveOnly) {
  ManualExecutor x;
  auto intp = std::make_unique<int>(42);

  EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
}

TEST(ViaFunc, valueKeepAlive) {
  ManualExecutor x;
  EXPECT_EQ(42, via(getKeepAliveToken(&x), [] { return 42; }).getVia(&x));
}

TEST(ViaFunc, thenValueKeepAlive) {
  ManualExecutor x;
  EXPECT_EQ(
      42,
      via(getKeepAliveToken(&x))
          .thenValue([](auto&&) { return 42; })
          .getVia(&x));
}