pub struct WaitMap<K, V, Lock = DefaultMutex>where
K: PartialEq,
Lock: ScopedRawMutex,{ /* private fields */ }
Expand description
A map of Waker
s associated with keys, allowing tasks to be woken by
their key.
A WaitMap
allows any number of tasks to wait asynchronously and be
woken when a value with a certain key arrives. This can be used to
implement structures like “async mailboxes”, where an async function
requests some data (such as a response) associated with a certain
key (such as a message ID). When the data is received, the key can
be used to provide the task with the desired data, as well as wake
the task for further processing.
Overriding the blocking mutex
This type uses a blocking Mutex
internally to
synchronize access to its wait list. By default, this is a DefaultMutex
. To
use an alternative ScopedRawMutex
implementation, use the
new_with_raw_mutex
constructor. See the documentation
on overriding mutex
implementations for more
details.
Examples
Waking a single task at a time by calling wake
:
use std::sync::Arc;
use maitake::scheduler;
use maitake_sync::wait_map::{WaitMap, WakeOutcome};
const TASKS: usize = 10;
// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();
// Construct a new `WaitMap`.
let q = Arc::new(WaitMap::new());
// Spawn some tasks that will wait on the queue.
// We'll use the task index (0..10) as the key.
for i in 0..TASKS {
let q = q.clone();
scheduler.spawn(async move {
let val = q.wait(i).await.unwrap();
assert_eq!(val, i + 100);
});
}
// Tick the scheduler once.
let tick = scheduler.tick();
// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");
// We now wake each of the tasks, using the same key (0..10),
// and provide them with a value that is their `key + 100`,
// e.g. 100..110. Only the task that has been woken will be
// notified.
for i in 0..TASKS {
let result = q.wake(&i, i + 100);
assert!(matches!(result, WakeOutcome::Woke));
// Tick the scheduler.
let tick = scheduler.tick();
// Exactly one task should have completed
assert_eq!(tick.completed, 1);
}
// Tick the scheduler.
let tick = scheduler.tick();
// No additional tasks should be completed
assert_eq!(tick.completed, 0);
assert!(!tick.has_remaining);
Implementation Notes
This type is currently implemented using intrusive doubly-linked list.
The intrusive aspect of this map is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.
However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.
Instead, we just stick a Mutex
around the linked list, which must be
acquired to pop nodes from it, or for nodes to remove themselves when
futures are cancelled. This is a bit sad, but the critical sections for this
mutex are short enough that we still get pretty good performance despite it.
Implementations§
§impl<K, V> WaitMap<K, V>where
K: PartialEq,
impl<K, V> WaitMap<K, V>where
K: PartialEq,
pub const fn new() -> WaitMap<K, V>
pub const fn new() -> WaitMap<K, V>
Returns a new WaitMap
.
This constructor returns a WaitMap
that uses a DefaultMutex
as
the ScopedRawMutex
implementation for wait list synchronization.
To use a different ScopedRawMutex
implementation, use the
new_with_raw_mutex
constructor, instead. See
the documentation on overriding mutex
implementations
for more details.
§impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
pub const fn new_with_raw_mutex(lock: Lock) -> WaitMap<K, V, Lock>
pub const fn new_with_raw_mutex(lock: Lock) -> WaitMap<K, V, Lock>
Returns a new WaitMap
, using the provided ScopedRawMutex
implementation for wait-list synchronization.
This constructor allows a WaitMap
to be constructed with any type that
implements ScopedRawMutex
as the underlying raw blocking mutex
implementation. See the documentation on overriding mutex
implementations
for more details.
§impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
pub fn wake(&self, key: &K, val: V) -> WakeOutcome<V>
pub fn wake(&self, key: &K, val: V) -> WakeOutcome<V>
Wake a certain task in the queue.
If the queue is empty, a wakeup is stored in the WaitMap
, and the
next call to wait
will complete immediately.
pub fn close(&self)
pub fn close(&self)
Close the queue, indicating that it may no longer be used.
Once a queue is closed, all wait
calls (current or future) will
return an error.
This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.
pub fn wait(&self, key: K) -> Wait<'_, K, V, Lock> ⓘ
pub fn wait(&self, key: K) -> Wait<'_, K, V, Lock> ⓘ
Wait to be woken up by this queue.
This returns a Wait
future that will complete when the task is
woken by a call to wake
with a matching key
, or when the WaitMap
is dropped.
Note: key
s must be unique. If the given key already exists in the
WaitMap
, the future will resolve to an Error the first time it is polled
§impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
impl<K, V, Lock> WaitMap<K, V, Lock>where
K: PartialEq,
Lock: ScopedRawMutex,
pub fn wait_owned(
self: &Arc<WaitMap<K, V, Lock>>,
key: K
) -> WaitOwned<K, V, Lock> ⓘ
Available on crate feature alloc
only.
pub fn wait_owned( self: &Arc<WaitMap<K, V, Lock>>, key: K ) -> WaitOwned<K, V, Lock> ⓘ
alloc
only.Wait to be woken up by this queue, returning a future that’s valid
for the 'static
lifetime.
This is identical to the wait
method, except that it takes a
Arc
reference to the WaitMap
, allowing the returned future to
live for the 'static
lifetime.
This returns a WaitOwned
future that will complete when the task is
woken by a call to wake
with a matching key
, or when the WaitMap
is dropped.
Note: key
s must be unique. If the given key already exists in the
WaitMap
, the future will resolve to an Error the first time it is polled