pub struct MpscQueue<T: Linked<Links<T>>> { /* private fields */ }
Expand description
A multi-producer, single-consumer (MPSC) queue, implemented using a lock-free intrusive singly-linked list.
Based on Dmitry Vyukov’s intrusive MPSC.
In order to be part of a MpscQueue
, a type T
must implement Linked
for
mpsc_queue::Links<T>
.
Examples
use cordyceps::{
Linked,
mpsc_queue::{self, MpscQueue},
};
// This example uses the Rust standard library for convenience, but
// the MPSC queue itself does not require std.
use std::{pin::Pin, ptr::{self, NonNull}, thread, sync::Arc};
/// A simple queue entry that stores an `i32`.
#[derive(Debug, Default)]
struct Entry {
links: mpsc_queue::Links<Entry>,
val: i32,
}
// Implement the `Linked` trait for our entry type so that it can be used
// as a queue entry.
unsafe impl Linked<mpsc_queue::Links<Entry>> for Entry {
// In this example, our entries will be "owned" by a `Box`, but any
// heap-allocated type that owns an element may be used.
//
// An element *must not* move while part of an intrusive data
// structure. In many cases, `Pin` may be used to enforce this.
type Handle = Pin<Box<Self>>;
/// Convert an owned `Handle` into a raw pointer
fn into_ptr(handle: Pin<Box<Entry>>) -> NonNull<Entry> {
unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) }
}
/// Convert a raw pointer back into an owned `Handle`.
unsafe fn from_ptr(ptr: NonNull<Entry>) -> Pin<Box<Entry>> {
// Safety: if this function is only called by the linked list
// implementation (and it is not intended for external use), we can
// expect that the `NonNull` was constructed from a reference which
// was pinned.
//
// If other callers besides `MpscQueue`'s internals were to call this on
// some random `NonNull<Entry>`, this would not be the case, and
// this could be constructing an erroneous `Pin` from a referent
// that may not be pinned!
Pin::new_unchecked(Box::from_raw(ptr.as_ptr()))
}
/// Access an element's `Links`.
unsafe fn links(target: NonNull<Entry>) -> NonNull<mpsc_queue::Links<Entry>> {
// Using `ptr::addr_of_mut!` permits us to avoid creating a temporary
// reference without using layout-dependent casts.
let links = ptr::addr_of_mut!((*target.as_ptr()).links);
// `NonNull::new_unchecked` is safe to use here, because the pointer that
// we offset was not null, implying that the pointer produced by offsetting
// it will also not be null.
NonNull::new_unchecked(links)
}
}
impl Entry {
fn new(val: i32) -> Self {
Self {
val,
..Self::default()
}
}
}
// Once we have a `Linked` implementation for our element type, we can construct
// a queue.
// Because `Pin<Box<...>>` doesn't have a `Default` impl, we have to manually
// construct the stub node.
let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
// Spawn some producer threads.
thread::spawn({
let q = q.clone();
move || {
// Enqueuing elements does not require waiting, and is not fallible.
q.enqueue(Box::pin(Entry::new(1)));
q.enqueue(Box::pin(Entry::new(2)));
}
});
thread::spawn({
let q = q.clone();
move || {
q.enqueue(Box::pin(Entry::new(3)));
q.enqueue(Box::pin(Entry::new(4)));
}
});
// Dequeue elements until the producer threads have terminated.
let mut seen = Vec::new();
loop {
// Make sure we run at least once, in case the producer is already done.
let done = Arc::strong_count(&q) == 1;
// Dequeue until the queue is empty.
while let Some(entry) = q.dequeue() {
seen.push(entry.as_ref().val);
}
// If there are still producers, we may continue dequeuing.
if done {
break;
}
thread::yield_now();
}
// The elements may not have been received in order, so sort the
// received values before making assertions about them.
&mut seen[..].sort();
assert_eq!(&[1, 2, 3, 4], &seen[..]);
The Consumer
type may be used to reserve the permission to consume
multiple elements at a time:
let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
thread::spawn({
let q = q.clone();
move || {
q.enqueue(Box::pin(Entry::new(1)));
q.enqueue(Box::pin(Entry::new(2)));
}
});
// Reserve exclusive permission to consume elements
let consumer = q.consume();
let mut seen = Vec::new();
loop {
// Make sure we run at least once, in case the producer is already done.
let done = Arc::strong_count(&q) == 1;
// Dequeue until the queue is empty.
while let Some(entry) = consumer.dequeue() {
seen.push(entry.as_ref().val);
}
if done {
break;
}
thread::yield_now();
}
assert_eq!(&[1, 2], &seen[..]);
The Consumer
type also implements Iterator
:
let stub = Box::pin(Entry::default());
let q = Arc::new(MpscQueue::<Entry>::new_with_stub(stub));
thread::spawn({
let q = q.clone();
move || {
for i in 1..5 {
q.enqueue(Box::pin(Entry::new(i)));
}
}
});
thread::spawn({
let q = q.clone();
move || {
for i in 5..=10 {
q.enqueue(Box::pin(Entry::new(i)));
}
}
});
let mut seen = Vec::new();
loop {
// Make sure we run at least once, in case the producer is already done.
let done = Arc::strong_count(&q) == 1;
// Append any elements currently in the queue to the `Vec`
seen.extend(q.consume().map(|entry| entry.as_ref().val));
if done {
break;
}
thread::yield_now();
}
&mut seen[..].sort();
assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &seen[..]);
Implementation Details
This queue design is conceptually very simple, and has extremely fast and
wait-free producers (the enqueue
operation). Enqueuing an element always
performs exactly one atomic swap and one atomic store, so producers need
never wait.
The consumer (the dequeue
) is typically wait-free in the common case,
but must occasionally wait when the queue is in an inconsistent state.
Inconsistent States
As discussed in the algorithm description on 1024cores.net, it
is possible for this queue design to enter an inconsistent state if the
consumer tries to dequeue an element while a producer is in the middle
of enqueueing a new element. This occurs when a producer is between the
atomic swap with the head
of the queue and the atomic store that sets the
next
pointer of the previous head
element. When the queue is in an
inconsistent state, the consumer must briefly wait before dequeueing an
element.
The consumer’s behavior in the inconsistent state depends on which API
method is used. The MpscQueue::dequeue
and Consumer::dequeue
methods
will wait by spinning (with an exponential backoff) when the queue is
inconsistent. Alternatively, the MpscQueue::try_dequeue
and
Consumer::try_dequeue
methods will instead return an error when the
queue is in an inconsistent state.
Implementations§
source§impl<T: Linked<Links<T>>> MpscQueue<T>
impl<T: Linked<Links<T>>> MpscQueue<T>
sourcepub fn new() -> Self
pub fn new() -> Self
Returns a new MpscQueue
.
The Default
implementation for T::Handle
is used to produce a new
node used as the list’s stub.
sourcepub fn new_with_stub(stub: T::Handle) -> Self
pub fn new_with_stub(stub: T::Handle) -> Self
Returns a new MpscQueue
with the provided stub node.
If a MpscQueue
must be constructed in a const
context, such as a
static
initializer, see MpscQueue::new_with_static_stub
.
sourcepub const unsafe fn new_with_static_stub(stub: &'static T) -> Self
Available on non-loom
only.
pub const unsafe fn new_with_static_stub(stub: &'static T) -> Self
loom
only.Returns a new MpscQueue
with a static “stub” entity
This is primarily used for creating an MpscQueue
as a static
variable.
Usage notes
Unlike MpscQueue::new
or MpscQueue::new_with_stub
, the stub
item will NOT be dropped when the MpscQueue
is dropped. This is fine
if you are ALSO statically creating the stub
. However, if it is
necessary to recover that memory after the MpscQueue
has been dropped,
that will need to be done by the user manually.
Safety
The stub
provided must ONLY EVER be used for a single MpscQueue
instance. Re-using the stub for multiple queues may lead to undefined
behavior.
Example usage
// This is our same `Entry` from the parent examples. It has implemented
// the `Links` trait as above.
#[derive(Debug, Default)]
struct Entry {
links: mpsc_queue::Links<Entry>,
val: i32,
}
static MPSC: MpscQueue<Entry> = {
static STUB_ENTRY: Entry = Entry {
links: mpsc_queue::Links::<Entry>::new_stub(),
val: 0
};
// SAFETY: The stub may not be used by another MPSC queue.
// Here, this is ensured because the `STUB_ENTRY` static is defined
// inside of the initializer for the `MPSC` static, so it cannot be referenced
// elsewhere.
unsafe { MpscQueue::new_with_static_stub(&STUB_ENTRY) }
};
sourcepub fn enqueue(&self, element: T::Handle)
pub fn enqueue(&self, element: T::Handle)
Enqueue a new element at the end of the queue.
This takes ownership of a Handle
that owns the element, and
(conceptually) assigns ownership of the element to the queue while it
remains enqueued.
This method will never wait.
sourcepub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError>
pub fn try_dequeue(&self) -> Result<T::Handle, TryDequeueError>
Try to dequeue an element from the queue, without waiting if the queue is in an inconsistent state, or until there is no other consumer trying to read from the queue.
Because this is a multi-producer, single-consumer queue,
only one thread may be dequeueing at a time. If another thread is
dequeueing, this method returns TryDequeueError::Busy
.
The MpscQueue::dequeue
method will instead wait (by spinning with an
exponential backoff) when the queue is in an inconsistent state or busy.
The unsafe MpscQueue::try_dequeue_unchecked
method will not check if the
queue is busy before dequeueing an element. This can be used when the
user code guarantees that no other threads will dequeue from the queue
concurrently, but this cannot be enforced by the compiler.
This method will never wait.
Returns
Ok
(T::Handle
)
if an element was successfully dequeuedErr(
TryDequeueError::Empty
)
if there are no elements in the queueErr(
TryDequeueError::Inconsistent
)
if the queue is currently in an inconsistent stateErr(
TryDequeueError::Busy
)
if another thread is currently trying to dequeue a message.
sourcepub fn dequeue(&self) -> Option<T::Handle>
pub fn dequeue(&self) -> Option<T::Handle>
Dequeue an element from the queue.
This method will wait by spinning with an exponential backoff if the queue is in an inconsistent state.
Additionally, because this is a multi-producer, single-consumer queue, only one thread may be dequeueing at a time. If another thread is dequeueing, this method will spin until the queue is no longer busy.
The MpscQueue::try_dequeue
will return an error rather than waiting when
the queue is in an inconsistent state or busy.
The unsafe MpscQueue::dequeue_unchecked
method will not check if the
queue is busy before dequeueing an element. This can be used when the
user code guarantees that no other threads will dequeue from the queue
concurrently, but this cannot be enforced by the compiler.
Returns
Some(
T::Handle
)
if an element was successfully dequeuedNone
if the queue is empty or another thread is dequeueing
sourcepub fn consume(&self) -> Consumer<'_, T> ⓘ
pub fn consume(&self) -> Consumer<'_, T> ⓘ
Returns a Consumer
handle that reserves the exclusive right to dequeue
elements from the queue until it is dropped.
If another thread is dequeueing, this method spins until there is no other thread dequeueing.
sourcepub fn try_consume(&self) -> Option<Consumer<'_, T>>
pub fn try_consume(&self) -> Option<Consumer<'_, T>>
Attempts to reserve a Consumer
handle that holds the exclusive right
to dequeue elements from the queue until it is dropped.
If another thread is dequeueing, this returns None
instead.
sourcepub unsafe fn try_dequeue_unchecked(&self) -> Result<T::Handle, TryDequeueError>
pub unsafe fn try_dequeue_unchecked(&self) -> Result<T::Handle, TryDequeueError>
Try to dequeue an element from the queue, without waiting if the queue is in an inconsistent state, and without checking if another consumer exists.
This method returns TryDequeueError::Inconsistent
when the queue is
in an inconsistent state.
The MpscQueue::dequeue_unchecked
method will instead wait (by
spinning with an exponential backoff) when the queue is in an
inconsistent state.
This method will never wait.
Returns
Ok
(T::Handle
)
if an element was successfully dequeuedErr(
TryDequeueError::Empty
)
if there are no elements in the queueErr(
TryDequeueError::Inconsistent
)
if the queue is currently in an inconsistent state
This method will never return TryDequeueError::Busy
.
Safety
This is a multi-producer, single-consumer queue. Only one thread/core
may call try_dequeue_unchecked
at a time!
sourcepub unsafe fn dequeue_unchecked(&self) -> Option<T::Handle>
pub unsafe fn dequeue_unchecked(&self) -> Option<T::Handle>
Dequeue an element from the queue, without checking whether another consumer exists.
This method will wait by spinning with an exponential backoff if the queue is in an inconsistent state.
The MpscQueue::try_dequeue
will return an error rather than waiting
when the queue is in an inconsistent state.
Returns
Some(
T::Handle
)
if an element was successfully dequeuedNone
if the queue is empty
Safety
This is a multi-producer, single-consumer queue. Only one thread/core
may call dequeue
at a time!
source§impl<T: Linked<Links<T>>> MpscQueue<T>
impl<T: Linked<Links<T>>> MpscQueue<T>
sourcepub fn consume_owned(self: Arc<Self>) -> OwnedConsumer<T>
Available on crate feature alloc
only.
pub fn consume_owned(self: Arc<Self>) -> OwnedConsumer<T>
alloc
only.Returns a OwnedConsumer
handle that reserves the exclusive right to dequeue
elements from the queue until it is dropped.
If another thread is dequeueing, this method spins until there is no other thread dequeueing.
sourcepub fn try_consume_owned(self: Arc<Self>) -> Option<OwnedConsumer<T>>
Available on crate feature alloc
only.
pub fn try_consume_owned(self: Arc<Self>) -> Option<OwnedConsumer<T>>
alloc
only.Attempts to reserve an OwnedConsumer
handle that holds the exclusive right
to dequeue elements from the queue until it is dropped.
If another thread is dequeueing, this returns None
instead.