folly/folly/MPMCQueue.h

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstring>
#include <limits>
#include <type_traits>

#include <folly/Traits.h>
#include <folly/concurrency/CacheLocality.h>
#include <folly/detail/TurnSequencer.h>
#include <folly/portability/Unistd.h>

namespace folly {

namespace detail {

template <typename T, template <typename> class Atom>
struct SingleElementQueue;

template <typename T>
class MPMCPipelineStageImpl;

/// MPMCQueue base CRTP template
template <typename>
class MPMCQueueBase;

} // namespace detail

/// MPMCQueue<T> is a high-performance bounded concurrent queue that
/// supports multiple producers, multiple consumers, and optional blocking.
/// The queue has a fixed capacity, for which all memory will be allocated
/// up front.  The bulk of the work of enqueuing and dequeuing can be
/// performed in parallel.
///
/// MPMCQueue is linearizable.  That means that if a call to write(A)
/// returns before a call to write(B) begins, then A will definitely end up
/// in the queue before B, and if a call to read(X) returns before a call
/// to read(Y) is started, that X will be something from earlier in the
/// queue than Y.  This also means that if a read call returns a value, you
/// can be sure that all previous elements of the queue have been assigned
/// a reader (that reader might not yet have returned, but it exists).
///
/// The underlying implementation uses a ticket dispenser for the head and
/// the tail, spreading accesses across N single-element queues to produce
/// a queue with capacity N.  The ticket dispensers use atomic increment,
/// which is more robust to contention than a CAS loop.  Each of the
/// single-element queues uses its own CAS to serialize access, with an
/// adaptive spin cutoff.  When spinning fails on a single-element queue
/// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
/// even if multiple waiters are present on an individual queue (such as
/// when the MPMCQueue's capacity is smaller than the number of enqueuers
/// or dequeuers).
///
/// In benchmarks (contained in tao/queues/ConcurrentQueueTests)
/// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
/// than any of the alternatives present in fbcode, for both small (~10)
/// and large capacities.  In these benchmarks it is also faster than
/// tbb::concurrent_bounded_queue for all configurations.  When there are
/// many more threads than cores, MPMCQueue is _much_ faster than the tbb
/// queue because it uses futex() to block and unblock waiting threads,
/// rather than spinning with sched_yield.
///
/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine.  Ticket-based
/// queues separate the assignment of queue positions from the actual
/// construction of the in-queue elements, which means that the T
/// constructor used during enqueue must not throw an exception.  This is
/// enforced at compile time using type traits, which requires that T be
/// adorned with accurate noexcept information.  If your type does not
/// use noexcept, you will have to wrap it in something that provides
/// the guarantee.  We provide an alternate safe implementation for types
/// that don't use noexcept but that are marked folly::IsRelocatable
/// and std::is_nothrow_constructible, which is common for folly types.
/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
/// then your type can be put in MPMCQueue.
///
/// If you have a pool of N queue consumers that you want to shut down
/// after the queue has drained, one way is to enqueue N sentinel values
/// to the queue.  If the producer doesn't know how many consumers there
/// are you can enqueue one sentinel and then have each consumer requeue
/// two sentinels after it receives it (by requeuing 2 the shutdown can
/// complete in O(log P) time instead of O(P)).
template <
    typename T,
    template <typename> class Atom = std::atomic,
    bool Dynamic = false>
class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> {};

/// *** The dynamic version of MPMCQueue is deprecated. ***
/// Use UnboundedQueue instead.

/// The dynamic version of MPMCQueue allows dynamic expansion of queue
/// capacity, such that a queue may start with a smaller capacity than
/// specified and expand only if needed. Users may optionally specify
/// the initial capacity and the expansion multiplier.
///
/// The design uses a seqlock to enforce mutual exclusion among
/// expansion attempts. Regular operations read up-to-date queue
/// information (slots array, capacity, stride) inside read-only
/// seqlock sections, which are unimpeded when no expansion is in
/// progress.
///
/// An expansion computes a new capacity, allocates a new slots array,
/// and updates stride. No information needs to be copied from the
/// current slots array to the new one. When this happens, new slots
/// will not have sequence numbers that match ticket numbers. The
/// expansion needs to compute a ticket offset such that operations
/// that use new arrays can adjust the calculations of slot indexes
/// and sequence numbers that take into account that the new slots
/// start with sequence numbers of zero. The current ticket offset is
/// packed with the seqlock in an atomic 64-bit integer. The initial
/// offset is zero.
///
/// Lagging write and read operations with tickets lower than the
/// ticket offset of the current slots array (i.e., the minimum ticket
/// number that can be served by the current array) must use earlier
/// closed arrays instead of the current one. Information about closed
/// slots arrays (array address, capacity, stride, and offset) is
/// maintained in a logarithmic-sized structure. Each entry in that
/// structure never needs to be changed once set. The number of closed
/// arrays is half the value of the seqlock (when unlocked).
///
/// The acquisition of the seqlock to perform an expansion does not
/// prevent the issuing of new push and pop tickets concurrently. The
/// expansion must set the new ticket offset to a value that couldn't
/// have been issued to an operation that has already gone through a
/// seqlock read-only section (and hence obtained information for
/// older closed arrays).
///
/// Note that the total queue capacity can temporarily exceed the
/// specified capacity when there are lagging consumers that haven't
/// yet consumed all the elements in closed arrays. Users should not
/// rely on the capacity of dynamic queues for synchronization, e.g.,
/// they should not expect that a thread will definitely block on a
/// call to blockingWrite() when the queue size is known to be equal
/// to its capacity.
///
/// Note that some writeIfNotFull() and tryWriteUntil() operations may
/// fail even if the size of the queue is less than its maximum
/// capacity and despite the success of expansion, if the operation
/// happens to acquire a ticket that belongs to a closed array. This
/// is a transient condition. Typically, one or two ticket values may
/// be subject to such condition per expansion.
///
/// The dynamic version is a partial specialization of MPMCQueue with
/// Dynamic == true
MPMCQueue<T, Atom, true>;

namespace detail {

/// CRTP specialization of MPMCQueueBase
MPMCQueueBase<Derived<T, Atom, Dynamic>>;

/// SingleElementQueue implements a blocking queue that holds at most one
/// item, and that requires its users to assign incrementing identifiers
/// (turns) to each enqueue and dequeue operation.  Note that the turns
/// used by SingleElementQueue are doubled inside the TurnSequencer
template <typename T, template <typename> class Atom>
struct SingleElementQueue {};

} // namespace detail

} // namespace folly