folly/folly/fibers/test/FibersTest.cpp

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <array>
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>

#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/Random.h>
#include <folly/coro/GtestHelpers.h>
#include <folly/coro/Timeout.h>
#include <folly/coro/WithCancellation.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/fibers/AddTasks.h>
#include <folly/fibers/AtomicBatchDispatcher.h>
#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/BatchSemaphore.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/ExecutorLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
#include <folly/fibers/TimedMutex.h>
#include <folly/fibers/WhenN.h>
#include <folly/futures/Future.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
#include <folly/tracing/AsyncStack.h>

using namespace folly::fibers;

using folly::Try;

TEST(FiberManager, batonTimedWaitTimeout) {
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());
  std::chrono::steady_clock::time_point start;

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        Baton baton;

        start = std::chrono::steady_clock::now();
        constexpr auto kTimeout = std::chrono::milliseconds(230);
        auto res = baton.try_wait_for(kTimeout);
        auto elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::steady_clock::now() - start);

        EXPECT_FALSE(res);
        EXPECT_LE(kTimeout, elapsedMs);

        loopController.stop();
      });
      taskAdded = true;
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, batonTimedWaitPost) {
  bool taskAdded = false;
  size_t iterations = 0;
  Baton* baton_ptr;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        Baton baton;
        baton_ptr = &baton;

        auto res = baton.try_wait_for(std::chrono::milliseconds(130));

        EXPECT_TRUE(res);
        EXPECT_EQ(2, iterations);

        loopController.stop();
      });
      taskAdded = true;
    } else {
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
      iterations++;
      if (iterations == 2) {
        baton_ptr->post();
      }
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, batonTimedWaitTimeoutEvb) {
  size_t tasksComplete = 0;

  folly::EventBase evb;

  FiberManager manager(std::make_unique<EventBaseLoopController>());
  dynamic_cast<EventBaseLoopController&>(manager.loopController())
      .attachEventBase(evb);

  auto task = [&](size_t timeout_ms) {
    Baton baton;

    auto start = EventBaseLoopController::Clock::now();
    auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
    auto finish = EventBaseLoopController::Clock::now();

    EXPECT_FALSE(res);

    auto duration_ms =
        std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);

    EXPECT_GT(duration_ms.count(), timeout_ms - 50);
    EXPECT_LT(duration_ms.count(), timeout_ms + 50);

    if (++tasksComplete == 2) {
      evb.terminateLoopSoon();
    }
  };

  evb.runInEventBaseThread([&]() {
    manager.addTask([&]() { task(500); });
    manager.addTask([&]() { task(250); });
  });

  evb.loopForever();

  EXPECT_EQ(2, tasksComplete);
}

TEST(FiberManager, batonTimedWaitPostEvb) {
  size_t tasksComplete = 0;

  folly::EventBase evb;

  FiberManager manager(std::make_unique<EventBaseLoopController>());
  dynamic_cast<EventBaseLoopController&>(manager.loopController())
      .attachEventBase(evb);

  evb.runInEventBaseThread([&]() {
    manager.addTask([&]() {
      Baton baton;

      evb.tryRunAfterDelay([&]() { baton.post(); }, 100);

      auto start = EventBaseLoopController::Clock::now();
      auto res = baton.try_wait_for(std::chrono::milliseconds(130));
      auto finish = EventBaseLoopController::Clock::now();

      EXPECT_TRUE(res);

      auto duration_ms =
          std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);

      EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);

      if (++tasksComplete == 1) {
        evb.terminateLoopSoon();
      }
    });
  });

  evb.loopForever();

  EXPECT_EQ(1, tasksComplete);
}

TEST(FiberManager, fiberTimeLogged) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  TaskOptions tOpt;
  tOpt.logRunningTime = true;
  manager.addTask(
      [&]() {
        LOG(INFO) << "logging time.";
        EXPECT_LT(-1, manager.getCurrentTaskRunningTime()->count());
      },
      tOpt /*logRunningTime = true*/);
  EXPECT_FALSE(manager.getCurrentTaskRunningTime());
  tOpt.logRunningTime = false;
  manager.addTask(
      [&]() {
        LOG(INFO) << "Not logging time.";
        EXPECT_FALSE(manager.getCurrentTaskRunningTime());
      },
      tOpt /*logRunningTime = false*/);
  manager.loopUntilNoReady();
}

#ifdef __linux__

namespace {
void burnCpu(std::chrono::milliseconds duration) {
  auto expires = folly::fibers::thread_clock::now() + duration;
  while (folly::fibers::thread_clock::now() < expires) {
  }
}

void sleepFor(std::chrono::milliseconds duration) {
  /* sleep override */ std::this_thread::sleep_for(duration);
}
} // namespace

TEST(FiberManager, fiberTimeWhileRunning) {
  using namespace std::chrono;
  FiberManager fm(std::make_unique<SimpleLoopController>());
  TaskOptions tOpt;
  tOpt.logRunningTime = true;
  fm.addTask(
      [&]() {
        burnCpu(200ms);
        sleepFor(1s); // (not included in running time)
        fm.yield();
        burnCpu(100ms);
        auto dur = fm.getCurrentTaskRunningTime();
        ASSERT_TRUE(dur); // ~300ms
        EXPECT_GE(*dur, 250ms);
        EXPECT_LE(*dur, 350ms);
      },
      tOpt);
  fm.addTask(
      [&]() {
        burnCpu(300ms);
        sleepFor(1s); // (not included in running time)
        fm.yield();
        burnCpu(200ms);
        auto dur = fm.getCurrentTaskRunningTime();
        ASSERT_TRUE(dur); // ~500ms
        EXPECT_GE(*dur, 450ms);
        EXPECT_LE(*dur, 550ms);
      },
      tOpt);
  while (fm.hasTasks()) {
    fm.loopUntilNoReady();
  }
}

TEST(FiberManager, fiberTimeWhileAwaiting) {
  using namespace std::chrono;
  FiberManager fm(std::make_unique<SimpleLoopController>());
  TaskOptions tOpt;
  tOpt.logRunningTime = true;
  fm.addTask(
      [&]() {
        burnCpu(200ms);
        sleepFor(1s); // (not included in running time)
        fm.yield();
        burnCpu(100ms);
        runInMainContext([&] {
          burnCpu(200ms); // (not included in running time)
          ASSERT_TRUE(fm.currentFiber());
          auto dur = fm.getCurrentTaskRunningTime();
          ASSERT_TRUE(dur); // ~300ms
          EXPECT_GE(*dur, 250ms);
          EXPECT_LE(*dur, 350ms);
        });
      },
      tOpt);
  while (fm.hasTasks()) {
    fm.loopUntilNoReady();
  }
}

#endif // __linux__

TEST(FiberManager, batonTryWait) {
  FiberManager manager(std::make_unique<SimpleLoopController>());

  // Check if try_wait and post work as expected
  Baton b;

  manager.addTask([&]() {
    while (!b.try_wait()) {
    }
  });
  auto thr = std::thread([&]() {
    std::this_thread::sleep_for(std::chrono::milliseconds(300));
    b.post();
  });

  manager.loopUntilNoReady();
  thr.join();

  Baton c;

  // Check try_wait without post
  manager.addTask([&]() {
    int cnt = 100;
    while (cnt && !c.try_wait()) {
      cnt--;
    }
    EXPECT_TRUE(!c.try_wait()); // must still hold
    EXPECT_EQ(cnt, 0);
  });

  manager.loopUntilNoReady();
}

TEST(FiberManager, batonTryWaitConsistent) {
  folly::EventBase evb;
  struct ExpectedException {};
  auto& fm = getFiberManager(evb);

  // Check if try_wait and post have a consistent behavior on a timed out
  // baton
  Baton b;

  b.try_wait_for(std::chrono::milliseconds(1));
  b.try_wait(); // returns false

  b.post();
  EXPECT_TRUE(b.try_wait());
  b.reset();

  fm.addTask([&]() {
    b.try_wait_for(std::chrono::milliseconds(1));
    b.try_wait(); // returns false

    b.post();
    EXPECT_TRUE(b.try_wait());
  });

  while (fm.hasTasks()) {
    evb.loopOnce();
  }
}

TEST(FiberManager, genericBatonFiberWait) {
  FiberManager manager(std::make_unique<SimpleLoopController>());

  GenericBaton b;
  bool fiberRunning = false;

  manager.addTask([&]() {
    EXPECT_EQ(manager.hasActiveFiber(), true);
    fiberRunning = true;
    b.wait();
    fiberRunning = false;
  });

  EXPECT_FALSE(fiberRunning);
  manager.loopUntilNoReady();
  EXPECT_TRUE(fiberRunning); // ensure fiber still active

  auto thr = std::thread([&]() {
    std::this_thread::sleep_for(std::chrono::milliseconds(300));
    b.post();
  });

  while (fiberRunning) {
    manager.loopUntilNoReady();
  }

  thr.join();
}

TEST(FiberManager, genericBatonThreadWait) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  GenericBaton b;
  std::atomic<bool> threadWaiting(false);

  auto thr = std::thread([&]() {
    threadWaiting = true;
    b.wait();
    threadWaiting = false;
  });

  while (!threadWaiting) {
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(300));

  manager.addTask([&]() {
    EXPECT_EQ(manager.hasActiveFiber(), true);
    EXPECT_TRUE(threadWaiting);
    b.post();
    while (threadWaiting) {
    }
  });

  manager.loopUntilNoReady();
  thr.join();
}

