maitake_sync/wait_queue.rs
1//! A queue of waiting tasks that can be woken in first-in, first-out order (or
2//! all at once).
3//!
4//! See the [`WaitQueue`] type's documentation for details.
5#[cfg(any(test, maitake_ultraverbose))]
6use crate::util::fmt;
7use crate::{
8 blocking::{DefaultMutex, Mutex, ScopedRawMutex},
9 loom::{
10 cell::UnsafeCell,
11 sync::atomic::{AtomicUsize, Ordering::*},
12 },
13 util::{CachePadded, WakeBatch},
14 WaitResult,
15};
16use cordyceps::{
17 list::{self, List},
18 Linked,
19};
20use core::{
21 future::Future,
22 marker::PhantomPinned,
23 mem,
24 pin::Pin,
25 ptr::{self, NonNull},
26 task::{Context, Poll, Waker},
27};
28use mycelium_bitfield::{bitfield, enum_from_bits, FromBits};
29use pin_project::{pin_project, pinned_drop};
30
31#[cfg(test)]
32mod tests;
33
34/// A queue of waiting tasks which can be [woken in first-in, first-out
35/// order][wake], or [all at once][wake_all].
36///
37/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be
38/// woken when some event occurs, either [individually][wake] in first-in,
39/// first-out order, or [all at once][wake_all]. This makes it a vital building
40/// block of runtime services (such as timers or I/O resources), where it may be
41/// used to wake a set of tasks when a timer completes or when a resource
42/// becomes available. It can be equally useful for implementing higher-level
43/// synchronization primitives: for example, a `WaitQueue` plus an
44/// [`UnsafeCell`] is essentially [an entire implementation of a fair
45/// asynchronous mutex][mutex]. Finally, a `WaitQueue` can be a useful
46/// synchronization primitive on its own: sometimes, you just need to have a
47/// bunch of tasks wait for something and then wake them all up.
48///
49/// # Overriding the blocking mutex
50///
51/// This type uses a [blocking `Mutex`](crate::blocking::Mutex) internally to
52/// synchronize access to its wait list. By default, this is a [`DefaultMutex`]. To
53/// use an alternative [`ScopedRawMutex`] implementation, use the
54/// [`new_with_raw_mutex`](Self::new_with_raw_mutex) constructor. See [the documentation
55/// on overriding mutex
56/// implementations](crate::blocking#overriding-mutex-implementations) for more
57/// details.
58///
59/// # Examples
60///
61/// Waking a single task at a time by calling [`wake`][wake]:
62///
63/// ```ignore
64/// use std::sync::Arc;
65/// use maitake::scheduler::Scheduler;
66/// use maitake_sync::WaitQueue;
67///
68/// const TASKS: usize = 10;
69///
70/// // In order to spawn tasks, we need a `Scheduler` instance.
71/// let scheduler = Scheduler::new();
72///
73/// // Construct a new `WaitQueue`.
74/// let q = Arc::new(WaitQueue::new());
75///
76/// // Spawn some tasks that will wait on the queue.
77/// for _ in 0..TASKS {
78/// let q = q.clone();
79/// scheduler.spawn(async move {
80/// // Wait to be woken by the queue.
81/// q.wait().await.expect("queue is not closed");
82/// });
83/// }
84///
85/// // Tick the scheduler once.
86/// let tick = scheduler.tick();
87///
88/// // No tasks should complete on this tick, as they are all waiting
89/// // to be woken by the queue.
90/// assert_eq!(tick.completed, 0, "no tasks have been woken");
91///
92/// let mut completed = 0;
93/// for i in 1..=TASKS {
94/// // Wake the next task from the queue.
95/// q.wake();
96///
97/// // Tick the scheduler.
98/// let tick = scheduler.tick();
99///
100/// // A single task should have completed on this tick.
101/// completed += tick.completed;
102/// assert_eq!(completed, i);
103/// }
104///
105/// assert_eq!(completed, TASKS, "all tasks should have completed");
106/// ```
107///
108/// Waking all tasks using [`wake_all`][wake_all]:
109///
110/// ```ignore
111/// use std::sync::Arc;
112/// use maitake::scheduler::Scheduler;
113/// use maitake_sync::WaitQueue;
114///
115/// const TASKS: usize = 10;
116///
117/// // In order to spawn tasks, we need a `Scheduler` instance.
118/// let scheduler = Scheduler::new();
119///
120/// // Construct a new `WaitQueue`.
121/// let q = Arc::new(WaitQueue::new());
122///
123/// // Spawn some tasks that will wait on the queue.
124/// for _ in 0..TASKS {
125/// let q = q.clone();
126/// scheduler.spawn(async move {
127/// // Wait to be woken by the queue.
128/// q.wait().await.expect("queue is not closed");
129/// });
130/// }
131///
132/// // Tick the scheduler once.
133/// let tick = scheduler.tick();
134///
135/// // No tasks should complete on this tick, as they are all waiting
136/// // to be woken by the queue.
137/// assert_eq!(tick.completed, 0, "no tasks have been woken");
138///
139/// // Wake all tasks waiting for the queue.
140/// q.wake_all();
141///
142/// // Tick the scheduler again to run the woken tasks.
143/// let tick = scheduler.tick();
144///
145/// // All tasks have now completed, since they were woken by the
146/// // queue.
147/// assert_eq!(tick.completed, TASKS, "all tasks should have completed");
148/// ```
149///
150/// # Implementation Notes
151///
152/// This type is implemented using an [intrusive doubly-linked list][ilist].
153///
154/// The *[intrusive]* aspect of this list is important, as it means that it does
155/// not allocate memory. Instead, nodes in the linked list are stored in the
156/// futures of tasks trying to wait for capacity. This means that it is not
157/// necessary to allocate any heap memory for each task waiting to be woken.
158///
159/// However, the intrusive linked list introduces one new danger: because
160/// futures can be *cancelled*, and the linked list nodes live within the
161/// futures trying to wait on the queue, we *must* ensure that the node
162/// is unlinked from the list before dropping a cancelled future. Failure to do
163/// so would result in the list containing dangling pointers. Therefore, we must
164/// use a *doubly-linked* list, so that nodes can edit both the previous and
165/// next node when they have to remove themselves. This is kind of a bummer, as
166/// it means we can't use something nice like this [intrusive queue by Dmitry
167/// Vyukov][2], and there are not really practical designs for lock-free
168/// doubly-linked lists that don't rely on some kind of deferred reclamation
169/// scheme such as hazard pointers or QSBR.
170///
171/// Instead, we just stick a [`Mutex`] around the linked list, which must be
172/// acquired to pop nodes from it, or for nodes to remove themselves when
173/// futures are cancelled. This is a bit sad, but the critical sections for this
174/// mutex are short enough that we still get pretty good performance despite it.
175///
176/// [`Waker`]: core::task::Waker
177/// [wait]: WaitQueue::wait
178/// [wake]: WaitQueue::wake
179/// [wake_all]: WaitQueue::wake_all
180/// [`UnsafeCell`]: core::cell::UnsafeCell
181/// [ilist]: cordyceps::List
182/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
183/// [mutex]: crate::Mutex
184/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
185#[derive(Debug)]
186pub struct WaitQueue<Lock: ScopedRawMutex = DefaultMutex> {
187 /// The wait queue's state variable.
188 state: CachePadded<AtomicUsize>,
189
190 /// The linked list of waiters.
191 ///
192 /// # Safety
193 ///
194 /// This is protected by a mutex; the mutex *must* be acquired when
195 /// manipulating the linked list, OR when manipulating waiter nodes that may
196 /// be linked into the list. If a node is known to not be linked, it is safe
197 /// to modify that node (such as by waking the stored [`Waker`]) without
198 /// holding the lock; otherwise, it may be modified through the list, so the
199 /// lock must be held when modifying the
200 /// node.
201 ///
202 /// A spinlock is used here, in order to support `no_std` platforms; when
203 /// running `loom` tests, a `loom` mutex is used instead to simulate the
204 /// spinlock, because loom doesn't play nice with real spinlocks.
205 queue: Mutex<List<Waiter>, Lock>,
206}
207
208/// Future returned from [`WaitQueue::wait()`].
209///
210/// This future is fused, so once it has completed, any future calls to poll
211/// will immediately return [`Poll::Ready`].
212///
213/// # Notes
214///
215/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a `Wait`
216/// future once it has been polled. For instance, the following code must not
217/// compile:
218///
219///```compile_fail
220/// use maitake_sync::wait_queue::Wait;
221///
222/// // Calls to this function should only compile if `T` is `Unpin`.
223/// fn assert_unpin<T: Unpin>() {}
224///
225/// assert_unpin::<Wait<'_>>();
226/// ```
227#[derive(Debug)]
228#[pin_project(PinnedDrop)]
229#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
230pub struct Wait<'a, Lock: ScopedRawMutex = DefaultMutex> {
231 /// The [`WaitQueue`] being waited on.
232 queue: &'a WaitQueue<Lock>,
233
234 /// Entry in the wait queue linked list.
235 #[pin]
236 waiter: Waiter,
237}
238
239/// A waiter node which may be linked into a wait queue.
240#[derive(Debug)]
241#[repr(C)]
242#[pin_project]
243struct Waiter {
244 /// The intrusive linked list node.
245 ///
246 /// This *must* be the first field in the struct in order for the `Linked`
247 /// implementation to be sound.
248 #[pin]
249 node: UnsafeCell<Node>,
250
251 /// The future's state.
252 state: WaitStateBits,
253}
254
255#[derive(Debug)]
256struct Node {
257 /// Intrusive linked list pointers.
258 links: list::Links<Waiter>,
259
260 /// The node's waker
261 waker: Wakeup,
262
263 // This type is !Unpin due to the heuristic from:
264 // <https://github.com/rust-lang/rust/pull/82834>
265 _pin: PhantomPinned,
266}
267
268bitfield! {
269 #[derive(Eq, PartialEq)]
270 struct QueueState<usize> {
271 /// The queue's state.
272 const STATE: State;
273
274 /// The number of times [`WaitQueue::wake_all`] has been called.
275 const WAKE_ALLS = ..;
276 }
277}
278
279bitfield! {
280 #[derive(Eq, PartialEq)]
281 struct WaitStateBits<usize> {
282 /// The waiter's state.
283 const STATE: WaitState;
284
285 /// The number of times [`WaitQueue::wake_all`] has been called.
286 const WAKE_ALLS = ..;
287 }
288}
289
290enum_from_bits! {
291 /// The state of a [`Waiter`] node in a [`WaitQueue`].
292 #[derive(Debug, Eq, PartialEq)]
293 enum WaitState<u8> {
294 /// The waiter has not yet been enqueued.
295 ///
296 /// The number of times [`WaitQueue::wake_all`] has been called is stored
297 /// when the node is created, in order to determine whether it was woken by
298 /// a stored wakeup when enqueueing.
299 ///
300 /// When in this state, the node is **not** part of the linked list, and
301 /// can be dropped without removing it from the list.
302 Start = 0b00,
303
304 /// The waiter is waiting.
305 ///
306 /// When in this state, the node **is** part of the linked list. If the
307 /// node is dropped in this state, it **must** be removed from the list
308 /// before dropping it. Failure to ensure this will result in dangling
309 /// pointers in the linked list!
310 Waiting = 0b01,
311
312 /// The waiter has been woken.
313 ///
314 /// When in this state, the node is **not** part of the linked list, and
315 /// can be dropped without removing it from the list.
316 Woken = 0b10,
317 }
318}
319
320/// The queue's current state.
321#[derive(Debug, Copy, Clone, Eq, PartialEq)]
322#[repr(u8)]
323enum State {
324 /// No waiters are queued, and there is no pending notification.
325 /// Waiting while the queue is in this state will enqueue the waiter;
326 /// notifying while in this state will store a pending notification in the
327 /// queue, transitioning to [`State::Woken`].
328 Empty = 0b00,
329
330 /// There are one or more waiters in the queue. Waiting while
331 /// the queue is in this state will not transition the state. Waking while
332 /// in this state will wake the first waiter in the queue; if this empties
333 /// the queue, then the queue will transition to [`State::Empty`].
334 Waiting = 0b01,
335
336 /// The queue has a stored notification. Waiting while the queue
337 /// is in this state will consume the pending notification *without*
338 /// enqueueing the waiter and transition the queue to [`State::Empty`].
339 /// Waking while in this state will leave the queue in this state.
340 Woken = 0b10,
341
342 /// The queue is closed. Waiting while in this state will return
343 /// [`Closed`] without transitioning the queue's state.
344 ///
345 /// *Note*: This *must* correspond to all state bits being set, as it's set
346 /// via a [`fetch_or`].
347 ///
348 /// [`Closed`]: crate::Closed
349 /// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or
350 Closed = 0b11,
351}
352
353#[derive(Clone, Debug)]
354enum Wakeup {
355 Empty,
356 Waiting(Waker),
357 One,
358 All,
359 Closed,
360}
361
362// === impl WaitQueue ===
363
364impl WaitQueue {
365 loom_const_fn! {
366 /// Returns a new `WaitQueue`.
367 ///
368 /// This constructor returns a `WaitQueue` that uses a [`DefaultMutex`] as
369 /// the [`ScopedRawMutex`] implementation for wait list synchronization.
370 /// To use a different [`ScopedRawMutex`] implementation, use the
371 /// [`new_with_raw_mutex`](Self::new_with_raw_mutex) constructor, instead. See
372 /// [the documentation on overriding mutex
373 /// implementations](crate::blocking#overriding-mutex-implementations)
374 /// for more details.
375 #[must_use]
376 pub fn new() -> Self {
377 Self::new_with_raw_mutex(DefaultMutex::new())
378 }
379 }
380}
381
382impl<Lock> Default for WaitQueue<Lock>
383where
384 Lock: ScopedRawMutex + Default,
385{
386 fn default() -> Self {
387 Self::new_with_raw_mutex(Lock::default())
388 }
389}
390
391impl<Lock> WaitQueue<Lock>
392where
393 Lock: ScopedRawMutex,
394{
395 loom_const_fn! {
396 /// Returns a new `WaitQueue`, using the provided [`ScopedRawMutex`]
397 /// implementation for wait-list synchronization.
398 ///
399 /// This constructor allows a `WaitQueue` to be constructed with any type that
400 /// implements [`ScopedRawMutex`] as the underlying raw blocking mutex
401 /// implementation. See [the documentation on overriding mutex
402 /// implementations](crate::blocking#overriding-mutex-implementations)
403 /// for more details.
404 #[must_use]
405 pub fn new_with_raw_mutex(lock: Lock) -> Self {
406 Self::make(State::Empty, lock)
407 }
408 }
409
410 loom_const_fn! {
411 /// Returns a new `WaitQueue` with a single stored wakeup.
412 ///
413 /// The first call to [`wait`] on this queue will immediately succeed.
414 ///
415 /// [`wait`]: Self::wait
416 // TODO(eliza): should this be a public API?
417 #[must_use]
418 pub(crate) fn new_woken(lock: Lock) -> Self {
419 Self::make(State::Woken, lock)
420 }
421 }
422
423 loom_const_fn! {
424 #[must_use]
425 fn make(state: State, lock: Lock) -> Self {
426 Self {
427 state: CachePadded::new(AtomicUsize::new(state.into_usize())),
428 queue: Mutex::new_with_raw_mutex(List::new(), lock),
429 }
430 }
431 }
432
433 /// Wake the next task in the queue.
434 ///
435 /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the
436 /// **next** call to [`wait().await`] will complete immediately. If one or more
437 /// tasks are currently in the queue, the first task in the queue is woken.
438 ///
439 /// At most one wakeup will be stored in the queue at any time. If `wake()`
440 /// is called many times while there are no tasks in the queue, only a
441 /// single wakeup is stored.
442 ///
443 /// [`wait().await`]: Self::wait()
444 ///
445 /// # Examples
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// # use tokio::task;
451 /// # #[tokio::main(flavor = "current_thread")]
452 /// # async fn test() {
453 /// use std::sync::Arc;
454 /// use maitake_sync::WaitQueue;
455 ///
456 /// let queue = Arc::new(WaitQueue::new());
457 ///
458 /// let waiter = task::spawn({
459 /// // clone the queue to move into the spawned task
460 /// let queue = queue.clone();
461 /// async move {
462 /// queue.wait().await;
463 /// println!("received wakeup!");
464 /// }
465 /// });
466 ///
467 /// println!("waking task...");
468 /// queue.wake();
469 ///
470 /// waiter.await.unwrap();
471 /// # }
472 /// # test();
473 /// ```
474 #[inline]
475 pub fn wake(&self) {
476 // snapshot the queue's current state.
477 let mut state = self.load();
478
479 // check if any tasks are currently waiting on this queue. if there are
480 // no waiting tasks, store the wakeup to be consumed by the next call to
481 // `wait`.
482 loop {
483 match state.get(QueueState::STATE) {
484 // if the queue is closed, bail.
485 State::Closed => return,
486 // if there are waiting tasks, break out of the loop and wake one.
487 State::Waiting => break,
488 _ => {}
489 }
490
491 let next = state.with_state(State::Woken);
492 // advance the state to `Woken`, and return (if we did so
493 // successfully)
494 match self.compare_exchange(state, next) {
495 Ok(_) => return,
496 Err(actual) => state = actual,
497 }
498 }
499
500 // okay, there are tasks waiting on the queue; we must acquire the lock
501 // on the linked list and wake the next task from the queue.
502 let waker = self.queue.with_lock(|queue| {
503 test_debug!("wake: -> locked");
504
505 // the queue's state may have changed while we were waiting to acquire
506 // the lock, so we need to acquire a new snapshot before we take the
507 // waker.
508 state = self.load();
509 self.wake_locked(queue, state)
510 });
511
512 //now that we've released the lock, wake the waiting task (if we
513 //actually deuqueued one).
514 if let Some(waker) = waker {
515 waker.wake();
516 }
517 }
518
519 /// Wake *all* tasks currently in the queue.
520 ///
521 /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a
522 /// wakeup is *not* stored in the queue to wake the next call to [`wait()`]
523 /// if the queue is empty. Instead, this method only wakes all currently
524 /// registered waiters. Registering a task to be woken is done by `await`ing
525 /// the [`Future`] returned by the [`wait()`] method on this queue.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// # use tokio::task;
531 /// # #[tokio::main(flavor = "current_thread")]
532 /// # async fn test() {
533 /// use maitake_sync::WaitQueue;
534 /// use std::sync::Arc;
535 ///
536 /// let queue = Arc::new(WaitQueue::new());
537 ///
538 /// // spawn multiple tasks to wait on the queue.
539 /// let task1 = task::spawn({
540 /// let queue = queue.clone();
541 /// async move {
542 /// println!("task 1 waiting...");
543 /// queue.wait().await;
544 /// println!("task 1 woken")
545 /// }
546 /// });
547 ///
548 /// let task2 = task::spawn({
549 /// let queue = queue.clone();
550 /// async move {
551 /// println!("task 2 waiting...");
552 /// queue.wait().await;
553 /// println!("task 2 woken")
554 /// }
555 /// });
556 ///
557 /// // yield to the scheduler so that both tasks register
558 /// // themselves to wait on the queue.
559 /// task::yield_now().await;
560 ///
561 /// // neither task will have been woken.
562 /// assert!(!task1.is_finished());
563 /// assert!(!task2.is_finished());
564 ///
565 /// // wake all tasks waiting on the queue.
566 /// queue.wake_all();
567 ///
568 /// // yield to the scheduler again so that the tasks can execute.
569 /// task::yield_now().await;
570 ///
571 /// assert!(task1.is_finished());
572 /// assert!(task2.is_finished());
573 /// # }
574 /// # test();
575 /// ```
576 ///
577 /// [`wake()`]: Self::wake
578 /// [`wait()`]: Self::wait
579 pub fn wake_all(&self) {
580 let mut batch = WakeBatch::new();
581 let mut waiters_remaining = true;
582
583 // This is a little bit contorted: we must load the state inside the
584 // lock, but for all states except for `Waiting`, we just need to bail
585 // out...but we can't `return` from the outer function inside the lock
586 // closure. Therefore, we just return a `bool` and, if it's `true`,
587 // return instead of doing more work.
588 let done = self.queue.with_lock(|queue| {
589 let state = self.load();
590
591 match test_dbg!(state.get(QueueState::STATE)) {
592 // If there are no waiters in the queue, increment the number of
593 // `wake_all` calls and return. incrementing the `wake_all` count
594 // must be performed inside the lock, so we do it here.
595 State::Woken | State::Empty => {
596 self.state.fetch_add(QueueState::ONE_WAKE_ALL, SeqCst);
597 true
598 }
599 // If the queue is already closed, this is a no-op. Just bail.
600 State::Closed => true,
601 // Okay, there are waiters in the queue. Transition to the empty
602 // state inside the lock and start draining the queue.
603 State::Waiting => {
604 let next_state = QueueState::new()
605 .with_state(State::Empty)
606 .with(QueueState::WAKE_ALLS, state.get(QueueState::WAKE_ALLS) + 1);
607 self.compare_exchange(state, next_state)
608 .expect("state should not have transitioned while locked");
609
610 // Drain the first batch of waiters from the queue.
611 waiters_remaining =
612 test_dbg!(Self::drain_to_wake_batch(&mut batch, queue, Wakeup::All));
613
614 false
615 }
616 }
617 });
618
619 if done {
620 return;
621 }
622
623 batch.wake_all();
624 // As long as there are waiters remaining to wake, lock the queue, drain
625 // another batch, release the lock, and wake them.
626 while waiters_remaining {
627 self.queue.with_lock(|queue| {
628 waiters_remaining = Self::drain_to_wake_batch(&mut batch, queue, Wakeup::All);
629 });
630 batch.wake_all();
631 }
632 }
633
634 /// Close the queue, indicating that it may no longer be used.
635 ///
636 /// Once a queue is closed, all [`wait()`] calls (current or future) will
637 /// return an error.
638 ///
639 /// This method is generally used when implementing higher-level
640 /// synchronization primitives or resources: when an event makes a resource
641 /// permanently unavailable, the queue can be closed.
642 ///
643 /// [`wait()`]: Self::wait
644 pub fn close(&self) {
645 let state = self.state.fetch_or(State::Closed.into_usize(), SeqCst);
646 let state = test_dbg!(QueueState::from_bits(state));
647 if state.get(QueueState::STATE) != State::Waiting {
648 return;
649 }
650
651 let mut batch = WakeBatch::new();
652 let mut waking = true;
653 while waking {
654 waking = self
655 .queue
656 .with_lock(|queue| Self::drain_to_wake_batch(&mut batch, queue, Wakeup::Closed));
657 batch.wake_all();
658 }
659 }
660
661 /// Wait to be woken up by this queue.
662 ///
663 /// Equivalent to:
664 ///
665 /// ```ignore
666 /// async fn wait(&self);
667 /// ```
668 ///
669 /// This returns a [`Wait`] [`Future`] that will complete when the task is
670 /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue`
671 /// is dropped.
672 ///
673 /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously
674 /// called while no tasks were waiting on the queue, then `wait().await`
675 /// will complete immediately, consuming the stored wakeup. Otherwise,
676 /// `wait().await` waits to be woken by the next call to [`wake()`] or
677 /// [`wake_all()`].
678 ///
679 /// The [`Wait`] future is not guaranteed to receive wakeups from calls to
680 /// [`wake()`] if it has not yet been polled. See the documentation for the
681 /// [`Wait::subscribe()`] method for details on receiving wakeups from the
682 /// queue prior to polling the `Wait` future for the first time.
683 ///
684 /// A `Wait` future **is** is guaranteed to recieve wakeups from calls to
685 /// [`wake_all()`] as soon as it is created, even if it has not yet been
686 /// polled.
687 ///
688 /// # Returns
689 ///
690 /// The [`Future`] returned by this method completes with one of the
691 /// following [outputs](Future::Output):
692 ///
693 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
694 /// [`wake_all()`].
695 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being
696 /// [`close`d](WaitQueue::close).
697 ///
698 /// # Cancellation
699 ///
700 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order
701 /// that they started to wait. If a [`Wait`] future is dropped, the task
702 /// will forfeit its position in the queue.
703 ///
704 /// # Examples
705 ///
706 /// ```
707 /// # use tokio::task;
708 /// # #[tokio::main(flavor = "current_thread")]
709 /// # async fn test() {
710 /// use std::sync::Arc;
711 /// use maitake_sync::WaitQueue;
712 ///
713 /// let queue = Arc::new(WaitQueue::new());
714 ///
715 /// let waiter = task::spawn({
716 /// // clone the queue to move into the spawned task
717 /// let queue = queue.clone();
718 /// async move {
719 /// queue.wait().await;
720 /// println!("received wakeup!");
721 /// }
722 /// });
723 ///
724 /// println!("waking task...");
725 /// queue.wake();
726 ///
727 /// waiter.await.unwrap();
728 /// # }
729 /// # test();
730 /// ```
731 ///
732 /// [`wake()`]: Self::wake
733 /// [`wake_all()`]: Self::wake_all
734 /// [`Closed`]: crate::Closed
735 pub fn wait(&self) -> Wait<'_, Lock> {
736 Wait {
737 queue: self,
738 waiter: self.waiter(),
739 }
740 }
741
742 pub(crate) fn try_wait(&self) -> Poll<WaitResult<()>> {
743 let mut state = self.load();
744 let initial_wake_alls = state.get(QueueState::WAKE_ALLS);
745 while state.get(QueueState::STATE) == State::Woken {
746 match self.compare_exchange(state, state.with_state(State::Empty)) {
747 Ok(_) => return Poll::Ready(Ok(())),
748 Err(actual) => state = actual,
749 }
750 }
751
752 match state.get(QueueState::STATE) {
753 State::Closed => crate::closed(),
754 _ if state.get(QueueState::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())),
755 State::Empty | State::Waiting => Poll::Pending,
756 State::Woken => Poll::Ready(Ok(())),
757 }
758 }
759
760 /// Asynchronously poll the given function `f` until a condition occurs,
761 /// using the [`WaitQueue`] to only re-poll when notified.
762 ///
763 /// This can be used to implement a "wait loop", turning a "try" function
764 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
765 /// "recv" or "send").
766 ///
767 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
768 /// prior to polling the function, ensuring that there is not a chance of a race
769 /// where the condition occurs AFTER checking but BEFORE registering interest
770 /// in the [`WaitQueue`], which could lead to deadlock.
771 ///
772 /// This is intended to have similar behavior to `Condvar` in the standard library,
773 /// but asynchronous, and not requiring operating system intervention (or existence).
774 ///
775 /// In particular, this can be used in cases where interrupts or events are used
776 /// to signify readiness or completion of some task, such as the completion of a
777 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
778 /// can wake the queue, allowing the polling function to check status fields for
779 /// partial progress or completion.
780 ///
781 /// Consider using [`Self::wait_for_value()`] if your function does return a value.
782 ///
783 /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for)
784 /// if you do not need multiple waiters.
785 ///
786 /// # Returns
787 ///
788 /// * [`Ok`]`(())` if the closure returns `true`.
789 /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
790 ///
791 /// # Examples
792 ///
793 /// ```
794 /// # use tokio::task;
795 /// # #[tokio::main(flavor = "current_thread")]
796 /// # async fn test() {
797 /// use std::sync::Arc;
798 /// use maitake_sync::WaitQueue;
799 /// use std::sync::atomic::{AtomicU8, Ordering};
800 ///
801 /// let queue = Arc::new(WaitQueue::new());
802 /// let num = Arc::new(AtomicU8::new(0));
803 ///
804 /// let waiter1 = task::spawn({
805 /// // clone items to move into the spawned task
806 /// let queue = queue.clone();
807 /// let num = num.clone();
808 /// async move {
809 /// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
810 /// println!("received wakeup!");
811 /// }
812 /// });
813 ///
814 /// let waiter2 = task::spawn({
815 /// // clone items to move into the spawned task
816 /// let queue = queue.clone();
817 /// let num = num.clone();
818 /// async move {
819 /// queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
820 /// println!("received wakeup!");
821 /// }
822 /// });
823 ///
824 /// println!("poking task...");
825 ///
826 /// for i in 0..20 {
827 /// num.store(i, Ordering::Relaxed);
828 /// queue.wake();
829 /// }
830 ///
831 /// waiter1.await.unwrap();
832 /// waiter2.await.unwrap();
833 /// # }
834 /// # test();
835 /// ```
836 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> WaitResult<()> {
837 loop {
838 let wait = self.wait();
839 let mut wait = core::pin::pin!(wait);
840 let _ = wait.as_mut().subscribe()?;
841 if f() {
842 return Ok(());
843 }
844 wait.await?;
845 }
846 }
847
848 /// Asynchronously poll the given function `f` until a condition occurs,
849 /// using the [`WaitQueue`] to only re-poll when notified.
850 ///
851 /// This can be used to implement a "wait loop", turning a "try" function
852 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
853 /// "recv" or "send").
854 ///
855 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
856 /// prior to polling the function, ensuring that there is not a chance of a race
857 /// where the condition occurs AFTER checking but BEFORE registering interest
858 /// in the [`WaitQueue`], which could lead to deadlock.
859 ///
860 /// This is intended to have similar behavior to `Condvar` in the standard library,
861 /// but asynchronous, and not requiring operating system intervention (or existence).
862 ///
863 /// In particular, this can be used in cases where interrupts or events are used
864 /// to signify readiness or completion of some task, such as the completion of a
865 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
866 /// can wake the queue, allowing the polling function to check status fields for
867 /// partial progress or completion, and also return the status flags at the same time.
868 ///
869 /// Consider using [`Self::wait_for()`] if your function does not return a value.
870 ///
871 /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value)
872 /// if you do not need multiple waiters.
873 ///
874 /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
875 /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// # use tokio::task;
881 /// # #[tokio::main(flavor = "current_thread")]
882 /// # async fn test() {
883 /// use std::sync::Arc;
884 /// use maitake_sync::WaitQueue;
885 /// use std::sync::atomic::{AtomicU8, Ordering};
886 ///
887 /// let queue = Arc::new(WaitQueue::new());
888 /// let num = Arc::new(AtomicU8::new(0));
889 ///
890 /// let waiter1 = task::spawn({
891 /// // clone items to move into the spawned task
892 /// let queue = queue.clone();
893 /// let num = num.clone();
894 /// async move {
895 /// let rxd = queue.wait_for_value(|| {
896 /// let val = num.load(Ordering::Relaxed);
897 /// if val > 5 {
898 /// return Some(val);
899 /// }
900 /// None
901 /// }).await.unwrap();
902 /// assert!(rxd > 5);
903 /// println!("received wakeup with value: {rxd}");
904 /// }
905 /// });
906 ///
907 /// let waiter2 = task::spawn({
908 /// // clone items to move into the spawned task
909 /// let queue = queue.clone();
910 /// let num = num.clone();
911 /// async move {
912 /// let rxd = queue.wait_for_value(|| {
913 /// let val = num.load(Ordering::Relaxed);
914 /// if val > 10 {
915 /// return Some(val);
916 /// }
917 /// None
918 /// }).await.unwrap();
919 /// assert!(rxd > 10);
920 /// println!("received wakeup with value: {rxd}");
921 /// }
922 /// });
923 ///
924 /// println!("poking task...");
925 ///
926 /// for i in 0..20 {
927 /// num.store(i, Ordering::Relaxed);
928 /// queue.wake();
929 /// }
930 ///
931 /// waiter1.await.unwrap();
932 /// waiter2.await.unwrap();
933 /// # }
934 /// # test();
935 /// ```
936 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> WaitResult<T> {
937 loop {
938 let wait = self.wait();
939 let mut wait = core::pin::pin!(wait);
940 match wait.as_mut().subscribe() {
941 Poll::Ready(wr) => wr?,
942 Poll::Pending => {}
943 }
944 if let Some(t) = f() {
945 return Ok(t);
946 }
947 wait.await?;
948 }
949 }
950
951 /// Returns `true` if this `WaitQueue` is [closed](Self::close).
952 #[must_use]
953 pub fn is_closed(&self) -> bool {
954 self.load().get(QueueState::STATE) == State::Closed
955 }
956
957 /// Returns a [`Waiter`] entry in this queue.
958 ///
959 /// This is factored out into a separate function because it's used by both
960 /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`].
961 fn waiter(&self) -> Waiter {
962 // how many times has `wake_all` been called when this waiter is created?
963 let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS));
964 let state = WaitStateBits::new()
965 .with(WaitStateBits::WAKE_ALLS, current_wake_alls)
966 .with(WaitStateBits::STATE, WaitState::Start);
967 Waiter {
968 state,
969 node: UnsafeCell::new(Node {
970 links: list::Links::new(),
971 waker: Wakeup::Empty,
972 _pin: PhantomPinned,
973 }),
974 }
975 }
976
977 #[cfg_attr(test, track_caller)]
978 fn load(&self) -> QueueState {
979 #[allow(clippy::let_and_return)]
980 let state = QueueState::from_bits(self.state.load(SeqCst));
981 test_debug!("state.load() = {state:?}");
982 state
983 }
984
985 #[cfg_attr(test, track_caller)]
986 fn store(&self, state: QueueState) {
987 test_debug!("state.store({state:?}");
988 self.state.store(state.0, SeqCst);
989 }
990
991 #[cfg_attr(test, track_caller)]
992 fn compare_exchange(
993 &self,
994 current: QueueState,
995 new: QueueState,
996 ) -> Result<QueueState, QueueState> {
997 #[allow(clippy::let_and_return)]
998 let res = self
999 .state
1000 .compare_exchange(current.0, new.0, SeqCst, SeqCst)
1001 .map(QueueState::from_bits)
1002 .map_err(QueueState::from_bits);
1003 test_debug!("state.compare_exchange({current:?}, {new:?}) = {res:?}");
1004 res
1005 }
1006
1007 #[cold]
1008 #[inline(never)]
1009 fn wake_locked(&self, queue: &mut List<Waiter>, curr: QueueState) -> Option<Waker> {
1010 let state = curr.get(QueueState::STATE);
1011
1012 // is the queue still in the `Waiting` state? it is possible that we
1013 // transitioned to a different state while locking the queue.
1014 if test_dbg!(state) != State::Waiting {
1015 // if there are no longer any queued tasks, try to store the
1016 // wakeup in the queue and bail.
1017 if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Woken)) {
1018 debug_assert!(actual.get(QueueState::STATE) != State::Waiting);
1019 self.store(actual.with_state(State::Woken));
1020 }
1021
1022 return None;
1023 }
1024
1025 // otherwise, we have to dequeue a task and wake it.
1026 let node = queue
1027 .pop_back()
1028 .expect("if we are in the Waiting state, there must be waiters in the queue");
1029 let waker = Waiter::wake(node, queue, Wakeup::One);
1030
1031 // if we took the final waiter currently in the queue, transition to the
1032 // `Empty` state.
1033 if test_dbg!(queue.is_empty()) {
1034 self.store(curr.with_state(State::Empty));
1035 }
1036
1037 waker
1038 }
1039
1040 /// Drain waiters from `queue` and add them to `batch`. Returns `true` if
1041 /// the batch was filled while more waiters remain in the queue, indicating
1042 /// that this function must be called again to wake all waiters.
1043 fn drain_to_wake_batch(
1044 batch: &mut WakeBatch,
1045 queue: &mut List<Waiter>,
1046 wakeup: Wakeup,
1047 ) -> bool {
1048 while let Some(node) = queue.pop_back() {
1049 let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else {
1050 // this waiter was enqueued by `Wait::register` and doesn't have
1051 // a waker, just keep going.
1052 continue;
1053 };
1054
1055 if batch.add_waker(waker) {
1056 // there's still room in the wake set, just keep adding to it.
1057 continue;
1058 }
1059
1060 // wake set is full, drop the lock and wake everyone!
1061 break;
1062 }
1063
1064 !queue.is_empty()
1065 }
1066}
1067
1068// === impl Waiter ===
1069
1070impl Waiter {
1071 /// Returns the [`Waker`] for the task that owns this `Waiter`.
1072 ///
1073 /// # Safety
1074 ///
1075 /// This is only safe to call while the list is locked. The `list`
1076 /// parameter ensures this method is only called while holding the lock, so
1077 /// this can be safe.
1078 ///
1079 /// Of course, that must be the *same* list that this waiter is a member of,
1080 /// and currently, there is no way to ensure that...
1081 #[inline(always)]
1082 #[cfg_attr(loom, track_caller)]
1083 fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> {
1084 Waiter::with_node(this, list, |node| {
1085 let waker = test_dbg!(mem::replace(&mut node.waker, wakeup));
1086 match waker {
1087 // the node has a registered waker, so wake the task.
1088 Wakeup::Waiting(waker) => Some(waker),
1089 // do nothing: the node was registered by `Wait::register`
1090 // without a waker, so the future will already be woken when it is
1091 // actually polled.
1092 Wakeup::Empty => None,
1093 // the node was already woken? this should not happen and
1094 // probably indicates a race!
1095 _ => unreachable!("tried to wake a waiter in the {:?} state!", waker),
1096 }
1097 })
1098 }
1099
1100 /// # Safety
1101 ///
1102 /// This is only safe to call while the list is locked. The dummy `_list`
1103 /// parameter ensures this method is only called while holding the lock, so
1104 /// this can be safe.
1105 ///
1106 /// Of course, that must be the *same* list that this waiter is a member of,
1107 /// and currently, there is no way to ensure that...
1108 #[inline(always)]
1109 #[cfg_attr(loom, track_caller)]
1110 fn with_node<T>(
1111 mut this: NonNull<Self>,
1112 _list: &mut List<Self>,
1113 f: impl FnOnce(&mut Node) -> T,
1114 ) -> T {
1115 unsafe {
1116 // safety: this is only called while holding the lock on the queue,
1117 // so it's safe to mutate the waiter.
1118 this.as_mut().node.with_mut(|node| f(&mut *node))
1119 }
1120 }
1121
1122 fn poll_wait<Lock>(
1123 mut self: Pin<&mut Self>,
1124 queue: &WaitQueue<Lock>,
1125 waker: Option<&Waker>,
1126 ) -> Poll<WaitResult<()>>
1127 where
1128 Lock: ScopedRawMutex,
1129 {
1130 test_debug!(ptr = ?fmt::ptr(self.as_mut()), "Waiter::poll_wait");
1131 let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) };
1132 let mut this = self.as_mut().project();
1133
1134 match test_dbg!(this.state.get(WaitStateBits::STATE)) {
1135 WaitState::Start => {
1136 let queue_state = queue.load();
1137
1138 // can we consume a pending wakeup?
1139 if queue
1140 .compare_exchange(
1141 queue_state.with_state(State::Woken),
1142 queue_state.with_state(State::Empty),
1143 )
1144 .is_ok()
1145 {
1146 this.state.set(WaitStateBits::STATE, WaitState::Woken);
1147 return Poll::Ready(Ok(()));
1148 }
1149
1150 // okay, no pending wakeups. try to wait...
1151
1152 test_debug!("poll_wait: locking...");
1153 queue.queue.with_lock(move |waiters| {
1154 test_debug!("poll_wait: -> locked");
1155 let mut queue_state = queue.load();
1156
1157 // the whole queue was woken while we were trying to acquire
1158 // the lock!
1159 if queue_state.get(QueueState::WAKE_ALLS)
1160 != this.state.get(WaitStateBits::WAKE_ALLS)
1161 {
1162 this.state.set(WaitStateBits::STATE, WaitState::Woken);
1163 return Poll::Ready(Ok(()));
1164 }
1165
1166 // transition the queue to the waiting state
1167 'to_waiting: loop {
1168 match test_dbg!(queue_state.get(QueueState::STATE)) {
1169 // the queue is `Empty`, transition to `Waiting`
1170 State::Empty => {
1171 match queue.compare_exchange(
1172 queue_state,
1173 queue_state.with_state(State::Waiting),
1174 ) {
1175 Ok(_) => break 'to_waiting,
1176 Err(actual) => queue_state = actual,
1177 }
1178 }
1179 // the queue is already `Waiting`
1180 State::Waiting => break 'to_waiting,
1181 // the queue was woken, consume the wakeup.
1182 State::Woken => {
1183 match queue.compare_exchange(
1184 queue_state,
1185 queue_state.with_state(State::Empty),
1186 ) {
1187 Ok(_) => {
1188 this.state.set(WaitStateBits::STATE, WaitState::Woken);
1189 return Poll::Ready(Ok(()));
1190 }
1191 Err(actual) => queue_state = actual,
1192 }
1193 }
1194 State::Closed => return crate::closed(),
1195 }
1196 }
1197
1198 // enqueue the node
1199 this.state.set(WaitStateBits::STATE, WaitState::Waiting);
1200 if let Some(waker) = waker {
1201 this.node.as_mut().with_mut(|node| {
1202 unsafe {
1203 // safety: we may mutate the node because we are
1204 // holding the lock.
1205 debug_assert!(matches!((*node).waker, Wakeup::Empty));
1206 (*node).waker = Wakeup::Waiting(waker.clone());
1207 }
1208 });
1209 }
1210
1211 waiters.push_front(ptr);
1212
1213 Poll::Pending
1214 })
1215 }
1216 WaitState::Waiting => {
1217 queue.queue.with_lock(|_waiters| {
1218 this.node.with_mut(|node| unsafe {
1219 // safety: we may mutate the node because we are
1220 // holding the lock.
1221 let node = &mut *node;
1222 match node.waker {
1223 Wakeup::Waiting(ref mut curr_waker) => {
1224 match waker {
1225 Some(waker) if !curr_waker.will_wake(waker) => {
1226 *curr_waker = waker.clone()
1227 }
1228 _ => {}
1229 }
1230 Poll::Pending
1231 }
1232 Wakeup::All | Wakeup::One => {
1233 this.state.set(WaitStateBits::STATE, WaitState::Woken);
1234 Poll::Ready(Ok(()))
1235 }
1236 Wakeup::Closed => {
1237 this.state.set(WaitStateBits::STATE, WaitState::Woken);
1238 crate::closed()
1239 }
1240 Wakeup::Empty => {
1241 if let Some(waker) = waker {
1242 node.waker = Wakeup::Waiting(waker.clone());
1243 }
1244
1245 Poll::Pending
1246 }
1247 }
1248 })
1249 })
1250 }
1251 WaitState::Woken => Poll::Ready(Ok(())),
1252 }
1253 }
1254
1255 /// Release this `Waiter` from the queue.
1256 ///
1257 /// This is called from the `drop` implementation for the [`Wait`] and
1258 /// [`WaitOwned`] futures.
1259 fn release<Lock>(mut self: Pin<&mut Self>, queue: &WaitQueue<Lock>)
1260 where
1261 Lock: ScopedRawMutex,
1262 {
1263 let state = *(self.as_mut().project().state);
1264 let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) });
1265 test_debug!(self = ?fmt::ptr(ptr), ?state, ?queue.state, "Waiter::release");
1266
1267 // if we're not enqueued, we don't have to do anything else.
1268 if state.get(WaitStateBits::STATE) != WaitState::Waiting {
1269 return;
1270 }
1271
1272 let next_waiter = queue.queue.with_lock(|waiters| {
1273 let state = queue.load();
1274 // remove the node
1275 unsafe {
1276 // safety: we have the lock on the queue, so this is safe.
1277 waiters.remove(ptr);
1278 };
1279
1280 // if we removed the last waiter from the queue, transition the state to
1281 // `Empty`.
1282 if test_dbg!(waiters.is_empty()) && state.get(QueueState::STATE) == State::Waiting {
1283 queue.store(state.with_state(State::Empty));
1284 }
1285
1286 // if the node has an unconsumed wakeup, it must be assigned to the next
1287 // node in the queue.
1288 if Waiter::with_node(ptr, waiters, |node| matches!(&node.waker, Wakeup::One)) {
1289 queue.wake_locked(waiters, state)
1290 } else {
1291 None
1292 }
1293 });
1294
1295 if let Some(next) = next_waiter {
1296 next.wake();
1297 }
1298 }
1299}
1300
1301unsafe impl Linked<list::Links<Waiter>> for Waiter {
1302 type Handle = NonNull<Waiter>;
1303
1304 fn into_ptr(r: Self::Handle) -> NonNull<Self> {
1305 r
1306 }
1307
1308 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
1309 ptr
1310 }
1311
1312 unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> {
1313 // Safety: using `ptr::addr_of!` avoids creating a temporary
1314 // reference, which stacked borrows dislikes.
1315 let node = ptr::addr_of!((*target.as_ptr()).node);
1316 (*node).with_mut(|node| {
1317 let links = ptr::addr_of_mut!((*node).links);
1318 // Safety: since the `target` pointer is `NonNull`, we can assume
1319 // that pointers to its members are also not null, making this use
1320 // of `new_unchecked` fine.
1321 NonNull::new_unchecked(links)
1322 })
1323 }
1324}
1325
1326// === impl Wait ===
1327
1328impl<Lock: ScopedRawMutex> Wait<'_, Lock> {
1329 /// Returns `true` if this `Wait` future is waiting for a notification from
1330 /// the provided [`WaitQueue`].
1331 ///
1332 /// # Examples
1333 ///
1334 /// ```
1335 /// use maitake_sync::WaitQueue;
1336 ///
1337 /// let queue1 = WaitQueue::new();
1338 /// let queue2 = WaitQueue::new();
1339 ///
1340 /// let wait = queue1.wait();
1341 /// assert!(wait.waits_on(&queue1));
1342 /// assert!(!wait.waits_on(&queue2));
1343 /// ```
1344 #[inline]
1345 #[must_use]
1346 pub fn waits_on(&self, queue: &WaitQueue<Lock>) -> bool {
1347 ptr::eq(self.queue, queue)
1348 }
1349
1350 /// Returns `true` if `self` and `other` are waiting on a notification from
1351 /// the same [`WaitQueue`].
1352 ///
1353 /// # Examples
1354 ///
1355 /// Two [`Wait`] futures waiting on the same [`WaitQueue`] return `true`:
1356 ///
1357 /// ```
1358 /// use maitake_sync::WaitQueue;
1359 ///
1360 /// let queue = WaitQueue::new();
1361 ///
1362 /// let wait1 = queue.wait();
1363 /// let wait2 = queue.wait();
1364 /// assert!(wait1.same_queue(&wait2));
1365 /// assert!(wait2.same_queue(&wait1));
1366 /// ```
1367 ///
1368 /// Two [`Wait`] futures waiting on different [`WaitQueue`]s return `false`:
1369 ///
1370 /// ```
1371 /// use maitake_sync::WaitQueue;
1372 ///
1373 /// let queue1 = WaitQueue::new();
1374 /// let queue2 = WaitQueue::new();
1375 ///
1376 /// let wait1 = queue1.wait();
1377 /// let wait2 = queue2.wait();
1378 /// assert!(!wait1.same_queue(&wait2));
1379 /// assert!(!wait2.same_queue(&wait1));
1380 /// ```
1381 #[inline]
1382 #[must_use]
1383 pub fn same_queue(&self, other: &Wait<'_, Lock>) -> bool {
1384 ptr::eq(self.queue, other.queue)
1385 }
1386
1387 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1388 ///
1389 /// Polling a `Wait` future adds that future to the list of waiters that may
1390 /// receive a wakeup from a `WaitQueue`. However, in some cases, it is
1391 /// desirable to subscribe to wakeups *prior* to actually waiting for one.
1392 /// This method should be used when it is necessary to ensure a `Wait`
1393 /// future is in the list of waiters before the future is `poll`ed for the
1394 /// first time.
1395 ///
1396 /// In general, this method is used in cases where a [`WaitQueue`] must
1397 /// synchronize with some additional state, such as an `AtomicBool` or
1398 /// counter. If a task first checks that state, and then chooses whether or
1399 /// not to wait on the `WaitQueue` based on that state, then a race
1400 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1401 /// the task checked the external state and when it first polled its `Wait`
1402 /// future to wait on the queue. This method allows registering the `Wait`
1403 /// future with the queue *prior* to checking the external state, without
1404 /// actually sleeping, so that when the task does wait for the `Wait` future
1405 /// to complete, it will have received any wakeup that was sent between when
1406 /// the external state was checked and the `Wait` future was first polled.
1407 ///
1408 /// # Returns
1409 ///
1410 /// This method returns a [`Poll`]`<`[`WaitResult`]`>` which is `Ready` a wakeup was
1411 /// already received. This method returns [`Poll::Ready`] in the following
1412 /// cases:
1413 ///
1414 /// 1. The [`WaitQueue::wake()`] method was called between the creation of the
1415 /// `Wait` and the call to this method.
1416 /// 2. This is the first call to `subscribe` or `poll` on this future, and the
1417 /// `WaitQueue` was holding a stored wakeup from a previous call to
1418 /// [`wake()`]. This method consumes the wakeup in that case.
1419 /// 3. The future has previously been `subscribe`d or polled, and it has since
1420 /// then been marked ready by either consuming a wakeup from the
1421 /// `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that
1422 /// removed it from the list of futures ready to receive wakeups.
1423 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case
1424 /// this method returns `Poll::Ready(Err(Closed))`.
1425 ///
1426 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this
1427 /// `Wait` future will also immediately return [`Poll::Ready`].
1428 ///
1429 /// If the [`Wait`] future subscribed to wakeups from the queue, and
1430 /// has not been woken, this method returns [`Poll::Pending`].
1431 ///
1432 /// [`wake()`]: WaitQueue::wake
1433 /// [`wake_all()`]: WaitQueue::wake_all
1434 pub fn subscribe(self: Pin<&mut Self>) -> Poll<WaitResult<()>> {
1435 let this = self.project();
1436 this.waiter.poll_wait(this.queue, None)
1437 }
1438}
1439
1440impl<Lock: ScopedRawMutex> Future for Wait<'_, Lock> {
1441 type Output = WaitResult<()>;
1442
1443 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1444 let this = self.project();
1445 this.waiter.poll_wait(this.queue, Some(cx.waker()))
1446 }
1447}
1448
1449#[pinned_drop]
1450impl<Lock: ScopedRawMutex> PinnedDrop for Wait<'_, Lock> {
1451 fn drop(mut self: Pin<&mut Self>) {
1452 let this = self.project();
1453 this.waiter.release(this.queue);
1454 }
1455}
1456
1457// === impl QueueState ===
1458
1459impl QueueState {
1460 const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit();
1461
1462 fn with_state(self, state: State) -> Self {
1463 self.with(Self::STATE, state)
1464 }
1465}
1466
1467impl FromBits<usize> for State {
1468 const BITS: u32 = 2;
1469 type Error = core::convert::Infallible;
1470
1471 fn try_from_bits(bits: usize) -> Result<Self, Self::Error> {
1472 Ok(match bits as u8 {
1473 bits if bits == Self::Empty as u8 => Self::Empty,
1474 bits if bits == Self::Waiting as u8 => Self::Waiting,
1475 bits if bits == Self::Woken as u8 => Self::Woken,
1476 bits if bits == Self::Closed as u8 => Self::Closed,
1477 _ => unsafe {
1478 unreachable_unchecked!("all potential 2-bit patterns should be covered!")
1479 },
1480 })
1481 }
1482
1483 fn into_bits(self) -> usize {
1484 self.into_usize()
1485 }
1486}
1487
1488impl State {
1489 const fn into_usize(self) -> usize {
1490 self as u8 as usize
1491 }
1492}
1493
1494// === impl WaitOwned ===
1495
1496feature! {
1497 #![feature = "alloc"]
1498
1499 use alloc::sync::Arc;
1500
1501 /// Future returned from [`WaitQueue::wait_owned()`].
1502 ///
1503 /// This is identical to the [`Wait`] future, except that it takes an
1504 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to
1505 /// live for the `'static` lifetime.
1506 ///
1507 /// This future is fused, so once it has completed, any future calls to poll
1508 /// will immediately return [`Poll::Ready`].
1509 ///
1510 /// # Notes
1511 ///
1512 /// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a
1513 /// `WaitOwned` future once it has been polled. For instance, the following
1514 /// code must not compile:
1515 ///
1516 ///```compile_fail
1517 /// use maitake_sync::wait_queue::WaitOwned;
1518 ///
1519 /// // Calls to this function should only compile if `T` is `Unpin`.
1520 /// fn assert_unpin<T: Unpin>() {}
1521 ///
1522 /// assert_unpin::<WaitOwned<'_>>();
1523 /// ```
1524 #[derive(Debug)]
1525 #[pin_project(PinnedDrop)]
1526 pub struct WaitOwned<Lock: ScopedRawMutex = DefaultMutex> {
1527 /// The `WaitQueue` being waited on.
1528 queue: Arc<WaitQueue<Lock>>,
1529
1530 /// Entry in the wait queue.
1531 #[pin]
1532 waiter: Waiter,
1533 }
1534
1535 impl<Lock: ScopedRawMutex> WaitQueue<Lock> {
1536 /// Wait to be woken up by this queue, returning a future that's valid
1537 /// for the `'static` lifetime.
1538 ///
1539 /// This returns a [`WaitOwned`] future that will complete when the task
1540 /// is woken by a call to [`wake()`] or [`wake_all()`], or when the
1541 /// `WaitQueue` is [closed].
1542 ///
1543 /// This is identical to the [`wait()`] method, except that it takes a
1544 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future
1545 /// to live for the `'static` lifetime. See the documentation for
1546 /// [`wait()`] for details on how to use the future returned by this
1547 /// method.
1548 ///
1549 /// # Returns
1550 ///
1551 /// The [`Future`] returned by this method completes with one of the
1552 /// following [outputs](Future::Output):
1553 ///
1554 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
1555 /// [`wake_all()`].
1556 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue`
1557 /// being [closed].
1558 ///
1559 /// # Cancellation
1560 ///
1561 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the
1562 /// order that they started to wait. If a [`WaitOwned`] future is
1563 /// dropped, the task will forfeit its position in the queue.
1564 ///
1565 /// [`wake()`]: Self::wake
1566 /// [`wake_all()`]: Self::wake_all
1567 /// [`wait()`]: Self::wait
1568 /// [closed]: Self::close
1569 /// [`Closed`]: crate::Closed
1570 pub fn wait_owned(self: &Arc<Self>) -> WaitOwned<Lock> {
1571 let waiter = self.waiter();
1572 let queue = self.clone();
1573 WaitOwned { queue, waiter }
1574 }
1575 }
1576
1577 // === impl WaitOwned ===
1578
1579 impl<Lock: ScopedRawMutex> WaitOwned<Lock> {
1580 /// Returns `true` if this `WaitOwned` future is waiting for a
1581 /// notification from the provided [`WaitQueue`].
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```
1586 /// use maitake_sync::WaitQueue;
1587 /// use std::sync::Arc;
1588 ///
1589 /// let queue1 = Arc::new(WaitQueue::new());
1590 /// let queue2 = Arc::new(WaitQueue::new());
1591 ///
1592 /// let wait = queue1.clone().wait_owned();
1593 /// assert!(wait.waits_on(&queue1));
1594 /// assert!(!wait.waits_on(&queue2));
1595 /// ```
1596 #[inline]
1597 #[must_use]
1598 pub fn waits_on(&self, queue: &WaitQueue<Lock>) -> bool {
1599 ptr::eq(&*self.queue, queue)
1600 }
1601
1602 /// Returns `true` if `self` and `other` are waiting on a notification
1603 /// from the same [`WaitQueue`].
1604 ///
1605 /// # Examples
1606 ///
1607 /// Two [`WaitOwned`] futures waiting on the same [`WaitQueue`] return
1608 /// `true`:
1609 ///
1610 /// ```
1611 /// use maitake_sync::WaitQueue;
1612 /// use std::sync::Arc;
1613 ///
1614 /// let queue = Arc::new(WaitQueue::new());
1615 ///
1616 /// let wait1 = queue.clone().wait_owned();
1617 /// let wait2 = queue.clone().wait_owned();
1618 /// assert!(wait1.same_queue(&wait2));
1619 /// assert!(wait2.same_queue(&wait1));
1620 /// ```
1621 ///
1622 /// Two [`WaitOwned`] futures waiting on different [`WaitQueue`]s return
1623 /// `false`:
1624 ///
1625 /// ```
1626 /// use maitake_sync::WaitQueue;
1627 /// use std::sync::Arc;
1628 ///
1629 /// let queue1 = Arc::new(WaitQueue::new());
1630 /// let queue2 = Arc::new(WaitQueue::new());
1631 ///
1632 /// let wait1 = queue1.wait_owned();
1633 /// let wait2 = queue2.wait_owned();
1634 /// assert!(!wait1.same_queue(&wait2));
1635 /// assert!(!wait2.same_queue(&wait1));
1636 /// ```
1637 #[inline]
1638 #[must_use]
1639 pub fn same_queue(&self, other: &WaitOwned<Lock>) -> bool {
1640 Arc::ptr_eq(&self.queue, &other.queue)
1641 }
1642
1643 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1644 ///
1645 /// Polling a `WaitOwned` future adds that future to the list of waiters
1646 /// that may receive a wakeup from a `WaitQueue`. However, in some
1647 /// cases, it is desirable to subscribe to wakeups *prior* to actually
1648 /// waiting for one. This method should be used when it is necessary to
1649 /// ensure a `WaitOwned` future is in the list of waiters before the
1650 /// future is `poll`ed for the rst time.
1651 ///
1652 /// In general, this method is used in cases where a [`WaitQueue`] must
1653 /// synchronize with some additional state, such as an `AtomicBool` or
1654 /// counter. If a task first checks that state, and then chooses whether or
1655 /// not to wait on the `WaitQueue` based on that state, then a race
1656 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1657 /// the task checked the external state and when it first polled its
1658 /// `WaitOwned` future to wait on the queue. This method allows
1659 /// registering the `WaitOwned` future with the queue *prior* to
1660 /// checking the external state, without actually sleeping, so that when
1661 /// the task does wait for the `WaitOwned` future to complete, it will
1662 /// have received any wakeup that was sent between when the external
1663 /// state was checked and the `WaitOwned` future was first polled.
1664 ///
1665 /// # Returns
1666 ///
1667 /// This method returns a [`Poll`]`<`[`WaitResult`]`>` which is `Ready`
1668 /// a wakeup was already received. This method returns [`Poll::Ready`]
1669 /// in the following cases:
1670 ///
1671 /// 1. The [`WaitQueue::wake()`] method was called between the creation
1672 /// of the `WaitOwned` future and the call to this method.
1673 /// 2. This is the first call to `subscribe` or `poll` on this future,
1674 /// and the `WaitQueue` was holding a stored wakeup from a previous
1675 /// call to [`wake()`]. This method consumes the wakeup in that case.
1676 /// 3. The future has previously been `subscribe`d or polled, and it
1677 /// has since then been marked ready by either consuming a wakeup
1678 /// from the `WaitQueue`, or by a call to [`wake()`] or
1679 /// [`wake_all()`] that removed it from the list of futures ready to
1680 /// receive wakeups.
1681 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which
1682 /// case this method returns `Poll::Ready(Err(Closed))`.
1683 ///
1684 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of
1685 /// this `Wait` future will also immediately return [`Poll::Ready`].
1686 ///
1687 /// If the [`WaitOwned`] future subscribed to wakeups from the queue,
1688 /// and has not been woken, this method returns [`Poll::Pending`].
1689 ///
1690 /// [`wake()`]: WaitQueue::wake
1691 /// [`wake_all()`]: WaitQueue::wake_all
1692 pub fn subscribe(self: Pin<&mut Self>) -> Poll<WaitResult<()>> {
1693 let this = self.project();
1694 this.waiter.poll_wait(this.queue, None)
1695 }
1696 }
1697
1698 impl<Lock: ScopedRawMutex> Future for WaitOwned<Lock> {
1699 type Output = WaitResult<()>;
1700
1701 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1702 let this = self.project();
1703 this.waiter.poll_wait(&*this.queue, Some(cx.waker()))
1704 }
1705 }
1706
1707 #[pinned_drop]
1708 impl<Lock: ScopedRawMutex> PinnedDrop for WaitOwned<Lock> {
1709 fn drop(mut self: Pin<&mut Self>) {
1710 let this = self.project();
1711 this.waiter.release(&*this.queue);
1712 }
1713 }
1714}