pub struct MpscQueue<T: Linked<Links<T>>> { /* private fields */ }
Expand description

A multi-producer, single-consumer (MPSC) queue, implemented using a lock-free intrusive singly-linked list.

Based on Dmitry Vyukov’s intrusive MPSC.

In order to be part of a MpscQueue, a type T must implement Linked for mpsc_queue::Links<T>.

Examples

use cordyceps::{
    Linked,
    mpsc_queue::{self, MpscQueue},
};

// This example uses the Rust standard library for convenience, but
// the MPSC queue itself does not require std.
use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};

/// A simple queue entry that stores an `i32`.
#[derive(Debug, Default)]
struct Entry {
   links: mpsc_queue::Links<Entry>,
   val: i32,
}

// Implement the `Linked` trait for our entry type so that it can be used
// as a queue entry.
unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
    // In this example, our entries will be "owned" by a `Box`, but any
    // heap-allocated type that owns an element may be used.
    //
    // An element *must not* move while part of an intrusive data
    // structure. In many cases, `Pin` may be used to enforce this.
    type Handle = Pin<Box<Self>>;

    /// Convert an owned `Handle` into a raw pointer
    fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
       unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
    }

    /// Convert a raw pointer back into an owned `Handle`.
    unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
        // Safety: if this function is only called by the linked list
        // implementation (and it is not intended for external use), we can
        // expect that the `NonNull` was constructed from a reference which
        // was pinned.
        //
        // If other callers besides `MpscQueue`'s internals were to call this on
        // some random `NonNull<Entry>`, this would not be the case, and
        // this could be constructing an erroneous `Pin` from a referent
        // that may not be pinned!
        Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
    }

    /// Access an element's `Links`.
    unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
        // Using `ptr::addr_of_mut!` permits us to avoid creating a temporary
        // reference without using layout-dependent casts.
        let links = ptr::addr_of_mut!((*target.as_ptr()).links);

        // `NonNull::new_unchecked` is safe to use here, because the pointer that
        // we offset was not null, implying that the pointer produced by offsetting
        // it will also not be null.
        NonNull::new_unchecked(links)
    }
}

impl Entry {
    fn new(val: i32) -> Self {
        Self {
            val,
            ..Self::default()
        }
    }
}

// Once we have a `Linked` implementation for our element type, we can construct
// a queue.

// Because `Pin<Box<...>>` doesn't have a `Default` impl, we have to manually
// construct the stub node.
let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));

// Spawn some producer threads.
thread::spawn({
    let q = q.clone();
    move || {
        // Enqueuing elements does not require waiting, and is not fallible.
        q.enqueue(Box::pin(Entry::new(1)));
        q.enqueue(Box::pin(Entry::new(2)));
    }
});

thread::spawn({
    let q = q.clone();
    move || {
        q.enqueue(Box::pin(Entry::new(3)));
        q.enqueue(Box::pin(Entry::new(4)));
    }
});


// Dequeue elements until the producer threads have terminated.
let mut seen = Vec::new();
loop {
    // Make sure we run at least once, in case the producer is already done.
    let done = Arc::strong_count(&q) == 1;

    // Dequeue until the queue is empty.
    while let Some(entry) = q.dequeue() {
        seen.push(entry.as_ref().val);
    }

    // If there are still producers, we may continue dequeuing.
    if done {
        break;
    }

    thread::yield_now();
}

// The elements may not have been received in order, so sort the
// received values before making assertions about them.
&mut seen[..].sort();

assert_eq!(&[1, 2, 3, 4], &seen[..]);

The Consumer type may be used to reserve the permission to consume multiple elements at a time:

let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));

thread::spawn({
    let q = q.clone();
    move || {
        q.enqueue(Box::pin(Entry::new(1)));
        q.enqueue(Box::pin(Entry::new(2)));
    }
});

// Reserve exclusive permission to consume elements
let consumer = q.consume();

let mut seen = Vec::new();
loop {
    // Make sure we run at least once, in case the producer is already done.
    let done = Arc::strong_count(&q) == 1;

    // Dequeue until the queue is empty.
    while let Some(entry) = consumer.dequeue() {
        seen.push(entry.as_ref().val);
    }

    if done {
        break;
    }
    thread::yield_now();
}