TEST(FiberManager, addTasksNoncopyable) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<std::unique_ptr<int>()>> funcs;
        for (int i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            return std::make_unique<int>(i * 2 + 1);
          });
        }

        auto iter = addTasks(funcs.begin(), funcs.end());

        size_t n = 0;
        while (iter.hasNext()) {
          auto result = iter.awaitNext();
          EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
          EXPECT_GE(2 - n, pendingFibers.size());
          ++n;
        }
        EXPECT_EQ(3, n);
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, awaitThrow) {
  folly::EventBase evb;
  struct ExpectedException {};
  getFiberManager(evb)
      .addTaskFuture([&] {
        EXPECT_THROW(
            await_async([](Promise<int> p) {
              p.setValue(42);
              throw ExpectedException();
            }),
            ExpectedException);

        EXPECT_THROW(
            await_async([&](Promise<int> p) {
              evb.runInEventBaseThread(
                  [p = std::move(p)]() mutable { p.setValue(42); });
              throw ExpectedException();
            }),
            ExpectedException);
      })
      .waitVia(&evb);
}

TEST(FiberManager, addTasksThrow) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            if (i % 2 == 0) {
              throw std::runtime_error("Runtime");
            }
            return i * 2 + 1;
          });
        }

        auto iter = addTasks(funcs.begin(), funcs.end());

        size_t n = 0;
        while (iter.hasNext()) {
          try {
            int result = iter.awaitNext();
            EXPECT_EQ(1, iter.getTaskID() % 2);
            EXPECT_EQ(2 * iter.getTaskID() + 1, result);
          } catch (...) {
            EXPECT_EQ(0, iter.getTaskID() % 2);
          }
          EXPECT_GE(2 - n, pendingFibers.size());
          ++n;
        }
        EXPECT_EQ(3, n);
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, addTasksVoid) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
          });
        }

        auto iter = addTasks(funcs.begin(), funcs.end());

        size_t n = 0;
        while (iter.hasNext()) {
          iter.awaitNext();
          EXPECT_GE(2 - n, pendingFibers.size());
          ++n;
        }
        EXPECT_EQ(3, n);
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, addTasksVoidThrow) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            if (i % 2 == 0) {
              throw std::runtime_error("");
            }
          });
        }

        auto iter = addTasks(funcs.begin(), funcs.end());

        size_t n = 0;
        while (iter.hasNext()) {
          try {
            iter.awaitNext();
            EXPECT_EQ(1, iter.getTaskID() % 2);
          } catch (...) {
            EXPECT_EQ(0, iter.getTaskID() % 2);
          }
          EXPECT_GE(2 - n, pendingFibers.size());
          ++n;
        }
        EXPECT_EQ(3, n);
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, addTasksReserve) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
          });
        }

        auto iter = addTasks(funcs.begin(), funcs.end());

        iter.reserve(2);
        EXPECT_TRUE(iter.hasCompleted());
        EXPECT_TRUE(iter.hasPending());
        EXPECT_TRUE(iter.hasNext());

        iter.awaitNext();
        EXPECT_TRUE(iter.hasCompleted());
        EXPECT_TRUE(iter.hasPending());
        EXPECT_TRUE(iter.hasNext());

        iter.awaitNext();
        EXPECT_FALSE(iter.hasCompleted());
        EXPECT_TRUE(iter.hasPending());
        EXPECT_TRUE(iter.hasNext());

        iter.awaitNext();
        EXPECT_FALSE(iter.hasCompleted());
        EXPECT_FALSE(iter.hasPending());
        EXPECT_FALSE(iter.hasNext());
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, addTaskDynamic) {
  folly::EventBase evb;

  Baton batons[3];

  auto makeTask = [&](size_t taskId) {
    return [&, taskId]() -> size_t {
      batons[taskId].wait();
      return taskId;
    };
  };

  getFiberManager(evb)
      .addTaskFuture([&]() {
        TaskIterator<size_t> iterator;

        iterator.addTask(makeTask(0));
        iterator.addTask(makeTask(1));

        batons[1].post();

        EXPECT_EQ(1, iterator.awaitNext());

        iterator.addTask(makeTask(2));

        batons[2].post();

        EXPECT_EQ(2, iterator.awaitNext());

        batons[0].post();

        EXPECT_EQ(0, iterator.awaitNext());
      })
      .waitVia(&evb);
}

TEST(FiberManager, forEach) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            return i * 2 + 1;
          });
        }

        std::vector<std::pair<size_t, int>> results;
        forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
          results.emplace_back(id, result);
        });
        EXPECT_EQ(3, results.size());
        EXPECT_TRUE(pendingFibers.empty());
        for (size_t i = 0; i < 3; ++i) {
          EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
        }
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectN) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            return i * 2 + 1;
          });
        }

        auto results = collectN(funcs.begin(), funcs.end(), 2);
        EXPECT_EQ(2, results.size());
        EXPECT_EQ(1, pendingFibers.size());
        for (size_t i = 0; i < 2; ++i) {
          EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
        }
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectNThrow) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() -> size_t {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            throw std::runtime_error("Runtime");
          });
        }

        try {
          collectN(funcs.begin(), funcs.end(), 2);
        } catch (...) {
          EXPECT_EQ(1, pendingFibers.size());
        }
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectNVoid) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
          });
        }

        auto results = collectN(funcs.begin(), funcs.end(), 2);
        EXPECT_EQ(2, results.size());
        EXPECT_EQ(1, pendingFibers.size());
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectNVoidThrow) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            throw std::runtime_error("Runtime");
          });
        }

        try {
          collectN(funcs.begin(), funcs.end(), 2);
        } catch (...) {
          EXPECT_EQ(1, pendingFibers.size());
        }
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectAll) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            return i * 2 + 1;
          });
        }

        auto results = collectAll(funcs.begin(), funcs.end());
        EXPECT_TRUE(pendingFibers.empty());
        for (size_t i = 0; i < 3; ++i) {
          EXPECT_EQ(i * 2 + 1, results[i]);
        }
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectAllVoid) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<void()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([&pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
          });
        }

        collectAll(funcs.begin(), funcs.end());
        EXPECT_TRUE(pendingFibers.empty());
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

TEST(FiberManager, collectAny) {
  std::vector<Promise<int>> pendingFibers;
  bool taskAdded = false;

  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  auto loopFunc = [&]() {
    if (!taskAdded) {
      manager.addTask([&]() {
        std::vector<std::function<int()>> funcs;
        for (size_t i = 0; i < 3; ++i) {
          funcs.push_back([i, &pendingFibers]() {
            await_async([&pendingFibers](Promise<int> promise) {
              pendingFibers.push_back(std::move(promise));
            });
            if (i == 1) {
              throw std::runtime_error("This exception will be ignored");
            }
            return i * 2 + 1;
          });
        }

        auto result = collectAny(funcs.begin(), funcs.end());
        EXPECT_EQ(2, pendingFibers.size());
        EXPECT_EQ(2, result.first);
        EXPECT_EQ(2 * 2 + 1, result.second);
      });
      taskAdded = true;
    } else if (pendingFibers.size()) {
      pendingFibers.back().setValue(0);
      pendingFibers.pop_back();
    } else {
      loopController.stop();
    }
  };

  loopController.loop(std::move(loopFunc));
}

namespace {
/* Checks that this function was run from a main context,
   by comparing an address on a stack to a known main stack address
   and a known related fiber stack address.  The assumption
   is that fiber stack and main stack will be far enough apart,
   while any two values on the same stack will be close. */
void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
  int here;
  /* 2 pages is a good guess */
  constexpr auto const kHereToFiberMaxDist = 0x2000 / sizeof(int);

  // With ASAN's detect_stack_use_after_return=1, this must be much larger
  // I measured 410028 on x86_64, so allow for quadruple that, just in case.
  constexpr auto const kHereToMainMaxDist =
      folly::kIsSanitizeAddress ? 4 * 410028 : kHereToFiberMaxDist;

  if (fiberLocation) {
    EXPECT_GT(std::abs(&here - fiberLocation), kHereToFiberMaxDist);
  }
  if (mainLocation) {
    EXPECT_LT(std::abs(&here - mainLocation), kHereToMainMaxDist);
  }

  EXPECT_FALSE(ran);
  ran = true;
}
} // namespace

