cordyceps/mpsc_queue.rs
1//! A multi-producer, single-consumer (MPSC) queue, implemented using a
2//! lock-free [intrusive] singly-linked list.
3//!
4//! See the documentation for the [`MpscQueue`] type for details.
5//!
6//! Based on [Dmitry Vyukov's intrusive MPSC][vyukov].
7//!
8//! [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
9//! [intrusive]: crate#intrusive-data-structures
10use crate::{
11 loom::{
12 cell::UnsafeCell,
13 sync::atomic::{AtomicBool, AtomicPtr, Ordering::*},
14 },
15 util::{Backoff, CachePadded},
16 Linked,
17};
18use core::{
19 fmt,
20 marker::PhantomPinned,
21 ptr::{self, NonNull},
22};
23
24macro_rules! feature {
25 (
26 #![$meta:meta]
27 $($item:item)*
28 ) => {
29 $(
30 #[cfg($meta)]
31 $item
32 )*
33 }
34}
35
36/// A multi-producer, single-consumer (MPSC) queue, implemented using a
37/// lock-free [intrusive] singly-linked list.
38///
39/// Based on [Dmitry Vyukov's intrusive MPSC][vyukov].
40///
41/// In order to be part of a `MpscQueue`, a type `T` must implement [`Linked`] for
42/// [`mpsc_queue::Links<T>`].
43///
44/// [`mpsc_queue::Links<T>`]: crate::mpsc_queue::Links
45///
46/// # Examples
47///
48/// ```
49/// use cordyceps::{
50/// Linked,
51/// mpsc_queue::{self, MpscQueue},
52/// };
53///
54/// // This example uses the Rust standard library for convenience, but
55/// // the MPSC queue itself does not require std.
56/// use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
57///
58/// /// A simple queue entry that stores an `i32`.
59/// #[derive(Debug, Default)]
60/// struct Entry {
61/// links: mpsc_queue::Links<Entry>,
62/// val: i32,
63/// }
64///
65/// // Implement the `Linked` trait for our entry type so that it can be used
66/// // as a queue entry.
67/// unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
68/// // In this example, our entries will be "owned" by a `Box`, but any
69/// // heap-allocated type that owns an element may be used.
70/// //
71/// // An element *must not* move while part of an intrusive data
72/// // structure. In many cases, `Pin` may be used to enforce this.
73/// type Handle = Pin<Box<Self>>;
74///
75/// /// Convert an owned `Handle` into a raw pointer
76/// fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
77/// unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
78/// }
79///
80/// /// Convert a raw pointer back into an owned `Handle`.
81/// unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
82/// // Safety: if this function is only called by the linked list
83/// // implementation (and it is not intended for external use), we can
84/// // expect that the `NonNull` was constructed from a reference which
85/// // was pinned.
86/// //
87/// // If other callers besides `MpscQueue`'s internals were to call this on
88/// // some random `NonNull<Entry>`, this would not be the case, and
89/// // this could be constructing an erroneous `Pin` from a referent
90/// // that may not be pinned!
91/// Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
92/// }
93///
94/// /// Access an element's `Links`.
95/// unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
96/// // Using `ptr::addr_of_mut!` permits us to avoid creating a temporary
97/// // reference without using layout-dependent casts.
98/// let links = ptr::addr_of_mut!((*target.as_ptr()).links);
99///
100/// // `NonNull::new_unchecked` is safe to use here, because the pointer that
101/// // we offset was not null, implying that the pointer produced by offsetting
102/// // it will also not be null.
103/// NonNull::new_unchecked(links)
104/// }
105/// }
106///
107/// impl Entry {
108/// fn new(val: i32) -> Self {
109/// Self {
110/// val,
111/// ..Self::default()
112/// }
113/// }
114/// }
115///
116/// // Once we have a `Linked` implementation for our element type, we can construct
117/// // a queue.
118///
119/// // Because `Pin<Box<...>>` doesn't have a `Default` impl, we have to manually
120/// // construct the stub node.
121/// let stub = Box::pin(Entry::default());
122/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
123///
124/// // Spawn some producer threads.
125/// thread::spawn({
126/// let q = q.clone();
127/// move || {
128/// // Enqueuing elements does not require waiting, and is not fallible.
129/// q.enqueue(Box::pin(Entry::new(1)));
130/// q.enqueue(Box::pin(Entry::new(2)));
131/// }
132/// });
133///
134/// thread::spawn({
135/// let q = q.clone();
136/// move || {
137/// q.enqueue(Box::pin(Entry::new(3)));
138/// q.enqueue(Box::pin(Entry::new(4)));
139/// }
140/// });
141///
142///
143/// // Dequeue elements until the producer threads have terminated.
144/// let mut seen = Vec::new();
145/// loop {
146/// // Make sure we run at least once, in case the producer is already done.
147/// let done = Arc::strong_count(&q) == 1;
148///
149/// // Dequeue until the queue is empty.
150/// while let Some(entry) = q.dequeue() {
151/// seen.push(entry.as_ref().val);
152/// }
153///
154/// // If there are still producers, we may continue dequeuing.
155/// if done {
156/// break;
157/// }
158///
159/// thread::yield_now();
160/// }
161///
162/// // The elements may not have been received in order, so sort the
163/// // received values before making assertions about them.
164/// &mut seen[..].sort();
165///
166/// assert_eq!(&[1, 2, 3, 4], &seen[..]);
167/// ```
168///
169/// The [`Consumer`] type may be used to reserve the permission to consume
170/// multiple elements at a time:
171///
172/// ```
173/// # use cordyceps::{
174/// # Linked,
175/// # mpsc_queue::{self, MpscQueue},
176/// # };
177/// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
178/// #
179/// # #[derive(Debug, Default)]
180/// # struct Entry {
181/// # links: mpsc_queue::Links<Entry>,
182/// # val: i32,
183/// # }
184/// #
185/// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
186/// # type Handle = Pin<Box<Self>>;
187/// #
188/// # fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
189/// # unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
190/// # }
191/// #
192/// # unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
193/// # Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
194/// # }
195/// #
196/// # unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
197/// # let links = ptr::addr_of_mut!((*target.as_ptr()).links);
198/// # NonNull::new_unchecked(links)
199/// # }
200/// # }
201/// #
202/// # impl Entry {
203/// # fn new(val: i32) -> Self {
204/// # Self {
205/// # val,
206/// # ..Self::default()
207/// # }
208/// # }
209/// # }
210/// let stub = Box::pin(Entry::default());
211/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
212///
213/// thread::spawn({
214/// let q = q.clone();
215/// move || {
216/// q.enqueue(Box::pin(Entry::new(1)));
217/// q.enqueue(Box::pin(Entry::new(2)));
218/// }
219/// });
220///
221/// // Reserve exclusive permission to consume elements
222/// let consumer = q.consume();
223///
224/// let mut seen = Vec::new();
225/// loop {
226/// // Make sure we run at least once, in case the producer is already done.
227/// let done = Arc::strong_count(&q) == 1;
228///
229/// // Dequeue until the queue is empty.
230/// while let Some(entry) = consumer.dequeue() {
231/// seen.push(entry.as_ref().val);
232/// }
233///
234/// if done {
235/// break;
236/// }
237/// thread::yield_now();
238/// }
239///
240/// assert_eq!(&[1, 2], &seen[..]);
241/// ```
242///
243/// The [`Consumer`] type also implements [`Iterator`]:
244///
245/// ```
246/// # use cordyceps::{
247/// # Linked,
248/// # mpsc_queue::{self, MpscQueue},
249/// # };
250/// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
251/// #
252/// # #[derive(Debug, Default)]
253/// # struct Entry {
254/// # links: mpsc_queue::Links<Entry>,
255/// # val: i32,
256/// # }
257/// #
258/// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
259/// # type Handle = Pin<Box<Self>>;
260/// #
261/// # fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
262/// # unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
263/// # }
264/// #
265/// # unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
266/// # Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
267/// # }
268/// #
269/// # unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
270/// # let links = ptr::addr_of_mut!((*target.as_ptr()).links);
271/// # NonNull::new_unchecked(links)
272/// # }
273/// # }
274/// #
275/// # impl Entry {
276/// # fn new(val: i32) -> Self {
277/// # Self {
278/// # val,
279/// # ..Self::default()
280/// # }
281/// # }
282/// # }
283/// let stub = Box::pin(Entry::default());
284/// let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
285///
286/// thread::spawn({
287/// let q = q.clone();
288/// move || {
289/// for i in 1..5 {
290/// q.enqueue(Box::pin(Entry::new(i)));
291/// }
292/// }
293/// });
294///
295/// thread::spawn({
296/// let q = q.clone();
297/// move || {
298/// for i in 5..=10 {
299/// q.enqueue(Box::pin(Entry::new(i)));
300/// }
301/// }
302/// });
303///
304/// let mut seen = Vec::new();
305/// loop {
306/// // Make sure we run at least once, in case the producer is already done.
307/// let done = Arc::strong_count(&q) == 1;
308///
309/// // Append any elements currently in the queue to the `Vec`
310/// seen.extend(q.consume().map(|entry| entry.as_ref().val));
311///
312/// if done {
313/// break;
314/// }
315///
316/// thread::yield_now();
317/// }
318///
319/// &mut seen[..].sort();
320/// assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &seen[..]);
321/// ```
322///
323/// # Implementation Details
324///
325/// This queue design is conceptually very simple, and has *extremely* fast and
326/// wait-free producers (the [`enqueue`] operation). Enqueuing an element always
327/// performs exactly one atomic swap and one atomic store, so producers need
328/// never wait.
329///
330/// The consumer (the [`dequeue`]) is *typically* wait-free in the common case,
331/// but must occasionally wait when the queue is in an inconsistent state.
332///
333/// ## Inconsistent States
334///
335/// As discussed in the [algorithm description on 1024cores.net][vyukov], it
336/// is possible for this queue design to enter an inconsistent state if the
337/// consumer tries to dequeue an element while a producer is in the middle
338/// of enqueueing a new element. This occurs when a producer is between the
339/// atomic swap with the `head` of the queue and the atomic store that sets the
340/// `next` pointer of the previous `head` element. When the queue is in an
341/// inconsistent state, the consumer must briefly wait before dequeueing an
342/// element.
343///
344/// The consumer's behavior in the inconsistent state depends on which API
345/// method is used. The [`MpscQueue::dequeue`] and [`Consumer::dequeue`] methods
346/// will wait by spinning (with an exponential backoff) when the queue is
347/// inconsistent. Alternatively, the [`MpscQueue::try_dequeue`] and
348/// [`Consumer::try_dequeue`] methods will instead return [an error] when the
349/// queue is in an inconsistent state.
350///
351/// [intrusive]: crate#intrusive-data-structures
352/// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
353/// [`dequeue`]: Self::dequeue
354/// [`enqueue`]: Self::enqueue
355/// [an error]: TryDequeueError
356pub struct MpscQueue<T: Linked<Links<T>>> {
357 /// The head of the queue. This is accessed in both `enqueue` and `dequeue`.
358 head: CachePadded<AtomicPtr<T>>,
359
360 /// The tail of the queue. This is accessed only when dequeueing.
361 tail: CachePadded<UnsafeCell<*mut T>>,
362
363 /// Does a consumer handle to the queue exist? If not, it is safe to create a
364 /// new consumer.
365 has_consumer: CachePadded<AtomicBool>,
366
367 /// If the stub node is in a `static`, we cannot drop it when the
368 /// queue is dropped.
369 stub_is_static: bool,
370
371 stub: NonNull<T>,
372}
373
374/// A handle that holds the right to dequeue elements from a [`MpscQueue`].
375///
376/// This can be used when one thread wishes to dequeue many elements at a time,
377/// to avoid the overhead of ensuring mutual exclusion on every [`dequeue`] or
378/// [`try_dequeue`] call.
379///
380/// This type is returned by the [`MpscQueue::consume`] and [`MpscQueue::try_consume`]
381/// methods.
382///
383/// If the right to dequeue elements needs to be reserved for longer than a
384/// single scope, an owned variant ([`OwnedConsumer`]) is also available, when
385/// the [`MpscQueue`] is stored in an [`Arc`]. Since the [`MpscQueue`] must be stored
386/// in an [`Arc`], the [`OwnedConsumer`] type requires the "alloc" feature flag.
387///
388/// [`Arc`]: alloc::sync::Arc
389/// [`dequeue`]: Consumer::dequeue
390/// [`try_dequeue`]: Consumer::try_dequeue
391pub struct Consumer<'q, T: Linked<Links<T>>> {
392 q: &'q MpscQueue<T>,
393}
394
395/// Links to other nodes in a [`MpscQueue`].
396///
397/// In order to be part of a [`MpscQueue`], a type must contain an instance of this
398/// type, and must implement the [`Linked`] trait for `Links<Self>`.
399pub struct Links<T> {
400 /// The next node in the queue.
401 next: AtomicPtr<T>,
402
403 /// Is this the stub node?
404 ///
405 /// Used for debug mode consistency checking only.
406 #[cfg(debug_assertions)]
407 is_stub: AtomicBool,
408
409 /// Linked list links must always be `!Unpin`, in order to ensure that they
410 /// never recieve LLVM `noalias` annotations; see also
411 /// <https://github.com/rust-lang/rust/issues/63818>.
412 _unpin: PhantomPinned,
413}
414
415/// Errors returned by [`MpscQueue::try_dequeue`] and [`Consumer::try_dequeue`].
416#[derive(Debug, Eq, PartialEq)]
417pub enum TryDequeueError {
418 /// No element was dequeued because the queue was empty.
419 Empty,
420
421 /// The queue is currently in an [inconsistent state].
422 ///
423 /// Since inconsistent states are very short-lived, the caller may want to
424 /// try dequeueing a second time.
425 ///
426 /// [inconsistent state]: MpscQueue#inconsistent-states
427 Inconsistent,
428
429 /// Another thread is currently calling [`MpscQueue::try_dequeue`] or
430 /// [`MpscQueue::dequeue`], or owns a [`Consumer`] or [`OwnedConsumer`] handle.
431 ///
432 /// This is a multi-producer, *single-consumer* queue, so only a single
433 /// thread may dequeue elements at any given time.
434 Busy,
435}
436
437// === impl Queue ===
438
439impl<T: Linked<Links<T>>> MpscQueue<T> {
440 /// Returns a new `MpscQueue`.
441 ///
442 /// The [`Default`] implementation for `T::Handle` is used to produce a new
443 /// node used as the list's stub.
444 #[must_use]
445 pub fn new() -> Self
446 where
447 T::Handle: Default,
448 {
449 Self::new_with_stub(Default::default())
450 }
451
452 /// Returns a new `MpscQueue` with the provided stub node.
453 ///
454 /// If a `MpscQueue` must be constructed in a `const` context, such as a
455 /// `static` initializer, see [`MpscQueue::new_with_static_stub`].
456 #[must_use]
457 pub fn new_with_stub(stub: T::Handle) -> Self {
458 let stub = T::into_ptr(stub);
459
460 // In debug mode, set the stub flag for consistency checking.
461 #[cfg(debug_assertions)]
462 unsafe {
463 links(stub).is_stub.store(true, Release);
464 }
465 let ptr = stub.as_ptr();
466
467 Self {
468 head: CachePadded(AtomicPtr::new(ptr)),
469 tail: CachePadded(UnsafeCell::new(ptr)),
470 has_consumer: CachePadded(AtomicBool::new(false)),
471 stub_is_static: false,
472 stub,
473 }
474 }
475
476 /// Returns a new `MpscQueue` with a static "stub" entity
477 ///
478 /// This is primarily used for creating an `MpscQueue` as a `static` variable.
479 ///
480 /// # Usage notes
481 ///
482 /// Unlike [`MpscQueue::new`] or [`MpscQueue::new_with_stub`], the `stub`
483 /// item will NOT be dropped when the `MpscQueue` is dropped. This is fine
484 /// if you are ALSO statically creating the `stub`. However, if it is
485 /// necessary to recover that memory after the `MpscQueue` has been dropped,
486 /// that will need to be done by the user manually.
487 ///
488 /// # Safety
489 ///
490 /// The `stub` provided must ONLY EVER be used for a single `MpscQueue`
491 /// instance. Re-using the stub for multiple queues may lead to undefined
492 /// behavior.
493 ///
494 /// ## Example usage
495 ///
496 /// ```rust
497 /// # use cordyceps::{
498 /// # Linked,
499 /// # mpsc_queue::{self, MpscQueue},
500 /// # };
501 /// # use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
502 /// #
503 /// #
504 ///
505 /// // This is our same `Entry` from the parent examples. It has implemented
506 /// // the `Links` trait as above.
507 /// #[derive(Debug, Default)]
508 /// struct Entry {
509 /// links: mpsc_queue::Links<Entry>,
510 /// val: i32,
511 /// }
512 ///
513 /// #
514 /// # unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
515 /// # type Handle = Pin<Box<Self>>;
516 /// #
517 /// # fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
518 /// # unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
519 /// # }
520 /// #
521 /// # unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
522 /// # Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
523 /// # }
524 /// #
525 /// # unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
526 /// # let links = ptr::addr_of_mut!((*target.as_ptr()).links);
527 /// # NonNull::new_unchecked(links)
528 /// # }
529 /// # }
530 /// #
531 /// # impl Entry {
532 /// # fn new(val: i32) -> Self {
533 /// # Self {
534 /// # val,
535 /// # ..Self::default()
536 /// # }
537 /// # }
538 /// # }
539 ///
540 ///
541 /// static MPSC: MpscQueue<Entry> = {
542 /// static STUB_ENTRY: Entry = Entry {
543 /// links: mpsc_queue::Links::<Entry>::new_stub(),
544 /// val: 0
545 /// };
546 ///
547 /// // SAFETY: The stub may not be used by another MPSC queue.
548 /// // Here, this is ensured because the `STUB_ENTRY` static is defined
549 /// // inside of the initializer for the `MPSC` static, so it cannot be referenced
550 /// // elsewhere.
551 /// unsafe { MpscQueue::new_with_static_stub(&STUB_ENTRY) }
552 /// };
553 /// ```
554 ///
555 #[cfg(not(loom))]
556 #[must_use]
557 pub const unsafe fn new_with_static_stub(stub: &'static T) -> Self {
558 let ptr = stub as *const T as *mut T;
559 Self {
560 head: CachePadded(AtomicPtr::new(ptr)),
561 tail: CachePadded(UnsafeCell::new(ptr)),
562 has_consumer: CachePadded(AtomicBool::new(false)),
563 stub_is_static: true,
564 stub: NonNull::new_unchecked(ptr),
565 }
566 }
567
568 /// Enqueue a new element at the end of the queue.
569 ///
570 /// This takes ownership of a [`Handle`] that owns the element, and
571 /// (conceptually) assigns ownership of the element to the queue while it
572 /// remains enqueued.
573 ///
574 /// This method will never wait.
575 ///
576 /// [`Handle`]: crate::Linked::Handle
577 pub fn enqueue(&self, element: T::Handle) {
578 let ptr = T::into_ptr(element);
579
580 #[cfg(debug_assertions)]
581 debug_assert!(!unsafe { T::links(ptr).as_ref() }.is_stub());
582
583 self.enqueue_inner(ptr)
584 }
585
586 #[inline]
587 fn enqueue_inner(&self, ptr: NonNull<T>) {
588 unsafe { links(ptr).next.store(ptr::null_mut(), Relaxed) };
589
590 let ptr = ptr.as_ptr();
591 let prev = self.head.swap(ptr, AcqRel);
592 unsafe {
593 // Safety: in release mode, we don't null check `prev`. This is
594 // because no pointer in the list should ever be a null pointer, due
595 // to the presence of the stub node.
596 links(non_null(prev)).next.store(ptr, Release);
597 }
598 }
599
600 /// Try to dequeue an element from the queue, without waiting if the queue
601 /// is in an [inconsistent state], or until there is no other consumer trying
602 /// to read from the queue.
603 ///
604 /// Because this is a multi-producer, _single-consumer_ queue,
605 /// only one thread may be dequeueing at a time. If another thread is
606 /// dequeueing, this method returns [`TryDequeueError::Busy`].
607 ///
608 /// The [`MpscQueue::dequeue`] method will instead wait (by spinning with an
609 /// exponential backoff) when the queue is in an inconsistent state or busy.
610 ///
611 /// The unsafe [`MpscQueue::try_dequeue_unchecked`] method will not check if the
612 /// queue is busy before dequeueing an element. This can be used when the
613 /// user code guarantees that no other threads will dequeue from the queue
614 /// concurrently, but this cannot be enforced by the compiler.
615 ///
616 /// This method will never wait.
617 ///
618 /// # Returns
619 ///
620 /// - `Ok`([`T::Handle`]`)` if an element was successfully dequeued
621 /// - `Err(`[`TryDequeueError::Empty`]`)` if there are no elements in the queue
622 /// - `Err(`[`TryDequeueError::Inconsistent`]`)` if the queue is currently in an
623 /// inconsistent state
624 /// - `Err(`[`TryDequeueError::Busy`]`)` if another thread is currently trying to
625 /// dequeue a message.
626 ///
627 /// [inconsistent state]: Self#inconsistent-states
628 /// [`T::Handle`]: crate::Linked::Handle
629 pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
630 if self
631 .has_consumer
632 .compare_exchange(false, true, AcqRel, Acquire)
633 .is_err()
634 {
635 return Err(TryDequeueError::Busy);
636 }
637
638 let res = unsafe {
639 // Safety: the `has_consumer` flag ensures mutual exclusion of
640 // consumers.
641 self.try_dequeue_unchecked()
642 };
643
644 self.has_consumer.store(false, Release);
645 res
646 }
647
648 /// Dequeue an element from the queue.
649 ///
650 /// This method will wait by spinning with an exponential backoff if the
651 /// queue is in an [inconsistent state].
652 ///
653 /// Additionally, because this is a multi-producer, _single-consumer_ queue,
654 /// only one thread may be dequeueing at a time. If another thread is
655 /// dequeueing, this method will spin until the queue is no longer busy.
656 ///
657 /// The [`MpscQueue::try_dequeue`] will return an error rather than waiting when
658 /// the queue is in an inconsistent state or busy.
659 ///
660 /// The unsafe [`MpscQueue::dequeue_unchecked`] method will not check if the
661 /// queue is busy before dequeueing an element. This can be used when the
662 /// user code guarantees that no other threads will dequeue from the queue
663 /// concurrently, but this cannot be enforced by the compiler.
664 ///
665 /// # Returns
666 ///
667 /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
668 /// - `None` if the queue is empty or another thread is dequeueing
669 ///
670 /// [inconsistent state]: Self#inconsistent-states
671 /// [`T::Handle`]: crate::Linked::Handle
672 pub fn dequeue(&self) -> Option<T::Handle> {
673 let mut boff = Backoff::new();
674 loop {
675 match self.try_dequeue() {
676 Ok(val) => return Some(val),
677 Err(TryDequeueError::Empty) => return None,
678 Err(_) => boff.spin(),
679 }
680 }
681 }
682
683 /// Returns a [`Consumer`] handle that reserves the exclusive right to dequeue
684 /// elements from the queue until it is dropped.
685 ///
686 /// If another thread is dequeueing, this method spins until there is no
687 /// other thread dequeueing.
688 pub fn consume(&self) -> Consumer<'_, T> {
689 self.lock_consumer();
690 Consumer { q: self }
691 }
692
693 /// Attempts to reserve a [`Consumer`] handle that holds the exclusive right
694 /// to dequeue elements from the queue until it is dropped.
695 ///
696 /// If another thread is dequeueing, this returns `None` instead.
697 pub fn try_consume(&self) -> Option<Consumer<'_, T>> {
698 // lock the consumer-side of the queue.
699 self.try_lock_consumer().map(|_| Consumer { q: self })
700 }
701
702 /// Try to dequeue an element from the queue, without waiting if the queue
703 /// is in an inconsistent state, and without checking if another consumer
704 /// exists.
705 ///
706 /// This method returns [`TryDequeueError::Inconsistent`] when the queue is
707 /// in an [inconsistent state].
708 ///
709 /// The [`MpscQueue::dequeue_unchecked`] method will instead wait (by
710 /// spinning with an exponential backoff) when the queue is in an
711 /// inconsistent state.
712 ///
713 /// This method will never wait.
714 ///
715 /// # Returns
716 ///
717 /// - `Ok`([`T::Handle`]`)` if an element was successfully dequeued
718 /// - `Err(`[`TryDequeueError::Empty`]`)` if there are no elements in the queue
719 /// - `Err(`[`TryDequeueError::Inconsistent`]`)` if the queue is currently in an
720 /// inconsistent state
721 ///
722 /// This method will **never** return [`TryDequeueError::Busy`].
723 ///
724 /// # Safety
725 ///
726 /// This is a multi-producer, *single-consumer* queue. Only one thread/core
727 /// may call `try_dequeue_unchecked` at a time!
728 ///
729 /// [inconsistent state]: Self#inconsistent-states
730 /// [`T::Handle`]: crate::Linked::Handle
731 pub unsafe fn try_dequeue_unchecked(&self) -> Result<T::Handle, TryDequeueError> {
732 self.tail.with_mut(|tail| {
733 let mut tail_node = NonNull::new(*tail).ok_or(TryDequeueError::Empty)?;
734 let mut next = links(tail_node).next.load(Acquire);
735
736 if tail_node == self.stub {
737 #[cfg(debug_assertions)]
738 debug_assert!(links(tail_node).is_stub());
739 let next_node = NonNull::new(next).ok_or(TryDequeueError::Empty)?;
740
741 *tail = next;
742 tail_node = next_node;
743 next = links(next_node).next.load(Acquire);
744 }
745
746 if !next.is_null() {
747 *tail = next;
748 return Ok(T::from_ptr(tail_node));
749 }
750
751 let head = self.head.load(Acquire);
752
753 if tail_node.as_ptr() != head {
754 return Err(TryDequeueError::Inconsistent);
755 }
756
757 self.enqueue_inner(self.stub);
758
759 next = links(tail_node).next.load(Acquire);
760 if next.is_null() {
761 return Err(TryDequeueError::Empty);
762 }
763
764 *tail = next;
765
766 #[cfg(debug_assertions)]
767 debug_assert!(!links(tail_node).is_stub());
768
769 Ok(T::from_ptr(tail_node))
770 })
771 }
772
773 /// Dequeue an element from the queue, without checking whether another
774 /// consumer exists.
775 ///
776 /// This method will wait by spinning with an exponential backoff if the
777 /// queue is in an [inconsistent state].
778 ///
779 /// The [`MpscQueue::try_dequeue`] will return an error rather than waiting
780 /// when the queue is in an inconsistent state.
781 ///
782 /// # Returns
783 ///
784 /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
785 /// - `None` if the queue is empty
786 ///
787 /// # Safety
788 ///
789 /// This is a multi-producer, *single-consumer* queue. Only one thread/core
790 /// may call `dequeue` at a time!
791 ///
792 /// [inconsistent state]: Self#inconsistent-states
793 /// [`T::Handle`]: crate::Linked::Handle
794 pub unsafe fn dequeue_unchecked(&self) -> Option<T::Handle> {
795 let mut boff = Backoff::new();
796 loop {
797 match self.try_dequeue_unchecked() {
798 Ok(val) => return Some(val),
799 Err(TryDequeueError::Empty) => return None,
800 Err(TryDequeueError::Inconsistent) => boff.spin(),
801 Err(TryDequeueError::Busy) => {
802 unreachable!("try_dequeue_unchecked never returns `Busy`!")
803 }
804 }
805 }
806 }
807
808 #[inline]
809 fn lock_consumer(&self) {
810 let mut boff = Backoff::new();
811 while self
812 .has_consumer
813 .compare_exchange(false, true, AcqRel, Acquire)
814 .is_err()
815 {
816 while self.has_consumer.load(Relaxed) {
817 boff.spin();
818 }
819 }
820 }
821
822 #[inline]
823 fn try_lock_consumer(&self) -> Option<()> {
824 self.has_consumer
825 .compare_exchange(false, true, AcqRel, Acquire)
826 .map(|_| ())
827 .ok()
828 }
829}
830
831impl<T: Linked<Links<T>>> Drop for MpscQueue<T> {
832 fn drop(&mut self) {
833 let mut current = self.tail.with_mut(|tail| unsafe {
834 // Safety: because `Drop` is called with `&mut self`, we have
835 // exclusive ownership over the queue, so it's always okay to touch
836 // the tail cell.
837 *tail
838 });
839 while let Some(node) = NonNull::new(current) {
840 unsafe {
841 let links = links(node);
842 let next = links.next.load(Relaxed);
843
844 // Skip dropping the stub node; it is owned by the queue and
845 // will be dropped when the queue is dropped. If we dropped it
846 // here, that would cause a double free!
847 if node != self.stub {
848 // Convert the pointer to the owning handle and drop it.
849 #[cfg(debug_assertions)]
850 debug_assert!(!links.is_stub(), "stub: {:p}, node: {node:p}", self.stub);
851 drop(T::from_ptr(node));
852 } else {
853 #[cfg(debug_assertions)]
854 debug_assert!(links.is_stub());
855 }
856
857 current = next;
858 }
859 }
860
861 unsafe {
862 // If the stub is static, don't drop it. It lives 5eva
863 // (that's one more than 4eva)
864 if !self.stub_is_static {
865 drop(T::from_ptr(self.stub));
866 }
867 }
868 }
869}
870
871impl<T> fmt::Debug for MpscQueue<T>
872where
873 T: Linked<Links<T>>,
874{
875 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
876 let Self {
877 head,
878 tail: _,
879 has_consumer,
880 stub,
881 stub_is_static,
882 } = self;
883 f.debug_struct("MpscQueue")
884 .field("head", &format_args!("{:p}", head.load(Acquire)))
885 // only the consumer can load the tail; trying to print it here
886 // could be racy.
887 // XXX(eliza): we could replace the `UnsafeCell` with an atomic,
888 // and then it would be okay to print the tail...but then we would
889 // lose loom checking for tail accesses...
890 .field("tail", &format_args!("..."))
891 .field("has_consumer", &has_consumer.load(Acquire))
892 .field("stub", stub)
893 .field("stub_is_static", stub_is_static)
894 .finish()
895 }
896}
897
898impl<T> Default for MpscQueue<T>
899where
900 T: Linked<Links<T>>,
901 T::Handle: Default,
902{
903 fn default() -> Self {
904 Self::new()
905 }
906}
907
908unsafe impl<T> Send for MpscQueue<T>
909where
910 T: Send + Linked<Links<T>>,
911 T::Handle: Send,
912{
913}
914unsafe impl<T: Send + Linked<Links<T>>> Sync for MpscQueue<T> {}
915
916// === impl Consumer ===
917
918impl<T: Send + Linked<Links<T>>> Consumer<'_, T> {
919 /// Dequeue an element from the queue.
920 ///
921 /// This method will wait by spinning with an exponential backoff if the
922 /// queue is in an [inconsistent state].
923 ///
924 /// The [`Consumer::try_dequeue`] will return an error rather than waiting when
925 /// the queue is in an inconsistent state.
926 ///
927 /// # Returns
928 ///
929 /// - `Some(`[`T::Handle`]`)` if an element was successfully dequeued
930 /// - `None` if the queue is empty
931 ///
932 /// [inconsistent state]: Self#inconsistent-states
933 /// [`T::Handle`]: crate::Linked::Handle
934 #[inline]
935 pub fn dequeue(&self) -> Option<T::Handle> {
936 debug_assert!(self.q.has_consumer.load(Acquire));
937 unsafe {
938 // Safety: we have reserved exclusive access to the queue.
939 self.q.dequeue_unchecked()
940 }
941 }
942
943 /// Try to dequeue an element from the queue, without waiting if the queue
944 /// is in an inconsistent state.
945 ///
946 /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
947 /// is possible for this queue design to enter an inconsistent state if the
948 /// consumer tries to dequeue an element while a producer is in the middle
949 /// of enqueueing a new element. If this occurs, the consumer must briefly
950 /// wait before dequeueing an element. This method returns
951 /// [`TryDequeueError::Inconsistent`] when the queue is in an inconsistent
952 /// state.
953 ///
954 /// The [`Consumer::dequeue`] method will instead wait (by spinning with an
955 /// exponential backoff) when the queue is in an inconsistent state.
956 ///
957 /// # Returns
958 ///
959 /// - `T::Handle` if an element was successfully dequeued
960 /// - [`TryDequeueError::Empty`] if there are no elements in the queue
961 /// - [`TryDequeueError::Inconsistent`] if the queue is currently in an
962 /// inconsistent state
963 ///
964 ///
965 /// # Returns
966 ///
967 /// - `Some(T::Handle)` if an element was successfully dequeued
968 /// - `None` if the queue is empty
969 ///
970 /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
971 #[inline]
972 pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
973 debug_assert!(self.q.has_consumer.load(Acquire));
974 unsafe {
975 // Safety: we have reserved exclusive access to the queue.
976 self.q.try_dequeue_unchecked()
977 }
978 }
979}
980
981impl<T: Linked<Links<T>>> Drop for Consumer<'_, T> {
982 fn drop(&mut self) {
983 self.q.has_consumer.store(false, Release);
984 }
985}
986
987impl<T> fmt::Debug for Consumer<'_, T>
988where
989 T: Linked<Links<T>>,
990{
991 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
992 let Self { q } = self;
993 let tail = q.tail.with(|tail| unsafe {
994 // Safety: it's okay for the consumer to access the tail cell, since
995 // we have exclusive access to it.
996 *tail
997 });
998 f.debug_struct("Consumer")
999 .field("q", &q)
1000 .field("tail", &tail)
1001 .finish()
1002 }
1003}
1004
1005impl<T> Iterator for Consumer<'_, T>
1006where
1007 T: Send + Linked<Links<T>>,
1008{
1009 type Item = T::Handle;
1010
1011 fn next(&mut self) -> Option<Self::Item> {
1012 self.dequeue()
1013 }
1014}
1015
1016// === impl Links ===
1017
1018impl<T> Links<T> {
1019 /// Returns a new set of `Links` for a [`MpscQueue`].
1020 #[cfg(not(loom))]
1021 #[must_use]
1022 pub const fn new() -> Self {
1023 Self {
1024 next: AtomicPtr::new(ptr::null_mut()),
1025 _unpin: PhantomPinned,
1026 #[cfg(debug_assertions)]
1027 is_stub: AtomicBool::new(false),
1028 }
1029 }
1030
1031 /// Returns a new set of `Links` for the stub node in an [`MpscQueue`].
1032 #[cfg(not(loom))]
1033 #[must_use]
1034 pub const fn new_stub() -> Self {
1035 Self {
1036 next: AtomicPtr::new(ptr::null_mut()),
1037 _unpin: PhantomPinned,
1038 #[cfg(debug_assertions)]
1039 is_stub: AtomicBool::new(true),
1040 }
1041 }
1042
1043 /// Returns a new set of `Links` for a [`MpscQueue`].
1044 #[cfg(loom)]
1045 #[must_use]
1046 pub fn new() -> Self {
1047 Self {
1048 next: AtomicPtr::new(ptr::null_mut()),
1049 _unpin: PhantomPinned,
1050 #[cfg(debug_assertions)]
1051 is_stub: AtomicBool::new(false),
1052 }
1053 }
1054
1055 /// Returns a new set of `Links` for the stub node in an [`MpscQueue`].
1056 #[cfg(loom)]
1057 #[must_use]
1058 pub fn new_stub() -> Self {
1059 Self {
1060 next: AtomicPtr::new(ptr::null_mut()),
1061 _unpin: PhantomPinned,
1062 #[cfg(debug_assertions)]
1063 is_stub: AtomicBool::new(true),
1064 }
1065 }
1066
1067 #[cfg(debug_assertions)]
1068 fn is_stub(&self) -> bool {
1069 self.is_stub.load(Acquire)
1070 }
1071}
1072
1073impl<T> Default for Links<T> {
1074 fn default() -> Self {
1075 Self::new()
1076 }
1077}
1078
1079impl<T> fmt::Debug for Links<T> {
1080 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1081 let mut s = f.debug_struct("Links");
1082 s.field("next", &self.next.load(Acquire));
1083 #[cfg(debug_assertions)]
1084 s.field("is_stub", &self.is_stub.load(Acquire));
1085 s.finish()
1086 }
1087}
1088
1089feature! {
1090 #![feature = "alloc"]
1091
1092 use alloc::sync::Arc;
1093
1094 /// An owned handle that holds the right to dequeue elements from the queue.
1095 ///
1096 /// This can be used when one thread wishes to dequeue many elements at a time,
1097 /// to avoid the overhead of ensuring mutual exclusion on every [`dequeue`] or
1098 /// [`try_dequeue`] call.
1099 ///
1100 /// This type is returned by the [`MpscQueue::consume_owned`] and
1101 /// [`MpscQueue::try_consume_owned`] methods.
1102 ///
1103 /// This is similar to the [`Consumer`] type, but the queue is stored in an
1104 /// [`Arc`] rather than borrowed. This allows a single `OwnedConsumer`
1105 /// instance to be stored in a struct and used indefinitely.
1106 ///
1107 /// Since the queue is stored in an [`Arc`], this requires the `alloc`
1108 /// feature flag to be enabled.
1109 ///
1110 /// [`dequeue`]: OwnedConsumer::dequeue
1111 /// [`try_dequeue`]: OwnedConsumer::try_dequeue
1112 /// [`Arc`]: alloc::sync::Arc
1113 pub struct OwnedConsumer<T: Linked<Links<T>>> {
1114 q: Arc<MpscQueue<T>>
1115 }
1116
1117 // === impl Consumer ===
1118
1119 impl<T: Linked<Links<T>>> OwnedConsumer<T> {
1120 /// Dequeue an element from the queue.
1121 ///
1122 /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
1123 /// is possible for this queue design to enter an inconsistent state if the
1124 /// consumer tries to dequeue an element while a producer is in the middle
1125 /// of enqueueing a new element. If this occurs, the consumer must briefly
1126 /// wait before dequeueing an element. This method will wait by spinning
1127 /// with an exponential backoff if the queue is in an inconsistent state.
1128 ///
1129 /// The [`Consumer::try_dequeue`] will return an error rather than waiting when
1130 /// the queue is in an inconsistent state.
1131 ///
1132 /// # Returns
1133 ///
1134 /// - `Some(T::Handle)` if an element was successfully dequeued
1135 /// - `None` if the queue is empty
1136 ///
1137 /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
1138 #[inline]
1139 pub fn dequeue(&self) -> Option<T::Handle> {
1140 debug_assert!(self.q.has_consumer.load(Acquire));
1141 unsafe {
1142 // Safety: we have reserved exclusive access to the queue.
1143 self.q.dequeue_unchecked()
1144 }
1145 }
1146
1147 /// Try to dequeue an element from the queue, without waiting if the queue
1148 /// is in an inconsistent state.
1149 ///
1150 /// As discussed in the [algorithm description on 1024cores.net][vyukov], it
1151 /// is possible for this queue design to enter an inconsistent state if the
1152 /// consumer tries to dequeue an element while a producer is in the middle
1153 /// of enqueueing a new element. If this occurs, the consumer must briefly
1154 /// wait before dequeueing an element. This method returns
1155 /// [`TryDequeueError::Inconsistent`] when the queue is in an inconsistent
1156 /// state.
1157 ///
1158 /// The [`Consumer::dequeue`] method will instead wait (by spinning with an
1159 /// exponential backoff) when the queue is in an inconsistent state.
1160 ///
1161 /// # Returns
1162 ///
1163 /// - `T::Handle` if an element was successfully dequeued
1164 /// - [`TryDequeueError::Empty`] if there are no elements in the queue
1165 /// - [`TryDequeueError::Inconsistent`] if the queue is currently in an
1166 /// inconsistent state
1167 ///
1168 ///
1169 /// # Returns
1170 ///
1171 /// - `Some(T::Handle)` if an element was successfully dequeued
1172 /// - `None` if the queue is empty
1173 ///
1174 /// [vyukov]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
1175 #[inline]
1176 pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError> {
1177 debug_assert!(self.q.has_consumer.load(Acquire));
1178 unsafe {
1179 // Safety: we have reserved exclusive access to the queue.
1180 self.q.try_dequeue_unchecked()
1181 }
1182 }
1183
1184 /// Returns `true` if any producers exist for this queue.
1185 pub fn has_producers(&self) -> bool {
1186 Arc::strong_count(&self.q) > 1
1187 }
1188 }
1189
1190 impl<T: Linked<Links<T>>> Drop for OwnedConsumer<T> {
1191 fn drop(&mut self) {
1192 self.q.has_consumer.store(false, Release);
1193 }
1194 }
1195
1196 impl<T: Linked<Links<T>>> fmt::Debug for OwnedConsumer<T> {
1197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1198 let Self { q } = self;
1199 let tail = q.tail.with(|tail| unsafe {
1200 // Safety: it's okay for the consumer to access the tail cell, since
1201 // we have exclusive access to it.
1202 *tail
1203 });
1204 f.debug_struct("OwnedConsumer")
1205 .field("q", &q)
1206 .field("tail", &tail)
1207 .finish()
1208 }
1209 }
1210
1211 // === impl Queue ===
1212
1213 impl<T: Linked<Links<T>>> MpscQueue<T> {
1214 /// Returns a [`OwnedConsumer`] handle that reserves the exclusive right to dequeue
1215 /// elements from the queue until it is dropped.
1216 ///
1217 /// If another thread is dequeueing, this method spins until there is no
1218 /// other thread dequeueing.
1219 pub fn consume_owned(self: Arc<Self>) -> OwnedConsumer<T> {
1220 self.lock_consumer();
1221 OwnedConsumer { q: self }
1222 }
1223
1224 /// Attempts to reserve an [`OwnedConsumer`] handle that holds the exclusive right
1225 /// to dequeue elements from the queue until it is dropped.
1226 ///
1227 /// If another thread is dequeueing, this returns `None` instead.
1228 pub fn try_consume_owned(self: Arc<Self>) -> Option<OwnedConsumer<T>> {
1229 self.try_lock_consumer().map(|_| OwnedConsumer { q: self })
1230 }
1231 }
1232}
1233
1234/// Just a little helper so we don't have to add `.as_ref()` noise everywhere...
1235#[inline(always)]
1236unsafe fn links<'a, T: Linked<Links<T>>>(ptr: NonNull<T>) -> &'a Links<T> {
1237 T::links(ptr).as_ref()
1238}
1239
1240/// Helper to construct a `NonNull<T>` from a raw pointer to `T`, with null
1241/// checks elided in release mode.
1242#[cfg(debug_assertions)]
1243#[track_caller]
1244#[inline(always)]
1245unsafe fn non_null<T>(ptr: *mut T) -> NonNull<T> {
1246 NonNull::new(ptr).expect(
1247 "/!\\ constructed a `NonNull` from a null pointer! /!\\ \n\
1248 in release mode, this would have called `NonNull::new_unchecked`, \
1249 violating the `NonNull` invariant! this is a bug in `cordyceps!`.",
1250 )
1251}
1252
1253/// Helper to construct a `NonNull<T>` from a raw pointer to `T`, with null
1254/// checks elided in release mode.
1255///
1256/// This is the release mode version.
1257#[cfg(not(debug_assertions))]
1258#[inline(always)]
1259unsafe fn non_null<T>(ptr: *mut T) -> NonNull<T> {
1260 NonNull::new_unchecked(ptr)
1261}
1262
1263#[cfg(all(loom, test))]
1264mod loom {
1265 use super::*;
1266 use crate::loom::{self, sync::Arc, thread};
1267 use test_util::*;
1268
1269 #[test]
1270 fn basically_works_loom() {
1271 const THREADS: i32 = 2;
1272 const MSGS: i32 = THREADS;
1273 const TOTAL_MSGS: i32 = THREADS * MSGS;
1274 basically_works_test(THREADS, MSGS, TOTAL_MSGS);
1275 }
1276
1277 #[test]
1278 fn doesnt_leak() {
1279 // Test that dropping the queue drops any messages that haven't been
1280 // consumed by the consumer.
1281 const THREADS: i32 = 2;
1282 const MSGS: i32 = THREADS;
1283 // Only consume half as many messages as are sent, to ensure dropping
1284 // the queue does not leak.
1285 const TOTAL_MSGS: i32 = (THREADS * MSGS) / 2;
1286 basically_works_test(THREADS, MSGS, TOTAL_MSGS);
1287 }
1288
1289 fn basically_works_test(threads: i32, msgs: i32, total_msgs: i32) {
1290 loom::model(move || {
1291 let stub = entry(666);
1292 let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
1293
1294 let threads: Vec<_> = (0..threads)
1295 .map(|thread| thread::spawn(do_tx(thread, msgs, &q)))
1296 .collect();
1297
1298 let mut i = 0;
1299 while i < total_msgs {
1300 match q.try_dequeue() {
1301 Ok(val) => {
1302 i += 1;
1303 tracing::info!(?val, "dequeue {}/{}", i, total_msgs);
1304 }
1305 Err(TryDequeueError::Busy) => panic!(
1306 "the queue should never be busy, as there is only a single consumer!"
1307 ),
1308 Err(err) => {
1309 tracing::info!(?err, "dequeue error");
1310 thread::yield_now();
1311 }
1312 }
1313 }
1314
1315 for thread in threads {
1316 thread.join().unwrap();
1317 }
1318 })
1319 }
1320
1321 fn do_tx(thread: i32, msgs: i32, q: &Arc<MpscQueue<Entry>>) -> impl FnOnce() + Send + Sync {
1322 let q = q.clone();
1323 move || {
1324 for i in 0..msgs {
1325 q.enqueue(entry(i + (thread * 10)));
1326 tracing::info!(thread, "enqueue msg {}/{}", i, msgs);
1327 }
1328 }
1329 }
1330
1331 #[test]
1332 fn mpmc() {
1333 // Tests multiple consumers competing for access to the consume side of
1334 // the queue.
1335 const THREADS: i32 = 2;
1336 const MSGS: i32 = THREADS;
1337
1338 fn do_rx(thread: i32, q: Arc<MpscQueue<Entry>>) {
1339 let mut i = 0;
1340 while let Some(val) = q.dequeue() {
1341 tracing::info!(?val, ?thread, "dequeue {}/{}", i, THREADS * MSGS);
1342 i += 1;
1343 }
1344 }
1345
1346 loom::model(|| {
1347 let stub = entry(666);
1348 let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
1349
1350 let mut threads: Vec<_> = (0..THREADS)
1351 .map(|thread| thread::spawn(do_tx(thread, MSGS, &q)))
1352 .collect();
1353
1354 threads.push(thread::spawn({
1355 let q = q.clone();
1356 move || do_rx(THREADS + 1, q)
1357 }));
1358 do_rx(THREADS + 2, q);
1359
1360 for thread in threads {
1361 thread.join().unwrap();
1362 }
1363 })
1364 }
1365
1366 #[test]
1367 fn crosses_queues() {
1368 loom::model(|| {
1369 let stub1 = entry(666);
1370 let q1 = Arc::new(MpscQueue::<Entry>::new_with_stub(stub1));
1371
1372 let thread = thread::spawn({
1373 let q1 = q1.clone();
1374 move || {
1375 let stub2 = entry(420);
1376 let q2 = Arc::new(MpscQueue::<Entry>::new_with_stub(stub2));
1377 // let mut dequeued = false;
1378 for entry in q1.consume() {
1379 tracing::info!("dequeued");
1380 q2.enqueue(entry);
1381 q2.try_dequeue().unwrap();
1382 }
1383 tracing::info!("consumer done\nq1={q1:#?}\nq2={q2:#?}");
1384 }
1385 });
1386
1387 q1.enqueue(entry(1));
1388 drop(q1);
1389
1390 thread.join().unwrap();
1391 })
1392 }
1393}
1394
1395#[cfg(all(test, not(loom)))]
1396mod tests {
1397 use super::*;
1398 use test_util::*;
1399
1400 use std::{ops::Deref, println, sync::Arc, thread};
1401
1402 #[test]
1403 fn dequeue_empty() {
1404 let stub = entry(666);
1405 let q = MpscQueue::<Entry>::new_with_stub(stub);
1406 assert_eq!(q.dequeue(), None)
1407 }
1408
1409 #[test]
1410 fn try_dequeue_empty() {
1411 let stub = entry(666);
1412 let q = MpscQueue::<Entry>::new_with_stub(stub);
1413 assert_eq!(q.try_dequeue(), Err(TryDequeueError::Empty))
1414 }
1415
1416 #[test]
1417 fn try_dequeue_busy() {
1418 let stub = entry(666);
1419 let q = MpscQueue::<Entry>::new_with_stub(stub);
1420
1421 let consumer = q.try_consume().expect("must acquire consumer");
1422 assert_eq!(consumer.try_dequeue(), Err(TryDequeueError::Empty));
1423
1424 q.enqueue(entry(1));
1425
1426 assert_eq!(q.try_dequeue(), Err(TryDequeueError::Busy));
1427
1428 assert_eq!(consumer.try_dequeue(), Ok(entry(1)),);
1429
1430 assert_eq!(q.try_dequeue(), Err(TryDequeueError::Busy));
1431
1432 assert_eq!(consumer.try_dequeue(), Err(TryDequeueError::Empty));
1433
1434 drop(consumer);
1435 assert_eq!(q.try_dequeue(), Err(TryDequeueError::Empty));
1436 }
1437
1438 #[test]
1439 fn enqueue_dequeue() {
1440 let stub = entry(666);
1441 let e = entry(1);
1442 let q = MpscQueue::<Entry>::new_with_stub(stub);
1443 q.enqueue(e);
1444 assert_eq!(q.dequeue(), Some(entry(1)));
1445 assert_eq!(q.dequeue(), None)
1446 }
1447
1448 #[test]
1449 fn basically_works() {
1450 let stub = entry(666);
1451 let q = MpscQueue::<Entry>::new_with_stub(stub);
1452
1453 let q = Arc::new(q);
1454 test_basically_works(q);
1455 }
1456
1457 #[test]
1458 fn basically_works_all_const() {
1459 static STUB_ENTRY: Entry = const_stub_entry(666);
1460 static MPSC: MpscQueue<Entry> =
1461 unsafe { MpscQueue::<Entry>::new_with_static_stub(&STUB_ENTRY) };
1462 test_basically_works(&MPSC);
1463 }
1464
1465 #[test]
1466 fn basically_works_mixed_const() {
1467 static STUB_ENTRY: Entry = const_stub_entry(666);
1468 let q = unsafe { MpscQueue::<Entry>::new_with_static_stub(&STUB_ENTRY) };
1469
1470 let q = Arc::new(q);
1471 test_basically_works(q)
1472 }
1473
1474 fn test_basically_works<Q>(q: Q)
1475 where
1476 Q: Deref<Target = MpscQueue<Entry>> + Clone,
1477 Q: Send + 'static,
1478 {
1479 const THREADS: i32 = if_miri(3, 8);
1480 const MSGS: i32 = if_miri(10, 1000);
1481
1482 assert_eq!(q.dequeue(), None);
1483
1484 let threads: Vec<_> = (0..THREADS)
1485 .map(|thread| {
1486 let q = q.clone();
1487 thread::spawn(move || {
1488 for i in 0..MSGS {
1489 q.enqueue(entry(i));
1490 println!("thread {thread}; msg {i}/{MSGS}");
1491 }
1492 })
1493 })
1494 .collect();
1495
1496 let mut i = 0;
1497 while i < THREADS * MSGS {
1498 match q.try_dequeue() {
1499 Ok(msg) => {
1500 i += 1;
1501 println!("recv {msg:?} ({i}/{})", THREADS * MSGS);
1502 }
1503 Err(TryDequeueError::Busy) => {
1504 panic!("the queue should never be busy, as there is only one consumer")
1505 }
1506 Err(e) => {
1507 println!("recv error {e:?}");
1508 thread::yield_now();
1509 }
1510 }
1511 }
1512
1513 for thread in threads {
1514 thread.join().unwrap();
1515 }
1516 }
1517
1518 const fn if_miri(miri: i32, not_miri: i32) -> i32 {
1519 if cfg!(miri) {
1520 miri
1521 } else {
1522 not_miri
1523 }
1524 }
1525}
1526
1527#[cfg(test)]
1528mod test_util {
1529 use super::*;
1530 use crate::loom::alloc;
1531 pub use std::{boxed::Box, pin::Pin, ptr, vec::Vec};
1532
1533 pub(super) struct Entry {
1534 links: Links<Entry>,
1535 pub(super) val: i32,
1536 // participate in loom leak checking
1537 _track: alloc::Track<()>,
1538 }
1539
1540 impl std::cmp::PartialEq for Entry {
1541 fn eq(&self, other: &Self) -> bool {
1542 self.val == other.val
1543 }
1544 }
1545
1546 unsafe impl Linked<Links<Self>> for Entry {
1547 type Handle = Pin<Box<Entry>>;
1548
1549 fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
1550 unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
1551 }
1552
1553 unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
1554 // Safety: if this function is only called by the linked list
1555 // implementation (and it is not intended for external use), we can
1556 // expect that the `NonNull` was constructed from a reference which
1557 // was pinned.
1558 //
1559 // If other callers besides `List`'s internals were to call this on
1560 // some random `NonNull<Entry>`, this would not be the case, and
1561 // this could be constructing an erroneous `Pin` from a referent
1562 // that may not be pinned!
1563 Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
1564 }
1565
1566 unsafe fn links(target: NonNull<Entry>) -> NonNull<Links<Entry>> {
1567 let links = ptr::addr_of_mut!((*target.as_ptr()).links);
1568 NonNull::new_unchecked(links)
1569 }
1570 }
1571
1572 impl fmt::Debug for Entry {
1573 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1574 let Self { links, val, _track } = self;
1575 f.debug_struct("Entry")
1576 .field("links", links)
1577 .field("val", val)
1578 .field("_track", _track)
1579 .finish()
1580 }
1581 }
1582
1583 #[cfg(not(loom))]
1584 pub(super) const fn const_stub_entry(val: i32) -> Entry {
1585 Entry {
1586 links: Links::new_stub(),
1587 val,
1588 _track: alloc::Track::new_const(()),
1589 }
1590 }
1591
1592 pub(super) fn entry(val: i32) -> Pin<Box<Entry>> {
1593 Box::pin(Entry {
1594 links: Links::new(),
1595 val,
1596 _track: alloc::Track::new(()),
1597 })
1598 }
1599}