assert_eq!(&[1, 2], &seen[..]);

The Consumer type also implements Iterator:

let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));

thread::spawn({
    let q = q.clone();
    move || {
        for i in 1..5 {
            q.enqueue(Box::pin(Entry::new(i)));
        }
    }
});

thread::spawn({
    let q = q.clone();
    move || {
        for i in 5..=10 {
            q.enqueue(Box::pin(Entry::new(i)));
        }
    }
});

let mut seen = Vec::new();
loop {
    // Make sure we run at least once, in case the producer is already done.
    let done = Arc::strong_count(&q) == 1;

    // Append any elements currently in the queue to the `Vec`
    seen.extend(q.consume().map(|entry| entry.as_ref().val));

    if done {
        break;
    }

    thread::yield_now();
}

&mut seen[..].sort();
assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &seen[..]);

Implementation Details

This queue design is conceptually very simple, and has extremely fast and wait-free producers (the enqueue operation). Enqueuing an element always performs exactly one atomic swap and one atomic store, so producers need never wait.

The consumer (the dequeue) is typically wait-free in the common case, but must occasionally wait when the queue is in an inconsistent state.

Inconsistent States

As discussed in the algorithm description on 1024cores.net, it is possible for this queue design to enter an inconsistent state if the consumer tries to dequeue an element while a producer is in the middle of enqueueing a new element. This occurs when a producer is between the atomic swap with the head of the queue and the atomic store that sets the next pointer of the previous head element. When the queue is in an inconsistent state, the consumer must briefly wait before dequeueing an element.

The consumer’s behavior in the inconsistent state depends on which API method is used. The MpscQueue::dequeue and Consumer::dequeue methods will wait by spinning (with an exponential backoff) when the queue is inconsistent. Alternatively, the MpscQueue::try_dequeue and Consumer::try_dequeue methods will instead return an error when the queue is in an inconsistent state.

Implementations§

source§

impl<T: Linked<Links<T>>> MpscQueue<T>

source

pub fn new() -> Self
where T::Handle: Default,

Returns a new MpscQueue.

The Default implementation for T::Handle is used to produce a new node used as the list’s stub.

source

pub fn new_with_stub(stub: T::Handle) -> Self

Returns a new MpscQueue with the provided stub node.

If a MpscQueue must be constructed in a const context, such as a static initializer, see MpscQueue::new_with_static_stub.

source

