folly/folly/fibers/ExecutorLoopController-inl.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once
#include <thread>
#include <folly/ScopeGuard.h>
#include <folly/futures/Future.h>

namespace folly {
namespace fibers {

inline ExecutorLoopController::ExecutorLoopController(folly::Executor* executor)
    : executor_(executor),
      timeoutManager_(executor_),
      timer_(HHWheelTimer::newTimer(&timeoutManager_)) {}

inline ExecutorLoopController::~ExecutorLoopController() {}

inline void ExecutorLoopController::setFiberManager(fibers::FiberManager* fm) {
  fm_ = fm;
}

inline void ExecutorLoopController::schedule() {
  // add() is thread-safe, so this isn't properly optimized for addTask()
  if (!executorKeepAlive_) {
    executorKeepAlive_ = getKeepAliveToken(executor_);
  }
  auto guard = localCallbackControlBlock_->trySchedule();
  if (!guard) {
    return;
  }
  executor_->add([this, guard = std::move(guard)]() {
    if (guard->isCancelled()) {
      return;
    }
    runLoop();
  });
}

inline void ExecutorLoopController::runLoop() {
  auto oldLoopThread = loopThread_.exchange(std::this_thread::get_id());
  DCHECK(oldLoopThread == std::thread::id{});
  SCOPE_EXIT {
    loopThread_ = std::thread::id{};
  };

  if (!executorKeepAlive_) {
    if (!fm_->hasTasks()) {
      return;
    }
    executorKeepAlive_ = getKeepAliveToken(executor_);
  }
  fm_->loopUntilNoReadyImpl();
  if (!fm_->hasTasks()) {
    executorKeepAlive_.reset();
  }
}

inline void ExecutorLoopController::runEagerFiber(Fiber* fiber) {
  DCHECK(loopThread_ == std::this_thread::get_id());
  if (!executorKeepAlive_) {
    executorKeepAlive_ = getKeepAliveToken(executor_);
  }
  fm_->runEagerFiberImpl(fiber);
  if (!fm_->hasTasks()) {
    executorKeepAlive_.reset();
  }
}

inline void ExecutorLoopController::scheduleThreadSafe() {
  executor_->add(
      [this, executorKeepAlive = getKeepAliveToken(executor_)]() mutable {
        if (fm_->shouldRunLoopRemote()) {
          return runLoop();
        }
      });
}

inline HHWheelTimer* ExecutorLoopController::timer() {
  return timer_.get();
}

} // namespace fibers
} // namespace folly