TEST(FiberManager, runInMainContext) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  bool checkRan = false;

  int mainLocation;
  manager.runInMainContext(
      [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
  EXPECT_TRUE(checkRan);

  checkRan = false;

  struct A {
    explicit A(int value_) : value(value_) {}
    A(const A&) = delete;
    A(A&&) = default;

    int value;
  };

  manager.addTask([&]() {
    int stackLocation;
    auto ret = runInMainContext([&]() {
      expectMainContext(checkRan, &mainLocation, &stackLocation);
      return A(42);
    });
    EXPECT_TRUE(checkRan);
    EXPECT_EQ(42, ret.value);
  });

  loopController.loop([&]() { loopController.stop(); });

  EXPECT_TRUE(checkRan);
}

namespace {

FOLLY_NOINLINE int runHugeStackInMainContext(bool& checkRan) {
  auto ret = runInMainContext([&checkRan]() {
    std::array<unsigned char, 1 << 20> buf;
    buf.fill(42);
    checkRan = true;
    return buf[time(nullptr) % buf.size()];
  });
  EXPECT_TRUE(checkRan);
  EXPECT_EQ(42, ret);
  return ret;
}

} // namespace

TEST(FiberManager, runInMainContextNoInline) {
  FiberManager::Options opts;
  opts.recordStackEvery = 1;
  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  bool checkRan = false;
  manager.addTask([&] { return runHugeStackInMainContext(checkRan); });

  EXPECT_TRUE(manager.stackHighWatermark() < 100000);

  loopController.loop([&]() { loopController.stop(); });

  EXPECT_TRUE(checkRan);
}

TEST(FiberManager, addTaskFinally) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  bool checkRan = false;

  int mainLocation;

  manager.addTaskFinally(
      [&]() { return 1234; },
      [&](Try<int>&& result) {
        EXPECT_EQ(result.value(), 1234);

        expectMainContext(checkRan, &mainLocation, nullptr);
      });

  EXPECT_FALSE(checkRan);

  loopController.loop([&]() { loopController.stop(); });

  EXPECT_TRUE(checkRan);
}

TEST(FiberManager, fibersPoolWithinLimit) {
  FiberManager::Options opts;
  opts.maxFibersPoolSize = 5;

  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  size_t fibersRun = 0;

  for (size_t i = 0; i < 5; ++i) {
    manager.addTask([&]() { ++fibersRun; });
  }
  loopController.loop([&]() { loopController.stop(); });

  EXPECT_EQ(5, fibersRun);
  EXPECT_EQ(5, manager.fibersAllocated());
  EXPECT_EQ(5, manager.fibersPoolSize());

  for (size_t i = 0; i < 5; ++i) {
    manager.addTask([&]() { ++fibersRun; });
  }
  loopController.loop([&]() { loopController.stop(); });

  EXPECT_EQ(10, fibersRun);
  EXPECT_EQ(5, manager.fibersAllocated());
  EXPECT_EQ(5, manager.fibersPoolSize());
}

TEST(FiberManager, fibersPoolOverLimit) {
  FiberManager::Options opts;
  opts.maxFibersPoolSize = 5;

  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  size_t fibersRun = 0;

  for (size_t i = 0; i < 10; ++i) {
    manager.addTask([&]() { ++fibersRun; });
  }

  EXPECT_EQ(0, fibersRun);
  EXPECT_EQ(10, manager.fibersAllocated());
  EXPECT_EQ(0, manager.fibersPoolSize());

  loopController.loop([&]() { loopController.stop(); });

  EXPECT_EQ(10, fibersRun);
  EXPECT_EQ(5, manager.fibersAllocated());
  EXPECT_EQ(5, manager.fibersPoolSize());
}

