maitake_sync/wait_cell.rs
1//! An atomically registered [`Waker`], for waking a single task.
2//!
3//! See the documentation for the [`WaitCell`] type for details.
4use crate::{
5 loom::{
6 cell::UnsafeCell,
7 sync::atomic::{
8 AtomicUsize,
9 Ordering::{self, *},
10 },
11 },
12 util::{fmt, CachePadded},
13 Closed,
14};
15use core::{
16 future::Future,
17 ops,
18 pin::Pin,
19 task::{self, Context, Poll, Waker},
20};
21
22/// An atomically registered [`Waker`].
23///
24/// This cell stores the [`Waker`] of a single task. A [`Waker`] is stored in
25/// the cell either by calling [`poll_wait`], or by polling a [`wait`]
26/// future. Once a task's [`Waker`] is stored in a `WaitCell`, it can be woken
27/// by calling [`wake`] on the `WaitCell`.
28///
29/// # Implementation Notes
30///
31/// This is inspired by the [`AtomicWaker`] type used in Tokio's
32/// synchronization primitives, with the following modifications:
33///
34/// - An additional bit of state is added to allow [setting a "close"
35/// bit](Self::close).
36/// - A `WaitCell` is always woken by value (for now).
37/// - `WaitCell` does not handle unwinding, because [`maitake` does not support
38/// unwinding](crate#maitake-does-not-support-unwinding)
39///
40/// [`AtomicWaker`]: https://github.com/tokio-rs/tokio/blob/09b770c5db31a1f35631600e1d239679354da2dd/tokio/src/sync/task/atomic_waker.rs
41/// [`Waker`]: core::task::Waker
42/// [`poll_wait`]: Self::poll_wait
43/// [`wait`]: Self::wait
44/// [`wake`]: Self::wake
45pub struct WaitCell {
46 state: CachePadded<AtomicUsize>,
47 waker: UnsafeCell<Option<Waker>>,
48}
49
50/// An error indicating that a [`WaitCell`] was closed or busy while
51/// attempting register a [`Waker`].
52///
53/// This error is returned by the [`WaitCell::poll_wait`] method.
54#[derive(Copy, Clone, Debug, Eq, PartialEq)]
55pub enum PollWaitError {
56 /// The [`Waker`] was not registered because the [`WaitCell`] has been
57 /// [closed](WaitCell::close).
58 Closed,
59
60 /// The [`Waker`] was not registered because another task was concurrently
61 /// storing its own [`Waker`] in the [`WaitCell`].
62 Busy,
63}
64
65/// Future returned from [`WaitCell::wait()`].
66///
67/// This future is fused, so once it has completed, any future calls to poll
68/// will immediately return [`Poll::Ready`].
69#[derive(Debug)]
70#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
71pub struct Wait<'a> {
72 /// The [`WaitCell`] being waited on.
73 cell: &'a WaitCell,
74
75 presubscribe: Poll<Result<(), super::Closed>>,
76}
77
78/// Future returned from [`WaitCell::subscribe()`].
79///
80/// See the documentation for [`WaitCell::subscribe()`] for details.
81#[derive(Debug)]
82#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
83pub struct Subscribe<'a> {
84 /// The [`WaitCell`] being waited on.
85 cell: &'a WaitCell,
86}
87
88#[derive(Eq, PartialEq, Copy, Clone)]
89struct State(usize);
90
91// === impl WaitCell ===
92
93impl WaitCell {
94 loom_const_fn! {
95 /// Returns a new `WaitCell`, with no [`Waker`] stored in it.
96 #[must_use]
97 pub fn new() -> Self {
98 Self {
99 state: CachePadded::new(AtomicUsize::new(State::WAITING.0)),
100 waker: UnsafeCell::new(None),
101 }
102 }
103 }
104}
105
106impl Default for WaitCell {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl WaitCell {
113 /// Poll to wait on this `WaitCell`, consuming a stored wakeup or
114 /// registering the [`Waker`] from the provided [`Context`] to be woken by
115 /// the next wakeup.
116 ///
117 /// Once a [`Waker`] has been registered, a subsequent call to [`wake`] will
118 /// wake that [`Waker`].
119 ///
120 /// # Returns
121 ///
122 /// - [`Poll::Pending`] if the [`Waker`] was registered. If this method returns
123 /// [`Poll::Pending`], then the registered [`Waker`] will be woken by a
124 /// subsequent call to [`wake`].
125 /// - [`Poll::Ready`]`(`[`Ok`]`(()))` if the cell was woken by a call to
126 /// [`wake`] while the [`Waker`] was being registered.
127 /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Closed`]`))` if the
128 /// [`WaitCell`] has been closed.
129 /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Busy`]`))` if another
130 /// task was concurrently registering its [`Waker`] with this
131 /// [`WaitCell`].
132 ///
133 /// [`wake`]: Self::wake
134 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>> {
135 enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self));
136
137 // this is based on tokio's AtomicWaker synchronization strategy
138 match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) {
139 Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => {
140 return Poll::Ready(Err(PollWaitError::Closed));
141 }
142 Err(actual) if test_dbg!(actual.contains(State::WOKEN)) => {
143 // take the wakeup
144 self.fetch_and(!State::WOKEN, Release);
145 return Poll::Ready(Ok(()));
146 }
147 // someone else is notifying, so don't wait!
148 Err(actual) if test_dbg!(actual.contains(State::WAKING)) => {
149 return Poll::Ready(Ok(()));
150 }
151 Err(_) => return Poll::Ready(Err(PollWaitError::Busy)),
152 Ok(_) => {}
153 }
154
155 let waker = cx.waker();
156 trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker");
157
158 let prev_waker = self.waker.with_mut(|old_waker| unsafe {
159 match &mut *old_waker {
160 Some(old_waker) if waker.will_wake(old_waker) => None,
161 old => old.replace(waker.clone()),
162 }
163 });
164
165 if let Some(prev_waker) = prev_waker {
166 test_debug!("Replaced an old waker in cell, waking");
167 prev_waker.wake();
168 }
169
170 if let Err(actual) =
171 test_dbg!(self.compare_exchange(State::REGISTERING, State::WAITING, AcqRel))
172 {
173 // If the `compare_exchange` fails above, this means that we were notified for one of
174 // two reasons: either the cell was awoken, or the cell was closed.
175 //
176 // Bail out of the parking state, and determine what to report to the caller.
177 test_trace!(state = ?actual, "was notified");
178 let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() });
179 // Reset to the WAITING state by clearing everything *except*
180 // the closed bits (which must remain set). This `fetch_and`
181 // does *not* set the CLOSED bit if it is unset, it just doesn't
182 // clear it.
183 let state = test_dbg!(self.fetch_and(State::CLOSED, AcqRel));
184 // The only valid state transition while we were parking is to
185 // add the CLOSED bit.
186 debug_assert!(
187 state == actual || state == actual | State::CLOSED,
188 "state changed unexpectedly while parking!"
189 );
190
191 if let Some(waker) = waker {
192 waker.wake();
193 }
194
195 // Was the `CLOSED` bit set while we were clearing other bits?
196 // If so, the cell is closed. Otherwise, we must have been notified.
197 if state.contains(State::CLOSED) {
198 return Poll::Ready(Err(PollWaitError::Closed));
199 }
200
201 return Poll::Ready(Ok(()));
202 }
203
204 // Waker registered, time to yield!
205 Poll::Pending
206 }
207
208 /// Wait to be woken up by this cell.
209 ///
210 /// # Returns
211 ///
212 /// This future completes with the following values:
213 ///
214 /// - [`Ok`]`(())` if the future was woken by a call to [`wake`] or another
215 /// task calling [`poll_wait`] or [`wait`] on this [`WaitCell`].
216 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by a call to [`close`],
217 /// or the [`WaitCell`] was already closed.
218 ///
219 /// **Note**: The calling task's [`Waker`] is not registered until AFTER the
220 /// first time the returned [`Wait`] future is polled. This means that if a
221 /// call to [`wake`] occurs between when [`wait`] is called and when the
222 /// future is first polled, the future will *not* complete. If the caller is
223 /// responsible for performing an operation which will result in an eventual
224 /// wakeup, prefer calling [`subscribe`] _before_ performing that operation
225 /// and `.await`ing the [`Wait`] future returned by [`subscribe`].
226 ///
227 /// [`wake`]: Self::wake
228 /// [`poll_wait`]: Self::poll_wait
229 /// [`wait`]: Self::wait
230 /// [`close`]: Self::close
231 /// [`subscribe`]: Self::subscribe
232 pub fn wait(&self) -> Wait<'_> {
233 Wait {
234 cell: self,
235 presubscribe: Poll::Pending,
236 }
237 }
238
239 /// Eagerly subscribe to notifications from this `WaitCell`.
240 ///
241 /// This method returns a [`Subscribe`] [`Future`], which outputs a [`Wait`]
242 /// [`Future`]. Awaiting the [`Subscribe`] future will eagerly register the
243 /// calling task to be woken by this [`WaitCell`], so that the returned
244 /// [`Wait`] future will be woken by any calls to [`wake`] (or [`close`])
245 /// that occur between when the [`Subscribe`] future completes and when the
246 /// returned [`Wait`] future is `.await`ed.
247 ///
248 /// This is primarily intended for scenarios where the task that waits on a
249 /// [`WaitCell`] is responsible for performing some operation that
250 /// ultimately results in the [`WaitCell`] being woken. If the task were to
251 /// simply perform the operation and then call [`wait`] on the [`WaitCell`],
252 /// a potential race condition could occur where the operation completes and
253 /// wakes the [`WaitCell`] *before* the [`Wait`] future is first `.await`ed.
254 /// Using `subscribe`, the task can ensure that it is ready to be woken by
255 /// the cell *before* performing an operation that could result in it being
256 /// woken.
257 ///
258 /// These scenarios occur when a wakeup is triggered by another thread/CPU
259 /// core in response to an operation performed in the task waiting on the
260 /// `WaitCell`, or when the wakeup is triggered by a hardware interrupt
261 /// resulting from operations performed in the task.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use maitake_sync::WaitCell;
267 ///
268 /// // Perform an operation that results in a concurrent wakeup, such as
269 /// // unmasking an interrupt.
270 /// fn do_something_that_causes_a_wakeup() {
271 /// # WAIT_CELL.wake();
272 /// // ...
273 /// }
274 ///
275 /// static WAIT_CELL: WaitCell = WaitCell::new();
276 ///
277 /// # async fn dox() {
278 /// // Subscribe to notifications from the cell *before* calling
279 /// // `do_something_that_causes_a_wakeup()`, to ensure that we are
280 /// // ready to be woken when the interrupt is unmasked.
281 /// let wait = WAIT_CELL.subscribe().await;
282 ///
283 /// // Actually perform the operation.
284 /// do_something_that_causes_a_wakeup();
285 ///
286 /// // Wait for the wakeup. If the wakeup occurred *before* the first
287 /// // poll of the `wait` future had successfully subscribed to the
288 /// // `WaitCell`, we would still receive the wakeup, because the
289 /// // `subscribe` future ensured that our waker was registered to be
290 /// // woken.
291 /// wait.await.expect("WaitCell is not closed");
292 /// # }
293 /// ```
294 ///
295 /// [`wait`]: Self::wait
296 /// [`wake`]: Self::wake
297 /// [`close`]: Self::close
298 pub fn subscribe(&self) -> Subscribe<'_> {
299 Subscribe { cell: self }
300 }
301
302 /// Wake the [`Waker`] stored in this cell.
303 ///
304 /// # Returns
305 ///
306 /// - `true` if a waiting task was woken.
307 /// - `false` if no task was woken (no [`Waker`] was stored in the cell)
308 pub fn wake(&self) -> bool {
309 enter_test_debug_span!("WaitCell::wake", cell = ?fmt::ptr(self));
310 if let Some(waker) = self.take_waker(false) {
311 waker.wake();
312 true
313 } else {
314 false
315 }
316 }
317
318 /// Close the [`WaitCell`].
319 ///
320 /// This wakes any waiting task with an error indicating the `WaitCell` is
321 /// closed. Subsequent calls to [`wait`] or [`poll_wait`] will return an
322 /// error indicating that the cell has been closed.
323 ///
324 /// [`wait`]: Self::wait
325 /// [`poll_wait`]: Self::poll_wait
326 pub fn close(&self) -> bool {
327 enter_test_debug_span!("WaitCell::close", cell = ?fmt::ptr(self));
328 if let Some(waker) = self.take_waker(true) {
329 waker.wake();
330 true
331 } else {
332 false
333 }
334 }
335
336 /// Asynchronously poll the given function `f` until a condition occurs,
337 /// using the [`WaitCell`] to only re-poll when notified.
338 ///
339 /// This can be used to implement a "wait loop", turning a "try" function
340 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
341 /// "recv" or "send").
342 ///
343 /// In particular, this function correctly *registers* interest in the [`WaitCell`]
344 /// prior to polling the function, ensuring that there is not a chance of a race
345 /// where the condition occurs AFTER checking but BEFORE registering interest
346 /// in the [`WaitCell`], which could lead to deadlock.
347 ///
348 /// This is intended to have similar behavior to `Condvar` in the standard library,
349 /// but asynchronous, and not requiring operating system intervention (or existence).
350 ///
351 /// In particular, this can be used in cases where interrupts or events are used
352 /// to signify readiness or completion of some task, such as the completion of a
353 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
354 /// can wake the cell, allowing the polling function to check status fields for
355 /// partial progress or completion.
356 ///
357 /// Consider using [`Self::wait_for_value()`] if your function does return a value.
358 ///
359 /// Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for)
360 /// if you need multiple waiters.
361 ///
362 /// # Returns
363 ///
364 /// * [`Ok`]`(())` if the closure returns `true`.
365 /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
366 ///
367 /// # Examples
368 ///
369 /// ```
370 /// # use tokio::task;
371 /// # #[tokio::main(flavor = "current_thread")]
372 /// # async fn test() {
373 /// use std::sync::Arc;
374 /// use maitake_sync::WaitCell;
375 /// use std::sync::atomic::{AtomicU8, Ordering};
376 ///
377 /// let queue = Arc::new(WaitCell::new());
378 /// let num = Arc::new(AtomicU8::new(0));
379 ///
380 /// let waiter = task::spawn({
381 /// // clone items to move into the spawned task
382 /// let queue = queue.clone();
383 /// let num = num.clone();
384 /// async move {
385 /// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
386 /// println!("received wakeup!");
387 /// }
388 /// });
389 ///
390 /// println!("poking task...");
391 ///
392 /// for i in 0..20 {
393 /// num.store(i, Ordering::Relaxed);
394 /// queue.wake();
395 /// }
396 ///
397 /// waiter.await.unwrap();
398 /// # }
399 /// # test();
400 /// ```
401 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
402 loop {
403 let wait = self.subscribe().await;
404 if f() {
405 return Ok(());
406 }
407 wait.await?;
408 }
409 }
410
411 /// Asynchronously poll the given function `f` until a condition occurs,
412 /// using the [`WaitCell`] to only re-poll when notified.
413 ///
414 /// This can be used to implement a "wait loop", turning a "try" function
415 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
416 /// "recv" or "send").
417 ///
418 /// In particular, this function correctly *registers* interest in the [`WaitCell`]
419 /// prior to polling the function, ensuring that there is not a chance of a race
420 /// where the condition occurs AFTER checking but BEFORE registering interest
421 /// in the [`WaitCell`], which could lead to deadlock.
422 ///
423 /// This is intended to have similar behavior to `Condvar` in the standard library,
424 /// but asynchronous, and not requiring operating system intervention (or existence).
425 ///
426 /// In particular, this can be used in cases where interrupts or events are used
427 /// to signify readiness or completion of some task, such as the completion of a
428 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
429 /// can wake the cell, allowing the polling function to check status fields for
430 /// partial progress or completion, and also return the status flags at the same time.
431 ///
432 /// Consider using [`Self::wait_for()`] if your function does not return a value.
433 ///
434 /// Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters.
435 ///
436 /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
437 /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
438 ///
439 /// # Examples
440 ///
441 /// ```
442 /// # use tokio::task;
443 /// # #[tokio::main(flavor = "current_thread")]
444 /// # async fn test() {
445 /// use std::sync::Arc;
446 /// use maitake_sync::WaitCell;
447 /// use std::sync::atomic::{AtomicU8, Ordering};
448 ///
449 /// let queue = Arc::new(WaitCell::new());
450 /// let num = Arc::new(AtomicU8::new(0));
451 ///
452 /// let waiter = task::spawn({
453 /// // clone items to move into the spawned task
454 /// let queue = queue.clone();
455 /// let num = num.clone();
456 /// async move {
457 /// let rxd = queue.wait_for_value(|| {
458 /// let val = num.load(Ordering::Relaxed);
459 /// if val > 5 {
460 /// return Some(val);
461 /// }
462 /// None
463 /// }).await.unwrap();
464 /// assert!(rxd > 5);
465 /// println!("received wakeup with value: {rxd}");
466 /// }
467 /// });
468 ///
469 /// println!("poking task...");
470 ///
471 /// for i in 0..20 {
472 /// num.store(i, Ordering::Relaxed);
473 /// queue.wake();
474 /// }
475 ///
476 /// waiter.await.unwrap();
477 /// # }
478 /// # test();
479 /// ```
480 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
481 loop {
482 let wait = self.subscribe().await;
483 if let Some(t) = f() {
484 return Ok(t);
485 }
486 wait.await?;
487 }
488 }
489
490 /// Returns `true` if this `WaitCell` is [closed](Self::close).
491 #[must_use]
492 pub fn is_closed(&self) -> bool {
493 self.current_state() == State::CLOSED
494 }
495
496 /// Takes this `WaitCell`'s waker.
497 // TODO(eliza): could probably be made a public API...
498 pub(crate) fn take_waker(&self, close: bool) -> Option<Waker> {
499 trace!(wait_cell = ?fmt::ptr(self), ?close, "notifying");
500 // Set the WAKING bit (to indicate that we're touching the waker) and
501 // the WOKEN bit (to indicate that we intend to wake it up).
502 let state = {
503 let mut bits = State::WAKING | State::WOKEN;
504 if close {
505 bits.0 |= State::CLOSED.0;
506 }
507 test_dbg!(self.fetch_or(bits, AcqRel))
508 };
509
510 // Is anyone else touching the waker?
511 if !test_dbg!(state.contains(State::WAKING | State::REGISTERING | State::CLOSED)) {
512 // Ladies and gentlemen...we got him (the lock)!
513 let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() });
514
515 // Release the lock.
516 self.fetch_and(!State::WAKING, Release);
517
518 if let Some(waker) = test_dbg!(waker) {
519 trace!(wait_cell = ?fmt::ptr(self), ?close, ?waker, "notified");
520 return Some(waker);
521 }
522 }
523
524 None
525 }
526}
527
528impl WaitCell {
529 #[inline(always)]
530 fn compare_exchange(
531 &self,
532 State(curr): State,
533 State(new): State,
534 success: Ordering,
535 ) -> Result<State, State> {
536 self.state
537 .compare_exchange(curr, new, success, Acquire)
538 .map(State)
539 .map_err(State)
540 }
541
542 #[inline(always)]
543 fn fetch_and(&self, State(state): State, order: Ordering) -> State {
544 State(self.state.fetch_and(state, order))
545 }
546
547 #[inline(always)]
548 fn fetch_or(&self, State(state): State, order: Ordering) -> State {
549 State(self.state.fetch_or(state, order))
550 }
551
552 #[inline(always)]
553 fn current_state(&self) -> State {
554 State(self.state.load(Acquire))
555 }
556}
557
558unsafe impl Send for WaitCell {}
559unsafe impl Sync for WaitCell {}
560
561impl fmt::Debug for WaitCell {
562 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563 f.debug_struct("WaitCell")
564 .field("state", &self.current_state())
565 .field("waker", &fmt::display(".."))
566 .finish()
567 }
568}
569
570impl Drop for WaitCell {
571 fn drop(&mut self) {
572 self.close();
573 }
574}
575
576// === impl Wait ===
577
578impl Future for Wait<'_> {
579 type Output = Result<(), Closed>;
580
581 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
582 enter_test_debug_span!("Wait::poll");
583
584 // Did a wakeup occur while we were pre-registering the future?
585 if test_dbg!(self.presubscribe.is_ready()) {
586 return self.presubscribe;
587 }
588
589 // Okay, actually poll the cell, then.
590 match task::ready!(test_dbg!(self.cell.poll_wait(cx))) {
591 Ok(()) => Poll::Ready(Ok(())),
592 Err(PollWaitError::Closed) => Poll::Ready(Err(Closed(()))),
593 Err(PollWaitError::Busy) => {
594 // If some other task was registering, yield and try to re-register
595 // our waker when that task is done.
596 cx.waker().wake_by_ref();
597 Poll::Pending
598 }
599 }
600 }
601}
602
603// === impl Subscribe ===
604
605impl<'cell> Future for Subscribe<'cell> {
606 type Output = Wait<'cell>;
607
608 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609 enter_test_debug_span!("Subscribe::poll");
610
611 // Pre-register the waker in the cell.
612 let presubscribe = match test_dbg!(self.cell.poll_wait(cx)) {
613 Poll::Ready(Err(PollWaitError::Busy)) => {
614 // Someone else is in the process of registering. Yield now so we
615 // can wait until that task is done, and then try again.
616 cx.waker().wake_by_ref();
617 return Poll::Pending;
618 }
619 Poll::Ready(Err(PollWaitError::Closed)) => Poll::Ready(Err(Closed(()))),
620 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
621 Poll::Pending => Poll::Pending,
622 };
623
624 Poll::Ready(Wait {
625 cell: self.cell,
626 presubscribe,
627 })
628 }
629}
630
631// === impl State ===
632
633impl State {
634 /// /!\ EXTREMELY SERIOUS WARNING! /!\
635 /// It is LOAD BEARING that the `WAITING` state is represented by zero!
636 /// This is because we return to the waiting state by `fetch_and`ing out all
637 /// other bits in a few places. If this state's bit representation is
638 /// changed to anything other than zero, that code will break! Don't do
639 /// that!
640 ///
641 /// YES, FUTURE ELIZA, THIS DOES APPLY TO YOU. YOU ALREADY BROKE IT ONCE.
642 /// DON'T DO IT AGAIN.
643 const WAITING: Self = Self(0b0000);
644 const REGISTERING: Self = Self(0b0001);
645 const WAKING: Self = Self(0b0010);
646 const WOKEN: Self = Self(0b0100);
647 const CLOSED: Self = Self(0b1000);
648
649 fn contains(self, Self(state): Self) -> bool {
650 self.0 & state > 0
651 }
652}
653
654impl ops::BitOr for State {
655 type Output = Self;
656
657 fn bitor(self, Self(rhs): Self) -> Self::Output {
658 Self(self.0 | rhs)
659 }
660}
661
662impl ops::BitAnd for State {
663 type Output = Self;
664
665 fn bitand(self, Self(rhs): Self) -> Self::Output {
666 Self(self.0 & rhs)
667 }
668}
669
670impl ops::Not for State {
671 type Output = Self;
672
673 fn not(self) -> Self::Output {
674 Self(!self.0)
675 }
676}
677
678impl fmt::Debug for State {
679 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680 let mut has_states = false;
681
682 fmt_bits!(self, f, has_states, REGISTERING, WAKING, CLOSED, WOKEN);
683
684 if !has_states {
685 if *self == Self::WAITING {
686 return f.write_str("WAITING");
687 }
688
689 f.debug_tuple("UnknownState")
690 .field(&format_args!("{:#b}", self.0))
691 .finish()?;
692 }
693
694 Ok(())
695 }
696}
697
698#[cfg(all(feature = "alloc", not(loom), test))]
699mod tests {
700 use super::*;
701 use alloc::sync::Arc;
702
703 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
704
705 #[test]
706 fn wait_smoke() {
707 let _trace = crate::util::test::trace_init();
708
709 let wait = Arc::new(WaitCell::new());
710
711 let mut task = task::spawn({
712 let wait = wait.clone();
713 async move { wait.wait().await }
714 });
715
716 assert_pending!(task.poll());
717
718 assert!(wait.wake());
719
720 assert!(task.is_woken());
721 assert_ready_ok!(task.poll());
722 }
723
724 /// Reproduces https://github.com/hawkw/mycelium/issues/449
725 #[test]
726 fn wait_spurious_poll() {
727 let _trace = crate::util::test::trace_init();
728
729 let cell = Arc::new(WaitCell::new());
730 let mut task = task::spawn({
731 let cell = cell.clone();
732 async move { cell.wait().await }
733 });
734
735 assert_pending!(task.poll(), "first poll should be pending");
736 assert_pending!(task.poll(), "second poll should be pending");
737
738 cell.wake();
739
740 assert_ready_ok!(task.poll(), "should have been woken");
741 }
742
743 #[test]
744 fn subscribe() {
745 let _trace = crate::util::test::trace_init();
746 futures::executor::block_on(async {
747 let cell = WaitCell::new();
748 let wait = cell.subscribe().await;
749 cell.wake();
750 wait.await.unwrap();
751 })
752 }
753
754 #[test]
755 fn wake_before_subscribe() {
756 let _trace = crate::util::test::trace_init();
757 let cell = Arc::new(WaitCell::new());
758 cell.wake();
759
760 let mut task = task::spawn({
761 let cell = cell.clone();
762 async move {
763 let wait = cell.subscribe().await;
764 wait.await.unwrap();
765 }
766 });
767
768 assert_ready!(task.poll(), "woken task should complete");
769
770 let mut task = task::spawn({
771 let cell = cell.clone();
772 async move {
773 let wait = cell.subscribe().await;
774 wait.await.unwrap();
775 }
776 });
777
778 assert_pending!(task.poll(), "wait cell hasn't been woken yet");
779 cell.wake();
780 assert!(task.is_woken());
781 assert_ready!(task.poll());
782 }
783
784 #[test]
785 fn wake_debounce() {
786 let _trace = crate::util::test::trace_init();
787 let cell = Arc::new(WaitCell::new());
788
789 let mut task = task::spawn({
790 let cell = cell.clone();
791 async move {
792 cell.wait().await.unwrap();
793 }
794 });
795
796 assert_pending!(task.poll());
797 cell.wake();
798 cell.wake();
799 assert!(task.is_woken());
800 assert_ready!(task.poll());
801
802 let mut task = task::spawn({
803 let cell = cell.clone();
804 async move {
805 cell.wait().await.unwrap();
806 }
807 });
808
809 assert_pending!(task.poll());
810 assert!(!task.is_woken());
811
812 cell.wake();
813 assert!(task.is_woken());
814 assert_ready!(task.poll());
815 }
816
817 #[test]
818 fn subscribe_doesnt_self_wake() {
819 let _trace = crate::util::test::trace_init();
820 let cell = Arc::new(WaitCell::new());
821
822 let mut task = task::spawn({
823 let cell = cell.clone();
824 async move {
825 let wait = cell.subscribe().await;
826 wait.await.unwrap();
827 let wait = cell.subscribe().await;
828 wait.await.unwrap();
829 }
830 });
831 assert_pending!(task.poll());
832 assert!(!task.is_woken());
833
834 cell.wake();
835 assert!(task.is_woken());
836 assert_pending!(task.poll());
837
838 assert!(!task.is_woken());
839 assert_pending!(task.poll());
840
841 cell.wake();
842 assert!(task.is_woken());
843 assert_ready!(task.poll());
844 }
845}
846
847#[cfg(all(loom, test))]
848mod loom {
849 use super::*;
850 use crate::loom::{future, sync::Arc, thread};
851
852 #[test]
853 fn basic() {
854 crate::loom::model(|| {
855 let wait = Arc::new(WaitCell::new());
856
857 let waker = wait.clone();
858 let closer = wait.clone();
859
860 thread::spawn(move || {
861 tracing::info!("waking");
862 waker.wake();
863 tracing::info!("woken");
864 });
865 thread::spawn(move || {
866 tracing::info!("closing");
867 closer.close();
868 tracing::info!("closed");
869 });
870
871 tracing::info!("waiting");
872 let _ = future::block_on(wait.wait());
873 tracing::info!("wait'd");
874 });
875 }
876
877 #[test]
878 fn subscribe() {
879 crate::loom::model(|| {
880 future::block_on(async move {
881 let cell = Arc::new(WaitCell::new());
882 let wait = cell.subscribe().await;
883
884 thread::spawn({
885 let waker = cell.clone();
886 move || {
887 tracing::info!("waking");
888 waker.wake();
889 tracing::info!("woken");
890 }
891 });
892
893 tracing::info!("waiting");
894 wait.await.expect("wait should be woken, not closed");
895 tracing::info!("wait'd");
896 });
897 });
898 }
899}