/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/MPMCQueue.h>
namespace folly {
template <class T, class... Stages>
class MPMCPipeline;
template <class T, size_t Amp>
class MPMCPipelineStage {
public:
typedef T value_type;
static constexpr size_t kAmplification = Amp;
};
namespace detail {
/**
* Helper template to determine value type and amplification whether or not
* we use MPMCPipelineStage<>
*/
template <class T>
struct PipelineStageInfo {
static constexpr size_t kAmplification = 1;
typedef T value_type;
};
template <class T, size_t Amp>
struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
static constexpr size_t kAmplification = Amp;
typedef T value_type;
};
/**
* Wrapper around MPMCQueue (friend) that keeps track of tickets.
*/
template <class T>
class MPMCPipelineStageImpl {
public:
typedef T value_type;
template <class U, class... Stages>
friend class MPMCPipeline;
// Implicit so that MPMCPipeline construction works
/* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) {}
MPMCPipelineStageImpl() {}
// only use on first stage, uses queue_.pushTicket_ instead of existing
// ticket
template <class... Args>
void blockingWrite(Args&&... args) noexcept {
queue_.blockingWrite(std::forward<Args>(args)...);
}
template <class... Args>
bool write(Args&&... args) noexcept {
return queue_.write(std::forward<Args>(args)...);
}
template <class... Args>
void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
}
uint64_t blockingRead(T& elem) noexcept {
uint64_t ticket;
queue_.blockingReadWithTicket(ticket, elem);
return ticket;
}
bool read(T& elem) noexcept { // only use on last stage, won't track ticket
return queue_.read(elem);
}
template <class... Args>
bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
return queue_.readAndGetTicket(ticket, elem);
}
// See MPMCQueue<T>::writeCount; only works for the first stage
uint64_t writeCount() const noexcept { return queue_.writeCount(); }
uint64_t readCount() const noexcept { return queue_.readCount(); }
private:
MPMCQueue<T> queue_;
};
// Product of amplifications of a tuple of PipelineStageInfo<X>
template <class Tuple>
struct AmplificationProduct;
template <>
struct AmplificationProduct<std::tuple<>> {
static constexpr size_t value = 1;
};
template <class T, class... Ts>
struct AmplificationProduct<std::tuple<T, Ts...>> {
static constexpr size_t value =
T::kAmplification * AmplificationProduct<std::tuple<Ts...>>::value;
};
} // namespace detail
} // namespace folly