TEST(FiberManager, remoteFiberBasic) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  int result[2];
  result[0] = result[1] = 0;
  folly::Optional<Promise<int>> savedPromise[2];
  manager.addTask([&]() {
    result[0] = await_async(
        [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
  });
  manager.addTask([&]() {
    result[1] = await_async(
        [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
  });

  manager.loopUntilNoReady();

  EXPECT_TRUE(savedPromise[0].has_value());
  EXPECT_TRUE(savedPromise[1].has_value());
  EXPECT_EQ(0, result[0]);
  EXPECT_EQ(0, result[1]);

  std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
  std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
  remoteThread0.join();
  remoteThread1.join();
  EXPECT_EQ(0, result[0]);
  EXPECT_EQ(0, result[1]);
  /* Should only have scheduled once */
  EXPECT_EQ(1, loopController.remoteScheduleCalled());

  manager.loopUntilNoReady();
  EXPECT_EQ(42, result[0]);
  EXPECT_EQ(43, result[1]);
}

TEST(FiberManager, addTaskRemoteBasic) {
  FiberManager manager(std::make_unique<SimpleLoopController>());

  int result[2];
  result[0] = result[1] = 0;
  folly::Optional<Promise<int>> savedPromise[2];

  std::thread remoteThread0{[&]() {
    manager.addTaskRemote([&]() {
      result[0] = await_async(
          [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
    });
  }};
  std::thread remoteThread1{[&]() {
    manager.addTaskRemote([&]() {
      result[1] = await_async(
          [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
    });
  }};
  remoteThread0.join();
  remoteThread1.join();

  manager.loopUntilNoReady();

  EXPECT_TRUE(savedPromise[0].has_value());
  EXPECT_TRUE(savedPromise[1].has_value());
  EXPECT_EQ(0, result[0]);
  EXPECT_EQ(0, result[1]);

  savedPromise[0]->setValue(42);
  savedPromise[1]->setValue(43);

  EXPECT_EQ(0, result[0]);
  EXPECT_EQ(0, result[1]);

  manager.loopUntilNoReady();
  EXPECT_EQ(42, result[0]);
  EXPECT_EQ(43, result[1]);
}

TEST(FiberManager, remoteHasTasks) {
  size_t counter = 0;
  FiberManager fm(std::make_unique<SimpleLoopController>());
  std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });

  remote.join();

  while (fm.hasTasks()) {
    fm.loopUntilNoReady();
  }

  EXPECT_FALSE(fm.hasTasks());
  EXPECT_EQ(counter, 1);
}

TEST(FiberManager, remoteHasReadyTasks) {
  int result = 0;
  folly::Optional<Promise<int>> savedPromise;
  FiberManager fm(std::make_unique<SimpleLoopController>());
  std::thread remote([&]() {
    fm.addTaskRemote([&]() {
      result = await_async(
          [&](Promise<int> promise) { savedPromise = std::move(promise); });
      EXPECT_TRUE(fm.hasTasks());
    });
  });

  remote.join();
  EXPECT_TRUE(fm.hasTasks());

  fm.loopUntilNoReady();
  EXPECT_TRUE(fm.hasTasks());

  std::thread remote2([&]() { savedPromise->setValue(47); });
  remote2.join();
  EXPECT_TRUE(fm.hasTasks());

  fm.loopUntilNoReady();
  EXPECT_FALSE(fm.hasTasks());

  EXPECT_EQ(result, 47);
}

template <typename Data>
void testFiberLocal() {
  FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());

  fm.addTask([]() {
    EXPECT_EQ(42, local<Data>().value);

    local<Data>().value = 43;

    addTask([]() {
      EXPECT_EQ(43, local<Data>().value);

      local<Data>().value = 44;

      addTask([]() { EXPECT_EQ(44, local<Data>().value); });
    });
  });

  fm.addTask([&]() {
    EXPECT_EQ(42, local<Data>().value);

    local<Data>().value = 43;

    fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
  });

  fm.addTask([]() {
    EXPECT_EQ(42, local<Data>().value);
    local<Data>().value = 43;

    auto task = []() {
      EXPECT_EQ(43, local<Data>().value);
      local<Data>().value = 44;
    };
    std::vector<std::function<void()>> tasks{task};
    collectAny(tasks.begin(), tasks.end());

    EXPECT_EQ(43, local<Data>().value);
  });

  fm.loopUntilNoReady();
  EXPECT_FALSE(fm.hasTasks());
}

TEST(FiberManager, fiberLocal) {
  struct SimpleData {
    int value{42};
  };

  testFiberLocal<SimpleData>();
}

TEST(FiberManager, fiberLocalHeap) {
  struct LargeData {
    char _[1024 * 1024];
    int value{42};
  };

  testFiberLocal<LargeData>();
}

TEST(FiberManager, fiberLocalDestructor) {
  struct CrazyData {
    size_t data{42};

    ~CrazyData() {
      if (data == 41) {
        addTask([]() {
          EXPECT_EQ(42, local<CrazyData>().data);
          // Make sure we don't have infinite loop
          local<CrazyData>().data = 0;
        });
      }
    }
  };

  FiberManager fm(
      LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());

  fm.addTask([]() { local<CrazyData>().data = 41; });

  fm.loopUntilNoReady();
  EXPECT_FALSE(fm.hasTasks());
}

TEST(FiberManager, yieldTest) {
  FiberManager manager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(manager.loopController());

  bool checkRan = false;

  manager.addTask([&]() {
    manager.yield();
    checkRan = true;
  });

  loopController.loop([&]() {
    if (checkRan) {
      loopController.stop();
    }
  });

  EXPECT_TRUE(checkRan);
}

TEST(FiberManager, RequestContext) {
  FiberManager fm(std::make_unique<SimpleLoopController>());

  bool checkRun1 = false;
  bool checkRun2 = false;
  bool checkRun3 = false;
  bool checkRun4 = false;
  folly::fibers::Baton baton1;
  folly::fibers::Baton baton2;
  folly::fibers::Baton baton3;
  folly::fibers::Baton baton4;

  {
    folly::RequestContextScopeGuard rctx;
    auto rcontext1 = folly::RequestContext::get();
    fm.addTask([&, rcontext1]() {
      EXPECT_EQ(rcontext1, folly::RequestContext::get());
      baton1.wait(
          [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
      EXPECT_EQ(rcontext1, folly::RequestContext::get());
      runInMainContext(
          [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
      checkRun1 = true;
    });
  }
  {
    folly::RequestContextScopeGuard rctx;
    auto rcontext2 = folly::RequestContext::get();
    fm.addTaskRemote([&, rcontext2]() {
      EXPECT_EQ(rcontext2, folly::RequestContext::get());
      baton2.wait();
      EXPECT_EQ(rcontext2, folly::RequestContext::get());
      checkRun2 = true;
    });
  }
  {
    folly::RequestContextScopeGuard rctx;
    auto rcontext3 = folly::RequestContext::get();
    fm.addTaskFinally(
        [&, rcontext3]() {
          EXPECT_EQ(rcontext3, folly::RequestContext::get());
          baton3.wait();
          EXPECT_EQ(rcontext3, folly::RequestContext::get());

          return folly::Unit();
        },
        [&, rcontext3](Try<folly::Unit>&& /* t */) {
          EXPECT_EQ(rcontext3, folly::RequestContext::get());
          checkRun3 = true;
        });
  }
  {
    folly::RequestContextScopeGuard rctx;
    fm.addTask([&]() {
      folly::RequestContextScopeGuard rctx2;
      auto rcontext4 = folly::RequestContext::get();
      baton4.wait();
      EXPECT_EQ(rcontext4, folly::RequestContext::get());
      checkRun4 = true;
    });
  }
  {
    folly::RequestContextScopeGuard rctx;
    auto rcontext = folly::RequestContext::get();

    fm.loopUntilNoReady();
    EXPECT_EQ(rcontext, folly::RequestContext::get());

    baton1.post();
    EXPECT_EQ(rcontext, folly::RequestContext::get());
    fm.loopUntilNoReady();
    EXPECT_TRUE(checkRun1);
    EXPECT_EQ(rcontext, folly::RequestContext::get());

    baton2.post();
    EXPECT_EQ(rcontext, folly::RequestContext::get());
    fm.loopUntilNoReady();
    EXPECT_TRUE(checkRun2);
    EXPECT_EQ(rcontext, folly::RequestContext::get());

    baton3.post();
    EXPECT_EQ(rcontext, folly::RequestContext::get());
    fm.loopUntilNoReady();
    EXPECT_TRUE(checkRun3);
    EXPECT_EQ(rcontext, folly::RequestContext::get());

    baton4.post();
    EXPECT_EQ(rcontext, folly::RequestContext::get());
    fm.loopUntilNoReady();
    EXPECT_TRUE(checkRun4);
    EXPECT_EQ(rcontext, folly::RequestContext::get());
  }
}

TEST(FiberManager, resizePeriodically) {
  FiberManager::Options opts;
  opts.fibersPoolResizePeriodMs = 300;
  opts.maxFibersPoolSize = 5;

  FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);

  folly::EventBase evb;
  dynamic_cast<EventBaseLoopController&>(manager.loopController())
      .attachEventBase(evb);

  std::vector<Baton> batons(10);

  size_t tasksRun = 0;
  for (size_t i = 0; i < 30; ++i) {
    manager.addTask([i, &batons, &tasksRun]() {
      ++tasksRun;
      // Keep some fibers active indefinitely
      if (i < batons.size()) {
        batons[i].wait();
      }
    });
  }

  EXPECT_EQ(0, tasksRun);
  EXPECT_EQ(30, manager.fibersAllocated());
  EXPECT_EQ(0, manager.fibersPoolSize());

  evb.loopOnce();
  EXPECT_EQ(30, tasksRun);
  EXPECT_EQ(30, manager.fibersAllocated());
  // Can go over maxFibersPoolSize, 10 of 30 fibers still active
  EXPECT_EQ(20, manager.fibersPoolSize());

  std::this_thread::sleep_for(std::chrono::milliseconds(400));
  evb.loopOnce(); // no fibers active in this period
  EXPECT_EQ(30, manager.fibersAllocated());
  EXPECT_EQ(20, manager.fibersPoolSize());

  std::this_thread::sleep_for(std::chrono::milliseconds(400));
  evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
  EXPECT_EQ(15, manager.fibersAllocated());
  EXPECT_EQ(5, manager.fibersPoolSize());

  for (size_t i = 0; i < batons.size(); ++i) {
    batons[i].post();
  }
  evb.loopOnce();
  EXPECT_EQ(15, manager.fibersAllocated());
  EXPECT_EQ(15, manager.fibersPoolSize());

  std::this_thread::sleep_for(std::chrono::milliseconds(400));
  evb.loopOnce(); // 10 fibers active in last period
  EXPECT_EQ(10, manager.fibersAllocated());
  EXPECT_EQ(10, manager.fibersPoolSize());

  std::this_thread::sleep_for(std::chrono::milliseconds(400));
  evb.loopOnce();
  EXPECT_EQ(5, manager.fibersAllocated());
  EXPECT_EQ(5, manager.fibersPoolSize());

  // Sleep again before destruction to force the case where the
  // resize timer fires during destruction of the EventBase.
  std::this_thread::sleep_for(std::chrono::milliseconds(400));
}

TEST(FiberManager, batonWaitTimeoutHandler) {
  FiberManager manager(std::make_unique<EventBaseLoopController>());

  folly::EventBase evb;
  dynamic_cast<EventBaseLoopController&>(manager.loopController())
      .attachEventBase(evb);

  size_t fibersRun = 0;
  Baton baton;
  Baton::TimeoutHandler timeoutHandler;

  manager.addTask([&]() {
    baton.wait(timeoutHandler);
    ++fibersRun;
  });
  manager.loopUntilNoReady();

  EXPECT_FALSE(baton.try_wait());
  EXPECT_EQ(0, fibersRun);

  timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  EXPECT_FALSE(baton.try_wait());
  EXPECT_EQ(0, fibersRun);

  evb.loopOnce();
  manager.loopUntilNoReady();

  EXPECT_EQ(1, fibersRun);
}

TEST(FiberManager, batonWaitTimeoutHandlerExecutor) {
  Baton baton2;
  folly::CPUThreadPoolExecutor executor(1);
  FiberManager manager(std::make_unique<ExecutorLoopController>(&executor));
  auto task = [&](size_t timeout_ms) {
    Baton baton;
    auto start = std::chrono::steady_clock::now();
    auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
    auto finish = std::chrono::steady_clock::now();
    EXPECT_FALSE(res);
    auto duration_ms =
        std::chrono::duration_cast<std::chrono::milliseconds>(finish - start)
            .count();

    EXPECT_GT(duration_ms, timeout_ms - 10);
    EXPECT_LT(duration_ms, timeout_ms + 100);
    baton2.post();
  };
  manager.addTask([&]() { task(300); });
  baton2.wait();
  executor.join();
}

TEST(FiberManager, batonWaitTimeoutMany) {
  FiberManager manager(std::make_unique<EventBaseLoopController>());

  folly::EventBase evb;
  dynamic_cast<EventBaseLoopController&>(manager.loopController())
      .attachEventBase(evb);

  // TODO(T71050527): It appears that TSAN does not yet maintain the shadow
  // stack correctly upon fiber switches, resulting in a shadow stack overflow
  // if we push too many tasks here. Cap it in the meantime.
  constexpr size_t kNumTimeoutTasks = folly::kIsSanitizeThread ? 1000 : 10000;
  size_t tasksCount = kNumTimeoutTasks;

  // We add many tasks to hit timeout queue deallocation logic.
  for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
    manager.addTask([&]() {
      Baton baton;
      Baton::TimeoutHandler timeoutHandler;

      folly::fibers::addTask([&] {
        timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
      });

      baton.wait(timeoutHandler);
      if (--tasksCount == 0) {
        evb.terminateLoopSoon();
      }
    });
  }

  evb.loopForever();
}

TEST(FiberManager, remoteFutureTest) {
  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(fiberManager.loopController());

  int testValue1 = 5;
  int testValue2 = 7;
  auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
  auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
  loopController.loop([&]() { loopController.stop(); });
  auto v1 = std::move(f1).get();
  auto v2 = std::move(f2).get();

  EXPECT_EQ(v1, testValue1);
  EXPECT_EQ(v2, testValue2);
}

// Test that a void function produes a Future<Unit>.
TEST(FiberManager, remoteFutureVoidUnitTest) {
  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
  auto& loopController =
      dynamic_cast<SimpleLoopController&>(fiberManager.loopController());

  bool ranLocal = false;
  folly::Future<folly::Unit> futureLocal =
      fiberManager.addTaskFuture([&]() { ranLocal = true; });

  bool ranRemote = false;
  folly::Future<folly::Unit> futureRemote =
      fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });

  loopController.loop([&]() { loopController.stop(); });

  futureLocal.wait();
  ASSERT_TRUE(ranLocal);

  futureRemote.wait();
  ASSERT_TRUE(ranRemote);
}

TEST(FiberManager, nestedFiberManagers) {
  folly::EventBase outerEvb;
  folly::EventBase innerEvb;

  getFiberManager(outerEvb).addTask([&]() {
    EXPECT_EQ(
        &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());

    runInMainContext([&]() {
      getFiberManager(innerEvb).addTask([&]() {
        EXPECT_EQ(
            &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());

        innerEvb.terminateLoopSoon();
      });

      innerEvb.loopForever();
    });

    EXPECT_EQ(
        &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());

    outerEvb.terminateLoopSoon();
  });

  outerEvb.loopForever();
}

TEST(FiberManager, nestedFiberManagersSameEvb) {
  folly::EventBase evb;
  auto& fm1 = getFiberManager(evb);
  EXPECT_EQ(&fm1, &getFiberManager(evb));

  // Always return the same fm by default
  FiberManager::Options unused;
  unused.stackSize = 1024;
  EXPECT_EQ(&fm1, &getFiberManager(evb, unused));

  // Use frozen options
  FiberManager::Options used;
  used.stackSize = 1024;
  FiberManager::FrozenOptions options{used};
  auto& fm2 = getFiberManager(evb, options);
  EXPECT_NE(&fm1, &fm2);

  // Same option
  EXPECT_EQ(&fm2, &getFiberManager(evb, options));
  EXPECT_EQ(&fm2, &getFiberManager(evb, FiberManager::FrozenOptions{used}));
  FiberManager::Options same;
  same.stackSize = 1024;
  EXPECT_EQ(&fm2, &getFiberManager(evb, FiberManager::FrozenOptions{same}));

  // Different option
  FiberManager::Options differ;
  differ.stackSize = 2048;
  auto& fm3 = getFiberManager(evb, FiberManager::FrozenOptions{differ});
  EXPECT_NE(&fm1, &fm3);
  EXPECT_NE(&fm2, &fm3);

  // Nested usage
  getFiberManager(evb)
      .addTaskFuture([&] {
        EXPECT_EQ(&fm1, FiberManager::getFiberManagerUnsafe());

        getFiberManager(evb, options)
            .addTaskFuture(
                [&] { EXPECT_EQ(&fm2, FiberManager::getFiberManagerUnsafe()); })
            .wait();
      })
      .waitVia(&evb);
}

TEST(FiberManager, virtualEvbFiberManager) {
  folly::EventBase evb;
  auto& vevb = evb.getVirtualEventBase();
  // Eventbase vs VirtualEventBase used for multiplexing
  auto& fm1 = getFiberManager(evb);
  auto& fm2 = getFiberManager(vevb);
  EXPECT_NE(&fm1, &fm2);

  FiberManager::Options opt;
  opt.stackSize = 1024;

  // Option is not used in multiplexing
  auto& fm3 = getFiberManager(evb, opt);
  auto& fm4 = getFiberManager(vevb, opt);
  EXPECT_EQ(&fm1, &fm3);
  EXPECT_EQ(&fm2, &fm4);

  // Frozen option used in multiplexing
  FiberManager::FrozenOptions fopt(opt);
  auto& fm5 = getFiberManager(evb, fopt);
  auto& fm6 = getFiberManager(vevb, fopt);
  EXPECT_NE(&fm1, &fm5);
  EXPECT_NE(&fm2, &fm6);
}

TEST(FiberManager, semaphore) {
  static constexpr size_t kTasks = 10;
  static constexpr size_t kIterations = 10000;
  static constexpr size_t kNumTokens = 10;
  static constexpr size_t kNumThreads = 16;

  Semaphore sem(kNumTokens);
  EXPECT_EQ(sem.getCapacity(), kNumTokens);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);

  sem.wait();
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 1);
  sem.wait();
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 2);

  sem.signal();
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 1);
  sem.signal();
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);

  struct Worker {
    explicit Worker(Semaphore& s) : sem(s), t([&] { run(); }) {}

    void run() {
      FiberManager manager(std::make_unique<EventBaseLoopController>());
      folly::EventBase evb;
      dynamic_cast<EventBaseLoopController&>(manager.loopController())
          .attachEventBase(evb);

      {
        std::shared_ptr<folly::EventBase> completionCounter(
            &evb, [](folly::EventBase* evb_) { evb_->terminateLoopSoon(); });

        for (size_t i = 0; i < kTasks; ++i) {
          manager.addTask([&, completionCounter]() {
            for (size_t j = 0; j < kIterations; ++j) {
              switch (j % 4) {
                case 0:
                  sem.wait();
                  break;
                case 1:
                  sem.future_wait().get();
                  break;
                case 2: {
                  Semaphore::Waiter waiter;
                  bool acquired = sem.try_wait(waiter);
                  if (!acquired) {
                    waiter.baton.wait();
                  }
                  break;
                }
                case 3:
                  if (!sem.try_wait()) {
                    sem.wait();
                  }
                  break;
              }
              ++counter;
              sem.signal();
              --counter;

              EXPECT_LT(counter, kNumTokens);
              EXPECT_GE(counter, 0);
            }
          });
        }
      }
      evb.loopForever();
    }

    Semaphore& sem;
    int counter{0};
    std::thread t;
  };

  std::vector<Worker> workers;
  workers.reserve(kNumThreads);
  for (size_t i = 0; i < kNumThreads; ++i) {
    workers.emplace_back(sem);
  }

  for (auto& worker : workers) {
    worker.t.join();
  }

  for (auto& worker : workers) {
    EXPECT_EQ(0, worker.counter);
  }

  EXPECT_EQ(sem.getCapacity(), kNumTokens);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);
}

