/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <atomic> #include <cassert> #include <cstring> #include <limits> #include <type_traits> #include <folly/Traits.h> #include <folly/concurrency/CacheLocality.h> #include <folly/detail/TurnSequencer.h> #include <folly/portability/Unistd.h> namespace folly { namespace detail { template <typename T, template <typename> class Atom> struct SingleElementQueue; template <typename T> class MPMCPipelineStageImpl; /// MPMCQueue base CRTP template template <typename> class MPMCQueueBase; } // namespace detail /// MPMCQueue<T> is a high-performance bounded concurrent queue that /// supports multiple producers, multiple consumers, and optional blocking. /// The queue has a fixed capacity, for which all memory will be allocated /// up front. The bulk of the work of enqueuing and dequeuing can be /// performed in parallel. /// /// MPMCQueue is linearizable. That means that if a call to write(A) /// returns before a call to write(B) begins, then A will definitely end up /// in the queue before B, and if a call to read(X) returns before a call /// to read(Y) is started, that X will be something from earlier in the /// queue than Y. This also means that if a read call returns a value, you /// can be sure that all previous elements of the queue have been assigned /// a reader (that reader might not yet have returned, but it exists). /// /// The underlying implementation uses a ticket dispenser for the head and /// the tail, spreading accesses across N single-element queues to produce /// a queue with capacity N. The ticket dispensers use atomic increment, /// which is more robust to contention than a CAS loop. Each of the /// single-element queues uses its own CAS to serialize access, with an /// adaptive spin cutoff. When spinning fails on a single-element queue /// it uses futex()'s _BITSET operations to reduce unnecessary wakeups /// even if multiple waiters are present on an individual queue (such as /// when the MPMCQueue's capacity is smaller than the number of enqueuers /// or dequeuers). /// /// In benchmarks (contained in tao/queues/ConcurrentQueueTests) /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better /// than any of the alternatives present in fbcode, for both small (~10) /// and large capacities. In these benchmarks it is also faster than /// tbb::concurrent_bounded_queue for all configurations. When there are /// many more threads than cores, MPMCQueue is _much_ faster than the tbb /// queue because it uses futex() to block and unblock waiting threads, /// rather than spinning with sched_yield. /// /// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based /// queues separate the assignment of queue positions from the actual /// construction of the in-queue elements, which means that the T /// constructor used during enqueue must not throw an exception. This is /// enforced at compile time using type traits, which requires that T be /// adorned with accurate noexcept information. If your type does not /// use noexcept, you will have to wrap it in something that provides /// the guarantee. We provide an alternate safe implementation for types /// that don't use noexcept but that are marked folly::IsRelocatable /// and std::is_nothrow_constructible, which is common for folly types. /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE /// then your type can be put in MPMCQueue. /// /// If you have a pool of N queue consumers that you want to shut down /// after the queue has drained, one way is to enqueue N sentinel values /// to the queue. If the producer doesn't know how many consumers there /// are you can enqueue one sentinel and then have each consumer requeue /// two sentinels after it receives it (by requeuing 2 the shutdown can /// complete in O(log P) time instead of O(P)). template < typename T, template <typename> class Atom = std::atomic, bool Dynamic = false> class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> { … }; /// *** The dynamic version of MPMCQueue is deprecated. *** /// Use UnboundedQueue instead. /// The dynamic version of MPMCQueue allows dynamic expansion of queue /// capacity, such that a queue may start with a smaller capacity than /// specified and expand only if needed. Users may optionally specify /// the initial capacity and the expansion multiplier. /// /// The design uses a seqlock to enforce mutual exclusion among /// expansion attempts. Regular operations read up-to-date queue /// information (slots array, capacity, stride) inside read-only /// seqlock sections, which are unimpeded when no expansion is in /// progress. /// /// An expansion computes a new capacity, allocates a new slots array, /// and updates stride. No information needs to be copied from the /// current slots array to the new one. When this happens, new slots /// will not have sequence numbers that match ticket numbers. The /// expansion needs to compute a ticket offset such that operations /// that use new arrays can adjust the calculations of slot indexes /// and sequence numbers that take into account that the new slots /// start with sequence numbers of zero. The current ticket offset is /// packed with the seqlock in an atomic 64-bit integer. The initial /// offset is zero. /// /// Lagging write and read operations with tickets lower than the /// ticket offset of the current slots array (i.e., the minimum ticket /// number that can be served by the current array) must use earlier /// closed arrays instead of the current one. Information about closed /// slots arrays (array address, capacity, stride, and offset) is /// maintained in a logarithmic-sized structure. Each entry in that /// structure never needs to be changed once set. The number of closed /// arrays is half the value of the seqlock (when unlocked). /// /// The acquisition of the seqlock to perform an expansion does not /// prevent the issuing of new push and pop tickets concurrently. The /// expansion must set the new ticket offset to a value that couldn't /// have been issued to an operation that has already gone through a /// seqlock read-only section (and hence obtained information for /// older closed arrays). /// /// Note that the total queue capacity can temporarily exceed the /// specified capacity when there are lagging consumers that haven't /// yet consumed all the elements in closed arrays. Users should not /// rely on the capacity of dynamic queues for synchronization, e.g., /// they should not expect that a thread will definitely block on a /// call to blockingWrite() when the queue size is known to be equal /// to its capacity. /// /// Note that some writeIfNotFull() and tryWriteUntil() operations may /// fail even if the size of the queue is less than its maximum /// capacity and despite the success of expansion, if the operation /// happens to acquire a ticket that belongs to a closed array. This /// is a transient condition. Typically, one or two ticket values may /// be subject to such condition per expansion. /// /// The dynamic version is a partial specialization of MPMCQueue with /// Dynamic == true MPMCQueue<T, Atom, true>; namespace detail { /// CRTP specialization of MPMCQueueBase MPMCQueueBase<Derived<T, Atom, Dynamic>>; /// SingleElementQueue implements a blocking queue that holds at most one /// item, and that requires its users to assign incrementing identifiers /// (turns) to each enqueue and dequeue operation. Note that the turns /// used by SingleElementQueue are doubled inside the TurnSequencer template <typename T, template <typename> class Atom> struct SingleElementQueue { … }; } // namespace detail } // namespace folly