pub const unsafe fn new_with_static_stub(stub: &'static T) -> Self

Available on non-loom only.

Returns a new MpscQueue with a static “stub” entity

This is primarily used for creating an MpscQueue as a static variable.

Usage notes

Unlike MpscQueue::new or MpscQueue::new_with_stub, the stub item will NOT be dropped when the MpscQueue is dropped. This is fine if you are ALSO statically creating the stub. However, if it is necessary to recover that memory after the MpscQueue has been dropped, that will need to be done by the user manually.

Safety

The stub provided must ONLY EVER be used for a single MpscQueue instance. Re-using the stub for multiple queues may lead to undefined behavior.

Example usage

// This is our same `Entry` from the parent examples. It has implemented
// the `Links` trait as above.
#[derive(Debug, Default)]
struct Entry {
   links: mpsc_queue::Links<Entry>,
   val: i32,
}



static MPSC: MpscQueue<Entry> = {
    static STUB_ENTRY: Entry = Entry {
        links: mpsc_queue::Links::<Entry>::new_stub(),
        val: 0
    };

    // SAFETY: The stub may not be used by another MPSC queue.
    // Here, this is ensured because the `STUB_ENTRY` static is defined
    // inside of the initializer for the `MPSC` static, so it cannot be referenced
    // elsewhere.
    unsafe { MpscQueue::new_with_static_stub(&STUB_ENTRY) }
};
source

pub fn enqueue(&self, element: T::Handle)

Enqueue a new element at the end of the queue.

This takes ownership of a Handle that owns the element, and (conceptually) assigns ownership of the element to the queue while it remains enqueued.

This method will never wait.

source

pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError>

Try to dequeue an element from the queue, without waiting if the queue is in an inconsistent state, or until there is no other consumer trying to read from the queue.

Because this is a multi-producer, single-consumer queue, only one thread may be dequeueing at a time. If another thread is dequeueing, this method returns TryDequeueError::Busy.

The MpscQueue::dequeue method will instead wait (by spinning with an exponential backoff) when the queue is in an inconsistent state or busy.

The unsafe MpscQueue::try_dequeue_unchecked method will not check if the queue is busy before dequeueing an element. This can be used when the user code guarantees that no other threads will dequeue from the queue concurrently, but this cannot be enforced by the compiler.

This method will never wait.

Returns
source

pub fn dequeue(&self) -> Option<T::Handle>

Dequeue an element from the queue.

This method will wait by spinning with an exponential backoff if the queue is in an inconsistent state.

Additionally, because this is a multi-producer, single-consumer queue, only one thread may be dequeueing at a time. If another thread is dequeueing, this method will spin until the queue is no longer busy.

The MpscQueue::try_dequeue will return an error rather than waiting when the queue is in an inconsistent state or busy.

The unsafe MpscQueue::dequeue_unchecked method will not check if the queue is busy before dequeueing an element. This can be used when the user code guarantees that no other threads will dequeue from the queue concurrently, but this cannot be enforced by the compiler.

Returns
  • Some(T::Handle) if an element was successfully dequeued
  • None if the queue is empty or another thread is dequeueing
source

pub fn consume(&self) -> Consumer<'_, T>

Returns a Consumer handle that reserves the exclusive right to dequeue elements from the queue until it is dropped.

If another thread is dequeueing, this method spins until there is no other thread dequeueing.

source

pub fn try_consume(&self) -> Option<Consumer<'_, T>>

Attempts to reserve a Consumer handle that holds the exclusive right to dequeue elements from the queue until it is dropped.

If another thread is dequeueing, this returns None instead.

source

pub unsafe fn try_dequeue_unchecked(&self) -> Result<T::Handle, TryDequeueError>

Try to dequeue an element from the queue, without waiting if the queue is in an inconsistent state, and without checking if another consumer exists.

This method returns TryDequeueError::Inconsistent when the queue is in an inconsistent state.

The MpscQueue::dequeue_unchecked method will instead wait (by spinning with an exponential backoff) when the queue is in an inconsistent state.

This method will never wait.

Returns

This method will never return TryDequeueError::Busy.

Safety

This is a multi-producer, single-consumer queue. Only one thread/core may call try_dequeue_unchecked at a time!

source

pub unsafe fn dequeue_unchecked(&self) -> Option<T::Handle>

Dequeue an element from the queue, without checking whether another consumer exists.

This method will wait by spinning with an exponential backoff if the queue is in an inconsistent state.

The MpscQueue::try_dequeue will return an error rather than waiting when the queue is in an inconsistent state.

Returns
  • Some(T::Handle) if an element was successfully dequeued
  • None if the queue is empty
Safety

This is a multi-producer, single-consumer queue. Only one thread/core may call dequeue at a time!

source§

impl<T: Linked<Links<T>>> MpscQueue<T>

source

pub fn consume_owned(self: Arc<Self>) -> OwnedConsumer<T>

Available on crate feature alloc only.

Returns a OwnedConsumer handle that reserves the exclusive right to dequeue elements from the queue until it is dropped.

If another thread is dequeueing, this method spins until there is no other thread dequeueing.

source

pub fn try_consume_owned(self: Arc<Self>) -> Option<OwnedConsumer<T>>

Available on crate feature alloc only.

Attempts to reserve an OwnedConsumer handle that holds the exclusive right to dequeue elements from the queue until it is dropped.

If another thread is dequeueing, this returns None instead.

Trait Implementations§

source§

impl<T> Debug for MpscQueue<T>
where T: Linked<Links<T>>,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> Default for MpscQueue<T>
where T: Linked<Links<T>>, T::Handle: Default,

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<T: Linked<Links<T>>> Drop for MpscQueue<T>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl<T> Send for MpscQueue<T>
where T: Send + Linked<Links<T>>, T::Handle: Send,

source§

impl<T: Send + Linked<Links<T>>> Sync for MpscQueue<T>

Auto Trait Implementations§

§

impl<T> !RefUnwindSafe for MpscQueue<T>

§

impl<T> Unpin for MpscQueue<T>

§

impl<T> UnwindSafe for MpscQueue<T>
where T: RefUnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.