#if FOLLY_HAS_COROUTINES

CO_TEST(FiberManager, SemaphoreCoTryWaitForCanCancelOrTimeout) {
  using namespace std::chrono_literals;

  {
    // Pre-cancel
    Semaphore sem{0};
    folly::CancellationSource cancelSource;
    cancelSource.requestCancellation();
    EXPECT_THROW(
        co_await folly::coro::co_withCancellation(
            cancelSource.getToken(), sem.co_try_wait_for(1h)),
        folly::OperationCancelled);
  }
  {
    // Cancel
    Semaphore sem{0};
    folly::CancellationSource cancelSource;
    auto t = std::thread([&]() { cancelSource.requestCancellation(); });
    EXPECT_THROW(
        co_await folly::coro::co_withCancellation(
            cancelSource.getToken(), sem.co_try_wait_for(1h)),
        folly::OperationCancelled);
    t.join();
  }
  {
    // Timeout
    Semaphore sem{0};
    folly::CancellationSource cancelSource;
    EXPECT_THROW(
        co_await folly::coro::co_withCancellation(
            cancelSource.getToken(), sem.co_try_wait_for(1ms)),
        folly::OperationCancelled);
  }
}

CO_TEST(FiberManager, StressTestSemaphoreCoTryWaitFor) {
  static constexpr size_t kIterations = 10000;

  Semaphore readyToRead{0};
  Semaphore sem{0};

  auto t = std::thread([&]() {
    for (size_t i = 0; i < kIterations; ++i) {
      readyToRead.wait();
      sem.signal();
    }
  });

  auto waitForSignal = [&](size_t i) -> folly::coro::Task<void> {
    std::chrono::milliseconds timeout{0};
    auto increaseTimeout = [&]() {
      if (timeout.count() == 0) {
        timeout = std::chrono::milliseconds{1};
      } else if (timeout < std::chrono::milliseconds{10000}) {
        timeout *= 2;
      }
    };
    readyToRead.signal();
    while (true) {
      try {
        co_await sem.co_try_wait_for(timeout);
        co_return;
      } catch (folly::OperationCancelled&) {
        increaseTimeout();
      }
    }
  };

  for (size_t i = 0; i < kIterations; ++i) {
    co_await waitForSignal(i);
  }

  t.join();
}

#endif

TEST(FiberManager, batchSemaphore) {
  static constexpr size_t kTasks = 10;
  static constexpr size_t kIterations = 10000;
  static constexpr size_t kNumTokens = 60;
  static constexpr size_t kNumThreads = 16;

  BatchSemaphore sem(kNumTokens);
  EXPECT_EQ(sem.getCapacity(), kNumTokens);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);

  sem.wait(20);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 20);
  sem.wait(30);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 20 - 30);

  sem.signal(30);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens - 20);
  sem.signal(20);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);

  struct Worker {
    explicit Worker(BatchSemaphore& s) : sem(s), t([&] { run(); }) {}

    void run() {
      FiberManager manager(std::make_unique<EventBaseLoopController>());
      folly::EventBase evb;
      dynamic_cast<EventBaseLoopController&>(manager.loopController())
          .attachEventBase(evb);

      {
        std::shared_ptr<folly::EventBase> completionCounter(
            &evb, [](folly::EventBase* evb_) { evb_->terminateLoopSoon(); });

        for (size_t i = 0; i < kTasks; ++i) {
          manager.addTask([&, completionCounter]() {
            for (size_t j = 0; j < kIterations; ++j) {
              int tokens = j % 5 + 1;
              switch (j % 5) {
                case 0:
                  sem.wait(tokens);
                  break;
                case 1:
                  sem.future_wait(tokens).get();
                  break;
                case 2: {
                  BatchSemaphore::Waiter waiter{tokens};
                  bool acquired = sem.try_wait(waiter, tokens);
                  if (!acquired) {
                    waiter.baton.wait();
                  }
                  break;
                }
                case 3:
                  folly::coro::blockingWait(sem.co_wait(tokens));
                  break;
                case 4: {
                  if (!sem.try_wait(tokens)) {
                    sem.wait(tokens);
                  }
                  break;
                }
              }
              counter += tokens;
              sem.signal(tokens);
              counter -= tokens;

              EXPECT_LT(counter, kNumTokens);
              EXPECT_GE(counter, 0);
            }
          });
        }
      }
      evb.loopForever();
    }

    BatchSemaphore& sem;
    int counter{0};
    std::thread t;
  };

  std::vector<Worker> workers;
  workers.reserve(kNumThreads);
  for (size_t i = 0; i < kNumThreads; ++i) {
    workers.emplace_back(sem);
  }

  for (auto& worker : workers) {
    worker.t.join();
  }

  for (auto& worker : workers) {
    EXPECT_EQ(0, worker.counter);
  }

  EXPECT_EQ(sem.getCapacity(), kNumTokens);
  EXPECT_EQ(sem.getAvailableTokens(), kNumTokens);
}

