/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Memory.h>
namespace folly {
namespace fibers {
inline EventBaseLoopController::EventBaseLoopController() : callback_(*this) {}
inline EventBaseLoopController::~EventBaseLoopController() {
callback_.cancelLoopCallback();
eventBaseKeepAlive_.reset();
}
inline void EventBaseLoopController::attachEventBase(EventBase& eventBase) {
attachEventBase(eventBase.getVirtualEventBase());
}
inline void EventBaseLoopController::attachEventBase(
VirtualEventBase& eventBase) {
if (eventBaseAttached_.exchange(true)) {
LOG(ERROR) << "Attempt to reattach EventBase to LoopController";
return;
}
eventBase_ = &eventBase;
CancellationSource source;
eventBaseShutdownToken_ = source.getToken();
eventBase_->runOnDestruction(
[source = std::move(source)] { source.requestCancellation(); });
if (awaitingScheduling_) {
schedule();
}
}
inline void EventBaseLoopController::setFiberManager(FiberManager* fm) {
fm_ = fm;
}
inline void EventBaseLoopController::schedule() {
if (eventBase_ == nullptr) {
// In this case we need to postpone scheduling.
awaitingScheduling_ = true;
} else {
// Schedule it to run in current iteration.
if (!eventBaseKeepAlive_) {
eventBaseKeepAlive_ = getKeepAliveToken(eventBase_);
}
eventBase_->getEventBase().runInLoop(&callback_, true);
awaitingScheduling_ = false;
}
}
inline void EventBaseLoopController::runLoop() {
if (!eventBaseKeepAlive_) {
// runLoop can be called twice if both schedule() and scheduleThreadSafe()
// were called.
if (!fm_->hasTasks()) {
return;
}
eventBaseKeepAlive_ = getKeepAliveToken(eventBase_);
}
if (loopRunner_) {
if (fm_->hasReadyTasks()) {
loopRunner_->run([&] { fm_->loopUntilNoReadyImpl(); });
}
} else {
fm_->loopUntilNoReadyImpl();
}
if (!fm_->hasTasks()) {
eventBaseKeepAlive_.reset();
}
}
inline void EventBaseLoopController::runEagerFiber(Fiber* fiber) {
if (!eventBaseKeepAlive_) {
eventBaseKeepAlive_ = getKeepAliveToken(eventBase_);
}
if (loopRunner_) {
loopRunner_->run([&] { fm_->runEagerFiberImpl(fiber); });
} else {
fm_->runEagerFiberImpl(fiber);
}
if (!fm_->hasTasks()) {
eventBaseKeepAlive_.reset();
}
}
inline void EventBaseLoopController::scheduleThreadSafe() {
/* The only way we could end up here is if
1) Fiber thread creates a fiber that awaits (which means we must
have already attached, fiber thread wouldn't be running).
2) We move the promise to another thread (this move is a memory fence)
3) We fulfill the promise from the other thread. */
assert(eventBaseAttached_);
eventBase_->runInEventBaseThread(
[this, eventBaseKeepAlive = getKeepAliveToken(eventBase_)]() {
if (fm_->shouldRunLoopRemote()) {
return runLoop();
}
if (!fm_->hasTasks()) {
eventBaseKeepAlive_.reset();
}
});
}
inline HHWheelTimer* EventBaseLoopController::timer() {
assert(eventBaseAttached_);
if (FOLLY_UNLIKELY(eventBaseShutdownToken_.isCancellationRequested())) {
return nullptr;
}
return &eventBase_->timer();
}
} // namespace fibers
} // namespace folly