pub struct WaitMap<K: PartialEq, V, Lock: ScopedRawMutex = DefaultMutex> { /* private fields */ }
Expand description

A map of Wakers associated with keys, allowing tasks to be woken by their key.

A WaitMap allows any number of tasks to wait asynchronously and be woken when a value with a certain key arrives. This can be used to implement structures like “async mailboxes”, where an async function requests some data (such as a response) associated with a certain key (such as a message ID). When the data is received, the key can be used to provide the task with the desired data, as well as wake the task for further processing.

Overriding the blocking mutex

This type uses a blocking Mutex internally to synchronize access to its wait list. By default, this is a DefaultMutex. To use an alternative ScopedRawMutex implementation, use the new_with_raw_mutex constructor. See the documentation on overriding mutex implementations for more details.

Examples

Waking a single task at a time by calling wake:

use std::sync::Arc;
use maitake::scheduler;
use maitake_sync::wait_map::{WaitMap, WakeOutcome};

const TASKS: usize = 10;

// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();

// Construct a new `WaitMap`.
let q = Arc::new(WaitMap::new());

// Spawn some tasks that will wait on the queue.
// We'll use the task index (0..10) as the key.
for i in 0..TASKS {
    let q = q.clone();
    scheduler.spawn(async move {
        let val = q.wait(i).await.unwrap();
        assert_eq!(val, i + 100);
    });
}

// Tick the scheduler once.
let tick = scheduler.tick();

// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");

// We now wake each of the tasks, using the same key (0..10),
// and provide them with a value that is their `key + 100`,
// e.g. 100..110. Only the task that has been woken will be
// notified.
for i in 0..TASKS {
    let result = q.wake(&i, i + 100);
    assert!(matches!(result, WakeOutcome::Woke));

    // Tick the scheduler.
    let tick = scheduler.tick();

    // Exactly one task should have completed
    assert_eq!(tick.completed, 1);
}

// Tick the scheduler.
let tick = scheduler.tick();

// No additional tasks should be completed
assert_eq!(tick.completed, 0);
assert!(!tick.has_remaining);

Implementation Notes

This type is currently implemented using intrusive doubly-linked list.

The intrusive aspect of this map is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.

However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.

Instead, we just stick a Mutex around the linked list, which must be acquired to pop nodes from it, or for nodes to remove themselves when futures are cancelled. This is a bit sad, but the critical sections for this mutex are short enough that we still get pretty good performance despite it.

Implementations§

source§

impl<K: PartialEq, V> WaitMap<K, V>

source

pub const fn new() -> Self

Returns a new WaitMap.

This constructor returns a WaitMap that uses a DefaultMutex as the ScopedRawMutex implementation for wait list synchronization. To use a different ScopedRawMutex implementation, use the new_with_raw_mutex constructor, instead. See the documentation on overriding mutex implementations for more details.

source§

impl<K, V, Lock> WaitMap<K, V, Lock>
where K: PartialEq, Lock: ScopedRawMutex,

source

pub const fn new_with_raw_mutex(lock: Lock) -> Self

Returns a new WaitMap, using the provided ScopedRawMutex implementation for wait-list synchronization.

This constructor allows a WaitMap to be constructed with any type that implements ScopedRawMutex as the underlying raw blocking mutex implementation. See the documentation on overriding mutex implementations for more details.

source§

impl<K: PartialEq, V, Lock: ScopedRawMutex> WaitMap<K, V, Lock>

source

pub fn wake(&self, key: &K, val: V) -> WakeOutcome<V>

Wake a certain task in the queue.

If the queue is empty, a wakeup is stored in the WaitMap, and the next call to wait will complete immediately.

source

pub fn is_closed(&self) -> bool

Returns true if this WaitMap is closed.

source

pub fn close(&self)

Close the queue, indicating that it may no longer be used.

Once a queue is closed, all wait calls (current or future) will return an error.

This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.

source

pub fn wait(&self, key: K) -> Wait<'_, K, V, Lock>

Wait to be woken up by this queue.

This returns a Wait future that will complete when the task is woken by a call to wake with a matching key, or when the WaitMap is dropped.

Note: keys must be unique. If the given key already exists in the WaitMap, the future will resolve to an Error the first time it is polled

source§

impl<K: PartialEq, V, Lock: ScopedRawMutex> WaitMap<K, V, Lock>

source

pub fn wait_owned(self: &Arc<Self>, key: K) -> WaitOwned<K, V, Lock>

Available on crate feature alloc only.

Wait to be woken up by this queue, returning a future that’s valid for the 'static lifetime.

This is identical to the wait method, except that it takes a Arc reference to the WaitMap, allowing the returned future to live for the 'static lifetime.

This returns a WaitOwned future that will complete when the task is woken by a call to wake with a matching key, or when the WaitMap is dropped.

Note: keys must be unique. If the given key already exists in the WaitMap, the future will resolve to an Error the first time it is polled

Trait Implementations§

source§

impl<K, V, Lock> Debug for WaitMap<K, V, Lock>
where K: PartialEq, Lock: ScopedRawMutex,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K, V, Lock = DefaultMutex> !RefUnwindSafe for WaitMap<K, V, Lock>

§

impl<K, V, Lock> Send for WaitMap<K, V, Lock>
where K: Send, Lock: Send, V: Send,

§

impl<K, V, Lock> Sync for WaitMap<K, V, Lock>
where K: Send, Lock: Sync, V: Send,

§

impl<K, V, Lock> Unpin for WaitMap<K, V, Lock>
where Lock: Unpin,

§

impl<K, V, Lock = DefaultMutex> !UnwindSafe for WaitMap<K, V, Lock>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more