/**
 * Verify that BatchSemaphore signals all waiters or fail by timeout.
 * Overall idea is to linearize waiters in the semaphore's list,
 * requesting incremental number of token. For example, [1, 2, 3, 4, 5] for a
 * total semaphore capacity of 5 tokens. When releasing all 5 tokens at once an
 * expected behavior is:
 *  - Return 5 tokens: notify [1, 2] and block [3, 4, 5] - 2 tokens left
 *  - Return 1 token: notify [3] and block [4, 5] - 0 tokens left
 *  - Return 2 tokens: and block [4, 5] - 2 tokens left
 *  - Return 3 tokens: notify [4] and block [5] - 1 token left
 *  - Return 4 tokens: notify [5] - 0 tokens left
 *  - Return 5 tokens: done - 5 tokens left
 */
TEST(FiberManager, batchSemaphoreSignalAll) {
  static constexpr size_t kNumWaiters = 5;

  BatchSemaphore sem(kNumWaiters);
  sem.wait(kNumWaiters);

  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  for (size_t task = 0; task < kNumWaiters; task++) {
    fm.addTask([&, tokens = int64_t(task) + 1] {
      // Wait for semaphore and fail if not notified
      BatchSemaphore::Waiter waiter{tokens};
      bool acquired = sem.try_wait(waiter, tokens);
      if (!acquired &&
          !waiter.baton.try_wait_for(std::chrono::milliseconds(1000))) {
        FAIL() << "BatchSemaphore::Waiter has never been notified";
      }

      // Rotate to the next task
      Baton b;
      b.try_wait_for(std::chrono::milliseconds(1));

      sem.signal(tokens);
    });
  }

  fm.addTask([&] {
    // Release all tokens and notify waiters
    sem.signal(kNumWaiters);
  });

  evb.loop();
  EXPECT_FALSE(fm.hasTasks());
}

template <typename ExecutorT>
void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
      executor, [batchSize](std::vector<int>&& batch) {
        EXPECT_EQ(batchSize, batch.size());
        std::vector<std::string> results;
        for (auto& it : batch) {
          results.push_back(folly::to<std::string>(it));
        }
        return results;
      });

  auto indexCopy = index;
  auto result = batchDispatcher.add(std::move(indexCopy));
  EXPECT_EQ(folly::to<std::string>(index), std::move(result).get());
}

TEST(FiberManager, batchDispatchTest) {
  folly::EventBase evb;
  auto& executor = getFiberManager(evb);

  // Launch multiple fibers with a single id.
  executor.add([&]() {
    int batchSize = 10;
    for (int i = 0; i < batchSize; i++) {
      executor.add(
          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
    }
  });
  evb.loop();

  // Reuse the same BatchDispatcher to batch once again.
  executor.add([&]() {
    int batchSize = 10;
    for (int i = 0; i < batchSize; i++) {
      executor.add(
          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
    }
  });
  evb.loop();
}

template <typename ExecutorT>
folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
    ExecutorT& executor, int totalNumberOfElements, std::vector<int> input) {
  thread_local BatchDispatcher<
      std::vector<int>,
      std::vector<std::string>,
      ExecutorT>
      batchDispatcher(
          executor,
          [totalNumberOfElements](std::vector<std::vector<int>>&& batch) {
            std::vector<std::vector<std::string>> results;
            int numberOfElements = 0;
            for (auto& unit : batch) {
              numberOfElements += unit.size();
              std::vector<std::string> result;
              for (auto& element : unit) {
                result.push_back(folly::to<std::string>(element));
              }
              results.push_back(std::move(result));
            }
            EXPECT_EQ(totalNumberOfElements, numberOfElements);
            return results;
          });

  return batchDispatcher.add(std::move(input));
}

/**
 * Batch values in groups of 5, and then call inner dispatch.
 */
template <typename ExecutorT>
void doubleBatchOuterDispatch(
    ExecutorT& executor, int totalNumberOfElements, int index) {
  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
      executor, [=, &executor](std::vector<int>&& batch) {
        EXPECT_EQ(totalNumberOfElements, batch.size());
        std::vector<std::string> results;
        std::vector<folly::Future<std::vector<std::string>>>
            innerDispatchResultFutures;

        std::vector<int> group;
        for (auto unit : batch) {
          group.push_back(unit);
          if (group.size() == 5) {
            auto localGroup = group;
            group.clear();

            innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
                executor, totalNumberOfElements, localGroup));
          }
        }

        folly::collectAll(
            innerDispatchResultFutures.begin(),
            innerDispatchResultFutures.end())
            .deferValue([&](std::vector<Try<std::vector<std::string>>>
                                innerDispatchResults) {
              for (auto& unit : innerDispatchResults) {
                for (auto& element : unit.value()) {
                  results.push_back(element);
                }
              }
            })
            .get();
        return results;
      });

  auto indexCopy = index;
  auto result = batchDispatcher.add(std::move(indexCopy));
  EXPECT_EQ(folly::to<std::string>(index), std::move(result).get());
}

TEST(FiberManager, doubleBatchDispatchTest) {
  folly::EventBase evb;
  auto& executor = getFiberManager(evb);

  // Launch multiple fibers with a single id.
  executor.add([&]() {
    int totalNumberOfElements = 20;
    for (int i = 0; i < totalNumberOfElements; i++) {
      executor.add([=, &executor]() {
        doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
      });
    }
  });
  evb.loop();
}

template <typename ExecutorT>
void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
  thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
      executor, [](std::vector<int>&&) -> std::vector<int> {
        throw std::runtime_error("Surprise!!");
      });

  EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
}

TEST(FiberManager, batchDispatchExceptionHandlingTest) {
  folly::EventBase evb;
  auto& executor = getFiberManager(evb);

  // Launch multiple fibers with a single id.
  executor.add([&]() {
    int totalNumberOfElements = 5;
    for (int i = 0; i < totalNumberOfElements; i++) {
      executor.add(
          [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
    }
  });
  evb.loop();
}

namespace AtomicBatchDispatcherTesting {

using ValueT = size_t;
using ResultT = std::string;
using DispatchFunctionT =
    folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;

#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
#if ENABLE_TRACE_IN_TEST
#define OUTPUT_TRACE std::cerr
#else // ENABLE_TRACE_IN_TEST
struct DevNullPiper {
  template <typename T>
  DevNullPiper& operator<<(const T&) {
    return *this;
  }

  DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) { return *this; }
} devNullPiper;
#define OUTPUT_TRACE devNullPiper
#endif // ENABLE_TRACE_IN_TEST

struct Job {
  AtomicBatchDispatcher<ValueT, ResultT>::Token token;
  ValueT input;

  void preprocess(FiberManager& executor, bool die) {
    // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
    clock_t msecToDoIO = folly::Random::rand32() % 10;
    double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
    double endAfter = start + msecToDoIO;
    while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
      executor.yield();
    }
    if (die) {
      throw std::logic_error("Simulating preprocessing failure");
    }
  }

  Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
      : token(std::move(t)), input(i) {}

  Job(Job&&) = default;
  Job& operator=(Job&&) = default;
};

ResultT processSingleInput(ValueT&& input) {
  return folly::to<ResultT>(std::move(input));
}

std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
  size_t expectedCount = inputs.size();
  std::vector<ResultT> results;
  results.reserve(expectedCount);
  for (size_t i = 0; i < expectedCount; ++i) {
    results.emplace_back(processSingleInput(std::move(inputs[i])));
  }
  return results;
}

void createJobs(
    AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
    std::vector<Job>& jobs,
    size_t count) {
  jobs.clear();
  for (size_t i = 0; i < count; ++i) {
    jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
  }
}

enum class DispatchProblem {
  None,
  PreprocessThrows,
  DuplicateDispatch,
};

void dispatchJobs(
    FiberManager& executor,
    std::vector<Job>& jobs,
    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
    DispatchProblem dispatchProblem = DispatchProblem::None,
    size_t problemIndex = size_t(-1)) {
  EXPECT_TRUE(
      dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
  results.clear();
  results.resize(jobs.size());
  for (size_t i = 0; i < jobs.size(); ++i) {
    executor.add(
        [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
          try {
            Job job(std::move(jobs[i]));

            if (dispatchProblem == DispatchProblem::PreprocessThrows) {
              if (i == problemIndex) {
                EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
                return;
              }
            }

            job.preprocess(executor, false);
            OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
            results[i] = job.token.dispatch(job.input);
            OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;

            if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
              if (i == problemIndex) {
                EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
              }
            }
          } catch (...) {
            OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
          }
        });
  }
}

