/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <glog/logging.h>
#include <folly/ExceptionString.h>
#include <folly/ScopeGuard.h>
namespace folly::detail {
template <template <typename> typename Queue>
class SerialExecutorImpl<Queue>::Worker {
public:
explicit Worker(KeepAlive<SerialExecutorImpl> ka) : ka_(std::move(ka)) {}
~Worker() {
if (ka_) {
ka_->drain(); // We own the queue but we did not run.
}
}
Worker(Worker&& other) : ka_(std::exchange(other.ka_, {})) {}
Worker(const Worker&) = delete;
Worker& operator=(const Worker&) = delete;
Worker& operator=(Worker&&) = delete;
void operator()() { std::exchange(ka_, {})->worker(); }
private:
KeepAlive<SerialExecutorImpl> ka_;
};
template <template <typename> typename Queue>
SerialExecutorImpl<Queue>::SerialExecutorImpl(KeepAlive<Executor> parent)
: parent_(std::move(parent)) {}
template <template <typename> typename Queue>
SerialExecutorImpl<Queue>::~SerialExecutorImpl() {
DCHECK(!keepAliveCounter_);
}
template <template <typename> typename Queue>
Executor::KeepAlive<SerialExecutorImpl<Queue>>
SerialExecutorImpl<Queue>::create(KeepAlive<Executor> parent) {
return makeKeepAlive<SerialExecutorImpl<Queue>>(
new SerialExecutorImpl<Queue>(std::move(parent)));
}
template <template <typename> typename Queue>
typename SerialExecutorImpl<Queue>::UniquePtr
SerialExecutorImpl<Queue>::createUnique(std::shared_ptr<Executor> parent) {
auto executor =
new SerialExecutorImpl<Queue>(getKeepAliveToken(parent.get()));
return {executor, Deleter{std::move(parent)}};
}
template <template <typename> typename Queue>
bool SerialExecutorImpl<Queue>::keepAliveAcquire() noexcept {
auto keepAliveCounter =
keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
DCHECK(keepAliveCounter > 0);
return true;
}
template <template <typename> typename Queue>
void SerialExecutorImpl<Queue>::keepAliveRelease() noexcept {
auto keepAliveCounter =
keepAliveCounter_.fetch_sub(1, std::memory_order_acq_rel);
DCHECK(keepAliveCounter > 0);
if (keepAliveCounter == 1) {
delete this;
}
}
template <template <typename> typename Queue>
void SerialExecutorImpl<Queue>::add(Func func) {
if (scheduleTask(std::move(func))) {
parent_->add(Worker{getKeepAliveToken(this)});
}
}
template <template <typename> typename Queue>
void SerialExecutorImpl<Queue>::addWithPriority(Func func, int8_t priority) {
if (scheduleTask(std::move(func))) {
parent_->addWithPriority(Worker{getKeepAliveToken(this)}, priority);
}
}
template <template <typename> typename Queue>
bool SerialExecutorImpl<Queue>::scheduleTask(Func&& func) {
queue_.enqueue(Task{std::move(func), RequestContext::saveContext()});
// If this thread is the first to mark the queue as non-empty, schedule the
// worker.
return scheduled_.fetch_add(1, std::memory_order_acq_rel) == 0;
}
template <template <typename> typename Queue>
void SerialExecutorImpl<Queue>::worker() {
std::size_t queueSize = scheduled_.load(std::memory_order_acquire);
DCHECK_NE(queueSize, 0);
std::size_t processed = 0;
RequestContextSaverScopeGuard ctxGuard;
while (true) {
Task task;
// This dequeue happens under the request context of the previous task, so
// that we can avoid switching context if the next task shares the same
// context. dequeue() is cheap, non-blocking, and doesn't run application
// logic, so it is fine to sneak it in the previous context.
queue_.dequeue(task);
RequestContext::setContext(std::move(task.ctx));
invokeCatchingExns("SerialExecutor: func", std::exchange(task.func, {}));
if (++processed == queueSize) {
// NOTE: scheduled_ must be decremented after the task has been processed,
// or add() may concurrently start another worker.
queueSize = scheduled_.fetch_sub(queueSize, std::memory_order_acq_rel) -
queueSize;
if (queueSize == 0) {
// Queue is now empty
return;
}
processed = 0;
}
}
}
template <template <typename> typename Queue>
void SerialExecutorImpl<Queue>::drain() {
auto queueSize = scheduled_.load(std::memory_order_acquire);
if (queueSize == 0) {
return;
}
RequestContextSaverScopeGuard ctxGuard;
while (queueSize != 0) {
Task task;
queue_.dequeue(task);
RequestContext::setContext(std::move(task.ctx));
task.func = {};
queueSize = scheduled_.fetch_sub(1, std::memory_order_acq_rel) - 1;
}
}
class NoopMutex {
public:
template <class F>
auto lock_combine(F&& f) {
#ifndef NDEBUG
CHECK(!locked_.exchange(true, std::memory_order_acq_rel));
auto&& g = folly::makeGuard(
[this] { locked_.store(false, std::memory_order_release); });
#endif
return f();
}
private:
#ifndef NDEBUG
std::atomic<bool> locked_;
#endif
};
/**
* MPSC queue with the additional requirement that the queue must be non-empty
* on dequeue(), and the enqueue() that makes the queue non-empty must complete
* before the corresponding dequeue(). This is guaranteed in SerialExecutor by
* release/acquire synchronization on scheduled_.
*
* Producers are internally synchronized using a mutex, while the consumer
* relies entirely on external synchronization.
*/
template <class Task, class Mutex>
class SerialExecutorMPSCQueue {
static_assert(std::is_nothrow_move_constructible_v<Task>);
public:
~SerialExecutorMPSCQueue() {
// Queue must be consumed completely at destruction.
CHECK_EQ(head_, tail_);
CHECK_EQ(head_->readIdx.load(), head_->writeIdx.load());
CHECK(head_->next == nullptr);
deleteSegment(head_);
deleteSegment(segmentCache_.load(std::memory_order_acquire));
}
void enqueue(Task&& task) {
mutex_.lock_combine([&] {
// dequeue() will not delete a segment or try to read head_->next until
// the next write has completed, so this is safe.
if (tail_->writeIdx.load() == kSegmentSize) {
auto* segment =
segmentCache_.exchange(nullptr, std::memory_order_acquire);
if (segment == nullptr) {
segment = new Segment;
} else {
std::destroy_at(segment);
new (segment) Segment;
}
tail_->next = segment;
tail_ = segment;
}
auto idx = tail_->writeIdx.load();
new (&tail_->tasks[idx]) Task(std::move(task));
tail_->writeIdx.store(idx + 1);
});
}
void dequeue(Task& task) {
auto idx = head_->readIdx.load();
DCHECK_LE(idx, kSegmentSize);
if (idx == kSegmentSize) {
DCHECK(head_->next != nullptr);
auto* oldSegment = std::exchange(head_, head_->next);
// If there is already a segment in cache, replace it with the latest one,
// as it is more likely to still be warm in cache for the producer.
deleteSegment(
segmentCache_.exchange(oldSegment, std::memory_order_release));
DCHECK_EQ(head_->readIdx.load(), 0);
idx = 0;
}
DCHECK_LT(idx, head_->writeIdx.load());
task = std::move(*reinterpret_cast<Task*>(&head_->tasks[idx]));
std::destroy_at(&head_->tasks[idx]);
head_->readIdx.store(idx + 1);
}
private:
static constexpr size_t kSegmentSize = 16;
struct Segment {
// Neither writeIdx or readIdx need to be atomic since each is exclusively
// owned by respectively producer and consumer, but we need atomicity for
// assertions.
// We avoid any padding to minimize memory usage, but at least we can
// separate write and read index by interposing the payloads.
relaxed_atomic<size_t> writeIdx = 0;
std::aligned_storage_t<sizeof(Task), alignof(Task)> tasks[kSegmentSize];
relaxed_atomic<size_t> readIdx = 0;
Segment* next = nullptr;
};
static_assert(std::is_trivially_destructible_v<Segment>);
void deleteSegment(Segment* segment) {
if (segment == &inlineSegment_) {
return;
}
delete segment;
}
[[FOLLY_ATTR_NO_UNIQUE_ADDRESS]] Mutex mutex_;
Segment* tail_ = &inlineSegment_;
Segment* head_ = tail_;
// Cache the allocation for exactly one segment, so that in the common case
// where the consumer keeps up with the producer no allocations are needed.
std::atomic<Segment*> segmentCache_{nullptr};
// Store the first segment inline. If this is a short-lived SerialExecutor
// which enqueues fewer than kSegmentSize tasks, this will save an allocation.
Segment inlineSegment_;
};
} // namespace folly::detail