cordyceps/
mpsc_queue.rs

1//! A multi-producer, single-consumer (MPSC) queue, implemented using a
2//! lock-free [intrusive] singly-linked list.
3//!
4//! See the documentation for the [`MpscQueue`] type for details.
5//!
6//! Based on [Dmitry Vyukov's intrusive MPSC][vyukov].
7//!
8//! [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
9//! [intrusive]: crate#intrusive-data-structures
10use crate::{
11    loom::{
12        cell::UnsafeCell,
13        sync::atomic::{AtomicBool, AtomicPtr, Ordering::*},
14    },
15    util::{Backoff, CachePadded},
16    Linked,
17};
18use core::{
19    fmt,
20    marker::PhantomPinned,
21    ptr::{self, NonNull},
22};
23
24macro_rules! feature {
25    (
26        #![$meta:meta]
27        $($item:item)*
28    ) => {
29        $(
30            #[cfg($meta)]
31            $item
32        )*
33    }
34}
35
36/// A multi-producer, single-consumer (MPSC) queue, implemented using a
37/// lock-free [intrusive] singly-linked list.
38///
39/// Based on [Dmitry Vyukov's intrusive MPSC][vyukov].
40///
41/// In order to be part of a `MpscQueue`, a type `T` must implement [`Linked`] for
42/// [`mpsc_queue::Links<T>`].
43///
44/// [`mpsc_queue::Links<T>`]: crate::mpsc_queue::Links
45///
46/// # Examples
47///
48/// ```
49/// use cordyceps::{
50///     Linked,
51///     mpsc_queue::{self, MpscQueue},
52/// };
53///
54/// // This example uses the Rust standard library for convenience, but
55/// // the MPSC queue itself does not require std.
56/// use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
57///
58/// /// A simple queue entry that stores an `i32`.
59/// #[derive(Debug, Default)]
60/// struct Entry {
61///    links: mpsc_queue::Links<Entry>,
62///    val: i32,
63/// }
64///
65/// // Implement the `Linked` trait for our entry type so that it can be used
66/// // as a queue entry.
67/// unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
68///     // In this example, our entries will be "owned" by a `Box`, but any
69///     // heap-allocated type that owns an element may be used.
70///     //
71///     // An element *must not* move while part of an intrusive data
72///     // structure. In many cases, `Pin` may be used to enforce this.
73///     type Handle = Pin<Box<Self>>;
74///
75///     /// Convert an owned `Handle` into a raw pointer
76///     fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
77///        unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
78///     }
79///
80///     /// Convert a raw pointer back into an owned `Handle`.
81///     unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
82///         // Safety: if this function is only called by the linked list
83///         // implementation (and it is not intended for external use), we can
84///         // expect that the `NonNull` was constructed from a reference which
85///         // was pinned.
86///         //
87///         // If other callers besides `MpscQueue`'s internals were to call this on
88///         // some random `NonNull<Entry>`, this would not be the case, and
89///         // this could be constructing an erroneous `Pin` from a referent
90///         // that may not be pinned!
91///         Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
92///     }
93///
94///     /// Access an element's `Links`.
95///     unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
96///         // Using `ptr::addr_of_mut!` permits us to avoid creating a temporary
97///         // reference without using layout-dependent casts.
98///         let links = ptr::addr_of_mut!((*target.as_ptr()).links);
99///
100///         // `NonNull::new_unchecked` is safe to use here, because the pointer that
101///         // we offset was not null, implying that the pointer produced by offsetting
102///         // it will also not be null.
103///         NonNull::new_unchecked(links)
104///     }
105/// }
106///
107/// impl Entry {
108///     fn new(val: i32) -> Self {
109///         Self {
110///             val,
111///             ..Self::default()
112///         }
113///     }
114/// }
115///
116/// // Once we have a `Linked` implementation for our element type, we can construct
117/// // a queue.
118///
119/// // Because `Pin<Box<...>>` doesn't have a `Default` impl, we have to manually
120/// // construct the stub node.
121/// let stub = Box::pin(Entry::default());
122/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
123///
124/// // Spawn some producer threads.
125/// thread::spawn({
126///     let q = q.clone();
127///     move || {
128///         // Enqueuing elements does not require waiting, and is not fallible.
129///         q.enqueue(Box::pin(Entry::new(1)));
130///         q.enqueue(Box::pin(Entry::new(2)));
131///     }
132/// });
133///
134/// thread::spawn({
135///     let q = q.clone();
136///     move || {
137///         q.enqueue(Box::pin(Entry::new(3)));
138///         q.enqueue(Box::pin(Entry::new(4)));
139///     }
140/// });
141///
142///
143/// // Dequeue elements until the producer threads have terminated.
144/// let mut seen = Vec::new();
145/// loop {
146///     // Make sure we run at least once, in case the producer is already done.
147///     let done = Arc::strong_count(&q) == 1;
148///
149///     // Dequeue until the queue is empty.
150///     while let Some(entry) = q.dequeue() {
151///         seen.push(entry.as_ref().val);
152///     }
153///
154///     // If there are still producers, we may continue dequeuing.
155///     if done {
156///         break;
157///     }
158///
159///     thread::yield_now();
160/// }
161///
162/// // The elements may not have been received in order, so sort the
163/// // received values before making assertions about them.
164/// &mut seen[..].sort();
165///
166/// assert_eq!(&[1, 2, 3, 4], &seen[..]);
167/// ```
168///
169/// The [`Consumer`] type may be used to reserve the permission to consume
170/// multiple elements at a time:
171///
172/// ```
173/// # use cordyceps::{
174/// #     Linked,
175/// #     mpsc_queue::{self, MpscQueue},
176/// # };
177/// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
178/// #
179/// # #[derive(Debug, Default)]
180/// # struct Entry {
181/// #    links: mpsc_queue::Links<Entry>,
182/// #    val: i32,
183/// # }
184/// #
185/// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
186/// #     type Handle = Pin<Box<Self>>;
187/// #
188/// #     fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
189/// #        unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
190/// #     }
191/// #
192/// #     unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
193/// #         Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
194/// #     }
195/// #
196/// #     unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
197/// #        let links = ptr::addr_of_mut!((*target.as_ptr()).links);
198/// #        NonNull::new_unchecked(links)
199/// #     }
200/// # }
201/// #
202/// # impl Entry {
203/// #     fn new(val: i32) -> Self {
204/// #         Self {
205/// #             val,
206/// #             ..Self::default()
207/// #         }
208/// #     }
209/// # }
210/// let stub = Box::pin(Entry::default());
211/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
212///
213/// thread::spawn({
214///     let q = q.clone();
215///     move || {
216///         q.enqueue(Box::pin(Entry::new(1)));
217///         q.enqueue(Box::pin(Entry::new(2)));
218///     }
219/// });
220///
221/// // Reserve exclusive permission to consume elements
222/// let consumer = q.consume();
223///
224/// let mut seen = Vec::new();
225/// loop {
226///     // Make sure we run at least once, in case the producer is already done.
227///     let done = Arc::strong_count(&q) == 1;
228///
229///     // Dequeue until the queue is empty.
230///     while let Some(entry) = consumer.dequeue() {
231///         seen.push(entry.as_ref().val);
232///     }
233///
234///     if done {
235///         break;
236///     }
237///     thread::yield_now();
238/// }
239///
240/// assert_eq!(&[1, 2], &seen[..]);
241/// ```
242///
243/// The [`Consumer`] type also implements [`Iterator`]:
244///
245/// ```
246/// # use cordyceps::{
247/// #     Linked,
248/// #     mpsc_queue::{self, MpscQueue},
249/// # };
250/// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
251/// #
252/// # #[derive(Debug, Default)]
253/// # struct Entry {
254/// #    links: mpsc_queue::Links<Entry>,
255/// #    val: i32,
256/// # }
257/// #
258/// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
259/// #     type Handle = Pin<Box<Self>>;
260/// #
261/// #     fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
262/// #        unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
263/// #     }
264/// #
265/// #     unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
266/// #         Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
267/// #     }
268/// #
269/// #     unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
270/// #        let links = ptr::addr_of_mut!((*target.as_ptr()).links);
271/// #        NonNull::new_unchecked(links)
272/// #     }
273/// # }
274/// #
275/// # impl Entry {
276/// #     fn new(val: i32) -> Self {
277/// #         Self {
278/// #             val,
279/// #             ..Self::default()
280/// #         }
281/// #     }
282/// # }
283/// let stub = Box::pin(Entry::default());
284/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
285///
286/// thread::spawn({
287///     let q = q.clone();
288///     move || {
289///         for i in 1..5 {
290///             q.enqueue(Box::pin(Entry::new(i)));
291///         }
292///     }
293/// });
294///
295/// thread::spawn({
296///     let q = q.clone();
297///     move || {
298///         for i in 5..=10 {
299///             q.enqueue(Box::pin(Entry::new(i)));
300///         }
301///     }
302/// });
303///
304/// let mut seen = Vec::new();
305/// loop {
306///     // Make sure we run at least once, in case the producer is already done.
307///     let done = Arc::strong_count(&q) == 1;
308///
309///     // Append any elements currently in the queue to the `Vec`
310///     seen.extend(q.consume().map(|entry| entry.as_ref().val));
311///
312///     if done {
313///         break;
314///     }
315///
316///     thread::yield_now();
317/// }
318///
319/// &mut seen[..].sort();
320/// assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &seen[..]);
321/// ```
322///
323/// # Implementation Details
324///
325/// This queue design is conceptually very simple, and has *extremely* fast and
326/// wait-free producers (the [`enqueue`] operation). Enqueuing an element always
327/// performs exactly one atomic swap and one atomic store, so producers need
328/// never wait.
329///
330/// The consumer (the [`dequeue`]) is *typically* wait-free in the common case,
331/// but must occasionally wait when the queue is in an inconsistent state.
332///
333/// ## Inconsistent States
334///
335/// As discussed in the [algorithm description on 1024cores.net][vyukov], it
336/// is possible for this queue design to enter an inconsistent state if the
337/// consumer tries to dequeue an element while a producer is in the middle
338/// of enqueueing a new element. This occurs when a producer is between the
339/// atomic swap with the `head` of the queue and the atomic store that sets the
340/// `next` pointer of the previous `head` element. When the queue is in an
341/// inconsistent state, the consumer must briefly wait before dequeueing an
342/// element.
343///
344/// The consumer's behavior in the inconsistent state depends on which API
345/// method is used. The [`MpscQueue::dequeue`] and [`Consumer::dequeue`] methods
346/// will wait by spinning (with an exponential backoff) when the queue is
347/// inconsistent. Alternatively, the [`MpscQueue::try_dequeue`] and
348/// [`Consumer::try_dequeue`] methods will instead return [an error] when the
349/// queue is in an inconsistent state.
350///
351/// [intrusive]: crate#intrusive-data-structures
352/// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
353/// [`dequeue`]: Self::dequeue
354/// [`enqueue`]: Self::enqueue
355/// [an error]: TryDequeueError
356pub struct MpscQueue<T: Linked<Links<T>>> {
357    /// The head of the queue. This is accessed in both `enqueue` and `dequeue`.
358    head: CachePadded<AtomicPtr<T>>,
359
360    /// The tail of the queue. This is accessed only when dequeueing.
361    tail: CachePadded<UnsafeCell<*mut T>>,
362
363    /// Does a consumer handle to the queue exist? If not, it is safe to create a
364    /// new consumer.
365    has_consumer: CachePadded<AtomicBool>,
366
367    /// If the stub node is in a `static`, we cannot drop it when the
368    /// queue is dropped.
369    stub_is_static: bool,
370
371    stub: NonNull<T>,
372}
373
374/// A handle that holds the right to dequeue elements from a [`MpscQueue`].
375///
376/// This can be used when one thread wishes to dequeue many elements at a time,
377/// to avoid the overhead of ensuring mutual exclusion on every [`dequeue`] or
378/// [`try_dequeue`] call.
379///
380/// This type is returned by the [`MpscQueue::consume`] and [`MpscQueue::try_consume`]
381/// methods.
382///
383/// If the right to dequeue elements needs to be reserved for longer than a
384/// single scope, an owned variant ([`OwnedConsumer`]) is also available, when
385/// the [`MpscQueue`] is stored in an [`Arc`]. Since the [`MpscQueue`] must be stored
386/// in an [`Arc`], the [`OwnedConsumer`] type requires the "alloc" feature flag.
387///
388/// [`Arc`]: alloc::sync::Arc
389/// [`dequeue`]: Consumer::dequeue
390/// [`try_dequeue`]: Consumer::try_dequeue
391pub struct Consumer<'q, T: Linked<Links<T>>> {
392    q: &'q MpscQueue<T>,
393}
394
395/// Links to other nodes in a [`MpscQueue`].
396///
397/// In order to be part of a [`MpscQueue`], a type must contain an instance of this
398/// type, and must implement the [`Linked`] trait for `Links<Self>`.
399pub struct Links<T> {
400    /// The next node in the queue.
401    next: AtomicPtr<T>,
402
403    /// Is this the stub node?
404    ///
405    /// Used for debug mode consistency checking only.
406    #[cfg(debug_assertions)]
407    is_stub: AtomicBool,
408
409    /// Linked list links must always be `!Unpin`, in order to ensure that they
410    /// never recieve LLVM `noalias` annotations; see also
411    /// <https://github.com/rust-lang/rust/issues/63818>.
412    _unpin: PhantomPinned,
413}
414
415/// Errors returned by [`MpscQueue::try_dequeue`] and [`Consumer::try_dequeue`].
416#[derive(Debug, Eq, PartialEq)]
417pub enum TryDequeueError {
418    /// No element was dequeued because the queue was empty.
419    Empty,
420
421    /// The queue is currently in an [inconsistent state].
422    ///
423    /// Since inconsistent states are very short-lived, the caller may want to
424    /// try dequeueing a second time.
425    ///
426    /// [inconsistent state]: MpscQueue#inconsistent-states
427    Inconsistent,
428
429    /// Another thread is currently calling [`MpscQueue::try_dequeue`]  or
430    /// [`MpscQueue::dequeue`], or owns a [`Consumer`] or [`OwnedConsumer`] handle.
431    ///
432    /// This is a multi-producer, *single-consumer* queue, so only a single
433    /// thread may dequeue elements at any given time.
434    Busy,
435}
436
437// === impl Queue ===
438
439impl<T: Linked<Links<T>>> MpscQueue<T> {
440    /// Returns a new `MpscQueue`.
441    ///
442    /// The [`Default`] implementation for `T::Handle` is used to produce a new
443    /// node used as the list's stub.
444    #[must_use]
445    pub fn new() -> Self
446    where
447        T::Handle: Default,
448    {
449        Self::new_with_stub(Default::default())
450    }
451
452    /// Returns a new `MpscQueue` with the provided stub node.
453    ///
454    /// If a `MpscQueue` must be constructed in a `const` context, such as a
455    /// `static` initializer, see [`MpscQueue::new_with_static_stub`].
456    #[must_use]
457    pub fn new_with_stub(stub: T::Handle) -> Self {
458        let stub = T::into_ptr(stub);
459
460        // In debug mode, set the stub flag for consistency checking.
461        #[cfg(debug_assertions)]
462        unsafe {
463            links(stub).is_stub.store(true, Release);
464        }
465        let ptr = stub.as_ptr();
466
467        Self {
468            head: CachePadded(AtomicPtr::new(ptr)),
469            tail: CachePadded(UnsafeCell::new(ptr)),
470            has_consumer: CachePadded(AtomicBool::new(false)),
471            stub_is_static: false,
472            stub,
473        }
474    }
475
476    /// Returns a new `MpscQueue` with a static "stub" entity
477    ///
478    /// This is primarily used for creating an `MpscQueue` as a `static` variable.
479    ///
480    /// # Usage notes
481    ///
482    /// Unlike [`MpscQueue::new`] or [`MpscQueue::new_with_stub`], the `stub`
483    /// item will NOT be dropped when the `MpscQueue` is dropped. This is fine
484    /// if you are ALSO statically creating the `stub`. However, if it is
485    /// necessary to recover that memory after the `MpscQueue` has been dropped,
486    /// that will need to be done by the user manually.
487    ///
488    /// # Safety
489    ///
490    /// The `stub` provided must ONLY EVER be used for a single `MpscQueue`
491    /// instance. Re-using the stub for multiple queues may lead to undefined
492    /// behavior.
493    ///
494    /// ## Example usage
495    ///
496    /// ```rust
497    /// # use cordyceps::{
498    /// #     Linked,
499    /// #     mpsc_queue::{self, MpscQueue},
500    /// # };
501    /// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
502    /// #
503    /// #
504    ///
505    /// // This is our same `Entry` from the parent examples. It has implemented
506    /// // the `Links` trait as above.
507    /// #[derive(Debug, Default)]
508    /// struct Entry {
509    ///    links: mpsc_queue::Links<Entry>,
510    ///    val: i32,
511    /// }
512    ///
513    /// #
514    /// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
515    /// #     type Handle = Pin<Box<Self>>;
516    /// #
517    /// #     fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
518    /// #        unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
519    /// #     }
520    /// #
521    /// #     unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
522    /// #         Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
523    /// #     }
524    /// #
525    /// #     unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
526    /// #        let links = ptr::addr_of_mut!((*target.as_ptr()).links);
527    /// #        NonNull::new_unchecked(links)
528    /// #     }
529    /// # }
530    /// #
531    /// # impl Entry {
532    /// #     fn new(val: i32) -> Self {
533    /// #         Self {
534    /// #             val,
535    /// #             ..Self::default()
536    /// #         }
537    /// #     }
538    /// # }
539    ///
540    ///
541    /// static MPSC: MpscQueue<Entry> = {
542    ///     static STUB_ENTRY: Entry = Entry {
543    ///         links: mpsc_queue::Links::<Entry>::new_stub(),
544    ///         val: 0
545    ///     };
546    ///
547    ///     // SAFETY: The stub may not be used by another MPSC queue.
548    ///     // Here, this is ensured because the `STUB_ENTRY` static is defined
549    ///     // inside of the initializer for the `MPSC` static, so it cannot be referenced
550    ///     // elsewhere.
551    ///     unsafe { MpscQueue::new_with_static_stub(&STUB_ENTRY) }
552    /// };
553    /// ```
554    ///
555    #[cfg(not(loom))]
556    #[must_use]
557    pub const unsafe fn new_with_static_stub(stub: &'static T) -> Self {
558        let ptr = stub as *const T as *mut T;
559        Self {
560            head: CachePadded(AtomicPtr::new(ptr)),
561            tail: CachePadded(UnsafeCell::new(ptr)),
562            has_consumer: CachePadded(AtomicBool::new(false)),
563            stub_is_static: true,
564            stub: NonNull::new_unchecked(ptr),
565        }
566    }
567
568    /// Enqueue a new element at the end of the queue.
569    ///
570    /// This takes ownership of a [`Handle`] that owns the element, and
571    /// (conceptually) assigns ownership of the element to the queue while it
572    /// remains enqueued.
573    ///
574    /// This method will never wait.
575    ///
576    /// [`Handle`]: crate::Linked::Handle
577    pub fn enqueue(&self, element: T::Handle) {
578        let ptr = T::into_ptr(element);
579
580        #[cfg(debug_assertions)]
581        debug_assert!(!unsafe { T::links(ptr).as_ref() }.is_stub());
582
583        self.enqueue_inner(ptr)
584    }
585
586    #[inline]
587    fn enqueue_inner(&self, ptr: NonNull<T>) {
588        unsafe { links(ptr).next.store(ptr::null_mut(), Relaxed) };
589
590        let ptr = ptr.as_ptr();
591        let prev = self.head.swap(ptr, AcqRel);
592        unsafe {
593            // Safety: in release mode, we don't null check `prev`. This is
594            // because no pointer in the list should ever be a null pointer, due
595            // to the presence of the stub node.
596            links(non_null(prev)).next.store(ptr, Release);
597        }
598    }
599
600    /// Try to dequeue an element from the queue, without waiting if the queue
601    /// is in an [inconsistent state], or until there is no other consumer trying
602    /// to read from the queue.
603    ///
604    /// Because this is a multi-producer, _single-consumer_ queue,
605    /// only one thread may be dequeueing at a time. If another thread is
606    /// dequeueing, this method returns [`TryDequeueError::Busy`].
607    ///
608    /// The [`MpscQueue::dequeue`] method will instead wait (by spinning with an
609    /// exponential backoff) when the queue is in an inconsistent state or busy.
610    ///
611    /// The unsafe [`MpscQueue::try_dequeue_unchecked`] method will not check if the
612    /// queue is busy before dequeueing an element. This can be used when the
613    /// user code guarantees that no other threads will dequeue from the queue
614    /// concurrently, but this cannot be enforced by the compiler.
615    ///
616    /// This method will never wait.
617    ///
618    /// # Returns
619    ///
620    /// - `Ok`([`T::Handle`]`)` if an element was successfully dequeued
621    /// - `Err(`[`TryDequeueError::Empty`]`)` if there are no elements in the queue
622    /// - `Err(`[`TryDequeueError::Inconsistent`]`)` if the queue is currently in an
623    ///   inconsistent state
624    /// - `Err(`[`TryDequeueError::Busy`]`)` if another thread is currently trying to
625    ///   dequeue a message.
626    ///
627    /// [inconsistent state]: Self#inconsistent-states
628    /// [`T::Handle`]: crate::Linked::Handle
629    pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
630        if self
631            .has_consumer
632            .compare_exchange(false, true, AcqRel, Acquire)
633            .is_err()
634        {
635            return Err(TryDequeueError::Busy);
636        }
637
638        let res = unsafe {
639            // Safety: the `has_consumer` flag ensures mutual exclusion of
640            // consumers.
641            self.try_dequeue_unchecked()
642        };
643
644        self.has_consumer.store(false, Release);
645        res
646    }
647
648    /// Dequeue an element from the queue.
649    ///
650    /// This method will wait by spinning with an exponential backoff if the
651    /// queue is in an [inconsistent state].
652    ///
653    /// Additionally, because this is a multi-producer, _single-consumer_ queue,
654    /// only one thread may be dequeueing at a time. If another thread is
655    /// dequeueing, this method will spin until the queue is no longer busy.
656    ///
657    /// The [`MpscQueue::try_dequeue`] will return an error rather than waiting when
658    /// the queue is in an inconsistent state or busy.
659    ///
660    /// The unsafe [`MpscQueue::dequeue_unchecked`] method will not check if the
661    /// queue is busy before dequeueing an element. This can be used when the
662    /// user code guarantees that no other threads will dequeue from the queue
663    /// concurrently, but this cannot be enforced by the compiler.
664    ///
665    /// # Returns
666    ///
667    /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
668    /// - `None` if the queue is empty or another thread is dequeueing
669    ///
670    /// [inconsistent state]: Self#inconsistent-states
671    /// [`T::Handle`]: crate::Linked::Handle
672    pub fn dequeue(&self) -> Option<T::Handle> {
673        let mut boff = Backoff::new();
674        loop {
675            match self.try_dequeue() {
676                Ok(val) => return Some(val),
677                Err(TryDequeueError::Empty) => return None,
678                Err(_) => boff.spin(),
679            }
680        }
681    }
682
683    /// Returns a [`Consumer`] handle that reserves the exclusive right to dequeue
684    /// elements from the queue until it is dropped.
685    ///
686    /// If another thread is dequeueing, this method spins until there is no
687    /// other thread dequeueing.
688    pub fn consume(&self) -> Consumer<'_, T> {
689        self.lock_consumer();
690        Consumer { q: self }
691    }
692
693    /// Attempts to reserve a [`Consumer`] handle that holds the exclusive right
694    /// to dequeue  elements from the queue until it is dropped.
695    ///
696    /// If another thread is dequeueing, this returns `None` instead.
697    pub fn try_consume(&self) -> Option<Consumer<'_, T>> {
698        // lock the consumer-side of the queue.
699        self.try_lock_consumer().map(|_| Consumer { q: self })
700    }
701
702    /// Try to dequeue an element from the queue, without waiting if the queue
703    /// is in an inconsistent state, and without checking if another consumer
704    /// exists.
705    ///
706    /// This method returns [`TryDequeueError::Inconsistent`] when the queue is
707    /// in an [inconsistent state].
708    ///
709    /// The [`MpscQueue::dequeue_unchecked`] method will instead wait (by
710    /// spinning with an  exponential backoff) when the queue is in an
711    /// inconsistent state.
712    ///
713    /// This method will never wait.
714    ///
715    /// # Returns
716    ///
717    /// - `Ok`([`T::Handle`]`)` if an element was successfully dequeued
718    /// - `Err(`[`TryDequeueError::Empty`]`)` if there are no elements in the queue
719    /// - `Err(`[`TryDequeueError::Inconsistent`]`)` if the queue is currently in an
720    ///   inconsistent state
721    ///
722    /// This method will **never** return [`TryDequeueError::Busy`].
723    ///
724    /// # Safety
725    ///
726    /// This is a multi-producer, *single-consumer* queue. Only one thread/core
727    /// may call `try_dequeue_unchecked` at a time!
728    ///
729    /// [inconsistent state]: Self#inconsistent-states
730    /// [`T::Handle`]: crate::Linked::Handle
731    pub unsafe fn try_dequeue_unchecked(&self) -> Result<T::Handle, TryDequeueError> {
732        self.tail.with_mut(|tail| {
733            let mut tail_node = NonNull::new(*tail).ok_or(TryDequeueError::Empty)?;
734            let mut next = links(tail_node).next.load(Acquire);
735
736            if tail_node == self.stub {
737                #[cfg(debug_assertions)]
738                debug_assert!(links(tail_node).is_stub());
739                let next_node = NonNull::new(next).ok_or(TryDequeueError::Empty)?;
740
741                *tail = next;
742                tail_node = next_node;
743                next = links(next_node).next.load(Acquire);
744            }
745
746            if !next.is_null() {
747                *tail = next;
748                return Ok(T::from_ptr(tail_node));
749            }
750
751            let head = self.head.load(Acquire);
752
753            if tail_node.as_ptr() != head {
754                return Err(TryDequeueError::Inconsistent);
755            }
756
757            self.enqueue_inner(self.stub);
758
759            next = links(tail_node).next.load(Acquire);
760            if next.is_null() {
761                return Err(TryDequeueError::Empty);
762            }
763
764            *tail = next;
765
766            #[cfg(debug_assertions)]
767            debug_assert!(!links(tail_node).is_stub());
768
769            Ok(T::from_ptr(tail_node))
770        })
771    }
772
773    /// Dequeue an element from the queue, without checking whether another
774    /// consumer exists.
775    ///
776    /// This method will wait by spinning with an exponential backoff if the
777    /// queue is in an [inconsistent state].
778    ///
779    /// The [`MpscQueue::try_dequeue`] will return an error rather than waiting
780    /// when the queue is in an inconsistent state.
781    ///
782    /// # Returns
783    ///
784    /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
785    /// - `None` if the queue is empty
786    ///
787    /// # Safety
788    ///
789    /// This is a multi-producer, *single-consumer* queue. Only one thread/core
790    /// may call `dequeue` at a time!
791    ///
792    /// [inconsistent state]: Self#inconsistent-states
793    /// [`T::Handle`]: crate::Linked::Handle
794    pub unsafe fn dequeue_unchecked(&self) -> Option<T::Handle> {
795        let mut boff = Backoff::new();
796        loop {
797            match self.try_dequeue_unchecked() {
798                Ok(val) => return Some(val),
799                Err(TryDequeueError::Empty) => return None,
800                Err(TryDequeueError::Inconsistent) => boff.spin(),
801                Err(TryDequeueError::Busy) => {
802                    unreachable!("try_dequeue_unchecked never returns `Busy`!")
803                }
804            }
805        }
806    }
807
808    #[inline]
809    fn lock_consumer(&self) {
810        let mut boff = Backoff::new();
811        while self
812            .has_consumer
813            .compare_exchange(false, true, AcqRel, Acquire)
814            .is_err()
815        {
816            while self.has_consumer.load(Relaxed) {
817                boff.spin();
818            }
819        }
820    }
821
822    #[inline]
823    fn try_lock_consumer(&self) -> Option<()> {
824        self.has_consumer
825            .compare_exchange(false, true, AcqRel, Acquire)
826            .map(|_| ())
827            .ok()
828    }
829}
830
831impl<T: Linked<Links<T>>> Drop for MpscQueue<T> {
832    fn drop(&mut self) {
833        let mut current = self.tail.with_mut(|tail| unsafe {
834            // Safety: because `Drop` is called with `&mut self`, we have
835            // exclusive ownership over the queue, so it's always okay to touch
836            // the tail cell.
837            *tail
838        });
839        while let Some(node) = NonNull::new(current) {
840            unsafe {
841                let links = links(node);
842                let next = links.next.load(Relaxed);
843
844                // Skip dropping the stub node; it is owned by the queue and
845                // will be dropped when the queue is dropped. If we dropped it
846                // here, that would cause a double free!
847                if node != self.stub {
848                    // Convert the pointer to the owning handle and drop it.
849                    #[cfg(debug_assertions)]
850                    debug_assert!(!links.is_stub(), "stub: {:p}, node: {node:p}", self.stub);
851                    drop(T::from_ptr(node));
852                } else {
853                    #[cfg(debug_assertions)]
854                    debug_assert!(links.is_stub());
855                }
856
857                current = next;
858            }
859        }
860
861        unsafe {
862            // If the stub is static, don't drop it. It lives 5eva
863            // (that's one more than 4eva)
864            if !self.stub_is_static {
865                drop(T::from_ptr(self.stub));
866            }
867        }
868    }
869}
870
871impl<T> fmt::Debug for MpscQueue<T>
872where
873    T: Linked<Links<T>>,
874{
875    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
876        let Self {
877            head,
878            tail: _,
879            has_consumer,
880            stub,
881            stub_is_static,
882        } = self;
883        f.debug_struct("MpscQueue")
884            .field("head", &format_args!("{:p}", head.load(Acquire)))
885            // only the consumer can load the tail; trying to print it here
886            // could be racy.
887            // XXX(eliza): we could replace the `UnsafeCell` with an atomic,
888            // and then it would be okay to print the tail...but then we would
889            // lose loom checking for tail accesses...
890            .field("tail", &format_args!("..."))
891            .field("has_consumer", &has_consumer.load(Acquire))
892            .field("stub", stub)
893            .field("stub_is_static", stub_is_static)
894            .finish()
895    }
896}
897
898impl<T> Default for MpscQueue<T>
899where
900    T: Linked<Links<T>>,
901    T::Handle: Default,
902{
903    fn default() -> Self {
904        Self::new()
905    }
906}
907
908unsafe impl<T> Send for MpscQueue<T>
909where
910    T: Send + Linked<Links<T>>,
911    T::Handle: Send,
912{
913}
914unsafe impl<T: Send + Linked<Links<T>>> Sync for MpscQueue<T> {}
915
916// === impl Consumer ===
917
918impl<T: Send + Linked<Links<T>>> Consumer<'_, T> {
919    /// Dequeue an element from the queue.
920    ///
921    /// This method will wait by spinning with an exponential backoff if the
922    /// queue is in an [inconsistent state].
923    ///
924    /// The [`Consumer::try_dequeue`] will return an error rather than waiting when
925    /// the queue is in an inconsistent state.
926    ///
927    /// # Returns
928    ///
929    /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
930    /// - `None` if the queue is empty
931    ///
932    /// [inconsistent state]: Self#inconsistent-states
933    /// [`T::Handle`]: crate::Linked::Handle
934    #[inline]
935    pub fn dequeue(&self) -> Option<T::Handle> {
936        debug_assert!(self.q.has_consumer.load(Acquire));
937        unsafe {
938            // Safety: we have reserved exclusive access to the queue.
939            self.q.dequeue_unchecked()
940        }
941    }
942
943    /// Try to dequeue an element from the queue, without waiting if the queue
944    /// is in an inconsistent state.
945    ///
946    /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
947    /// is possible for this queue design to enter an inconsistent state if the
948    /// consumer tries to dequeue an element while a producer is in the middle
949    /// of enqueueing a new element. If this occurs, the consumer must briefly
950    /// wait before dequeueing an element. This method returns
951    /// [`TryDequeueError::Inconsistent`] when the queue is in an inconsistent
952    /// state.
953    ///
954    /// The [`Consumer::dequeue`] method will instead wait (by spinning with an
955    /// exponential backoff) when the queue is in an inconsistent state.
956    ///
957    /// # Returns
958    ///
959    /// - `T::Handle` if an element was successfully dequeued
960    /// - [`TryDequeueError::Empty`] if there are no elements in the queue
961    /// - [`TryDequeueError::Inconsistent`] if the queue is currently in an
962    ///   inconsistent state
963    ///
964    ///
965    /// # Returns
966    ///
967    /// - `Some(T::Handle)` if an element was successfully dequeued
968    /// - `None` if the queue is empty
969    ///
970    /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
971    #[inline]
972    pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
973        debug_assert!(self.q.has_consumer.load(Acquire));
974        unsafe {
975            // Safety: we have reserved exclusive access to the queue.
976            self.q.try_dequeue_unchecked()
977        }
978    }
979}
980
981impl<T: Linked<Links<T>>> Drop for Consumer<'_, T> {
982    fn drop(&mut self) {
983        self.q.has_consumer.store(false, Release);
984    }
985}
986
987impl<T> fmt::Debug for Consumer<'_, T>
988where
989    T: Linked<Links<T>>,
990{
991    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
992        let Self { q } = self;
993        let tail = q.tail.with(|tail| unsafe {
994            // Safety: it's okay for the consumer to access the tail cell, since
995            // we have exclusive access to it.
996            *tail
997        });
998        f.debug_struct("Consumer")
999            .field("q", &q)
1000            .field("tail", &tail)
1001            .finish()
1002    }
1003}
1004
1005impl<T> Iterator for Consumer<'_, T>
1006where
1007    T: Send + Linked<Links<T>>,
1008{
1009    type Item = T::Handle;
1010
1011    fn next(&mut self) -> Option<Self::Item> {
1012        self.dequeue()
1013    }
1014}
1015
1016// === impl Links ===
1017
1018impl<T> Links<T> {
1019    /// Returns a new set of `Links` for a [`MpscQueue`].
1020    #[cfg(not(loom))]
1021    #[must_use]
1022    pub const fn new() -> Self {
1023        Self {
1024            next: AtomicPtr::new(ptr::null_mut()),
1025            _unpin: PhantomPinned,
1026            #[cfg(debug_assertions)]
1027            is_stub: AtomicBool::new(false),
1028        }
1029    }
1030
1031    /// Returns a new set of `Links` for the stub node in an [`MpscQueue`].
1032    #[cfg(not(loom))]
1033    #[must_use]
1034    pub const fn new_stub() -> Self {
1035        Self {
1036            next: AtomicPtr::new(ptr::null_mut()),
1037            _unpin: PhantomPinned,
1038            #[cfg(debug_assertions)]
1039            is_stub: AtomicBool::new(true),
1040        }
1041    }
1042
1043    /// Returns a new set of `Links` for a [`MpscQueue`].
1044    #[cfg(loom)]
1045    #[must_use]
1046    pub fn new() -> Self {
1047        Self {
1048            next: AtomicPtr::new(ptr::null_mut()),
1049            _unpin: PhantomPinned,
1050            #[cfg(debug_assertions)]
1051            is_stub: AtomicBool::new(false),
1052        }
1053    }
1054
1055    /// Returns a new set of `Links` for the stub node in an [`MpscQueue`].
1056    #[cfg(loom)]
1057    #[must_use]
1058    pub fn new_stub() -> Self {
1059        Self {
1060            next: AtomicPtr::new(ptr::null_mut()),
1061            _unpin: PhantomPinned,
1062            #[cfg(debug_assertions)]
1063            is_stub: AtomicBool::new(true),
1064        }
1065    }
1066
1067    #[cfg(debug_assertions)]
1068    fn is_stub(&self) -> bool {
1069        self.is_stub.load(Acquire)
1070    }
1071}
1072
1073impl<T> Default for Links<T> {
1074    fn default() -> Self {
1075        Self::new()
1076    }
1077}
1078
1079impl<T> fmt::Debug for Links<T> {
1080    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1081        let mut s = f.debug_struct("Links");
1082        s.field("next", &self.next.load(Acquire));
1083        #[cfg(debug_assertions)]
1084        s.field("is_stub", &self.is_stub.load(Acquire));
1085        s.finish()
1086    }
1087}
1088
1089feature! {
1090    #![feature = "alloc"]
1091
1092    use alloc::sync::Arc;
1093
1094    /// An owned handle that holds the right to dequeue elements from the queue.
1095    ///
1096    /// This can be used when one thread wishes to dequeue many elements at a time,
1097    /// to avoid the overhead of ensuring mutual exclusion on every [`dequeue`] or
1098    /// [`try_dequeue`] call.
1099    ///
1100    /// This type is returned by the [`MpscQueue::consume_owned`] and
1101    /// [`MpscQueue::try_consume_owned`] methods.
1102    ///
1103    /// This is similar to the [`Consumer`] type, but the queue is stored in an
1104    /// [`Arc`] rather than borrowed. This allows a single `OwnedConsumer`
1105    /// instance to be stored in a struct and used indefinitely.
1106    ///
1107    /// Since the queue is stored in an [`Arc`], this requires the `alloc`
1108    /// feature flag to be enabled.
1109    ///
1110    /// [`dequeue`]: OwnedConsumer::dequeue
1111    /// [`try_dequeue`]: OwnedConsumer::try_dequeue
1112    /// [`Arc`]: alloc::sync::Arc
1113    pub struct OwnedConsumer<T: Linked<Links<T>>> {
1114        q: Arc<MpscQueue<T>>
1115    }
1116
1117    // === impl Consumer ===
1118
1119    impl<T: Linked<Links<T>>> OwnedConsumer<T> {
1120        /// Dequeue an element from the queue.
1121        ///
1122        /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
1123        /// is possible for this queue design to enter an inconsistent state if the
1124        /// consumer tries to dequeue an element while a producer is in the middle
1125        /// of enqueueing a new element. If this occurs, the consumer must briefly
1126        /// wait before dequeueing an element. This method will wait by spinning
1127        /// with an exponential backoff if the queue is in an inconsistent state.
1128        ///
1129        /// The [`Consumer::try_dequeue`] will return an error rather than waiting when
1130        /// the queue is in an inconsistent state.
1131        ///
1132        /// # Returns
1133        ///
1134        /// - `Some(T::Handle)` if an element was successfully dequeued
1135        /// - `None` if the queue is empty
1136        ///
1137        /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
1138        #[inline]
1139        pub fn dequeue(&self) -> Option<T::Handle> {
1140            debug_assert!(self.q.has_consumer.load(Acquire));
1141            unsafe {
1142                // Safety: we have reserved exclusive access to the queue.
1143                self.q.dequeue_unchecked()
1144            }
1145        }
1146
1147        /// Try to dequeue an element from the queue, without waiting if the queue
1148        /// is in an inconsistent state.
1149        ///
1150        /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
1151        /// is possible for this queue design to enter an inconsistent state if the
1152        /// consumer tries to dequeue an element while a producer is in the middle
1153        /// of enqueueing a new element. If this occurs, the consumer must briefly
1154        /// wait before dequeueing an element. This method returns
1155        /// [`TryDequeueError::Inconsistent`] when the queue is in an inconsistent
1156        /// state.
1157        ///
1158        /// The [`Consumer::dequeue`] method will instead wait (by spinning with an
1159        /// exponential backoff) when the queue is in an inconsistent state.
1160        ///
1161        /// # Returns
1162        ///
1163        /// - `T::Handle` if an element was successfully dequeued
1164        /// - [`TryDequeueError::Empty`] if there are no elements in the queue
1165        /// - [`TryDequeueError::Inconsistent`] if the queue is currently in an
1166        ///   inconsistent state
1167        ///
1168        ///
1169        /// # Returns
1170        ///
1171        /// - `Some(T::Handle)` if an element was successfully dequeued
1172        /// - `None` if the queue is empty
1173        ///
1174        /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
1175        #[inline]
1176        pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
1177            debug_assert!(self.q.has_consumer.load(Acquire));
1178            unsafe {
1179                // Safety: we have reserved exclusive access to the queue.
1180                self.q.try_dequeue_unchecked()
1181            }
1182        }
1183
1184        /// Returns `true` if any producers exist for this queue.
1185        pub fn has_producers(&self) -> bool {
1186            Arc::strong_count(&self.q) > 1
1187        }
1188    }
1189
1190    impl<T: Linked<Links<T>>> Drop for OwnedConsumer<T> {
1191        fn drop(&mut self) {
1192            self.q.has_consumer.store(false, Release);
1193        }
1194    }
1195
1196    impl<T: Linked<Links<T>>> fmt::Debug for OwnedConsumer<T> {
1197        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1198            let Self { q } = self;
1199            let tail = q.tail.with(|tail| unsafe {
1200                // Safety: it's okay for the consumer to access the tail cell, since
1201                // we have exclusive access to it.
1202                *tail
1203            });
1204            f.debug_struct("OwnedConsumer")
1205                .field("q", &q)
1206                .field("tail", &tail)
1207                .finish()
1208        }
1209    }
1210
1211    // === impl Queue ===
1212
1213    impl<T: Linked<Links<T>>> MpscQueue<T> {
1214        /// Returns a [`OwnedConsumer`] handle that reserves the exclusive right to dequeue
1215        /// elements from the queue until it is dropped.
1216        ///
1217        /// If another thread is dequeueing, this method spins until there is no
1218        /// other thread dequeueing.
1219        pub fn consume_owned(self: Arc<Self>) -> OwnedConsumer<T> {
1220            self.lock_consumer();
1221            OwnedConsumer { q: self }
1222        }
1223
1224        /// Attempts to reserve an [`OwnedConsumer`] handle that holds the exclusive right
1225        /// to dequeue  elements from the queue until it is dropped.
1226        ///
1227        /// If another thread is dequeueing, this returns `None` instead.
1228        pub fn try_consume_owned(self: Arc<Self>) -> Option<OwnedConsumer<T>> {
1229            self.try_lock_consumer().map(|_| OwnedConsumer { q: self })
1230        }
1231    }
1232}
1233
1234/// Just a little helper so we don't have to add `.as_ref()` noise everywhere...
1235#[inline(always)]
1236unsafe fn links<'a, T: Linked<Links<T>>>(ptr: NonNull<T>) -> &'a Links<T> {
1237    T::links(ptr).as_ref()
1238}
1239
1240/// Helper to construct a `NonNull<T>` from a raw pointer to `T`, with null
1241/// checks elided in release mode.
1242#[cfg(debug_assertions)]
1243#[track_caller]
1244#[inline(always)]
1245unsafe fn non_null<T>(ptr: *mut T) -> NonNull<T> {
1246    NonNull::new(ptr).expect(
1247        "/!\\ constructed a `NonNull` from a null pointer! /!\\ \n\
1248        in release mode, this would have called `NonNull::new_unchecked`, \
1249        violating the `NonNull` invariant! this is a bug in `cordyceps!`.",
1250    )
1251}
1252
1253/// Helper to construct a `NonNull<T>` from a raw pointer to `T`, with null
1254/// checks elided in release mode.
1255///
1256/// This is the release mode version.
1257#[cfg(not(debug_assertions))]
1258#[inline(always)]
1259unsafe fn non_null<T>(ptr: *mut T) -> NonNull<T> {
1260    NonNull::new_unchecked(ptr)
1261}
1262
1263#[cfg(all(loom, test))]
1264mod loom {
1265    use super::*;
1266    use crate::loom::{self, sync::Arc, thread};
1267    use test_util::*;
1268
1269    #[test]
1270    fn basically_works_loom() {
1271        const THREADS: i32 = 2;
1272        const MSGS: i32 = THREADS;
1273        const TOTAL_MSGS: i32 = THREADS * MSGS;
1274        basically_works_test(THREADS, MSGS, TOTAL_MSGS);
1275    }
1276
1277    #[test]
1278    fn doesnt_leak() {
1279        // Test that dropping the queue drops any messages that haven't been
1280        // consumed by the consumer.
1281        const THREADS: i32 = 2;
1282        const MSGS: i32 = THREADS;
1283        // Only consume half as many messages as are sent, to ensure dropping
1284        // the queue does not leak.
1285        const TOTAL_MSGS: i32 = (THREADS * MSGS) / 2;
1286        basically_works_test(THREADS, MSGS, TOTAL_MSGS);
1287    }
1288
1289    fn basically_works_test(threads: i32, msgs: i32, total_msgs: i32) {
1290        loom::model(move || {
1291            let stub = entry(666);
1292            let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
1293
1294            let threads: Vec<_> = (0..threads)
1295                .map(|thread| thread::spawn(do_tx(thread, msgs, &q)))
1296                .collect();
1297
1298            let mut i = 0;
1299            while i < total_msgs {
1300                match q.try_dequeue() {
1301                    Ok(val) => {
1302                        i += 1;
1303                        tracing::info!(?val, "dequeue {}/{}", i, total_msgs);
1304                    }
1305                    Err(TryDequeueError::Busy) => panic!(
1306                        "the queue should never be busy, as there is only a single consumer!"
1307                    ),
1308                    Err(err) => {
1309                        tracing::info!(?err, "dequeue error");
1310                        thread::yield_now();
1311                    }
1312                }
1313            }
1314
1315            for thread in threads {
1316                thread.join().unwrap();
1317            }
1318        })
1319    }
1320
1321    fn do_tx(thread: i32, msgs: i32, q: &Arc<MpscQueue<Entry>>) -> impl FnOnce() + Send + Sync {
1322        let q = q.clone();
1323        move || {
1324            for i in 0..msgs {
1325                q.enqueue(entry(i + (thread * 10)));
1326                tracing::info!(thread, "enqueue msg {}/{}", i, msgs);
1327            }
1328        }
1329    }
1330
1331    #[test]
1332    fn mpmc() {
1333        // Tests multiple consumers competing for access to the consume side of
1334        // the queue.
1335        const THREADS: i32 = 2;
1336        const MSGS: i32 = THREADS;
1337
1338        fn do_rx(thread: i32, q: Arc<MpscQueue<Entry>>) {
1339            let mut i = 0;
1340            while let Some(val) = q.dequeue() {
1341                tracing::info!(?val, ?thread, "dequeue {}/{}", i, THREADS * MSGS);
1342                i += 1;
1343            }
1344        }
1345
1346        loom::model(|| {
1347            let stub = entry(666);
1348            let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
1349
1350            let mut threads: Vec<_> = (0..THREADS)
1351                .map(|thread| thread::spawn(do_tx(thread, MSGS, &q)))
1352                .collect();
1353
1354            threads.push(thread::spawn({
1355                let q = q.clone();
1356                move || do_rx(THREADS + 1, q)
1357            }));
1358            do_rx(THREADS + 2, q);
1359
1360            for thread in threads {
1361                thread.join().unwrap();
1362            }
1363        })
1364    }
1365
1366    #[test]
1367    fn crosses_queues() {
1368        loom::model(|| {
1369            let stub1 = entry(666);
1370            let q1 = Arc::new(MpscQueue::<Entry>::new_with_stub(stub1));
1371
1372            let thread = thread::spawn({
1373                let q1 = q1.clone();
1374                move || {
1375                    let stub2 = entry(420);
1376                    let q2 = Arc::new(MpscQueue::<Entry>::new_with_stub(stub2));
1377                    // let mut dequeued = false;
1378                    for entry in q1.consume() {
1379                        tracing::info!("dequeued");
1380                        q2.enqueue(entry);
1381                        q2.try_dequeue().unwrap();
1382                    }
1383                    tracing::info!("consumer done\nq1={q1:#?}\nq2={q2:#?}");
1384                }
1385            });
1386
1387            q1.enqueue(entry(1));
1388            drop(q1);
1389
1390            thread.join().unwrap();
1391        })
1392    }
1393}
1394
1395#[cfg(all(test, not(loom)))]
1396mod tests {
1397    use super::*;
1398    use test_util::*;
1399
1400    use std::{ops::Deref, println, sync::Arc, thread};
1401
1402    #[test]
1403    fn dequeue_empty() {
1404        let stub = entry(666);
1405        let q = MpscQueue::<Entry>::new_with_stub(stub);
1406        assert_eq!(q.dequeue(), None)
1407    }
1408
1409    #[test]
1410    fn try_dequeue_empty() {
1411        let stub = entry(666);
1412        let q = MpscQueue::<Entry>::new_with_stub(stub);
1413        assert_eq!(q.try_dequeue(), Err(TryDequeueError::Empty))
1414    }
1415
1416    #[test]
1417    fn try_dequeue_busy() {
1418        let stub = entry(666);
1419        let q = MpscQueue::<Entry>::new_with_stub(stub);
1420
1421        let consumer = q.try_consume().expect("must acquire consumer");
1422        assert_eq!(consumer.try_dequeue(), Err(TryDequeueError::Empty));
1423
1424        q.enqueue(entry(1));
1425
1426        assert_eq!(q.try_dequeue(), Err(TryDequeueError::Busy));
1427
1428        assert_eq!(consumer.try_dequeue(), Ok(entry(1)),);
1429
1430        assert_eq!(q.try_dequeue(), Err(TryDequeueError::Busy));
1431
1432        assert_eq!(consumer.try_dequeue(), Err(TryDequeueError::Empty));
1433
1434        drop(consumer);
1435        assert_eq!(q.try_dequeue(), Err(TryDequeueError::Empty));
1436    }
1437
1438    #[test]
1439    fn enqueue_dequeue() {
1440        let stub = entry(666);
1441        let e = entry(1);
1442        let q = MpscQueue::<Entry>::new_with_stub(stub);
1443        q.enqueue(e);
1444        assert_eq!(q.dequeue(), Some(entry(1)));
1445        assert_eq!(q.dequeue(), None)
1446    }
1447
1448    #[test]
1449    fn basically_works() {
1450        let stub = entry(666);
1451        let q = MpscQueue::<Entry>::new_with_stub(stub);
1452
1453        let q = Arc::new(q);
1454        test_basically_works(q);
1455    }
1456
1457    #[test]
1458    fn basically_works_all_const() {
1459        static STUB_ENTRY: Entry = const_stub_entry(666);
1460        static MPSC: MpscQueue<Entry> =
1461            unsafe { MpscQueue::<Entry>::new_with_static_stub(&STUB_ENTRY) };
1462        test_basically_works(&MPSC);
1463    }
1464
1465    #[test]
1466    fn basically_works_mixed_const() {
1467        static STUB_ENTRY: Entry = const_stub_entry(666);
1468        let q = unsafe { MpscQueue::<Entry>::new_with_static_stub(&STUB_ENTRY) };
1469
1470        let q = Arc::new(q);
1471        test_basically_works(q)
1472    }
1473
1474    fn test_basically_works<Q>(q: Q)
1475    where
1476        Q: Deref<Target = MpscQueue<Entry>> + Clone,
1477        Q: Send + 'static,
1478    {
1479        const THREADS: i32 = if_miri(3, 8);
1480        const MSGS: i32 = if_miri(10, 1000);
1481
1482        assert_eq!(q.dequeue(), None);
1483
1484        let threads: Vec<_> = (0..THREADS)
1485            .map(|thread| {
1486                let q = q.clone();
1487                thread::spawn(move || {
1488                    for i in 0..MSGS {
1489                        q.enqueue(entry(i));
1490                        println!("thread {thread}; msg {i}/{MSGS}");
1491                    }
1492                })
1493            })
1494            .collect();
1495
1496        let mut i = 0;
1497        while i < THREADS * MSGS {
1498            match q.try_dequeue() {
1499                Ok(msg) => {
1500                    i += 1;
1501                    println!("recv {msg:?} ({i}/{})", THREADS * MSGS);
1502                }
1503                Err(TryDequeueError::Busy) => {
1504                    panic!("the queue should never be busy, as there is only one consumer")
1505                }
1506                Err(e) => {
1507                    println!("recv error {e:?}");
1508                    thread::yield_now();
1509                }
1510            }
1511        }
1512
1513        for thread in threads {
1514            thread.join().unwrap();
1515        }
1516    }
1517
1518    const fn if_miri(miri: i32, not_miri: i32) -> i32 {
1519        if cfg!(miri) {
1520            miri
1521        } else {
1522            not_miri
1523        }
1524    }
1525}
1526
1527#[cfg(test)]
1528mod test_util {
1529    use super::*;
1530    use crate::loom::alloc;
1531    pub use std::{boxed::Box, pin::Pin, ptr, vec::Vec};
1532
1533    pub(super) struct Entry {
1534        links: Links<Entry>,
1535        pub(super) val: i32,
1536        // participate in loom leak checking
1537        _track: alloc::Track<()>,
1538    }
1539
1540    impl std::cmp::PartialEq for Entry {
1541        fn eq(&self, other: &Self) -> bool {
1542            self.val == other.val
1543        }
1544    }
1545
1546    unsafe impl Linked<Links<Self>> for Entry {
1547        type Handle = Pin<Box<Entry>>;
1548
1549        fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
1550            unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
1551        }
1552
1553        unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
1554            // Safety: if this function is only called by the linked list
1555            // implementation (and it is not intended for external use), we can
1556            // expect that the `NonNull` was constructed from a reference which
1557            // was pinned.
1558            //
1559            // If other callers besides `List`'s internals were to call this on
1560            // some random `NonNull<Entry>`, this would not be the case, and
1561            // this could be constructing an erroneous `Pin` from a referent
1562            // that may not be pinned!
1563            Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
1564        }
1565
1566        unsafe fn links(target: NonNull<Entry>) -> NonNull<Links<Entry>> {
1567            let links = ptr::addr_of_mut!((*target.as_ptr()).links);
1568            NonNull::new_unchecked(links)
1569        }
1570    }
1571
1572    impl fmt::Debug for Entry {
1573        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1574            let Self { links, val, _track } = self;
1575            f.debug_struct("Entry")
1576                .field("links", links)
1577                .field("val", val)
1578                .field("_track", _track)
1579                .finish()
1580        }
1581    }
1582
1583    #[cfg(not(loom))]
1584    pub(super) const fn const_stub_entry(val: i32) -> Entry {
1585        Entry {
1586            links: Links::new_stub(),
1587            val,
1588            _track: alloc::Track::new_const(()),
1589        }
1590    }
1591
1592    pub(super) fn entry(val: i32) -> Pin<Box<Entry>> {
1593        Box::pin(Entry {
1594            links: Links::new(),
1595            val,
1596            _track: alloc::Track::new(()),
1597        })
1598    }
1599}