void validateResult(
    std::vector<folly::Optional<folly::Future<ResultT>>>& results, size_t i) {
  try {
    OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
                 << std::endl;
  } catch (std::exception& e) {
    OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
    throw;
  }
}

template <typename TException>
void validateResults(
    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
    size_t expectedNumResults) {
  size_t numResultsFilled = 0;
  for (size_t i = 0; i < results.size(); ++i) {
    if (!results[i]) {
      continue;
    }
    ++numResultsFilled;
    EXPECT_THROW(validateResult(results, i), TException);
  }
  EXPECT_EQ(numResultsFilled, expectedNumResults);
}

void validateResults(
    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
    size_t expectedNumResults) {
  size_t numResultsFilled = 0;
  for (size_t i = 0; i < results.size(); ++i) {
    if (!results[i]) {
      continue;
    }
    ++numResultsFilled;
    EXPECT_NO_THROW(validateResult(results, i));
    ValueT expectedInput = i;
    EXPECT_EQ(
        results[i]->value(), processSingleInput(std::move(expectedInput)));
  }
  EXPECT_EQ(numResultsFilled, expectedNumResults);
}

} // namespace AtomicBatchDispatcherTesting

#define SET_UP_TEST_FUNC                                        \
  using namespace AtomicBatchDispatcherTesting;                 \
  folly::EventBase evb;                                         \
  auto& executor = getFiberManager(evb);                        \
  const size_t COUNT = 11;                                      \
  std::vector<Job> jobs;                                        \
  jobs.reserve(COUNT);                                          \
  std::vector<folly::Optional<folly::Future<ResultT>>> results; \
  results.reserve(COUNT);                                       \
  DispatchFunctionT dispatchFunc

TEST(FiberManager, ABDTest) {
  SET_UP_TEST_FUNC;

  //
  // Testing AtomicBatchDispatcher with explicit call to commit()
  //
  dispatchFunc = userDispatchFunc;
  auto atomicBatchDispatcher =
      createAtomicBatchDispatcher(std::move(dispatchFunc));
  createJobs(atomicBatchDispatcher, jobs, COUNT);
  dispatchJobs(executor, jobs, results);
  atomicBatchDispatcher.commit();
  evb.loop();
  validateResults(results, COUNT);
}

TEST(FiberManager, ABDDispatcherdestroyedbeforecallingcommit) {
  SET_UP_TEST_FUNC;

  //
  // Testing AtomicBatchDispatcher destroyed before calling commit.
  // Handles error cases for:
  // - User might have forgotten to add the call to commit() in the code
  // - An unexpected exception got thrown in user code before commit() is called
  //
  try {
    dispatchFunc = userDispatchFunc;
    auto atomicBatchDispatcher =
        createAtomicBatchDispatcher(std::move(dispatchFunc));
    createJobs(atomicBatchDispatcher, jobs, COUNT);
    dispatchJobs(executor, jobs, results);
    throw std::runtime_error(
        "Unexpected exception in user code before commit called");
    // atomicBatchDispatcher.commit();
  } catch (...) {
    /* User code handles the exception and does not exit process */
  }
  evb.loop();
  validateResults<ABDCommitNotCalledException>(results, COUNT);
}

TEST(FiberManager, ABDPreprocessingfailuretest) {
  SET_UP_TEST_FUNC;

  //
  // Testing preprocessing failure on a job throws
  //
  dispatchFunc = userDispatchFunc;
  auto atomicBatchDispatcher =
      createAtomicBatchDispatcher(std::move(dispatchFunc));
  createJobs(atomicBatchDispatcher, jobs, COUNT);
  dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
  atomicBatchDispatcher.commit();
  evb.loop();
  validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
}

TEST(FiberManager, ABDMultipledispatchonsametokenerrortest) {
  SET_UP_TEST_FUNC;

  //
  // Testing that calling dispatch more than once on the same token throws
  //
  dispatchFunc = userDispatchFunc;
  auto atomicBatchDispatcher =
      createAtomicBatchDispatcher(std::move(dispatchFunc));
  createJobs(atomicBatchDispatcher, jobs, COUNT);
  dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
  atomicBatchDispatcher.commit();
  evb.loop();
}

TEST(FiberManager, ABDGettokencalledaftercommittest) {
  SET_UP_TEST_FUNC;

  //
  // Testing that exception set on attempt to call getToken after commit called
  //
  dispatchFunc = userDispatchFunc;
  auto atomicBatchDispatcher =
      createAtomicBatchDispatcher(std::move(dispatchFunc));
  createJobs(atomicBatchDispatcher, jobs, COUNT);
  atomicBatchDispatcher.commit();
  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
  dispatchJobs(executor, jobs, results);
  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
  evb.loop();
  validateResults(results, COUNT);
  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
}

TEST(FiberManager, ABDUserprovidedbatchdispatchthrowstest) {
  SET_UP_TEST_FUNC;

  //
  // Testing that exception is set if user provided batch dispatch throws
  //
  dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
    (void)userDispatchFunc(std::move(inputs));
    throw std::runtime_error("Unexpected exception in user dispatch function");
  };
  auto atomicBatchDispatcher =
      createAtomicBatchDispatcher(std::move(dispatchFunc));
  createJobs(atomicBatchDispatcher, jobs, COUNT);
  dispatchJobs(executor, jobs, results);
  atomicBatchDispatcher.commit();
  evb.loop();
  validateResults<std::runtime_error>(results, COUNT);
}

TEST(FiberManager, VirtualEventBase) {
  bool done1{false};
  bool done2{false};
  {
    folly::ScopedEventBaseThread thread;

    auto evb1 =
        std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
    auto& evb2 = thread.getEventBase()->getVirtualEventBase();

    getFiberManager(*evb1).addTaskRemote([&] {
      Baton baton;
      baton.try_wait_for(std::chrono::milliseconds{100});

      done1 = true;
    });

    getFiberManager(evb2).addTaskRemote([&] {
      Baton baton;
      baton.try_wait_for(std::chrono::milliseconds{200});

      done2 = true;
    });

    EXPECT_FALSE(done1);
    EXPECT_FALSE(done2);

    evb1.reset();
    EXPECT_TRUE(done1);
    EXPECT_FALSE(done2);
  }
  EXPECT_TRUE(done2);
}

TEST(TimedMutex, ThreadsAndFibersDontDeadlock) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  TimedMutex mutex;
  std::thread testThread([&] {
    for (int i = 0; i < 100; i++) {
      mutex.lock();
      mutex.unlock();
      {
        Baton b;
        b.try_wait_for(std::chrono::milliseconds(1));
      }
    }
  });

  for (int numFibers = 0; numFibers < 100; numFibers++) {
    fm.addTask([&] {
      for (int i = 0; i < 20; i++) {
        mutex.lock();
        {
          Baton b;
          b.try_wait_for(std::chrono::milliseconds(1));
        }
        mutex.unlock();
        {
          Baton b;
          b.try_wait_for(std::chrono::milliseconds(1));
        }
      }
    });
  }

  evb.loop();
  EXPECT_EQ(0, fm.hasTasks());
  testThread.join();
}

TEST(TimedMutex, ThreadFiberDeadlockOrder) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  TimedMutex mutex;

  mutex.lock();
  std::thread unlockThread([&] {
    /* sleep override */ std::this_thread::sleep_for(
        std::chrono::milliseconds{100});
    mutex.unlock();
  });

  fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
  fm.addTask([&] {
    runInMainContext([&] {
      auto locked = mutex.try_lock_for(std::chrono::seconds{1});
      EXPECT_TRUE(locked);
      if (locked) {
        mutex.unlock();
      }
    });
  });

  evb.loopOnce();
  EXPECT_EQ(0, fm.hasTasks());

  unlockThread.join();
}

TEST(TimedMutex, ThreadFiberDeadlockRace) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  TimedMutex mutex;

  mutex.lock();

  fm.addTask([&] {
    auto locked = mutex.try_lock_for(std::chrono::seconds{1});
    EXPECT_TRUE(locked);
    if (locked) {
      mutex.unlock();
    }
  });
  fm.addTask([&] {
    mutex.unlock();
    runInMainContext([&] {
      auto locked = mutex.try_lock_for(std::chrono::seconds{1});
      EXPECT_TRUE(locked);
      if (locked) {
        mutex.unlock();
      }
    });
  });

  evb.loopOnce();
  EXPECT_EQ(0, fm.hasTasks());
}

namespace {
// Checks whether stackHighWatermark is set for non-ASAN builds,
// and not set for ASAN builds.
#ifndef FOLLY_SANITIZE_ADDRESS
void expectStackHighWatermark(size_t minStackSize, size_t stackHighWatermark) {
  // Check that we properly accounted fiber stack usage
  EXPECT_NE(0, stackHighWatermark);
  EXPECT_LT(minStackSize, stackHighWatermark);
}
#else
void expectStackHighWatermark(size_t, size_t stackHighWatermark) {
  // For ASAN, stackHighWatermark is not tracked.
  EXPECT_EQ(0, stackHighWatermark);
}
#endif
} // namespace

/**
 * Test that we can properly track fiber stack usage, via recordStackEvery
 * For ASAN builds, it is not recorded.
 */

