maitake_sync/
wait_cell.rs

1//! An atomically registered [`Waker`], for waking a single task.
2//!
3//! See the documentation for the [`WaitCell`] type for details.
4use crate::{
5    loom::{
6        cell::UnsafeCell,
7        sync::atomic::{
8            AtomicUsize,
9            Ordering::{self, *},
10        },
11    },
12    util::{fmt, CachePadded},
13    Closed,
14};
15use core::{
16    future::Future,
17    ops,
18    pin::Pin,
19    task::{self, Context, Poll, Waker},
20};
21
22/// An atomically registered [`Waker`].
23///
24/// This cell stores the [`Waker`] of a single task. A [`Waker`] is stored in
25/// the cell either by calling [`poll_wait`], or by polling a [`wait`]
26/// future. Once a task's [`Waker`] is stored in a `WaitCell`, it can be woken
27/// by calling [`wake`] on the `WaitCell`.
28///
29/// # Implementation Notes
30///
31/// This is inspired by the [`AtomicWaker`] type used in Tokio's
32/// synchronization primitives, with the following modifications:
33///
34/// - An additional bit of state is added to allow [setting a "close"
35///   bit](Self::close).
36/// - A `WaitCell` is always woken by value (for now).
37/// - `WaitCell` does not handle unwinding, because [`maitake` does not support
38///   unwinding](crate#maitake-does-not-support-unwinding)
39///
40/// [`AtomicWaker`]: https://github.com/tokio-rs/tokio/blob/09b770c5db31a1f35631600e1d239679354da2dd/tokio/src/sync/task/atomic_waker.rs
41/// [`Waker`]: core::task::Waker
42/// [`poll_wait`]: Self::poll_wait
43/// [`wait`]: Self::wait
44/// [`wake`]: Self::wake
45pub struct WaitCell {
46    state: CachePadded<AtomicUsize>,
47    waker: UnsafeCell<Option<Waker>>,
48}
49
50/// An error indicating that a [`WaitCell`] was closed or busy while
51/// attempting register a [`Waker`].
52///
53/// This error is returned by the [`WaitCell::poll_wait`] method.
54#[derive(Copy, Clone, Debug, Eq, PartialEq)]
55pub enum PollWaitError {
56    /// The [`Waker`] was not registered because the [`WaitCell`] has been
57    /// [closed](WaitCell::close).
58    Closed,
59
60    /// The [`Waker`] was not registered because another task was concurrently
61    /// storing its own [`Waker`] in the [`WaitCell`].
62    Busy,
63}
64
65/// Future returned from [`WaitCell::wait()`].
66///
67/// This future is fused, so once it has completed, any future calls to poll
68/// will immediately return [`Poll::Ready`].
69#[derive(Debug)]
70#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
71pub struct Wait<'a> {
72    /// The [`WaitCell`] being waited on.
73    cell: &'a WaitCell,
74
75    presubscribe: Poll<Result<(), super::Closed>>,
76}
77
78/// Future returned from [`WaitCell::subscribe()`].
79///
80/// See the documentation for [`WaitCell::subscribe()`] for details.
81#[derive(Debug)]
82#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
83pub struct Subscribe<'a> {
84    /// The [`WaitCell`] being waited on.
85    cell: &'a WaitCell,
86}
87
88#[derive(Eq, PartialEq, Copy, Clone)]
89struct State(usize);
90
91// === impl WaitCell ===
92
93impl WaitCell {
94    loom_const_fn! {
95        /// Returns a new `WaitCell`, with no [`Waker`] stored in it.
96        #[must_use]
97        pub fn new() -> Self {
98            Self {
99                state: CachePadded::new(AtomicUsize::new(State::WAITING.0)),
100                waker: UnsafeCell::new(None),
101            }
102        }
103    }
104}
105
106impl Default for WaitCell {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl WaitCell {
113    /// Poll to wait on this `WaitCell`, consuming a stored wakeup or
114    /// registering the [`Waker`] from the provided [`Context`] to be woken by
115    /// the next wakeup.
116    ///
117    /// Once a [`Waker`] has been registered, a subsequent call to [`wake`] will
118    /// wake that [`Waker`].
119    ///
120    /// # Returns
121    ///
122    /// - [`Poll::Pending`] if the [`Waker`] was registered. If this method returns
123    ///   [`Poll::Pending`], then the registered [`Waker`] will be woken by a
124    ///   subsequent call to [`wake`].
125    /// - [`Poll::Ready`]`(`[`Ok`]`(()))` if the cell was woken by a call to
126    ///   [`wake`] while the [`Waker`] was being registered.
127    /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Closed`]`))` if the
128    ///   [`WaitCell`] has been closed.
129    /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Busy`]`))` if another
130    ///   task was concurrently registering its [`Waker`] with this
131    ///   [`WaitCell`].
132    ///
133    /// [`wake`]: Self::wake
134    pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>> {
135        enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self));
136
137        // this is based on tokio's AtomicWaker synchronization strategy
138        match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) {
139            Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => {
140                return Poll::Ready(Err(PollWaitError::Closed));
141            }
142            Err(actual) if test_dbg!(actual.contains(State::WOKEN)) => {
143                // take the wakeup
144                self.fetch_and(!State::WOKEN, Release);
145                return Poll::Ready(Ok(()));
146            }
147            // someone else is notifying, so don't wait!
148            Err(actual) if test_dbg!(actual.contains(State::WAKING)) => {
149                return Poll::Ready(Ok(()));
150            }
151            Err(_) => return Poll::Ready(Err(PollWaitError::Busy)),
152            Ok(_) => {}
153        }
154
155        let waker = cx.waker();
156        trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker");
157
158        let prev_waker = self.waker.with_mut(|old_waker| unsafe {
159            match &mut *old_waker {
160                Some(old_waker) if waker.will_wake(old_waker) => None,
161                old => old.replace(waker.clone()),
162            }
163        });
164
165        if let Some(prev_waker) = prev_waker {
166            test_debug!("Replaced an old waker in cell, waking");
167            prev_waker.wake();
168        }
169
170        if let Err(actual) =
171            test_dbg!(self.compare_exchange(State::REGISTERING, State::WAITING, AcqRel))
172        {
173            // If the `compare_exchange` fails above, this means that we were notified for one of
174            // two reasons: either the cell was awoken, or the cell was closed.
175            //
176            // Bail out of the parking state, and determine what to report to the caller.
177            test_trace!(state = ?actual, "was notified");
178            let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() });
179            // Reset to the WAITING state by clearing everything *except*
180            // the closed bits (which must remain set). This `fetch_and`
181            // does *not* set the CLOSED bit if it is unset, it just doesn't
182            // clear it.
183            let state = test_dbg!(self.fetch_and(State::CLOSED, AcqRel));
184            // The only valid state transition while we were parking is to
185            // add the CLOSED bit.
186            debug_assert!(
187                state == actual || state == actual | State::CLOSED,
188                "state changed unexpectedly while parking!"
189            );
190
191            if let Some(waker) = waker {
192                waker.wake();
193            }
194
195            // Was the `CLOSED` bit set while we were clearing other bits?
196            // If so, the cell is closed. Otherwise, we must have been notified.
197            if state.contains(State::CLOSED) {
198                return Poll::Ready(Err(PollWaitError::Closed));
199            }
200
201            return Poll::Ready(Ok(()));
202        }
203
204        // Waker registered, time to yield!
205        Poll::Pending
206    }
207
208    /// Wait to be woken up by this cell.
209    ///
210    /// # Returns
211    ///
212    /// This future completes with the following values:
213    ///
214    /// - [`Ok`]`(())` if the future was woken by a call to [`wake`] or another
215    ///   task calling [`poll_wait`] or [`wait`] on this [`WaitCell`].
216    /// - [`Err`]`(`[`Closed`]`)` if the task was woken by a call to [`close`],
217    ///   or the [`WaitCell`] was already closed.
218    ///
219    /// **Note**: The calling task's [`Waker`] is not registered until AFTER the
220    /// first time the returned [`Wait`] future is polled. This means that if a
221    /// call to [`wake`] occurs between when [`wait`] is called and when the
222    /// future is first polled, the future will *not* complete. If the caller is
223    /// responsible for performing an operation which will result in an eventual
224    /// wakeup, prefer calling [`subscribe`] _before_ performing that operation
225    /// and `.await`ing the [`Wait`] future returned by [`subscribe`].
226    ///
227    /// [`wake`]: Self::wake
228    /// [`poll_wait`]: Self::poll_wait
229    /// [`wait`]: Self::wait
230    /// [`close`]: Self::close
231    /// [`subscribe`]: Self::subscribe
232    pub fn wait(&self) -> Wait<'_> {
233        Wait {
234            cell: self,
235            presubscribe: Poll::Pending,
236        }
237    }
238
239    /// Eagerly subscribe to notifications from this `WaitCell`.
240    ///
241    /// This method returns a [`Subscribe`] [`Future`], which outputs a [`Wait`]
242    /// [`Future`]. Awaiting the [`Subscribe`] future will eagerly register the
243    /// calling task to be woken by this [`WaitCell`], so that the returned
244    /// [`Wait`] future will be woken by any calls to [`wake`] (or [`close`])
245    /// that occur between when the [`Subscribe`] future completes and when the
246    /// returned [`Wait`] future is `.await`ed.
247    ///
248    /// This is primarily intended for scenarios where the task that waits on a
249    /// [`WaitCell`] is responsible for performing some operation that
250    /// ultimately results in the [`WaitCell`] being woken. If the task were to
251    /// simply perform the operation and then call [`wait`] on the [`WaitCell`],
252    /// a potential race condition could occur where the operation completes and
253    /// wakes the [`WaitCell`] *before* the [`Wait`] future is first `.await`ed.
254    /// Using `subscribe`, the task can ensure that it is ready to be woken by
255    /// the cell *before* performing an operation that could result in it being
256    /// woken.
257    ///
258    /// These scenarios occur when a wakeup is triggered by another thread/CPU
259    /// core in response to an operation performed in the task waiting on the
260    /// `WaitCell`, or when the wakeup is triggered by a hardware interrupt
261    /// resulting from operations performed in the task.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use maitake_sync::WaitCell;
267    ///
268    /// // Perform an operation that results in a concurrent wakeup, such as
269    /// // unmasking an interrupt.
270    /// fn do_something_that_causes_a_wakeup() {
271    ///     # WAIT_CELL.wake();
272    ///     // ...
273    /// }
274    ///
275    /// static WAIT_CELL: WaitCell = WaitCell::new();
276    ///
277    /// # async fn dox() {
278    /// // Subscribe to notifications from the cell *before* calling
279    /// // `do_something_that_causes_a_wakeup()`, to ensure that we are
280    /// // ready to be woken when the interrupt is unmasked.
281    /// let wait = WAIT_CELL.subscribe().await;
282    ///
283    /// // Actually perform the operation.
284    /// do_something_that_causes_a_wakeup();
285    ///
286    /// // Wait for the wakeup. If the wakeup occurred *before* the first
287    /// // poll of the `wait` future had successfully subscribed to the
288    /// // `WaitCell`, we would still receive the wakeup, because the
289    /// // `subscribe` future ensured that our waker was registered to be
290    /// // woken.
291    /// wait.await.expect("WaitCell is not closed");
292    /// # }
293    /// ```
294    ///
295    /// [`wait`]: Self::wait
296    /// [`wake`]: Self::wake
297    /// [`close`]: Self::close
298    pub fn subscribe(&self) -> Subscribe<'_> {
299        Subscribe { cell: self }
300    }
301
302    /// Wake the [`Waker`] stored in this cell.
303    ///
304    /// # Returns
305    ///
306    /// - `true` if a waiting task was woken.
307    /// - `false` if no task was woken (no [`Waker`] was stored in the cell)
308    pub fn wake(&self) -> bool {
309        enter_test_debug_span!("WaitCell::wake", cell = ?fmt::ptr(self));
310        if let Some(waker) = self.take_waker(false) {
311            waker.wake();
312            true
313        } else {
314            false
315        }
316    }
317
318    /// Close the [`WaitCell`].
319    ///
320    /// This wakes any waiting task with an error indicating the `WaitCell` is
321    /// closed. Subsequent calls to [`wait`] or [`poll_wait`] will return an
322    /// error indicating that the cell has been closed.
323    ///
324    /// [`wait`]: Self::wait
325    /// [`poll_wait`]: Self::poll_wait
326    pub fn close(&self) -> bool {
327        enter_test_debug_span!("WaitCell::close", cell = ?fmt::ptr(self));
328        if let Some(waker) = self.take_waker(true) {
329            waker.wake();
330            true
331        } else {
332            false
333        }
334    }
335
336    /// Asynchronously poll the given function `f` until a condition occurs,
337    /// using the [`WaitCell`] to only re-poll when notified.
338    ///
339    /// This can be used to implement a "wait loop", turning a "try" function
340    /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
341    /// "recv" or "send").
342    ///
343    /// In particular, this function correctly *registers* interest in the [`WaitCell`]
344    /// prior to polling the function, ensuring that there is not a chance of a race
345    /// where the condition occurs AFTER checking but BEFORE registering interest
346    /// in the [`WaitCell`], which could lead to deadlock.
347    ///
348    /// This is intended to have similar behavior to `Condvar` in the standard library,
349    /// but asynchronous, and not requiring operating system intervention (or existence).
350    ///
351    /// In particular, this can be used in cases where interrupts or events are used
352    /// to signify readiness or completion of some task, such as the completion of a
353    /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
354    /// can wake the cell, allowing the polling function to check status fields for
355    /// partial progress or completion.
356    ///
357    /// Consider using [`Self::wait_for_value()`] if your function does return a value.
358    ///
359    /// Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for)
360    /// if you need multiple waiters.
361    ///
362    /// # Returns
363    ///
364    /// * [`Ok`]`(())` if the closure returns `true`.
365    /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// # use tokio::task;
371    /// # #[tokio::main(flavor = "current_thread")]
372    /// # async fn test() {
373    /// use std::sync::Arc;
374    /// use maitake_sync::WaitCell;
375    /// use std::sync::atomic::{AtomicU8, Ordering};
376    ///
377    /// let queue = Arc::new(WaitCell::new());
378    /// let num = Arc::new(AtomicU8::new(0));
379    ///
380    /// let waiter = task::spawn({
381    ///     // clone items to move into the spawned task
382    ///     let queue = queue.clone();
383    ///     let num = num.clone();
384    ///     async move {
385    ///         queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
386    ///         println!("received wakeup!");
387    ///     }
388    /// });
389    ///
390    /// println!("poking task...");
391    ///
392    /// for i in 0..20 {
393    ///     num.store(i, Ordering::Relaxed);
394    ///     queue.wake();
395    /// }
396    ///
397    /// waiter.await.unwrap();
398    /// # }
399    /// # test();
400    /// ```
401    pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
402        loop {
403            let wait = self.subscribe().await;
404            if f() {
405                return Ok(());
406            }
407            wait.await?;
408        }
409    }
410
411    /// Asynchronously poll the given function `f` until a condition occurs,
412    /// using the [`WaitCell`] to only re-poll when notified.
413    ///
414    /// This can be used to implement a "wait loop", turning a "try" function
415    /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
416    /// "recv" or "send").
417    ///
418    /// In particular, this function correctly *registers* interest in the [`WaitCell`]
419    /// prior to polling the function, ensuring that there is not a chance of a race
420    /// where the condition occurs AFTER checking but BEFORE registering interest
421    /// in the [`WaitCell`], which could lead to deadlock.
422    ///
423    /// This is intended to have similar behavior to `Condvar` in the standard library,
424    /// but asynchronous, and not requiring operating system intervention (or existence).
425    ///
426    /// In particular, this can be used in cases where interrupts or events are used
427    /// to signify readiness or completion of some task, such as the completion of a
428    /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
429    /// can wake the cell, allowing the polling function to check status fields for
430    /// partial progress or completion, and also return the status flags at the same time.
431    ///
432    /// Consider using [`Self::wait_for()`] if your function does not return a value.
433    ///
434    /// Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters.
435    ///
436    /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
437    /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
438    ///
439    /// # Examples
440    ///
441    /// ```
442    /// # use tokio::task;
443    /// # #[tokio::main(flavor = "current_thread")]
444    /// # async fn test() {
445    /// use std::sync::Arc;
446    /// use maitake_sync::WaitCell;
447    /// use std::sync::atomic::{AtomicU8, Ordering};
448    ///
449    /// let queue = Arc::new(WaitCell::new());
450    /// let num = Arc::new(AtomicU8::new(0));
451    ///
452    /// let waiter = task::spawn({
453    ///     // clone items to move into the spawned task
454    ///     let queue = queue.clone();
455    ///     let num = num.clone();
456    ///     async move {
457    ///         let rxd = queue.wait_for_value(|| {
458    ///             let val = num.load(Ordering::Relaxed);
459    ///             if val > 5 {
460    ///                 return Some(val);
461    ///             }
462    ///             None
463    ///         }).await.unwrap();
464    ///         assert!(rxd > 5);
465    ///         println!("received wakeup with value: {rxd}");
466    ///     }
467    /// });
468    ///
469    /// println!("poking task...");
470    ///
471    /// for i in 0..20 {
472    ///     num.store(i, Ordering::Relaxed);
473    ///     queue.wake();
474    /// }
475    ///
476    /// waiter.await.unwrap();
477    /// # }
478    /// # test();
479    /// ```
480    pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
481        loop {
482            let wait = self.subscribe().await;
483            if let Some(t) = f() {
484                return Ok(t);
485            }
486            wait.await?;
487        }
488    }
489
490    /// Returns `true` if this `WaitCell` is [closed](Self::close).
491    #[must_use]
492    pub fn is_closed(&self) -> bool {
493        self.current_state() == State::CLOSED
494    }
495
496    /// Takes this `WaitCell`'s waker.
497    // TODO(eliza): could probably be made a public API...
498    pub(crate) fn take_waker(&self, close: bool) -> Option<Waker> {
499        trace!(wait_cell = ?fmt::ptr(self), ?close, "notifying");
500        // Set the WAKING bit (to indicate that we're touching the waker) and
501        // the WOKEN bit (to indicate that we intend to wake it up).
502        let state = {
503            let mut bits = State::WAKING | State::WOKEN;
504            if close {
505                bits.0 |= State::CLOSED.0;
506            }
507            test_dbg!(self.fetch_or(bits, AcqRel))
508        };
509
510        // Is anyone else touching the waker?
511        if !test_dbg!(state.contains(State::WAKING | State::REGISTERING | State::CLOSED)) {
512            // Ladies and gentlemen...we got him (the lock)!
513            let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() });
514
515            // Release the lock.
516            self.fetch_and(!State::WAKING, Release);
517
518            if let Some(waker) = test_dbg!(waker) {
519                trace!(wait_cell = ?fmt::ptr(self), ?close, ?waker, "notified");
520                return Some(waker);
521            }
522        }
523
524        None
525    }
526}
527
528impl WaitCell {
529    #[inline(always)]
530    fn compare_exchange(
531        &self,
532        State(curr): State,
533        State(new): State,
534        success: Ordering,
535    ) -> Result<State, State> {
536        self.state
537            .compare_exchange(curr, new, success, Acquire)
538            .map(State)
539            .map_err(State)
540    }
541
542    #[inline(always)]
543    fn fetch_and(&self, State(state): State, order: Ordering) -> State {
544        State(self.state.fetch_and(state, order))
545    }
546
547    #[inline(always)]
548    fn fetch_or(&self, State(state): State, order: Ordering) -> State {
549        State(self.state.fetch_or(state, order))
550    }
551
552    #[inline(always)]
553    fn current_state(&self) -> State {
554        State(self.state.load(Acquire))
555    }
556}
557
558unsafe impl Send for WaitCell {}
559unsafe impl Sync for WaitCell {}
560
561impl fmt::Debug for WaitCell {
562    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563        f.debug_struct("WaitCell")
564            .field("state", &self.current_state())
565            .field("waker", &fmt::display(".."))
566            .finish()
567    }
568}
569
570impl Drop for WaitCell {
571    fn drop(&mut self) {
572        self.close();
573    }
574}
575
576// === impl Wait ===
577
578impl Future for Wait<'_> {
579    type Output = Result<(), Closed>;
580
581    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
582        enter_test_debug_span!("Wait::poll");
583
584        // Did a wakeup occur while we were pre-registering the future?
585        if test_dbg!(self.presubscribe.is_ready()) {
586            return self.presubscribe;
587        }
588
589        // Okay, actually poll the cell, then.
590        match task::ready!(test_dbg!(self.cell.poll_wait(cx))) {
591            Ok(()) => Poll::Ready(Ok(())),
592            Err(PollWaitError::Closed) => Poll::Ready(Err(Closed(()))),
593            Err(PollWaitError::Busy) => {
594                // If some other task was registering, yield and try to re-register
595                // our waker when that task is done.
596                cx.waker().wake_by_ref();
597                Poll::Pending
598            }
599        }
600    }
601}
602
603// === impl Subscribe ===
604
605impl<'cell> Future for Subscribe<'cell> {
606    type Output = Wait<'cell>;
607
608    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609        enter_test_debug_span!("Subscribe::poll");
610
611        // Pre-register the waker in the cell.
612        let presubscribe = match test_dbg!(self.cell.poll_wait(cx)) {
613            Poll::Ready(Err(PollWaitError::Busy)) => {
614                // Someone else is in the process of registering. Yield now so we
615                // can wait until that task is done, and then try again.
616                cx.waker().wake_by_ref();
617                return Poll::Pending;
618            }
619            Poll::Ready(Err(PollWaitError::Closed)) => Poll::Ready(Err(Closed(()))),
620            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
621            Poll::Pending => Poll::Pending,
622        };
623
624        Poll::Ready(Wait {
625            cell: self.cell,
626            presubscribe,
627        })
628    }
629}
630
631// === impl State ===
632
633impl State {
634    /// /!\ EXTREMELY SERIOUS WARNING! /!\
635    /// It is LOAD BEARING that the `WAITING` state is represented by zero!
636    /// This is because we return to the waiting state by `fetch_and`ing out all
637    /// other bits in a few places. If this state's bit representation is
638    /// changed to anything other than zero, that code will break! Don't do
639    /// that!
640    ///
641    /// YES, FUTURE ELIZA, THIS DOES APPLY TO YOU. YOU ALREADY BROKE IT ONCE.
642    /// DON'T DO IT AGAIN.
643    const WAITING: Self = Self(0b0000);
644    const REGISTERING: Self = Self(0b0001);
645    const WAKING: Self = Self(0b0010);
646    const WOKEN: Self = Self(0b0100);
647    const CLOSED: Self = Self(0b1000);
648
649    fn contains(self, Self(state): Self) -> bool {
650        self.0 & state > 0
651    }
652}
653
654impl ops::BitOr for State {
655    type Output = Self;
656
657    fn bitor(self, Self(rhs): Self) -> Self::Output {
658        Self(self.0 | rhs)
659    }
660}
661
662impl ops::BitAnd for State {
663    type Output = Self;
664
665    fn bitand(self, Self(rhs): Self) -> Self::Output {
666        Self(self.0 & rhs)
667    }
668}
669
670impl ops::Not for State {
671    type Output = Self;
672
673    fn not(self) -> Self::Output {
674        Self(!self.0)
675    }
676}
677
678impl fmt::Debug for State {
679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680        let mut has_states = false;
681
682        fmt_bits!(self, f, has_states, REGISTERING, WAKING, CLOSED, WOKEN);
683
684        if !has_states {
685            if *self == Self::WAITING {
686                return f.write_str("WAITING");
687            }
688
689            f.debug_tuple("UnknownState")
690                .field(&format_args!("{:#b}", self.0))
691                .finish()?;
692        }
693
694        Ok(())
695    }
696}
697
698#[cfg(all(feature = "alloc", not(loom), test))]
699mod tests {
700    use super::*;
701    use alloc::sync::Arc;
702
703    use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
704
705    #[test]
706    fn wait_smoke() {
707        let _trace = crate::util::test::trace_init();
708
709        let wait = Arc::new(WaitCell::new());
710
711        let mut task = task::spawn({
712            let wait = wait.clone();
713            async move { wait.wait().await }
714        });
715
716        assert_pending!(task.poll());
717
718        assert!(wait.wake());
719
720        assert!(task.is_woken());
721        assert_ready_ok!(task.poll());
722    }
723
724    /// Reproduces https://github.com/hawkw/mycelium/issues/449
725    #[test]
726    fn wait_spurious_poll() {
727        let _trace = crate::util::test::trace_init();
728
729        let cell = Arc::new(WaitCell::new());
730        let mut task = task::spawn({
731            let cell = cell.clone();
732            async move { cell.wait().await }
733        });
734
735        assert_pending!(task.poll(), "first poll should be pending");
736        assert_pending!(task.poll(), "second poll should be pending");
737
738        cell.wake();
739
740        assert_ready_ok!(task.poll(), "should have been woken");
741    }
742
743    #[test]
744    fn subscribe() {
745        let _trace = crate::util::test::trace_init();
746        futures::executor::block_on(async {
747            let cell = WaitCell::new();
748            let wait = cell.subscribe().await;
749            cell.wake();
750            wait.await.unwrap();
751        })
752    }
753
754    #[test]
755    fn wake_before_subscribe() {
756        let _trace = crate::util::test::trace_init();
757        let cell = Arc::new(WaitCell::new());
758        cell.wake();
759
760        let mut task = task::spawn({
761            let cell = cell.clone();
762            async move {
763                let wait = cell.subscribe().await;
764                wait.await.unwrap();
765            }
766        });
767
768        assert_ready!(task.poll(), "woken task should complete");
769
770        let mut task = task::spawn({
771            let cell = cell.clone();
772            async move {
773                let wait = cell.subscribe().await;
774                wait.await.unwrap();
775            }
776        });
777
778        assert_pending!(task.poll(), "wait cell hasn't been woken yet");
779        cell.wake();
780        assert!(task.is_woken());
781        assert_ready!(task.poll());
782    }
783
784    #[test]
785    fn wake_debounce() {
786        let _trace = crate::util::test::trace_init();
787        let cell = Arc::new(WaitCell::new());
788
789        let mut task = task::spawn({
790            let cell = cell.clone();
791            async move {
792                cell.wait().await.unwrap();
793            }
794        });
795
796        assert_pending!(task.poll());
797        cell.wake();
798        cell.wake();
799        assert!(task.is_woken());
800        assert_ready!(task.poll());
801
802        let mut task = task::spawn({
803            let cell = cell.clone();
804            async move {
805                cell.wait().await.unwrap();
806            }
807        });
808
809        assert_pending!(task.poll());
810        assert!(!task.is_woken());
811
812        cell.wake();
813        assert!(task.is_woken());
814        assert_ready!(task.poll());
815    }
816
817    #[test]
818    fn subscribe_doesnt_self_wake() {
819        let _trace = crate::util::test::trace_init();
820        let cell = Arc::new(WaitCell::new());
821
822        let mut task = task::spawn({
823            let cell = cell.clone();
824            async move {
825                let wait = cell.subscribe().await;
826                wait.await.unwrap();
827                let wait = cell.subscribe().await;
828                wait.await.unwrap();
829            }
830        });
831        assert_pending!(task.poll());
832        assert!(!task.is_woken());
833
834        cell.wake();
835        assert!(task.is_woken());
836        assert_pending!(task.poll());
837
838        assert!(!task.is_woken());
839        assert_pending!(task.poll());
840
841        cell.wake();
842        assert!(task.is_woken());
843        assert_ready!(task.poll());
844    }
845}
846
847#[cfg(all(loom, test))]
848mod loom {
849    use super::*;
850    use crate::loom::{future, sync::Arc, thread};
851
852    #[test]
853    fn basic() {
854        crate::loom::model(|| {
855            let wait = Arc::new(WaitCell::new());
856
857            let waker = wait.clone();
858            let closer = wait.clone();
859
860            thread::spawn(move || {
861                tracing::info!("waking");
862                waker.wake();
863                tracing::info!("woken");
864            });
865            thread::spawn(move || {
866                tracing::info!("closing");
867                closer.close();
868                tracing::info!("closed");
869            });
870
871            tracing::info!("waiting");
872            let _ = future::block_on(wait.wait());
873            tracing::info!("wait'd");
874        });
875    }
876
877    #[test]
878    fn subscribe() {
879        crate::loom::model(|| {
880            future::block_on(async move {
881                let cell = Arc::new(WaitCell::new());
882                let wait = cell.subscribe().await;
883
884                thread::spawn({
885                    let waker = cell.clone();
886                    move || {
887                        tracing::info!("waking");
888                        waker.wake();
889                        tracing::info!("woken");
890                    }
891                });
892
893                tracing::info!("waiting");
894                wait.await.expect("wait should be woken, not closed");
895                tracing::info!("wait'd");
896            });
897        });
898    }
899}