Struct maitake_sync::WaitQueue

source ·
pub struct WaitQueue<Lock: ScopedRawMutex = DefaultMutex> { /* private fields */ }
Expand description

A queue of waiting tasks which can be woken in first-in, first-out order, or all at once.

A WaitQueue allows any number of tasks to wait asynchronously and be woken when some event occurs, either individually in first-in, first-out order, or all at once. This makes it a vital building block of runtime services (such as timers or I/O resources), where it may be used to wake a set of tasks when a timer completes or when a resource becomes available. It can be equally useful for implementing higher-level synchronization primitives: for example, a WaitQueue plus an UnsafeCell is essentially an entire implementation of a fair asynchronous mutex. Finally, a WaitQueue can be a useful synchronization primitive on its own: sometimes, you just need to have a bunch of tasks wait for something and then wake them all up.

Overriding the blocking mutex

This type uses a blocking Mutex internally to synchronize access to its wait list. By default, this is a DefaultMutex. To use an alternative ScopedRawMutex implementation, use the new_with_raw_mutex constructor. See the documentation on overriding mutex implementations for more details.

Examples

Waking a single task at a time by calling wake:

use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;

const TASKS: usize = 10;

// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();

// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());

// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
    let q = q.clone();
    scheduler.spawn(async move {
        // Wait to be woken by the queue.
        q.wait().await.expect("queue is not closed");
    });
}

// Tick the scheduler once.
let tick = scheduler.tick();

// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");

let mut completed = 0;
for i in 1..=TASKS {
    // Wake the next task from the queue.
    q.wake();

    // Tick the scheduler.
    let tick = scheduler.tick();

    // A single task should have completed on this tick.
    completed += tick.completed;
    assert_eq!(completed, i);
}

assert_eq!(completed, TASKS, "all tasks should have completed");

Waking all tasks using wake_all:

use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;

const TASKS: usize = 10;

// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();

// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());

// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
    let q = q.clone();
    scheduler.spawn(async move {
        // Wait to be woken by the queue.
        q.wait().await.expect("queue is not closed");
    });
}

// Tick the scheduler once.
let tick = scheduler.tick();

// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");

// Wake all tasks waiting for the queue.
q.wake_all();

// Tick the scheduler again to run the woken tasks.
let tick = scheduler.tick();

// All tasks have now completed, since they were woken by the
// queue.
assert_eq!(tick.completed, TASKS, "all tasks should have completed");

Implementation Notes

This type is implemented using an intrusive doubly-linked list.

The intrusive aspect of this list is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.

However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.

Instead, we just stick a Mutex around the linked list, which must be acquired to pop nodes from it, or for nodes to remove themselves when futures are cancelled. This is a bit sad, but the critical sections for this mutex are short enough that we still get pretty good performance despite it.

Implementations§

source§

impl WaitQueue

source

pub const fn new() -> Self

Returns a new WaitQueue.

This constructor returns a WaitQueue that uses a DefaultMutex as the ScopedRawMutex implementation for wait list synchronization. To use a different ScopedRawMutex implementation, use the new_with_raw_mutex constructor, instead. See the documentation on overriding mutex implementations for more details.

source§

impl<Lock> WaitQueue<Lock>
where Lock: ScopedRawMutex,

source

pub const fn new_with_raw_mutex(lock: Lock) -> Self

Returns a new WaitQueue, using the provided ScopedRawMutex implementation for wait-list synchronization.

This constructor allows a WaitQueue to be constructed with any type that implements ScopedRawMutex as the underlying raw blocking mutex implementation. See the documentation on overriding mutex implementations for more details.

source

pub fn wake(&self)

Wake the next task in the queue.

If the queue is empty, a wakeup is stored in the WaitQueue, and the next call to wait().await will complete immediately. If one or more tasks are currently in the queue, the first task in the queue is woken.

At most one wakeup will be stored in the queue at any time. If wake() is called many times while there are no tasks in the queue, only a single wakeup is stored.

Examples
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;

let queue = Arc::new(WaitQueue::new());

