pub struct WaitQueue<Lock = DefaultMutex>where
Lock: ScopedRawMutex,{ /* private fields */ }
Expand description
A queue of waiting tasks which can be woken in first-in, first-out order, or all at once.
A WaitQueue
allows any number of tasks to wait asynchronously and be
woken when some event occurs, either individually in first-in,
first-out order, or all at once. This makes it a vital building
block of runtime services (such as timers or I/O resources), where it may be
used to wake a set of tasks when a timer completes or when a resource
becomes available. It can be equally useful for implementing higher-level
synchronization primitives: for example, a WaitQueue
plus an
UnsafeCell
is essentially an entire implementation of a fair
asynchronous mutex. Finally, a WaitQueue
can be a useful
synchronization primitive on its own: sometimes, you just need to have a
bunch of tasks wait for something and then wake them all up.
Overriding the blocking mutex
This type uses a blocking Mutex
internally to
synchronize access to its wait list. By default, this is a DefaultMutex
. To
use an alternative ScopedRawMutex
implementation, use the
new_with_raw_mutex
constructor. See the documentation
on overriding mutex
implementations for more
details.
Examples
Waking a single task at a time by calling wake
:
use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;
const TASKS: usize = 10;
// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();
// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());
// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
let q = q.clone();
scheduler.spawn(async move {
// Wait to be woken by the queue.
q.wait().await.expect("queue is not closed");
});
}
// Tick the scheduler once.
let tick = scheduler.tick();
// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");
let mut completed = 0;
for i in 1..=TASKS {
// Wake the next task from the queue.
q.wake();
// Tick the scheduler.
let tick = scheduler.tick();
// A single task should have completed on this tick.
completed += tick.completed;
assert_eq!(completed, i);
}
assert_eq!(completed, TASKS, "all tasks should have completed");
Waking all tasks using wake_all
:
use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;
const TASKS: usize = 10;
// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();
// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());
// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
let q = q.clone();
scheduler.spawn(async move {
// Wait to be woken by the queue.
q.wait().await.expect("queue is not closed");
});
}
// Tick the scheduler once.
let tick = scheduler.tick();
// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");
// Wake all tasks waiting for the queue.
q.wake_all();
// Tick the scheduler again to run the woken tasks.
let tick = scheduler.tick();
// All tasks have now completed, since they were woken by the
// queue.
assert_eq!(tick.completed, TASKS, "all tasks should have completed");
Implementation Notes
This type is implemented using an intrusive doubly-linked list.
The intrusive aspect of this list is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.
However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.
Instead, we just stick a Mutex
around the linked list, which must be
acquired to pop nodes from it, or for nodes to remove themselves when
futures are cancelled. This is a bit sad, but the critical sections for this
mutex are short enough that we still get pretty good performance despite it.
Implementations§
source§impl WaitQueue
impl WaitQueue
sourcepub const fn new() -> WaitQueue
pub const fn new() -> WaitQueue
Returns a new WaitQueue
.
This constructor returns a WaitQueue
that uses a DefaultMutex
as
the ScopedRawMutex
implementation for wait list synchronization.
To use a different ScopedRawMutex
implementation, use the
new_with_raw_mutex
constructor, instead. See
the documentation on overriding mutex
implementations
for more details.
source§impl<Lock> WaitQueue<Lock>where
Lock: ScopedRawMutex,
impl<Lock> WaitQueue<Lock>where
Lock: ScopedRawMutex,
sourcepub const fn new_with_raw_mutex(lock: Lock) -> WaitQueue<Lock>
pub const fn new_with_raw_mutex(lock: Lock) -> WaitQueue<Lock>
Returns a new WaitQueue
, using the provided ScopedRawMutex
implementation for wait-list synchronization.
This constructor allows a WaitQueue
to be constructed with any type that
implements ScopedRawMutex
as the underlying raw blocking mutex
implementation. See the documentation on overriding mutex
implementations
for more details.
sourcepub fn wake(&self)
pub fn wake(&self)
Wake the next task in the queue.
If the queue is empty, a wakeup is stored in the WaitQueue
, and the
next call to wait().await
will complete immediately. If one or more
tasks are currently in the queue, the first task in the queue is woken.
At most one wakeup will be stored in the queue at any time. If wake()
is called many times while there are no tasks in the queue, only a
single wakeup is stored.
Examples
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
let queue = Arc::new(WaitQueue::new());
let waiter = task::spawn({
// clone the queue to move into the spawned task
let queue = queue.clone();
async move {
queue.wait().await;
println!("received wakeup!");
}
});
println!("waking task...");
queue.wake();
waiter.await.unwrap();
sourcepub fn wake_all(&self)
pub fn wake_all(&self)
Wake all tasks currently in the queue.
All tasks currently waiting on the queue are woken. Unlike wake()
, a
wakeup is not stored in the queue to wake the next call to wait()
if the queue is empty. Instead, this method only wakes all currently
registered waiters. Registering a task to be woken is done by await
ing
the Future
returned by the wait()
method on this queue.
Examples
use maitake_sync::WaitQueue;
use std::sync::Arc;
let queue = Arc::new(WaitQueue::new());
// spawn multiple tasks to wait on the queue.
let task1 = task::spawn({
let queue = queue.clone();
async move {
println!("task 1 waiting...");
queue.wait().await;
println!("task 1 woken")
}
});
let task2 = task::spawn({
let queue = queue.clone();
async move {
println!("task 2 waiting...");
queue.wait().await;
println!("task 2 woken")
}
});
// yield to the scheduler so that both tasks register
// themselves to wait on the queue.
task::yield_now().await;
// neither task will have been woken.
assert!(!task1.is_finished());
assert!(!task2.is_finished());
// wake all tasks waiting on the queue.
queue.wake_all();
// yield to the scheduler again so that the tasks can execute.
task::yield_now().await;
assert!(task1.is_finished());
assert!(task2.is_finished());
sourcepub fn close(&self)
pub fn close(&self)
Close the queue, indicating that it may no longer be used.
Once a queue is closed, all wait()
calls (current or future) will
return an error.
This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.
sourcepub fn wait(&self) -> Wait<'_, Lock> ⓘ
pub fn wait(&self) -> Wait<'_, Lock> ⓘ
Wait to be woken up by this queue.
Equivalent to:
async fn wait(&self);
This returns a Wait
Future
that will complete when the task is
woken by a call to wake()
or wake_all()
, or when the WaitQueue
is dropped.
Each WaitQueue
holds a single wakeup. If wake()
was previously
called while no tasks were waiting on the queue, then wait().await
will complete immediately, consuming the stored wakeup. Otherwise,
wait().await
waits to be woken by the next call to wake()
or
wake_all()
.
The Wait
future is not guaranteed to receive wakeups from calls to
wake()
if it has not yet been polled. See the documentation for the
Wait::subscribe()
method for details on receiving wakeups from the
queue prior to polling the Wait
future for the first time.
A Wait
future is is guaranteed to recieve wakeups from calls to
wake_all()
as soon as it is created, even if it has not yet been
polled.
Returns
The Future
returned by this method completes with one of the
following outputs:
Ok
(())
if the task was woken by a call towake()
orwake_all()
.Err
(
Closed
)
if the task was woken by theWaitQueue
beingclose
d.
Cancellation
A WaitQueue
fairly distributes wakeups to waiting tasks in the order
that they started to wait. If a Wait
future is dropped, the task
will forfeit its position in the queue.
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
let queue = Arc::new(WaitQueue::new());
let waiter = task::spawn({
// clone the queue to move into the spawned task
let queue = queue.clone();
async move {
queue.wait().await;
println!("received wakeup!");
}
});
println!("waking task...");
queue.wake();
waiter.await.unwrap();
sourcepub async fn wait_for<F>(&self, f: F) -> Result<(), Closed>
pub async fn wait_for<F>(&self, f: F) -> Result<(), Closed>
Asynchronously poll the given function f
until a condition occurs,
using the WaitQueue
to only re-poll when notified.
This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).
In particular, this function correctly registers interest in the WaitQueue
prior to polling the function, ensuring that there is not a chance of a race
where the condition occurs AFTER checking but BEFORE registering interest
in the WaitQueue
, which could lead to deadlock.
This is intended to have similar behavior to Condvar
in the standard library,
but asynchronous, and not requiring operating system intervention (or existence).
In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the queue, allowing the polling function to check status fields for partial progress or completion.
Consider using Self::wait_for_value()
if your function does return a value.
Consider using WaitCell::wait_for()
if you do not need multiple waiters.
Returns
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
use std::sync::atomic::{AtomicU8, Ordering};
let queue = Arc::new(WaitQueue::new());
let num = Arc::new(AtomicU8::new(0));
let waiter1 = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
println!("received wakeup!");
}
});
let waiter2 = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
println!("received wakeup!");
}
});
println!("poking task...");
for i in 0..20 {
num.store(i, Ordering::Relaxed);
queue.wake();
}
waiter1.await.unwrap();
waiter2.await.unwrap();
sourcepub async fn wait_for_value<T, F>(&self, f: F) -> Result<T, Closed>
pub async fn wait_for_value<T, F>(&self, f: F) -> Result<T, Closed>
Asynchronously poll the given function f
until a condition occurs,
using the WaitQueue
to only re-poll when notified.
This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).
In particular, this function correctly registers interest in the WaitQueue
prior to polling the function, ensuring that there is not a chance of a race
where the condition occurs AFTER checking but BEFORE registering interest
in the WaitQueue
, which could lead to deadlock.
This is intended to have similar behavior to Condvar
in the standard library,
but asynchronous, and not requiring operating system intervention (or existence).
In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the queue, allowing the polling function to check status fields for partial progress or completion, and also return the status flags at the same time.
Consider using Self::wait_for()
if your function does not return a value.
Consider using WaitCell::wait_for_value()
if you do not need multiple waiters.
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
use std::sync::atomic::{AtomicU8, Ordering};
let queue = Arc::new(WaitQueue::new());
let num = Arc::new(AtomicU8::new(0));
let waiter1 = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
let rxd = queue.wait_for_value(|| {
let val = num.load(Ordering::Relaxed);
if val > 5 {
return Some(val);
}
None
}).await.unwrap();
assert!(rxd > 5);
println!("received wakeup with value: {rxd}");
}
});
let waiter2 = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
let rxd = queue.wait_for_value(|| {
let val = num.load(Ordering::Relaxed);
if val > 10 {
return Some(val);
}
None
}).await.unwrap();
assert!(rxd > 10);
println!("received wakeup with value: {rxd}");
}
});
println!("poking task...");
for i in 0..20 {
num.store(i, Ordering::Relaxed);
queue.wake();
}
waiter1.await.unwrap();
waiter2.await.unwrap();
source§impl<Lock> WaitQueue<Lock>where
Lock: ScopedRawMutex,
impl<Lock> WaitQueue<Lock>where
Lock: ScopedRawMutex,
sourcepub fn wait_owned(self: &Arc<WaitQueue<Lock>>) -> WaitOwned<Lock> ⓘ
Available on crate feature alloc
only.
pub fn wait_owned(self: &Arc<WaitQueue<Lock>>) -> WaitOwned<Lock> ⓘ
alloc
only.Wait to be woken up by this queue, returning a future that’s valid
for the 'static
lifetime.
This returns a WaitOwned
future that will complete when the task
is woken by a call to wake()
or wake_all()
, or when the
WaitQueue
is closed.
This is identical to the wait()
method, except that it takes a
Arc
reference to the WaitQueue
, allowing the returned future
to live for the 'static
lifetime. See the documentation for
wait()
for details on how to use the future returned by this
method.
Returns
The Future
returned by this method completes with one of the
following outputs:
Ok
(())
if the task was woken by a call towake()
orwake_all()
.Err
(
Closed
)
if the task was woken by theWaitQueue
being closed.
Cancellation
A WaitQueue
fairly distributes wakeups to waiting tasks in the
order that they started to wait. If a WaitOwned
future is
dropped, the task will forfeit its position in the queue.