TEST(FiberManager, highWaterMarkViaRecordStackEvery) {
  auto f = [] {
    folly::fibers::FiberManager::Options opts;
    opts.recordStackEvery = 1;

    FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
    auto& loopController =
        dynamic_cast<SimpleLoopController&>(fm.loopController());

    static constexpr size_t n = 1000;
    int s = 0;
    fm.addTask([&]() {
      int b[n] = {0};
      for (size_t i = 0; i < n; ++i) {
        b[i] = i;
      }
      for (size_t i = 0; i + 1 < n; ++i) {
        s += b[i] * b[i + 1];
      }
    });

    (void)s;

    loopController.loop([&]() { loopController.stop(); });
    expectStackHighWatermark(n * sizeof(int), fm.stackHighWatermark());
  };
  std::thread(f).join();
}

/**
 * Test that we can properly track fiber stack usage,
 * via current position estimate. For ASAN builds, it is not recorded.
 */
TEST(FiberManager, highWaterMarkViaRecordCurrentPosition) {
  auto f = [] {
    FiberManager fm(std::make_unique<SimpleLoopController>());
    auto& loopController =
        dynamic_cast<SimpleLoopController&>(fm.loopController());

    static constexpr size_t n = 1000;
    int s = 0;
    fm.addTask([&]() {
      int b[n] = {0};
      for (size_t i = 0; i < n; ++i) {
        b[i] = i;
      }
      for (size_t i = 0; i + 1 < n; ++i) {
        s += b[i] * b[i + 1];
      }
      // Calls preempt, which calls recordStackPosition.
      fm.runInMainContext([]() {});
    });

    (void)s;

    loopController.loop([&]() { loopController.stop(); });
    expectStackHighWatermark(n * sizeof(int), fm.stackHighWatermark());
  };
  std::thread(f).join();
}

TEST(FiberManager, addTaskEager) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);

  bool eagerTaskStarted{false};
  bool eagerTaskDone{false};
  bool firstTaskDone{false};

  fm.addTask([&] { firstTaskDone = true; });

  fm.addTaskEager([&] {
    EXPECT_FALSE(firstTaskDone);
    eagerTaskStarted = true;
    fm.yield();
    EXPECT_TRUE(firstTaskDone);
    eagerTaskDone = true;
  });

  EXPECT_TRUE(eagerTaskStarted);

  evb.loop();

  EXPECT_TRUE(eagerTaskDone);
  EXPECT_TRUE(firstTaskDone);
}

TEST(FiberManager, addTaskEagerFuture) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);

  bool eagerTaskStarted{false};
  bool eagerTaskDone{false};

  EXPECT_TRUE(fm.addTaskEagerFuture([&] {}).isReady());

  auto f = fm.addTaskEagerFuture([&] {
    eagerTaskStarted = true;
    fm.yield();
    eagerTaskDone = true;
  });

  EXPECT_TRUE(eagerTaskStarted);

  evb.loop();

  EXPECT_TRUE(f.isReady());

  EXPECT_TRUE(eagerTaskDone);
}

TEST(FiberManager, addTaskEagerNested) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);

  bool eagerTaskStarted{false};
  bool eagerTaskDone{false};
  bool firstTaskDone{false};
  bool secondTaskDone{false};

  fm.addTask([&] {
    fm.addTaskEager([&] {
      EXPECT_FALSE(firstTaskDone);
      EXPECT_FALSE(secondTaskDone);
      fm.runInMainContext([&] { eagerTaskStarted = true; });
      fm.yield();
      EXPECT_TRUE(firstTaskDone);
      EXPECT_TRUE(secondTaskDone);
      eagerTaskDone = true;
    });
    EXPECT_TRUE(eagerTaskStarted);
    firstTaskDone = true;
  });

  fm.addTask([&] {
    EXPECT_TRUE(firstTaskDone);
    secondTaskDone = true;
  });

  evb.loop();

  EXPECT_TRUE(eagerTaskDone);
  EXPECT_TRUE(secondTaskDone);
}

TEST(FiberManager, addTaskEagerNestedFiberManager) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  FiberManager::Options opts;
  opts.stackSize *= 2;
  auto& fm2 = getFiberManager(evb, FiberManager::FrozenOptions(opts));

  bool eagerTaskDone{false};

  fm.addTask([&] {
    fm2.addTaskEager([&] {
      EXPECT_FALSE(fm.hasActiveFiber());
      eagerTaskDone = true;
    });
  });

  evb.loop();

  EXPECT_TRUE(eagerTaskDone);
}

TEST(FiberManager, swapWithException) {
  folly::EventBase evb;
  FiberManager::Options opts;
  // ASSERT_DEATH takes a lot of stack space
  opts.stackSize = 65536;
  auto& fm = getFiberManager(evb, FiberManager::FrozenOptions{opts});
  bool done = false;

  fm.addTask([&] {
    try {
      throw std::logic_error("test");
    } catch (const std::exception&) {
      // Ok to call runInMainContext in exception unwinding
      runInMainContext([&] { done = true; });
    }
  });

  evb.loop();
  EXPECT_TRUE(done);

  fm.addTask([&] {
    try {
      throw std::logic_error("test");
    } catch (const std::exception&) {
      Baton b;
      // Can't block during exception unwinding
      ASSERT_DEATH(b.try_wait_for(std::chrono::milliseconds(1)), "");
    }
  });
  evb.loop();
}

TEST(FiberManager, loopInCatch) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  bool started = false;
  folly::fibers::Baton baton;
  bool done = false;

  fm.addTask([&] {
    started = true;
    baton.wait();
    done = true;
  });

  try {
    throw std::logic_error("expected");
  } catch (...) {
    EXPECT_FALSE(started);
    evb.drive();
    EXPECT_TRUE(started);
    EXPECT_FALSE(done);
    baton.post();
    evb.drive();
    EXPECT_TRUE(done);
  }
}

TEST(FiberManager, loopInUnwind) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  bool started = false;
  folly::fibers::Baton baton;
  bool done = false;

  fm.addTask([&] {
    started = true;
    baton.wait();
    done = true;
  });

  try {
    SCOPE_EXIT {
      EXPECT_FALSE(started);
      evb.drive();
      EXPECT_TRUE(started);
      EXPECT_FALSE(done);
      baton.post();
      evb.drive();
      EXPECT_TRUE(done);
    };
    throw std::logic_error("expected");
  } catch (...) {
  }
}

TEST(FiberManager, addTaskRemoteFutureTry) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);

  EXPECT_EQ(
      42,
      fm.addTaskRemoteFuture(
            [&]() -> folly::Try<int> { return folly::Try<int>(42); })
          .getVia(&evb)
          .value());
}

TEST(FiberManager, addTaskEagerKeepAlive) {
  auto f = [&] {
    folly::EventBase evb;
    return getFiberManager(evb).addTaskEagerFuture([&] {
      folly::futures::sleep(std::chrono::milliseconds{100}).get();
      return 42;
    });
  }();

  EXPECT_TRUE(f.isReady());
  EXPECT_EQ(42, std::move(f).get());
}

TEST(FiberManager, fibersPreserveAsyncStackRoots) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);

  {
    folly::detail::ScopedAsyncStackRoot root;

    auto f = [&] {
      // Should be launched with a no active AsyncStackRoot
      EXPECT_TRUE(folly::tryGetCurrentAsyncStackRoot() == nullptr);

      folly::detail::ScopedAsyncStackRoot scopedRoot1;

      auto* root1 = folly::tryGetCurrentAsyncStackRoot();
      EXPECT_TRUE(root1 != nullptr);

      fm.yield();

      EXPECT_EQ(root1, folly::tryGetCurrentAsyncStackRoot());

      {
        folly::detail::ScopedAsyncStackRoot scopedRoot2;

        auto* root2 = folly::tryGetCurrentAsyncStackRoot();

        folly::AsyncStackFrame frame1;

        folly::AsyncStackFrame frame2;
        frame2.setParentFrame(frame1);

        scopedRoot2.activateFrame(frame2);

        fm.yield();

        EXPECT_EQ(root2, folly::tryGetCurrentAsyncStackRoot());

        folly::deactivateAsyncStackFrame(frame2);
      }
    };

    auto* originalRoot = folly::tryGetCurrentAsyncStackRoot();

    auto task1 = fm.addTaskFuture(f);
    auto task2 = fm.addTaskFuture(f);

    EXPECT_EQ(originalRoot, folly::tryGetCurrentAsyncStackRoot());

    std::move(task1).getVia(&evb);

    EXPECT_EQ(originalRoot, folly::tryGetCurrentAsyncStackRoot());

    std::move(task2).getVia(&evb);

    EXPECT_EQ(originalRoot, folly::tryGetCurrentAsyncStackRoot());
  }
}

TEST(FiberManager, EventBaseMigratingThreads) {
  folly::EventBase evb;
  auto& fm = getFiberManager(evb);
  folly::fibers::Baton baton;
  auto f = fm.addTaskFuture([&baton] { baton.wait(); });
  evb.drive();

  std::thread anotherThread([&] { evb.drive(); });

  std::this_thread::sleep_for(std::chrono::milliseconds{100});
  baton.post();
  anotherThread.join();

  EXPECT_TRUE(f.isReady());
}