let waiter = task::spawn({
    // clone the queue to move into the spawned task
    let queue = queue.clone();
    async move {
        queue.wait().await;
        println!("received wakeup!");
    }
});

println!("waking task...");
queue.wake();

waiter.await.unwrap();
source

pub fn wake_all(&self)

Wake all tasks currently in the queue.

All tasks currently waiting on the queue are woken. Unlike wake(), a wakeup is not stored in the queue to wake the next call to wait() if the queue is empty. Instead, this method only wakes all currently registered waiters. Registering a task to be woken is done by awaiting the Future returned by the wait() method on this queue.

Examples
use maitake_sync::WaitQueue;
use std::sync::Arc;

let queue = Arc::new(WaitQueue::new());

// spawn multiple tasks to wait on the queue.
let task1 = task::spawn({
    let queue = queue.clone();
    async move {
        println!("task 1 waiting...");
        queue.wait().await;
        println!("task 1 woken")
    }
});

let task2 = task::spawn({
    let queue = queue.clone();
    async move {
        println!("task 2 waiting...");
        queue.wait().await;
        println!("task 2 woken")
    }
});

// yield to the scheduler so that both tasks register
// themselves to wait on the queue.
task::yield_now().await;

// neither task will have been woken.
assert!(!task1.is_finished());
assert!(!task2.is_finished());

// wake all tasks waiting on the queue.
queue.wake_all();

// yield to the scheduler again so that the tasks can execute.
task::yield_now().await;

assert!(task1.is_finished());
assert!(task2.is_finished());
source

pub fn close(&self)

Close the queue, indicating that it may no longer be used.

Once a queue is closed, all wait() calls (current or future) will return an error.

This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.

source

pub fn wait(&self) -> Wait<'_, Lock>

Wait to be woken up by this queue.

Equivalent to:

async fn wait(&self);

This returns a Wait Future that will complete when the task is woken by a call to wake() or wake_all(), or when the WaitQueue is dropped.

Each WaitQueue holds a single wakeup. If wake() was previously called while no tasks were waiting on the queue, then wait().await will complete immediately, consuming the stored wakeup. Otherwise, wait().await waits to be woken by the next call to wake() or wake_all().

The Wait future is not guaranteed to receive wakeups from calls to wake() if it has not yet been polled. See the documentation for the Wait::subscribe() method for details on receiving wakeups from the queue prior to polling the Wait future for the first time.

A Wait future is is guaranteed to recieve wakeups from calls to wake_all() as soon as it is created, even if it has not yet been polled.

Returns

The Future returned by this method completes with one of the following outputs:

Cancellation

A WaitQueue fairly distributes wakeups to waiting tasks in the order that they started to wait. If a Wait future is dropped, the task will forfeit its position in the queue.

Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;

let queue = Arc::new(WaitQueue::new());

let waiter = task::spawn({
    // clone the queue to move into the spawned task
    let queue = queue.clone();
    async move {
        queue.wait().await;
        println!("received wakeup!");
    }
});

println!("waking task...");
queue.wake();

waiter.await.unwrap();
source

pub async fn wait_for<F: FnMut() -> bool>(&self, f: F) -> WaitResult<()>

Asynchronously poll the given function f until a condition occurs, using the WaitQueue to only re-poll when notified.

This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).

In particular, this function correctly registers interest in the WaitQueue prior to polling the function, ensuring that there is not a chance of a race where the condition occurs AFTER checking but BEFORE registering interest in the WaitQueue, which could lead to deadlock.

This is intended to have similar behavior to Condvar in the standard library, but asynchronous, and not requiring operating system intervention (or existence).

In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the queue, allowing the polling function to check status fields for partial progress or completion.

Consider using Self::wait_for_value() if your function does return a value.

Consider using WaitCell::wait_for() if you do not need multiple waiters.

Returns
Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
use std::sync::atomic::{AtomicU8, Ordering};

let queue = Arc::new(WaitQueue::new());
let num = Arc::new(AtomicU8::new(0));

let waiter1 = task::spawn({
    // clone items to move into the spawned task
    let queue = queue.clone();
    let num = num.clone();
    async move {
        queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
        println!("received wakeup!");
    }
});

