maitake_sync/
wait_queue.rs

1//! A queue of waiting tasks that can be woken in first-in, first-out order (or
2//! all at once).
3//!
4//! See the [`WaitQueue`] type's documentation for details.
5#[cfg(any(test, maitake_ultraverbose))]
6use crate::util::fmt;
7use crate::{
8    blocking::{DefaultMutex, Mutex, ScopedRawMutex},
9    loom::{
10        cell::UnsafeCell,
11        sync::atomic::{AtomicUsize, Ordering::*},
12    },
13    util::{CachePadded, WakeBatch},
14    WaitResult,
15};
16use cordyceps::{
17    list::{self, List},
18    Linked,
19};
20use core::{
21    future::Future,
22    marker::PhantomPinned,
23    mem,
24    pin::Pin,
25    ptr::{self, NonNull},
26    task::{Context, Poll, Waker},
27};
28use mycelium_bitfield::{bitfield, enum_from_bits, FromBits};
29use pin_project::{pin_project, pinned_drop};
30
31#[cfg(test)]
32mod tests;
33
34/// A queue of waiting tasks which can be [woken in first-in, first-out
35/// order][wake], or [all at once][wake_all].
36///
37/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be
38/// woken when some event occurs, either [individually][wake] in first-in,
39/// first-out order, or [all at once][wake_all]. This makes it a vital building
40/// block of runtime services (such as timers or I/O resources), where it may be
41/// used to wake a set of tasks when a timer completes or when a resource
42/// becomes available. It can be equally useful for implementing higher-level
43/// synchronization primitives: for example, a `WaitQueue` plus an
44/// [`UnsafeCell`] is essentially [an entire implementation of a fair
45/// asynchronous mutex][mutex]. Finally, a `WaitQueue` can be a useful
46/// synchronization primitive on its own: sometimes, you just need to have a
47/// bunch of tasks wait for something and then wake them all up.
48///
49/// # Overriding the blocking mutex
50///
51/// This type uses a [blocking `Mutex`](crate::blocking::Mutex) internally to
52/// synchronize access to its wait list. By default, this is a [`DefaultMutex`]. To
53/// use an alternative [`ScopedRawMutex`] implementation, use the
54/// [`new_with_raw_mutex`](Self::new_with_raw_mutex) constructor. See [the documentation
55/// on overriding mutex
56/// implementations](crate::blocking#overriding-mutex-implementations) for more
57/// details.
58///
59/// # Examples
60///
61/// Waking a single task at a time by calling [`wake`][wake]:
62///
63/// ```ignore
64/// use std::sync::Arc;
65/// use maitake::scheduler::Scheduler;
66/// use maitake_sync::WaitQueue;
67///
68/// const TASKS: usize = 10;
69///
70/// // In order to spawn tasks, we need a `Scheduler` instance.
71/// let scheduler = Scheduler::new();
72///
73/// // Construct a new `WaitQueue`.
74/// let q = Arc::new(WaitQueue::new());
75///
76/// // Spawn some tasks that will wait on the queue.
77/// for _ in 0..TASKS {
78///     let q = q.clone();
79///     scheduler.spawn(async move {
80///         // Wait to be woken by the queue.
81///         q.wait().await.expect("queue is not closed");
82///     });
83/// }
84///
85/// // Tick the scheduler once.
86/// let tick = scheduler.tick();
87///
88/// // No tasks should complete on this tick, as they are all waiting
89/// // to be woken by the queue.
90/// assert_eq!(tick.completed, 0, "no tasks have been woken");
91///
92/// let mut completed = 0;
93/// for i in 1..=TASKS {
94///     // Wake the next task from the queue.
95///     q.wake();
96///
97///     // Tick the scheduler.
98///     let tick = scheduler.tick();
99///
100///     // A single task should have completed on this tick.
101///     completed += tick.completed;
102///     assert_eq!(completed, i);
103/// }
104///
105/// assert_eq!(completed, TASKS, "all tasks should have completed");
106/// ```
107///
108/// Waking all tasks using [`wake_all`][wake_all]:
109///
110/// ```ignore
111/// use std::sync::Arc;
112/// use maitake::scheduler::Scheduler;
113/// use maitake_sync::WaitQueue;
114///
115/// const TASKS: usize = 10;
116///
117/// // In order to spawn tasks, we need a `Scheduler` instance.
118/// let scheduler = Scheduler::new();
119///
120/// // Construct a new `WaitQueue`.
121/// let q = Arc::new(WaitQueue::new());
122///
123/// // Spawn some tasks that will wait on the queue.
124/// for _ in 0..TASKS {
125///     let q = q.clone();
126///     scheduler.spawn(async move {
127///         // Wait to be woken by the queue.
128///         q.wait().await.expect("queue is not closed");
129///     });
130/// }
131///
132/// // Tick the scheduler once.
133/// let tick = scheduler.tick();
134///
135/// // No tasks should complete on this tick, as they are all waiting
136/// // to be woken by the queue.
137/// assert_eq!(tick.completed, 0, "no tasks have been woken");
138///
139/// // Wake all tasks waiting for the queue.
140/// q.wake_all();
141///
142/// // Tick the scheduler again to run the woken tasks.
143/// let tick = scheduler.tick();
144///
145/// // All tasks have now completed, since they were woken by the
146/// // queue.
147/// assert_eq!(tick.completed, TASKS, "all tasks should have completed");
148/// ```
149///
150/// # Implementation Notes
151///
152/// This type is implemented using an [intrusive doubly-linked list][ilist].
153///
154/// The *[intrusive]* aspect of this list is important, as it means that it does
155/// not allocate memory. Instead, nodes in the linked list are stored in the
156/// futures of tasks trying to wait for capacity. This means that it is not
157/// necessary to allocate any heap memory for each task waiting to be woken.
158///
159/// However, the intrusive linked list introduces one new danger: because
160/// futures can be *cancelled*, and the linked list nodes live within the
161/// futures trying to wait on the queue, we *must* ensure that the node
162/// is unlinked from the list before dropping a cancelled future. Failure to do
163/// so would result in the list containing dangling pointers. Therefore, we must
164/// use a *doubly-linked* list, so that nodes can edit both the previous and
165/// next node when they have to remove themselves. This is kind of a bummer, as
166/// it means we can't use something nice like this [intrusive queue by Dmitry
167/// Vyukov][2], and there are not really practical designs for lock-free
168/// doubly-linked lists that don't rely on some kind of deferred reclamation
169/// scheme such as hazard pointers or QSBR.
170///
171/// Instead, we just stick a [`Mutex`] around the linked list, which must be
172/// acquired to pop nodes from it, or for nodes to remove themselves when
173/// futures are cancelled. This is a bit sad, but the critical sections for this
174/// mutex are short enough that we still get pretty good performance despite it.
175///
176/// [`Waker`]: core::task::Waker
177/// [wait]: WaitQueue::wait
178/// [wake]: WaitQueue::wake
179/// [wake_all]: WaitQueue::wake_all
180/// [`UnsafeCell`]: core::cell::UnsafeCell
181/// [ilist]: cordyceps::List
182/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
183/// [mutex]: crate::Mutex
184/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
185#[derive(Debug)]
186pub struct WaitQueue<Lock: ScopedRawMutex = DefaultMutex> {
187    /// The wait queue's state variable.
188    state: CachePadded<AtomicUsize>,
189
190    /// The linked list of waiters.
191    ///
192    /// # Safety
193    ///
194    /// This is protected by a mutex; the mutex *must* be acquired when
195    /// manipulating the linked list, OR when manipulating waiter nodes that may
196    /// be linked into the list. If a node is known to not be linked, it is safe
197    /// to modify that node (such as by waking the stored [`Waker`]) without
198    /// holding the lock; otherwise, it may be modified through the list, so the
199    /// lock must be held when modifying the
200    /// node.
201    ///
202    /// A spinlock is used here, in order to support `no_std` platforms; when
203    /// running `loom` tests, a `loom` mutex is used instead to simulate the
204    /// spinlock, because loom doesn't play nice with  real spinlocks.
205    queue: Mutex<List<Waiter>, Lock>,
206}
207
208/// Future returned from [`WaitQueue::wait()`].
209///
210/// This future is fused, so once it has completed, any future calls to poll
211/// will immediately return [`Poll::Ready`].
212///
213/// # Notes
214///
215/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a `Wait`
216/// future once it has been polled. For instance, the following code must not
217/// compile:
218///
219///```compile_fail
220/// use maitake_sync::wait_queue::Wait;
221///
222/// // Calls to this function should only compile if `T` is `Unpin`.
223/// fn assert_unpin<T: Unpin>() {}
224///
225/// assert_unpin::<Wait<'_>>();
226/// ```
227#[derive(Debug)]
228#[pin_project(PinnedDrop)]
229#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
230pub struct Wait<'a, Lock: ScopedRawMutex = DefaultMutex> {
231    /// The [`WaitQueue`] being waited on.
232    queue: &'a WaitQueue<Lock>,
233
234    /// Entry in the wait queue linked list.
235    #[pin]
236    waiter: Waiter,
237}
238
239/// A waiter node which may be linked into a wait queue.
240#[derive(Debug)]
241#[repr(C)]
242#[pin_project]
243struct Waiter {
244    /// The intrusive linked list node.
245    ///
246    /// This *must* be the first field in the struct in order for the `Linked`
247    /// implementation to be sound.
248    #[pin]
249    node: UnsafeCell<Node>,
250
251    /// The future's state.
252    state: WaitStateBits,
253}
254
255#[derive(Debug)]
256struct Node {
257    /// Intrusive linked list pointers.
258    links: list::Links<Waiter>,
259
260    /// The node's waker
261    waker: Wakeup,
262
263    // This type is !Unpin due to the heuristic from:
264    // <https://github.com/rust-lang/rust/pull/82834>
265    _pin: PhantomPinned,
266}
267
268bitfield! {
269    #[derive(Eq, PartialEq)]
270    struct QueueState<usize> {
271        /// The queue's state.
272        const STATE: State;
273
274        /// The number of times [`WaitQueue::wake_all`] has been called.
275        const WAKE_ALLS = ..;
276    }
277}
278
279bitfield! {
280    #[derive(Eq, PartialEq)]
281    struct WaitStateBits<usize> {
282        /// The waiter's state.
283        const STATE: WaitState;
284
285        /// The number of times [`WaitQueue::wake_all`] has been called.
286        const WAKE_ALLS = ..;
287    }
288}
289
290enum_from_bits! {
291    /// The state of a [`Waiter`] node in a [`WaitQueue`].
292    #[derive(Debug, Eq, PartialEq)]
293    enum WaitState<u8> {
294        /// The waiter has not yet been enqueued.
295        ///
296        /// The number of times [`WaitQueue::wake_all`] has been called is stored
297        /// when the node is created, in order to determine whether it was woken by
298        /// a stored wakeup when enqueueing.
299        ///
300        /// When in this state, the node is **not** part of the linked list, and
301        /// can be dropped without removing it from the list.
302        Start = 0b00,
303
304        /// The waiter is waiting.
305        ///
306        /// When in this state, the node **is** part of the linked list. If the
307        /// node is dropped in this state, it **must** be removed from the list
308        /// before dropping it. Failure to ensure this will result in dangling
309        /// pointers in the linked list!
310        Waiting = 0b01,
311
312        /// The waiter has been woken.
313        ///
314        /// When in this state, the node is **not** part of the linked list, and
315        /// can be dropped without removing it from the list.
316        Woken = 0b10,
317    }
318}
319
320/// The queue's current state.
321#[derive(Debug, Copy, Clone, Eq, PartialEq)]
322#[repr(u8)]
323enum State {
324    /// No waiters are queued, and there is no pending notification.
325    /// Waiting while the queue is in this state will enqueue the waiter;
326    /// notifying while in this state will store a pending notification in the
327    /// queue, transitioning to [`State::Woken`].
328    Empty = 0b00,
329
330    /// There are one or more waiters in the queue. Waiting while
331    /// the queue is in this state will not transition the state. Waking while
332    /// in this state will wake the first waiter in the queue; if this empties
333    /// the queue, then the queue will transition to [`State::Empty`].
334    Waiting = 0b01,
335
336    /// The queue has a stored notification. Waiting while the queue
337    /// is in this state will consume the pending notification *without*
338    /// enqueueing the waiter and transition the queue to [`State::Empty`].
339    /// Waking while in this state will leave the queue in this state.
340    Woken = 0b10,
341
342    /// The queue is closed. Waiting while in this state will return
343    /// [`Closed`] without transitioning the queue's state.
344    ///
345    /// *Note*: This *must* correspond to all state bits being set, as it's set
346    /// via a [`fetch_or`].
347    ///
348    /// [`Closed`]: crate::Closed
349    /// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or
350    Closed = 0b11,
351}
352
353#[derive(Clone, Debug)]
354enum Wakeup {
355    Empty,
356    Waiting(Waker),
357    One,
358    All,
359    Closed,
360}
361
362// === impl WaitQueue ===
363
364impl WaitQueue {
365    loom_const_fn! {
366        /// Returns a new `WaitQueue`.
367        ///
368        /// This constructor returns a `WaitQueue` that uses a [`DefaultMutex`] as
369        /// the [`ScopedRawMutex`] implementation for wait list synchronization.
370        /// To use a different [`ScopedRawMutex`] implementation, use the
371        /// [`new_with_raw_mutex`](Self::new_with_raw_mutex) constructor, instead. See
372        /// [the documentation on overriding mutex
373        /// implementations](crate::blocking#overriding-mutex-implementations)
374        /// for more details.
375        #[must_use]
376        pub fn new() -> Self {
377            Self::new_with_raw_mutex(DefaultMutex::new())
378        }
379    }
380}
381
382impl<Lock> Default for WaitQueue<Lock>
383where
384    Lock: ScopedRawMutex + Default,
385{
386    fn default() -> Self {
387        Self::new_with_raw_mutex(Lock::default())
388    }
389}
390
391impl<Lock> WaitQueue<Lock>
392where
393    Lock: ScopedRawMutex,
394{
395    loom_const_fn! {
396        /// Returns a new `WaitQueue`, using the provided [`ScopedRawMutex`]
397        /// implementation for wait-list synchronization.
398        ///
399        /// This constructor allows a `WaitQueue` to be constructed with any type that
400        /// implements [`ScopedRawMutex`] as the underlying raw blocking mutex
401        /// implementation. See [the documentation on overriding mutex
402        /// implementations](crate::blocking#overriding-mutex-implementations)
403        /// for more details.
404        #[must_use]
405        pub fn new_with_raw_mutex(lock: Lock) -> Self {
406            Self::make(State::Empty, lock)
407        }
408    }
409
410    loom_const_fn! {
411        /// Returns a new `WaitQueue` with a single stored wakeup.
412        ///
413        /// The first call to [`wait`] on this queue will immediately succeed.
414        ///
415        /// [`wait`]: Self::wait
416        // TODO(eliza): should this be a public API?
417        #[must_use]
418        pub(crate) fn new_woken(lock: Lock) -> Self {
419            Self::make(State::Woken, lock)
420        }
421    }
422
423    loom_const_fn! {
424        #[must_use]
425        fn make(state: State, lock: Lock) -> Self {
426            Self {
427                state: CachePadded::new(AtomicUsize::new(state.into_usize())),
428                queue: Mutex::new_with_raw_mutex(List::new(), lock),
429            }
430        }
431    }
432
433    /// Wake the next task in the queue.
434    ///
435    /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the
436    /// **next** call to [`wait().await`] will complete immediately. If one or more
437    /// tasks are currently in the queue, the first task in the queue is woken.
438    ///
439    /// At most one wakeup will be stored in the queue at any time. If `wake()`
440    /// is called many times while there are no tasks in the queue, only a
441    /// single wakeup is stored.
442    ///
443    /// [`wait().await`]: Self::wait()
444    ///
445    /// # Examples
446    ///
447    /// # Examples
448    ///
449    /// ```
450    /// # use tokio::task;
451    /// # #[tokio::main(flavor = "current_thread")]
452    /// # async fn test() {
453    /// use std::sync::Arc;
454    /// use maitake_sync::WaitQueue;
455    ///
456    /// let queue = Arc::new(WaitQueue::new());
457    ///
458    /// let waiter = task::spawn({
459    ///     // clone the queue to move into the spawned task
460    ///     let queue = queue.clone();
461    ///     async move {
462    ///         queue.wait().await;
463    ///         println!("received wakeup!");
464    ///     }
465    /// });
466    ///
467    /// println!("waking task...");
468    /// queue.wake();
469    ///
470    /// waiter.await.unwrap();
471    /// # }
472    /// # test();
473    /// ```
474    #[inline]
475    pub fn wake(&self) {
476        // snapshot the queue's current state.
477        let mut state = self.load();
478
479        // check if any tasks are currently waiting on this queue. if there are
480        // no waiting tasks, store the wakeup to be consumed by the next call to
481        // `wait`.
482        loop {
483            match state.get(QueueState::STATE) {
484                // if the queue is closed, bail.
485                State::Closed => return,
486                // if there are waiting tasks, break out of the loop and wake one.
487                State::Waiting => break,
488                _ => {}
489            }
490
491            let next = state.with_state(State::Woken);
492            // advance the state to `Woken`, and return (if we did so
493            // successfully)
494            match self.compare_exchange(state, next) {
495                Ok(_) => return,
496                Err(actual) => state = actual,
497            }
498        }
499
500        // okay, there are tasks waiting on the queue; we must acquire the lock
501        // on the linked list and wake the next task from the queue.
502        let waker = self.queue.with_lock(|queue| {
503            test_debug!("wake: -> locked");
504
505            // the queue's state may have changed while we were waiting to acquire
506            // the lock, so we need to acquire a new snapshot before we take the
507            // waker.
508            state = self.load();
509            self.wake_locked(queue, state)
510        });
511
512        //now that we've released the lock, wake the waiting task (if we
513        //actually deuqueued one).
514        if let Some(waker) = waker {
515            waker.wake();
516        }
517    }
518
519    /// Wake *all* tasks currently in the queue.
520    ///
521    /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a
522    /// wakeup is *not* stored in the queue to wake the next call to [`wait()`]
523    /// if the queue is empty. Instead, this method only wakes all currently
524    /// registered waiters. Registering a task to be woken is done by `await`ing
525    /// the [`Future`] returned by the [`wait()`] method on this queue.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// # use tokio::task;
531    /// # #[tokio::main(flavor = "current_thread")]
532    /// # async fn test() {
533    /// use maitake_sync::WaitQueue;
534    /// use std::sync::Arc;
535    ///
536    /// let queue = Arc::new(WaitQueue::new());
537    ///
538    /// // spawn multiple tasks to wait on the queue.
539    /// let task1 = task::spawn({
540    ///     let queue = queue.clone();
541    ///     async move {
542    ///         println!("task 1 waiting...");
543    ///         queue.wait().await;
544    ///         println!("task 1 woken")
545    ///     }
546    /// });
547    ///
548    /// let task2 = task::spawn({
549    ///     let queue = queue.clone();
550    ///     async move {
551    ///         println!("task 2 waiting...");
552    ///         queue.wait().await;
553    ///         println!("task 2 woken")
554    ///     }
555    /// });
556    ///
557    /// // yield to the scheduler so that both tasks register
558    /// // themselves to wait on the queue.
559    /// task::yield_now().await;
560    ///
561    /// // neither task will have been woken.
562    /// assert!(!task1.is_finished());
563    /// assert!(!task2.is_finished());
564    ///
565    /// // wake all tasks waiting on the queue.
566    /// queue.wake_all();
567    ///
568    /// // yield to the scheduler again so that the tasks can execute.
569    /// task::yield_now().await;
570    ///
571    /// assert!(task1.is_finished());
572    /// assert!(task2.is_finished());
573    /// # }
574    /// # test();
575    /// ```
576    ///
577    /// [`wake()`]: Self::wake
578    /// [`wait()`]: Self::wait
579    pub fn wake_all(&self) {
580        let mut batch = WakeBatch::new();
581        let mut waiters_remaining = true;
582
583        // This is a little bit contorted: we must load the state inside the
584        // lock, but for all states except for `Waiting`, we just need to bail
585        // out...but we can't `return` from the outer function inside the lock
586        // closure. Therefore, we just return a `bool` and, if it's `true`,
587        // return instead of doing more work.
588        let done = self.queue.with_lock(|queue| {
589            let state = self.load();
590
591            match test_dbg!(state.get(QueueState::STATE)) {
592                // If there are no waiters in the queue, increment the number of
593                // `wake_all` calls and return. incrementing the `wake_all` count
594                // must be performed inside the lock, so we do it here.
595                State::Woken | State::Empty => {
596                    self.state.fetch_add(QueueState::ONE_WAKE_ALL, SeqCst);
597                    true
598                }
599                // If the queue is already closed, this is a no-op. Just bail.
600                State::Closed => true,
601                // Okay, there are waiters in the queue. Transition to the empty
602                // state inside the lock and start draining the queue.
603                State::Waiting => {
604                    let next_state = QueueState::new()
605                        .with_state(State::Empty)
606                        .with(QueueState::WAKE_ALLS, state.get(QueueState::WAKE_ALLS) + 1);
607                    self.compare_exchange(state, next_state)
608                        .expect("state should not have transitioned while locked");
609
610                    // Drain the first batch of waiters from the queue.
611                    waiters_remaining =
612                        test_dbg!(Self::drain_to_wake_batch(&mut batch, queue, Wakeup::All));
613
614                    false
615                }
616            }
617        });
618
619        if done {
620            return;
621        }
622
623        batch.wake_all();
624        // As long as there are waiters remaining to wake, lock the queue, drain
625        // another batch, release the lock, and wake them.
626        while waiters_remaining {
627            self.queue.with_lock(|queue| {
628                waiters_remaining = Self::drain_to_wake_batch(&mut batch, queue, Wakeup::All);
629            });
630            batch.wake_all();
631        }
632    }
633
634    /// Close the queue, indicating that it may no longer be used.
635    ///
636    /// Once a queue is closed, all [`wait()`] calls (current or future) will
637    /// return an error.
638    ///
639    /// This method is generally used when implementing higher-level
640    /// synchronization primitives or resources: when an event makes a resource
641    /// permanently unavailable, the queue can be closed.
642    ///
643    /// [`wait()`]: Self::wait
644    pub fn close(&self) {
645        let state = self.state.fetch_or(State::Closed.into_usize(), SeqCst);
646        let state = test_dbg!(QueueState::from_bits(state));
647        if state.get(QueueState::STATE) != State::Waiting {
648            return;
649        }
650
651        let mut batch = WakeBatch::new();
652        let mut waking = true;
653        while waking {
654            waking = self
655                .queue
656                .with_lock(|queue| Self::drain_to_wake_batch(&mut batch, queue, Wakeup::Closed));
657            batch.wake_all();
658        }
659    }
660
661    /// Wait to be woken up by this queue.
662    ///
663    /// Equivalent to:
664    ///
665    /// ```ignore
666    /// async fn wait(&self);
667    /// ```
668    ///
669    /// This returns a [`Wait`] [`Future`] that will complete when the task is
670    /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue`
671    /// is dropped.
672    ///
673    /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously
674    /// called while no tasks were waiting on the queue, then `wait().await`
675    /// will complete immediately, consuming the stored wakeup. Otherwise,
676    /// `wait().await` waits to be woken by the next call to [`wake()`] or
677    /// [`wake_all()`].
678    ///
679    /// The [`Wait`] future is not guaranteed to receive wakeups from calls to
680    /// [`wake()`] if it has not yet been polled. See the documentation for the
681    /// [`Wait::subscribe()`] method for details on receiving wakeups from the
682    /// queue prior to polling the `Wait` future for the first time.
683    ///
684    /// A `Wait` future **is** is guaranteed to recieve wakeups from calls to
685    /// [`wake_all()`] as soon as it is created, even if it has not yet been
686    /// polled.
687    ///
688    /// # Returns
689    ///
690    /// The [`Future`] returned by this method completes with one of the
691    /// following [outputs](Future::Output):
692    ///
693    /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
694    ///   [`wake_all()`].
695    /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being
696    ///   [`close`d](WaitQueue::close).
697    ///
698    /// # Cancellation
699    ///
700    /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order
701    /// that they started to wait. If a [`Wait`] future is dropped, the task
702    /// will forfeit its position in the queue.
703    ///
704    /// # Examples
705    ///
706    /// ```
707    /// # use tokio::task;
708    /// # #[tokio::main(flavor = "current_thread")]
709    /// # async fn test() {
710    /// use std::sync::Arc;
711    /// use maitake_sync::WaitQueue;
712    ///
713    /// let queue = Arc::new(WaitQueue::new());
714    ///
715    /// let waiter = task::spawn({
716    ///     // clone the queue to move into the spawned task
717    ///     let queue = queue.clone();
718    ///     async move {
719    ///         queue.wait().await;
720    ///         println!("received wakeup!");
721    ///     }
722    /// });
723    ///
724    /// println!("waking task...");
725    /// queue.wake();
726    ///
727    /// waiter.await.unwrap();
728    /// # }
729    /// # test();
730    /// ```
731    ///
732    /// [`wake()`]: Self::wake
733    /// [`wake_all()`]: Self::wake_all
734    /// [`Closed`]: crate::Closed
735    pub fn wait(&self) -> Wait<'_, Lock> {
736        Wait {
737            queue: self,
738            waiter: self.waiter(),
739        }
740    }
741
742    pub(crate) fn try_wait(&self) -> Poll<WaitResult<()>> {
743        let mut state = self.load();
744        let initial_wake_alls = state.get(QueueState::WAKE_ALLS);
745        while state.get(QueueState::STATE) == State::Woken {
746            match self.compare_exchange(state, state.with_state(State::Empty)) {
747                Ok(_) => return Poll::Ready(Ok(())),
748                Err(actual) => state = actual,
749            }
750        }
751
752        match state.get(QueueState::STATE) {
753            State::Closed => crate::closed(),
754            _ if state.get(QueueState::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())),
755            State::Empty | State::Waiting => Poll::Pending,
756            State::Woken => Poll::Ready(Ok(())),
757        }
758    }
759
760    /// Asynchronously poll the given function `f` until a condition occurs,
761    /// using the [`WaitQueue`] to only re-poll when notified.
762    ///
763    /// This can be used to implement a "wait loop", turning a "try" function
764    /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
765    /// "recv" or "send").
766    ///
767    /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
768    /// prior to polling the function, ensuring that there is not a chance of a race
769    /// where the condition occurs AFTER checking but BEFORE registering interest
770    /// in the [`WaitQueue`], which could lead to deadlock.
771    ///
772    /// This is intended to have similar behavior to `Condvar` in the standard library,
773    /// but asynchronous, and not requiring operating system intervention (or existence).
774    ///
775    /// In particular, this can be used in cases where interrupts or events are used
776    /// to signify readiness or completion of some task, such as the completion of a
777    /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
778    /// can wake the queue, allowing the polling function to check status fields for
779    /// partial progress or completion.
780    ///
781    /// Consider using [`Self::wait_for_value()`] if your function does return a value.
782    ///
783    /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for)
784    /// if you do not need multiple waiters.
785    ///
786    /// # Returns
787    ///
788    /// * [`Ok`]`(())` if the closure returns `true`.
789    /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
790    ///
791    /// # Examples
792    ///
793    /// ```
794    /// # use tokio::task;
795    /// # #[tokio::main(flavor = "current_thread")]
796    /// # async fn test() {
797    /// use std::sync::Arc;
798    /// use maitake_sync::WaitQueue;
799    /// use std::sync::atomic::{AtomicU8, Ordering};
800    ///
801    /// let queue = Arc::new(WaitQueue::new());
802    /// let num = Arc::new(AtomicU8::new(0));
803    ///
804    /// let waiter1 = task::spawn({
805    ///     // clone items to move into the spawned task
806    ///     let queue = queue.clone();
807    ///     let num = num.clone();
808    ///     async move {
809    ///         queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
810    ///         println!("received wakeup!");
811    ///     }
812    /// });
813    ///
814    /// let waiter2 = task::spawn({
815    ///     // clone items to move into the spawned task
816    ///     let queue = queue.clone();
817    ///     let num = num.clone();
818    ///     async move {
819    ///         queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
820    ///         println!("received wakeup!");
821    ///     }
822    /// });
823    ///
824    /// println!("poking task...");
825    ///
826    /// for i in 0..20 {
827    ///     num.store(i, Ordering::Relaxed);
828    ///     queue.wake();
829    /// }
830    ///
831    /// waiter1.await.unwrap();
832    /// waiter2.await.unwrap();
833    /// # }
834    /// # test();
835    /// ```
836    pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> WaitResult<()> {
837        loop {
838            let wait = self.wait();
839            let mut wait = core::pin::pin!(wait);
840            let _ = wait.as_mut().subscribe()?;
841            if f() {
842                return Ok(());
843            }
844            wait.await?;
845        }
846    }
847
848    /// Asynchronously poll the given function `f` until a condition occurs,
849    /// using the [`WaitQueue`] to only re-poll when notified.
850    ///
851    /// This can be used to implement a "wait loop", turning a "try" function
852    /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
853    /// "recv" or "send").
854    ///
855    /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
856    /// prior to polling the function, ensuring that there is not a chance of a race
857    /// where the condition occurs AFTER checking but BEFORE registering interest
858    /// in the [`WaitQueue`], which could lead to deadlock.
859    ///
860    /// This is intended to have similar behavior to `Condvar` in the standard library,
861    /// but asynchronous, and not requiring operating system intervention (or existence).
862    ///
863    /// In particular, this can be used in cases where interrupts or events are used
864    /// to signify readiness or completion of some task, such as the completion of a
865    /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
866    /// can wake the queue, allowing the polling function to check status fields for
867    /// partial progress or completion, and also return the status flags at the same time.
868    ///
869    /// Consider using [`Self::wait_for()`] if your function does not return a value.
870    ///
871    /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value)
872    /// if you do not need multiple waiters.
873    ///
874    /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
875    /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
876    ///
877    /// # Examples
878    ///
879    /// ```
880    /// # use tokio::task;
881    /// # #[tokio::main(flavor = "current_thread")]
882    /// # async fn test() {
883    /// use std::sync::Arc;
884    /// use maitake_sync::WaitQueue;
885    /// use std::sync::atomic::{AtomicU8, Ordering};
886    ///
887    /// let queue = Arc::new(WaitQueue::new());
888    /// let num = Arc::new(AtomicU8::new(0));
889    ///
890    /// let waiter1 = task::spawn({
891    ///     // clone items to move into the spawned task
892    ///     let queue = queue.clone();
893    ///     let num = num.clone();
894    ///     async move {
895    ///         let rxd = queue.wait_for_value(|| {
896    ///             let val = num.load(Ordering::Relaxed);
897    ///             if val > 5 {
898    ///                 return Some(val);
899    ///             }
900    ///             None
901    ///         }).await.unwrap();
902    ///         assert!(rxd > 5);
903    ///         println!("received wakeup with value: {rxd}");
904    ///     }
905    /// });
906    ///
907    /// let waiter2 = task::spawn({
908    ///     // clone items to move into the spawned task
909    ///     let queue = queue.clone();
910    ///     let num = num.clone();
911    ///     async move {
912    ///         let rxd = queue.wait_for_value(|| {
913    ///             let val = num.load(Ordering::Relaxed);
914    ///             if val > 10 {
915    ///                 return Some(val);
916    ///             }
917    ///             None
918    ///         }).await.unwrap();
919    ///         assert!(rxd > 10);
920    ///         println!("received wakeup with value: {rxd}");
921    ///     }
922    /// });
923    ///
924    /// println!("poking task...");
925    ///
926    /// for i in 0..20 {
927    ///     num.store(i, Ordering::Relaxed);
928    ///     queue.wake();
929    /// }
930    ///
931    /// waiter1.await.unwrap();
932    /// waiter2.await.unwrap();
933    /// # }
934    /// # test();
935    /// ```
936    pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> WaitResult<T> {
937        loop {
938            let wait = self.wait();
939            let mut wait = core::pin::pin!(wait);
940            match wait.as_mut().subscribe() {
941                Poll::Ready(wr) => wr?,
942                Poll::Pending => {}
943            }
944            if let Some(t) = f() {
945                return Ok(t);
946            }
947            wait.await?;
948        }
949    }
950
951    /// Returns `true` if this `WaitQueue` is [closed](Self::close).
952    #[must_use]
953    pub fn is_closed(&self) -> bool {
954        self.load().get(QueueState::STATE) == State::Closed
955    }
956
957    /// Returns a [`Waiter`] entry in this queue.
958    ///
959    /// This is factored out into a separate function because it's used by both
960    /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`].
961    fn waiter(&self) -> Waiter {
962        // how many times has `wake_all` been called when this waiter is created?
963        let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS));
964        let state = WaitStateBits::new()
965            .with(WaitStateBits::WAKE_ALLS, current_wake_alls)
966            .with(WaitStateBits::STATE, WaitState::Start);
967        Waiter {
968            state,
969            node: UnsafeCell::new(Node {
970                links: list::Links::new(),
971                waker: Wakeup::Empty,
972                _pin: PhantomPinned,
973            }),
974        }
975    }
976
977    #[cfg_attr(test, track_caller)]
978    fn load(&self) -> QueueState {
979        #[allow(clippy::let_and_return)]
980        let state = QueueState::from_bits(self.state.load(SeqCst));
981        test_debug!("state.load() = {state:?}");
982        state
983    }
984
985    #[cfg_attr(test, track_caller)]
986    fn store(&self, state: QueueState) {
987        test_debug!("state.store({state:?}");
988        self.state.store(state.0, SeqCst);
989    }
990
991    #[cfg_attr(test, track_caller)]
992    fn compare_exchange(
993        &self,
994        current: QueueState,
995        new: QueueState,
996    ) -> Result<QueueState, QueueState> {
997        #[allow(clippy::let_and_return)]
998        let res = self
999            .state
1000            .compare_exchange(current.0, new.0, SeqCst, SeqCst)
1001            .map(QueueState::from_bits)
1002            .map_err(QueueState::from_bits);
1003        test_debug!("state.compare_exchange({current:?}, {new:?}) = {res:?}");
1004        res
1005    }
1006
1007    #[cold]
1008    #[inline(never)]
1009    fn wake_locked(&self, queue: &mut List<Waiter>, curr: QueueState) -> Option<Waker> {
1010        let state = curr.get(QueueState::STATE);
1011
1012        // is the queue still in the `Waiting` state? it is possible that we
1013        // transitioned to a different state while locking the queue.
1014        if test_dbg!(state) != State::Waiting {
1015            // if there are no longer any queued tasks, try to store the
1016            // wakeup in the queue and bail.
1017            if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Woken)) {
1018                debug_assert!(actual.get(QueueState::STATE) != State::Waiting);
1019                self.store(actual.with_state(State::Woken));
1020            }
1021
1022            return None;
1023        }
1024
1025        // otherwise, we have to dequeue a task and wake it.
1026        let node = queue
1027            .pop_back()
1028            .expect("if we are in the Waiting state, there must be waiters in the queue");
1029        let waker = Waiter::wake(node, queue, Wakeup::One);
1030
1031        // if we took the final waiter currently in the queue, transition to the
1032        // `Empty` state.
1033        if test_dbg!(queue.is_empty()) {
1034            self.store(curr.with_state(State::Empty));
1035        }
1036
1037        waker
1038    }
1039
1040    /// Drain waiters from `queue` and add them to `batch`. Returns `true` if
1041    /// the batch was filled while more waiters remain in the queue, indicating
1042    /// that this function must be called again to wake all waiters.
1043    fn drain_to_wake_batch(
1044        batch: &mut WakeBatch,
1045        queue: &mut List<Waiter>,
1046        wakeup: Wakeup,
1047    ) -> bool {
1048        while let Some(node) = queue.pop_back() {
1049            let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else {
1050                // this waiter was enqueued by `Wait::register` and doesn't have
1051                // a waker, just keep going.
1052                continue;
1053            };
1054
1055            if batch.add_waker(waker) {
1056                // there's still room in the wake set, just keep adding to it.
1057                continue;
1058            }
1059
1060            // wake set is full, drop the lock and wake everyone!
1061            break;
1062        }
1063
1064        !queue.is_empty()
1065    }
1066}
1067
1068// === impl Waiter ===
1069
1070impl Waiter {
1071    /// Returns the [`Waker`] for the task that owns this `Waiter`.
1072    ///
1073    /// # Safety
1074    ///
1075    /// This is only safe to call while the list is locked. The `list`
1076    /// parameter ensures this method is only called while holding the lock, so
1077    /// this can be safe.
1078    ///
1079    /// Of course, that must be the *same* list that this waiter is a member of,
1080    /// and currently, there is no way to ensure that...
1081    #[inline(always)]
1082    #[cfg_attr(loom, track_caller)]
1083    fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> {
1084        Waiter::with_node(this, list, |node| {
1085            let waker = test_dbg!(mem::replace(&mut node.waker, wakeup));
1086            match waker {
1087                // the node has a registered waker, so wake the task.
1088                Wakeup::Waiting(waker) => Some(waker),
1089                // do nothing: the node was registered by `Wait::register`
1090                // without a waker, so the future will already be woken when it is
1091                // actually polled.
1092                Wakeup::Empty => None,
1093                // the node was already woken? this should not happen and
1094                // probably indicates a race!
1095                _ => unreachable!("tried to wake a waiter in the {:?} state!", waker),
1096            }
1097        })
1098    }
1099
1100    /// # Safety
1101    ///
1102    /// This is only safe to call while the list is locked. The dummy `_list`
1103    /// parameter ensures this method is only called while holding the lock, so
1104    /// this can be safe.
1105    ///
1106    /// Of course, that must be the *same* list that this waiter is a member of,
1107    /// and currently, there is no way to ensure that...
1108    #[inline(always)]
1109    #[cfg_attr(loom, track_caller)]
1110    fn with_node<T>(
1111        mut this: NonNull<Self>,
1112        _list: &mut List<Self>,
1113        f: impl FnOnce(&mut Node) -> T,
1114    ) -> T {
1115        unsafe {
1116            // safety: this is only called while holding the lock on the queue,
1117            // so it's safe to mutate the waiter.
1118            this.as_mut().node.with_mut(|node| f(&mut *node))
1119        }
1120    }
1121
1122    fn poll_wait<Lock>(
1123        mut self: Pin<&mut Self>,
1124        queue: &WaitQueue<Lock>,
1125        waker: Option<&Waker>,
1126    ) -> Poll<WaitResult<()>>
1127    where
1128        Lock: ScopedRawMutex,
1129    {
1130        test_debug!(ptr = ?fmt::ptr(self.as_mut()), "Waiter::poll_wait");
1131        let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) };
1132        let mut this = self.as_mut().project();
1133
1134        match test_dbg!(this.state.get(WaitStateBits::STATE)) {
1135            WaitState::Start => {
1136                let queue_state = queue.load();
1137
1138                // can we consume a pending wakeup?
1139                if queue
1140                    .compare_exchange(
1141                        queue_state.with_state(State::Woken),
1142                        queue_state.with_state(State::Empty),
1143                    )
1144                    .is_ok()
1145                {
1146                    this.state.set(WaitStateBits::STATE, WaitState::Woken);
1147                    return Poll::Ready(Ok(()));
1148                }
1149
1150                // okay, no pending wakeups. try to wait...
1151
1152                test_debug!("poll_wait: locking...");
1153                queue.queue.with_lock(move |waiters| {
1154                    test_debug!("poll_wait: -> locked");
1155                    let mut queue_state = queue.load();
1156
1157                    // the whole queue was woken while we were trying to acquire
1158                    // the lock!
1159                    if queue_state.get(QueueState::WAKE_ALLS)
1160                        != this.state.get(WaitStateBits::WAKE_ALLS)
1161                    {
1162                        this.state.set(WaitStateBits::STATE, WaitState::Woken);
1163                        return Poll::Ready(Ok(()));
1164                    }
1165
1166                    // transition the queue to the waiting state
1167                    'to_waiting: loop {
1168                        match test_dbg!(queue_state.get(QueueState::STATE)) {
1169                            // the queue is `Empty`, transition to `Waiting`
1170                            State::Empty => {
1171                                match queue.compare_exchange(
1172                                    queue_state,
1173                                    queue_state.with_state(State::Waiting),
1174                                ) {
1175                                    Ok(_) => break 'to_waiting,
1176                                    Err(actual) => queue_state = actual,
1177                                }
1178                            }
1179                            // the queue is already `Waiting`
1180                            State::Waiting => break 'to_waiting,
1181                            // the queue was woken, consume the wakeup.
1182                            State::Woken => {
1183                                match queue.compare_exchange(
1184                                    queue_state,
1185                                    queue_state.with_state(State::Empty),
1186                                ) {
1187                                    Ok(_) => {
1188                                        this.state.set(WaitStateBits::STATE, WaitState::Woken);
1189                                        return Poll::Ready(Ok(()));
1190                                    }
1191                                    Err(actual) => queue_state = actual,
1192                                }
1193                            }
1194                            State::Closed => return crate::closed(),
1195                        }
1196                    }
1197
1198                    // enqueue the node
1199                    this.state.set(WaitStateBits::STATE, WaitState::Waiting);
1200                    if let Some(waker) = waker {
1201                        this.node.as_mut().with_mut(|node| {
1202                            unsafe {
1203                                // safety: we may mutate the node because we are
1204                                // holding the lock.
1205                                debug_assert!(matches!((*node).waker, Wakeup::Empty));
1206                                (*node).waker = Wakeup::Waiting(waker.clone());
1207                            }
1208                        });
1209                    }
1210
1211                    waiters.push_front(ptr);
1212
1213                    Poll::Pending
1214                })
1215            }
1216            WaitState::Waiting => {
1217                queue.queue.with_lock(|_waiters| {
1218                    this.node.with_mut(|node| unsafe {
1219                        // safety: we may mutate the node because we are
1220                        // holding the lock.
1221                        let node = &mut *node;
1222                        match node.waker {
1223                            Wakeup::Waiting(ref mut curr_waker) => {
1224                                match waker {
1225                                    Some(waker) if !curr_waker.will_wake(waker) => {
1226                                        *curr_waker = waker.clone()
1227                                    }
1228                                    _ => {}
1229                                }
1230                                Poll::Pending
1231                            }
1232                            Wakeup::All | Wakeup::One => {
1233                                this.state.set(WaitStateBits::STATE, WaitState::Woken);
1234                                Poll::Ready(Ok(()))
1235                            }
1236                            Wakeup::Closed => {
1237                                this.state.set(WaitStateBits::STATE, WaitState::Woken);
1238                                crate::closed()
1239                            }
1240                            Wakeup::Empty => {
1241                                if let Some(waker) = waker {
1242                                    node.waker = Wakeup::Waiting(waker.clone());
1243                                }
1244
1245                                Poll::Pending
1246                            }
1247                        }
1248                    })
1249                })
1250            }
1251            WaitState::Woken => Poll::Ready(Ok(())),
1252        }
1253    }
1254
1255    /// Release this `Waiter` from the queue.
1256    ///
1257    /// This is called from the `drop` implementation for the [`Wait`] and
1258    /// [`WaitOwned`] futures.
1259    fn release<Lock>(mut self: Pin<&mut Self>, queue: &WaitQueue<Lock>)
1260    where
1261        Lock: ScopedRawMutex,
1262    {
1263        let state = *(self.as_mut().project().state);
1264        let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) });
1265        test_debug!(self = ?fmt::ptr(ptr), ?state, ?queue.state, "Waiter::release");
1266
1267        // if we're not enqueued, we don't have to do anything else.
1268        if state.get(WaitStateBits::STATE) != WaitState::Waiting {
1269            return;
1270        }
1271
1272        let next_waiter = queue.queue.with_lock(|waiters| {
1273            let state = queue.load();
1274            // remove the node
1275            unsafe {
1276                // safety: we have the lock on the queue, so this is safe.
1277                waiters.remove(ptr);
1278            };
1279
1280            // if we removed the last waiter from the queue, transition the state to
1281            // `Empty`.
1282            if test_dbg!(waiters.is_empty()) && state.get(QueueState::STATE) == State::Waiting {
1283                queue.store(state.with_state(State::Empty));
1284            }
1285
1286            // if the node has an unconsumed wakeup, it must be assigned to the next
1287            // node in the queue.
1288            if Waiter::with_node(ptr, waiters, |node| matches!(&node.waker, Wakeup::One)) {
1289                queue.wake_locked(waiters, state)
1290            } else {
1291                None
1292            }
1293        });
1294
1295        if let Some(next) = next_waiter {
1296            next.wake();
1297        }
1298    }
1299}
1300
1301unsafe impl Linked<list::Links<Waiter>> for Waiter {
1302    type Handle = NonNull<Waiter>;
1303
1304    fn into_ptr(r: Self::Handle) -> NonNull<Self> {
1305        r
1306    }
1307
1308    unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
1309        ptr
1310    }
1311
1312    unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> {
1313        // Safety: using `ptr::addr_of!` avoids creating a temporary
1314        // reference, which stacked borrows dislikes.
1315        let node = ptr::addr_of!((*target.as_ptr()).node);
1316        (*node).with_mut(|node| {
1317            let links = ptr::addr_of_mut!((*node).links);
1318            // Safety: since the `target` pointer is `NonNull`, we can assume
1319            // that pointers to its members are also not null, making this use
1320            // of `new_unchecked` fine.
1321            NonNull::new_unchecked(links)
1322        })
1323    }
1324}
1325
1326// === impl Wait ===
1327
1328impl<Lock: ScopedRawMutex> Wait<'_, Lock> {
1329    /// Returns `true` if this `Wait` future is waiting for a notification from
1330    /// the provided [`WaitQueue`].
1331    ///
1332    /// # Examples
1333    ///
1334    /// ```
1335    /// use maitake_sync::WaitQueue;
1336    ///
1337    /// let queue1 = WaitQueue::new();
1338    /// let queue2 = WaitQueue::new();
1339    ///
1340    /// let wait = queue1.wait();
1341    /// assert!(wait.waits_on(&queue1));
1342    /// assert!(!wait.waits_on(&queue2));
1343    /// ```
1344    #[inline]
1345    #[must_use]
1346    pub fn waits_on(&self, queue: &WaitQueue<Lock>) -> bool {
1347        ptr::eq(self.queue, queue)
1348    }
1349
1350    /// Returns `true` if `self` and `other` are waiting on a notification from
1351    /// the same [`WaitQueue`].
1352    ///
1353    /// # Examples
1354    ///
1355    /// Two [`Wait`] futures waiting on the same [`WaitQueue`] return `true`:
1356    ///
1357    /// ```
1358    /// use maitake_sync::WaitQueue;
1359    ///
1360    /// let queue = WaitQueue::new();
1361    ///
1362    /// let wait1 = queue.wait();
1363    /// let wait2 = queue.wait();
1364    /// assert!(wait1.same_queue(&wait2));
1365    /// assert!(wait2.same_queue(&wait1));
1366    /// ```
1367    ///
1368    /// Two [`Wait`] futures waiting on different [`WaitQueue`]s return `false`:
1369    ///
1370    /// ```
1371    /// use maitake_sync::WaitQueue;
1372    ///
1373    /// let queue1 = WaitQueue::new();
1374    /// let queue2 = WaitQueue::new();
1375    ///
1376    /// let wait1 = queue1.wait();
1377    /// let wait2 = queue2.wait();
1378    /// assert!(!wait1.same_queue(&wait2));
1379    /// assert!(!wait2.same_queue(&wait1));
1380    /// ```
1381    #[inline]
1382    #[must_use]
1383    pub fn same_queue(&self, other: &Wait<'_, Lock>) -> bool {
1384        ptr::eq(self.queue, other.queue)
1385    }
1386
1387    /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1388    ///
1389    /// Polling a `Wait` future adds that future to the list of waiters that may
1390    /// receive a wakeup from a `WaitQueue`. However, in some cases, it is
1391    /// desirable to subscribe to wakeups *prior* to actually waiting for one.
1392    /// This method should be used when it is necessary to ensure a `Wait`
1393    /// future is in the list of waiters before the future is `poll`ed for the
1394    /// first time.
1395    ///
1396    /// In general, this method is used in cases where a [`WaitQueue`] must
1397    /// synchronize with some additional state, such as an `AtomicBool` or
1398    /// counter. If a task first checks that state, and then chooses whether or
1399    /// not to wait on the `WaitQueue` based on that state, then a race
1400    /// condition may occur where the `WaitQueue` wakes waiters *between* when
1401    /// the task checked the external state and when it first polled its `Wait`
1402    /// future to wait on the queue. This method allows registering the `Wait`
1403    /// future with the queue *prior* to checking the external state, without
1404    /// actually sleeping, so that when the task does wait for the `Wait` future
1405    /// to complete, it will have received any wakeup that was sent between when
1406    /// the external state was checked and the `Wait` future was first polled.
1407    ///
1408    /// # Returns
1409    ///
1410    /// This method returns a [`Poll`]`<`[`WaitResult`]`>` which is `Ready` a wakeup was
1411    /// already received. This method returns [`Poll::Ready`] in the following
1412    /// cases:
1413    ///
1414    ///  1. The [`WaitQueue::wake()`] method was called between the creation of the
1415    ///     `Wait` and the call to this method.
1416    ///  2. This is the first call to `subscribe` or `poll` on this future, and the
1417    ///     `WaitQueue` was holding a stored wakeup from a previous call to
1418    ///     [`wake()`]. This method consumes the wakeup in that case.
1419    ///  3. The future has previously been `subscribe`d or polled, and it has since
1420    ///     then been marked ready by either consuming a wakeup from the
1421    ///     `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that
1422    ///     removed it from the list of futures ready to receive wakeups.
1423    ///  4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case
1424    ///     this method returns `Poll::Ready(Err(Closed))`.
1425    ///
1426    /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this
1427    /// `Wait` future will also immediately return [`Poll::Ready`].
1428    ///
1429    /// If the [`Wait`] future subscribed to wakeups from the queue, and
1430    /// has not been woken, this method returns [`Poll::Pending`].
1431    ///
1432    /// [`wake()`]: WaitQueue::wake
1433    /// [`wake_all()`]: WaitQueue::wake_all
1434    pub fn subscribe(self: Pin<&mut Self>) -> Poll<WaitResult<()>> {
1435        let this = self.project();
1436        this.waiter.poll_wait(this.queue, None)
1437    }
1438}
1439
1440impl<Lock: ScopedRawMutex> Future for Wait<'_, Lock> {
1441    type Output = WaitResult<()>;
1442
1443    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1444        let this = self.project();
1445        this.waiter.poll_wait(this.queue, Some(cx.waker()))
1446    }
1447}
1448
1449#[pinned_drop]
1450impl<Lock: ScopedRawMutex> PinnedDrop for Wait<'_, Lock> {
1451    fn drop(mut self: Pin<&mut Self>) {
1452        let this = self.project();
1453        this.waiter.release(this.queue);
1454    }
1455}
1456
1457// === impl QueueState ===
1458
1459impl QueueState {
1460    const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit();
1461
1462    fn with_state(self, state: State) -> Self {
1463        self.with(Self::STATE, state)
1464    }
1465}
1466
1467impl FromBits<usize> for State {
1468    const BITS: u32 = 2;
1469    type Error = core::convert::Infallible;
1470
1471    fn try_from_bits(bits: usize) -> Result<Self, Self::Error> {
1472        Ok(match bits as u8 {
1473            bits if bits == Self::Empty as u8 => Self::Empty,
1474            bits if bits == Self::Waiting as u8 => Self::Waiting,
1475            bits if bits == Self::Woken as u8 => Self::Woken,
1476            bits if bits == Self::Closed as u8 => Self::Closed,
1477            _ => unsafe {
1478                unreachable_unchecked!("all potential 2-bit patterns should be covered!")
1479            },
1480        })
1481    }
1482
1483    fn into_bits(self) -> usize {
1484        self.into_usize()
1485    }
1486}
1487
1488impl State {
1489    const fn into_usize(self) -> usize {
1490        self as u8 as usize
1491    }
1492}
1493
1494// === impl WaitOwned ===
1495
1496feature! {
1497    #![feature = "alloc"]
1498
1499    use alloc::sync::Arc;
1500
1501    /// Future returned from [`WaitQueue::wait_owned()`].
1502    ///
1503    /// This is identical to the [`Wait`] future, except that it takes an
1504    /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to
1505    /// live for the `'static` lifetime.
1506    ///
1507    /// This future is fused, so once it has completed, any future calls to poll
1508    /// will immediately return [`Poll::Ready`].
1509    ///
1510    /// # Notes
1511    ///
1512    /// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a
1513    /// `WaitOwned`  future once it has been polled. For instance, the following
1514    /// code must not compile:
1515    ///
1516    ///```compile_fail
1517    /// use maitake_sync::wait_queue::WaitOwned;
1518    ///
1519    /// // Calls to this function should only compile if `T` is `Unpin`.
1520    /// fn assert_unpin<T: Unpin>() {}
1521    ///
1522    /// assert_unpin::<WaitOwned<'_>>();
1523    /// ```
1524    #[derive(Debug)]
1525    #[pin_project(PinnedDrop)]
1526    pub struct WaitOwned<Lock: ScopedRawMutex = DefaultMutex> {
1527        /// The `WaitQueue` being waited on.
1528        queue: Arc<WaitQueue<Lock>>,
1529
1530        /// Entry in the wait queue.
1531        #[pin]
1532        waiter: Waiter,
1533    }
1534
1535    impl<Lock: ScopedRawMutex> WaitQueue<Lock> {
1536        /// Wait to be woken up by this queue, returning a future that's valid
1537        /// for the `'static` lifetime.
1538        ///
1539        /// This returns a [`WaitOwned`] future that will complete when the task
1540        /// is woken by a call to [`wake()`] or [`wake_all()`], or when the
1541        /// `WaitQueue` is [closed].
1542        ///
1543        /// This is identical to the [`wait()`] method, except that it takes a
1544        /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future
1545        /// to live for the `'static` lifetime. See the documentation for
1546        /// [`wait()`] for details on how to use the future returned by this
1547        /// method.
1548        ///
1549        /// # Returns
1550        ///
1551        /// The [`Future`] returned by this method completes with one of the
1552        /// following [outputs](Future::Output):
1553        ///
1554        /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
1555        ///   [`wake_all()`].
1556        /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue`
1557        ///   being [closed].
1558        ///
1559        /// # Cancellation
1560        ///
1561        /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the
1562        /// order that they started to wait. If a [`WaitOwned`] future is
1563        /// dropped, the task will forfeit its position in the queue.
1564        ///
1565        /// [`wake()`]: Self::wake
1566        /// [`wake_all()`]: Self::wake_all
1567        /// [`wait()`]: Self::wait
1568        /// [closed]: Self::close
1569        /// [`Closed`]: crate::Closed
1570        pub fn wait_owned(self: &Arc<Self>) -> WaitOwned<Lock> {
1571            let waiter = self.waiter();
1572            let queue = self.clone();
1573            WaitOwned { queue, waiter }
1574        }
1575    }
1576
1577    // === impl WaitOwned ===
1578
1579    impl<Lock: ScopedRawMutex> WaitOwned<Lock> {
1580        /// Returns `true` if this `WaitOwned` future is waiting for a
1581        /// notification from the provided [`WaitQueue`].
1582        ///
1583        /// # Examples
1584        ///
1585        /// ```
1586        /// use maitake_sync::WaitQueue;
1587        /// use std::sync::Arc;
1588        ///
1589        /// let queue1 = Arc::new(WaitQueue::new());
1590        /// let queue2 = Arc::new(WaitQueue::new());
1591        ///
1592        /// let wait = queue1.clone().wait_owned();
1593        /// assert!(wait.waits_on(&queue1));
1594        /// assert!(!wait.waits_on(&queue2));
1595        /// ```
1596        #[inline]
1597        #[must_use]
1598        pub fn waits_on(&self, queue: &WaitQueue<Lock>) -> bool {
1599            ptr::eq(&*self.queue, queue)
1600        }
1601
1602        /// Returns `true` if `self` and `other` are waiting on a notification
1603        /// from the same [`WaitQueue`].
1604        ///
1605        /// # Examples
1606        ///
1607        /// Two [`WaitOwned`] futures waiting on the same [`WaitQueue`] return
1608        /// `true`:
1609        ///
1610        /// ```
1611        /// use maitake_sync::WaitQueue;
1612        /// use std::sync::Arc;
1613        ///
1614        /// let queue = Arc::new(WaitQueue::new());
1615        ///
1616        /// let wait1 = queue.clone().wait_owned();
1617        /// let wait2 = queue.clone().wait_owned();
1618        /// assert!(wait1.same_queue(&wait2));
1619        /// assert!(wait2.same_queue(&wait1));
1620        /// ```
1621        ///
1622        /// Two [`WaitOwned`] futures waiting on different [`WaitQueue`]s return
1623        /// `false`:
1624        ///
1625        /// ```
1626        /// use maitake_sync::WaitQueue;
1627        /// use std::sync::Arc;
1628        ///
1629        /// let queue1 = Arc::new(WaitQueue::new());
1630        /// let queue2 = Arc::new(WaitQueue::new());
1631        ///
1632        /// let wait1 = queue1.wait_owned();
1633        /// let wait2 = queue2.wait_owned();
1634        /// assert!(!wait1.same_queue(&wait2));
1635        /// assert!(!wait2.same_queue(&wait1));
1636        /// ```
1637        #[inline]
1638        #[must_use]
1639        pub fn same_queue(&self, other: &WaitOwned<Lock>) -> bool {
1640            Arc::ptr_eq(&self.queue, &other.queue)
1641        }
1642
1643        /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1644        ///
1645        /// Polling a `WaitOwned` future adds that future to the list of waiters
1646        /// that may receive a wakeup from a `WaitQueue`. However, in some
1647        /// cases, it is desirable to subscribe to wakeups *prior* to actually
1648        /// waiting for one. This method should be used when it is necessary to
1649        /// ensure a `WaitOwned` future is in the list of waiters before the
1650        /// future is `poll`ed for the rst time.
1651        ///
1652        /// In general, this method is used in cases where a [`WaitQueue`] must
1653        /// synchronize with some additional state, such as an `AtomicBool` or
1654        /// counter. If a task first checks that state, and then chooses whether or
1655        /// not to wait on the `WaitQueue` based on that state, then a race
1656        /// condition may occur where the `WaitQueue` wakes waiters *between* when
1657        /// the task checked the external state and when it first polled its
1658        /// `WaitOwned` future to wait on the queue. This method allows
1659        /// registering the `WaitOwned`  future with the queue *prior* to
1660        /// checking the external state, without actually sleeping, so that when
1661        /// the task does wait for the `WaitOwned` future to complete, it will
1662        /// have received any wakeup that was sent between when the external
1663        /// state was checked and the `WaitOwned` future was first polled.
1664        ///
1665        /// # Returns
1666        ///
1667        /// This method returns a [`Poll`]`<`[`WaitResult`]`>` which is `Ready`
1668        /// a wakeup was already received. This method returns [`Poll::Ready`]
1669        /// in the following cases:
1670        ///
1671        ///  1. The [`WaitQueue::wake()`] method was called between the creation
1672        ///     of the `WaitOwned` future and the call to this method.
1673        ///  2. This is the first call to `subscribe` or `poll` on this future,
1674        ///     and the `WaitQueue` was holding a stored wakeup from a previous
1675        ///     call to [`wake()`]. This method consumes the wakeup in that case.
1676        ///  3. The future has previously been `subscribe`d or polled, and it
1677        ///     has since then been marked ready by either consuming a wakeup
1678        ///     from the `WaitQueue`, or by a call to [`wake()`] or
1679        ///     [`wake_all()`] that removed it from the list of futures ready to
1680        ///     receive wakeups.
1681        ///  4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which
1682        ///     case this method returns `Poll::Ready(Err(Closed))`.
1683        ///
1684        /// If this method returns [`Poll::Ready`], any subsequent `poll`s of
1685        /// this `Wait` future will also immediately return [`Poll::Ready`].
1686        ///
1687        /// If the [`WaitOwned`] future subscribed to wakeups from the queue,
1688        /// and has not been woken, this method returns [`Poll::Pending`].
1689        ///
1690        /// [`wake()`]: WaitQueue::wake
1691        /// [`wake_all()`]: WaitQueue::wake_all
1692        pub fn subscribe(self: Pin<&mut Self>) -> Poll<WaitResult<()>> {
1693            let this = self.project();
1694            this.waiter.poll_wait(this.queue, None)
1695        }
1696    }
1697
1698    impl<Lock: ScopedRawMutex> Future for WaitOwned<Lock> {
1699        type Output = WaitResult<()>;
1700
1701        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1702            let this = self.project();
1703            this.waiter.poll_wait(&*this.queue, Some(cx.waker()))
1704        }
1705    }
1706
1707    #[pinned_drop]
1708    impl<Lock: ScopedRawMutex> PinnedDrop for WaitOwned<Lock> {
1709        fn drop(mut self: Pin<&mut Self>) {
1710            let this = self.project();
1711            this.waiter.release(&*this.queue);
1712        }
1713    }
1714}