pub struct WaitCell { /* private fields */ }
Expand description
An atomically registered Waker
.
This cell stores the Waker
of a single task. A Waker
is stored in
the cell either by calling poll_wait
, or by polling a wait
future. Once a task’s Waker
is stored in a WaitCell
, it can be woken
by calling wake
on the WaitCell
.
Implementation Notes
This is inspired by the AtomicWaker
type used in Tokio’s
synchronization primitives, with the following modifications:
- An additional bit of state is added to allow setting a “close” bit.
- A
WaitCell
is always woken by value (for now). WaitCell
does not handle unwinding, becausemaitake
does not support unwinding
Implementations§
§impl WaitCell
impl WaitCell
pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>>
pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>>
Poll to wait on this WaitCell
, consuming a stored wakeup or
registering the Waker
from the provided Context
to be woken by
the next wakeup.
Once a Waker
has been registered, a subsequent call to wake
will
wake that Waker
.
Returns
Poll::Pending
if theWaker
was registered. If this method returnsPoll::Pending
, then the registeredWaker
will be woken by a subsequent call towake
.Poll::Ready
(
Ok
(()))
if the cell was woken by a call towake
while theWaker
was being registered.Poll::Ready
(
Err
(
PollWaitError::Closed
))
if theWaitCell
has been closed.Poll::Ready
(
Err
(
PollWaitError::Busy
))
if another task was concurrently registering itsWaker
with thisWaitCell
.
pub fn wait(&self) -> Wait<'_> ⓘ
pub fn wait(&self) -> Wait<'_> ⓘ
Wait to be woken up by this cell.
Returns
This future completes with the following values:
Ok
(())
if the future was woken by a call towake
or another task callingpoll_wait
orwait
on thisWaitCell
.Err
(
Closed
)
if the task was woken by a call toclose
, or theWaitCell
was already closed.
Note: The calling task’s Waker
is not registered until AFTER the
first time the returned Wait
future is polled. This means that if a
call to wake
occurs between when wait
is called and when the
future is first polled, the future will not complete. If the caller is
responsible for performing an operation which will result in an eventual
wakeup, prefer calling subscribe
before performing that operation
and .await
ing the Wait
future returned by subscribe
.
pub fn subscribe(&self) -> Subscribe<'_> ⓘ
pub fn subscribe(&self) -> Subscribe<'_> ⓘ
Eagerly subscribe to notifications from this WaitCell
.
This method returns a Subscribe
Future
, which outputs a Wait
Future
. Awaiting the Subscribe
future will eagerly register the
calling task to be woken by this WaitCell
, so that the returned
Wait
future will be woken by any calls to wake
(or close
)
that occur between when the Subscribe
future completes and when the
returned Wait
future is .await
ed.
This is primarily intended for scenarios where the task that waits on a
WaitCell
is responsible for performing some operation that
ultimately results in the WaitCell
being woken. If the task were to
simply perform the operation and then call wait
on the WaitCell
,
a potential race condition could occur where the operation completes and
wakes the WaitCell
before the Wait
future is first .await
ed.
Using subscribe
, the task can ensure that it is ready to be woken by
the cell before performing an operation that could result in it being
woken.
These scenarios occur when a wakeup is triggered by another thread/CPU
core in response to an operation performed in the task waiting on the
WaitCell
, or when the wakeup is triggered by a hardware interrupt
resulting from operations performed in the task.
Examples
use maitake_sync::WaitCell;
// Perform an operation that results in a concurrent wakeup, such as
// unmasking an interrupt.
fn do_something_that_causes_a_wakeup() {
// ...
}
static WAIT_CELL: WaitCell = WaitCell::new();
// Subscribe to notifications from the cell *before* calling
// `do_something_that_causes_a_wakeup()`, to ensure that we are
// ready to be woken when the interrupt is unmasked.
let wait = WAIT_CELL.subscribe().await;
// Actually perform the operation.
do_something_that_causes_a_wakeup();
// Wait for the wakeup. If the wakeup occurred *before* the first
// poll of the `wait` future had successfully subscribed to the
// `WaitCell`, we would still receive the wakeup, because the
// `subscribe` future ensured that our waker was registered to be
// woken.
wait.await.expect("WaitCell is not closed");
pub async fn wait_for<F>(&self, f: F) -> Result<(), Closed>
pub async fn wait_for<F>(&self, f: F) -> Result<(), Closed>
Asynchronously poll the given function f
until a condition occurs,
using the WaitCell
to only re-poll when notified.
This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).
In particular, this function correctly registers interest in the WaitCell
prior to polling the function, ensuring that there is not a chance of a race
where the condition occurs AFTER checking but BEFORE registering interest
in the WaitCell
, which could lead to deadlock.
This is intended to have similar behavior to Condvar
in the standard library,
but asynchronous, and not requiring operating system intervention (or existence).
In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the cell, allowing the polling function to check status fields for partial progress or completion.
Consider using Self::wait_for_value()
if your function does return a value.
Consider using WaitQueue::wait_for()
if you need multiple waiters.
Returns
Examples
use std::sync::Arc;
use maitake_sync::WaitCell;
use std::sync::atomic::{AtomicU8, Ordering};
let queue = Arc::new(WaitCell::new());
let num = Arc::new(AtomicU8::new(0));
let waiter = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
println!("received wakeup!");
}
});
println!("poking task...");
for i in 0..20 {
num.store(i, Ordering::Relaxed);
queue.wake();
}
waiter.await.unwrap();
pub async fn wait_for_value<T, F>(&self, f: F) -> Result<T, Closed>
pub async fn wait_for_value<T, F>(&self, f: F) -> Result<T, Closed>
Asynchronously poll the given function f
until a condition occurs,
using the WaitCell
to only re-poll when notified.
This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).
In particular, this function correctly registers interest in the WaitCell
prior to polling the function, ensuring that there is not a chance of a race
where the condition occurs AFTER checking but BEFORE registering interest
in the WaitCell
, which could lead to deadlock.
This is intended to have similar behavior to Condvar
in the standard library,
but asynchronous, and not requiring operating system intervention (or existence).
In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the cell, allowing the polling function to check status fields for partial progress or completion, and also return the status flags at the same time.
Consider using Self::wait_for()
if your function does not return a value.
Consider using WaitQueue::wait_for_value()
if you need multiple waiters.
Examples
use std::sync::Arc;
use maitake_sync::WaitCell;
use std::sync::atomic::{AtomicU8, Ordering};
let queue = Arc::new(WaitCell::new());
let num = Arc::new(AtomicU8::new(0));
let waiter = task::spawn({
// clone items to move into the spawned task
let queue = queue.clone();
let num = num.clone();
async move {
let rxd = queue.wait_for_value(|| {
let val = num.load(Ordering::Relaxed);
if val > 5 {
return Some(val);
}
None
}).await.unwrap();
assert!(rxd > 5);
println!("received wakeup with value: {rxd}");
}
});
println!("poking task...");
for i in 0..20 {
num.store(i, Ordering::Relaxed);
queue.wake();
}
waiter.await.unwrap();