let waiter2 = task::spawn({
    // clone items to move into the spawned task
    let queue = queue.clone();
    let num = num.clone();
    async move {
        queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
        println!("received wakeup!");
    }
});

println!("poking task...");

for i in 0..20 {
    num.store(i, Ordering::Relaxed);
    queue.wake();
}

waiter1.await.unwrap();
waiter2.await.unwrap();
source

pub async fn wait_for_value<T, F: FnMut() -> Option<T>>( &self, f: F ) -> WaitResult<T>

Asynchronously poll the given function f until a condition occurs, using the WaitQueue to only re-poll when notified.

This can be used to implement a “wait loop”, turning a “try” function (e.g. “try_recv” or “try_send”) into an asynchronous function (e.g. “recv” or “send”).

In particular, this function correctly registers interest in the WaitQueue prior to polling the function, ensuring that there is not a chance of a race where the condition occurs AFTER checking but BEFORE registering interest in the WaitQueue, which could lead to deadlock.

This is intended to have similar behavior to Condvar in the standard library, but asynchronous, and not requiring operating system intervention (or existence).

In particular, this can be used in cases where interrupts or events are used to signify readiness or completion of some task, such as the completion of a DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt can wake the queue, allowing the polling function to check status fields for partial progress or completion, and also return the status flags at the same time.

Consider using Self::wait_for() if your function does not return a value.

Consider using WaitCell::wait_for_value() if you do not need multiple waiters.

Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;
use std::sync::atomic::{AtomicU8, Ordering};

let queue = Arc::new(WaitQueue::new());
let num = Arc::new(AtomicU8::new(0));

let waiter1 = task::spawn({
    // clone items to move into the spawned task
    let queue = queue.clone();
    let num = num.clone();
    async move {
        let rxd = queue.wait_for_value(|| {
            let val = num.load(Ordering::Relaxed);
            if val > 5 {
                return Some(val);
            }
            None
        }).await.unwrap();
        assert!(rxd > 5);
        println!("received wakeup with value: {rxd}");
    }
});

let waiter2 = task::spawn({
    // clone items to move into the spawned task
    let queue = queue.clone();
    let num = num.clone();
    async move {
        let rxd = queue.wait_for_value(|| {
            let val = num.load(Ordering::Relaxed);
            if val > 10 {
                return Some(val);
            }
            None
        }).await.unwrap();
        assert!(rxd > 10);
        println!("received wakeup with value: {rxd}");
    }
});

println!("poking task...");

for i in 0..20 {
    num.store(i, Ordering::Relaxed);
    queue.wake();
}

waiter1.await.unwrap();
waiter2.await.unwrap();
source

pub fn is_closed(&self) -> bool

Returns true if this WaitQueue is closed.

source§

impl<Lock: ScopedRawMutex> WaitQueue<Lock>

source

pub fn wait_owned(self: &Arc<Self>) -> WaitOwned<Lock>

Available on crate feature alloc only.

Wait to be woken up by this queue, returning a future that’s valid for the 'static lifetime.

This returns a WaitOwned future that will complete when the task is woken by a call to wake() or wake_all(), or when the WaitQueue is closed.

This is identical to the wait() method, except that it takes a Arc reference to the WaitQueue, allowing the returned future to live for the 'static lifetime. See the documentation for wait() for details on how to use the future returned by this method.

Returns

The Future returned by this method completes with one of the following outputs:

Cancellation

A WaitQueue fairly distributes wakeups to waiting tasks in the order that they started to wait. If a WaitOwned future is dropped, the task will forfeit its position in the queue.

Trait Implementations§

source§

impl<Lock: Debug + ScopedRawMutex> Debug for WaitQueue<Lock>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<Lock = DefaultMutex> !RefUnwindSafe for WaitQueue<Lock>

§

impl<Lock> Send for WaitQueue<Lock>
where Lock: Send,

§

impl<Lock> Sync for WaitQueue<Lock>
where Lock: Sync,

§

impl<Lock> Unpin for WaitQueue<Lock>
where Lock: Unpin,

§

impl<Lock = DefaultMutex> !UnwindSafe for WaitQueue<Lock>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more