maitake/task.rs
1//! The `maitake` task system.
2//!
3//! This module contains the code that spawns tasks on a [scheduler], and
4//! manages the lifecycle of tasks once they are spawned. This includes the
5//! in-memory representation of spawned tasks (the [`Task`] type), and the
6//! handle used by the scheduler and other components of the runtime to
7//! reference a task once it is spawned (the [`TaskRef`] type).
8//!
9//! [scheduler]: crate::scheduler
10#[cfg(feature = "alloc")]
11pub use self::storage::BoxStorage;
12pub use self::{
13 builder::Builder,
14 id::TaskId,
15 join_handle::{JoinError, JoinHandle},
16 storage::Storage,
17};
18pub use core::task::{Context, Poll, Waker};
19
20mod builder;
21mod id;
22pub(crate) mod join_handle;
23mod state;
24mod storage;
25
26#[cfg(test)]
27mod tests;
28
29use crate::{
30 loom::{cell::UnsafeCell, sync::atomic::Ordering},
31 scheduler::Schedule,
32 trace,
33 util::non_null,
34};
35
36#[cfg(debug_assertions)]
37use core::any::TypeId;
38use core::{
39 any::type_name,
40 future::Future,
41 marker::PhantomData,
42 mem,
43 pin::Pin,
44 ptr::{self, NonNull},
45 task::{RawWaker, RawWakerVTable},
46};
47
48use self::{
49 builder::Settings,
50 state::{JoinAction, OrDrop, ScheduleAction, StartPollAction, StateCell},
51};
52use cordyceps::{mpsc_queue, Linked};
53use mycelium_util::{fmt, mem::CheckedMaybeUninit};
54
55/// A type-erased, reference-counted pointer to a spawned [`Task`].
56///
57/// Once a task has been spawned, it is generally referenced by a `TaskRef`.
58/// When a spawned task is placed in a scheduler's run queue, dequeuing the next
59/// task will yield a `TaskRef`, and a `TaskRef` may be converted into a
60/// [`Waker`] or used to await a spawned task's completion.
61///
62/// `TaskRef`s are reference-counted, and the task will be deallocated when the
63/// last `TaskRef` pointing to it is dropped.
64#[derive(Eq, PartialEq)]
65pub struct TaskRef(NonNull<Header>);
66
67/// A task.
68///
69/// This type contains the various components of a task: the [future][`Future`]
70/// itself, the task's header, and a reference to the task's [scheduler]. When a
71/// task is spawned, the `Task` type is placed on the heap (or wherever spawned
72/// tasks are stored), and a type-erased [`TaskRef`] that points to that `Task`
73/// is returned. Once a task is spawned, it is primarily interacted with via
74/// [`TaskRef`]s.
75///
76/// ## Vtables and Type Erasure
77///
78/// The `Task` struct, once spawned, is rarely interacted with directly. Because
79/// a system may spawn any number of different [`Future`] types as tasks, and
80/// may potentially also contain multiple types of [scheduler] and/or [task
81/// storage], the scheduler and other parts of the system generally interact
82/// with tasks via type-erased [`TaskRef`]s.
83///
84/// However, in order to actually poll a task's [`Future`], or perform other
85/// operations such as deallocating a task, it is necessary to know the type of
86/// the the task's [`Future`] (and potentially, that of the scheduler and/or
87/// storage). Therefore, operations that are specific to the task's `S`-typed
88/// [scheduler], `F`-typed [`Future`], and `STO`-typed [`Storage`] are performed
89/// via [dynamic dispatch].
90///
91/// [scheduler]: crate::scheduler::Schedule
92/// [task storage]: Storage
93/// [dynamic dispatch]: https://en.wikipedia.org/wiki/Dynamic_dispatch
94#[repr(C)]
95pub struct Task<S, F: Future, STO> {
96 /// The task's [`Header`] and [scheduler].
97 ///
98 /// # Safety
99 ///
100 /// This must be the first field of the `Task` struct!
101 ///
102 /// [scheduler]: crate::scheduler::Schedule
103 schedulable: Schedulable<S>,
104
105 /// The task itself.
106 ///
107 /// This is either the task's [`Future`], when it is running,
108 /// or the future's [`Output`], when the future has completed.
109 ///
110 /// [`Future`]: core::future::Future
111 /// [`Output`]: core::future::Future::Output
112 inner: UnsafeCell<Cell<F>>,
113
114 /// The [`Waker`] of the [`JoinHandle`] for this task, if one exists.
115 ///
116 /// # Safety
117 ///
118 /// This field is only initialized when the [`State::JOIN_WAKER`] state
119 /// field is set to `JoinWakerState::Waiting`. If the join waker state is
120 /// any other value, this field may be uninitialized.
121 ///
122 /// [`State::JOIN_WAKER`]: state::State::JOIN_WAKER
123 join_waker: UnsafeCell<CheckedMaybeUninit<Waker>>,
124
125 /// The [`Storage`] type associated with this struct
126 ///
127 /// In order to be agnostic over container types (e.g. [`Box`], or
128 /// other user provided types), the Task is generic over a
129 /// [`Storage`] type.
130 ///
131 /// [`Box`]: alloc::boxed::Box
132 /// [`Storage`]: crate::task::Storage
133 storage: PhantomData<STO>,
134}
135
136/// The task's header.
137///
138/// This contains the *untyped* components of the task which are identical
139/// regardless of the task's future, output, and scheduler types: the
140/// [vtable], [state cell], and [run queue links].
141///
142/// See the [`Vtable` documentation](Vtable#task-vtables) for more details on a
143/// task's vtables.
144///
145/// The header is the data at which a [`TaskRef`] points, and will likely be
146/// prefetched when dereferencing a [`TaskRef`] pointer.[^1] Therefore, the
147/// header should contain the task's most frequently accessed data, and should
148/// ideally fit within a CPU cache line.
149///
150/// # Safety
151///
152/// The [run queue links] *must* be the first field in this type, in order for
153/// the [`Linked::links` implementation] for this type to be sound. Therefore,
154/// the `#[repr(C)]` attribute on this struct is load-bearing.
155///
156/// [vtable]: Vtable
157/// [state cell]: StateCell
158/// [run queue links]: cordyceps::mpsc_queue::Links
159/// [`Linked::links` implementation]: #method.links
160///
161/// [^1]: On CPU architectures which support spatial prefetch, at least...
162#[repr(C)]
163#[derive(Debug)]
164pub(crate) struct Header {
165 /// The task's links in the intrusive run queue.
166 ///
167 /// # Safety
168 ///
169 /// This MUST be the first field in this struct.
170 run_queue: mpsc_queue::Links<Header>,
171
172 /// The task's state, which can be atomically updated.
173 state: StateCell,
174
175 /// The task vtable for this task.
176 ///
177 /// Note that this is different from the [waker vtable], which contains
178 /// pointers to the waker methods (and depends primarily on the task's
179 /// scheduler type). The task vtable instead contains methods for
180 /// interacting with the task's future, such as polling it and reading the
181 /// task's output. These depend primarily on the type of the future rather
182 /// than the scheduler.
183 ///
184 /// See the [`Vtable` documentation](Vtable#task-vtables) for
185 /// more details on a task's vtables.
186 ///
187 /// [waker vtable]: core::task::RawWakerVTable
188 vtable: &'static Vtable,
189
190 /// The task's ID.
191 id: TaskId,
192
193 /// The task's `tracing` span, if `tracing` is enabled.
194 span: trace::Span,
195
196 #[cfg(debug_assertions)]
197 scheduler_type: Option<TypeId>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub(crate) enum PollResult {
202 /// The task has completed, without waking a [`JoinHandle`] waker.
203 ///
204 /// The scheduler can increment a counter of completed tasks, and then drop
205 /// the [`TaskRef`].
206 Ready,
207
208 /// The task has completed and a [`JoinHandle`] waker has been woken.
209 ///
210 /// The scheduler can increment a counter of completed tasks, and then drop
211 /// the [`TaskRef`].
212 ReadyJoined,
213
214 /// The task is pending, but not woken.
215 ///
216 /// The scheduler can drop the [`TaskRef`], as whoever intends to wake the
217 /// task later is holding a clone of its [`Waker`].
218 Pending,
219
220 /// The task has woken itself during the poll.
221 ///
222 /// The scheduler should re-schedule the task, rather than dropping the [`TaskRef`].
223 PendingSchedule,
224}
225
226/// The task's [`Header`] and [scheduler] reference.
227///
228/// This is factored out into a separate type from `Task` itself so that we can
229/// have a target for casting a pointer to that is generic only over the
230/// `S`-typed [scheduler], and not the task's `Future` and `Storage` types. This
231/// reduces excessive monomorphization of waker vtable functions.
232///
233/// This type knows the task's [`RawWaker`] vtable, as the raw waker methods
234/// need only be generic over the type of the scheduler. It does not know the
235/// task's *task* vtable, as the task vtable actually polls the future and
236/// deallocates the task, and must therefore know the types of the task's future
237/// and storage.
238///
239/// [scheduler]: crate::scheduler::Schedule
240#[repr(C)]
241struct Schedulable<S> {
242 /// The task's header.
243 ///
244 /// This contains the *untyped* components of the task which are identical
245 /// regardless of the task's future, output, and scheduler types: the
246 /// [vtable], [state cell], and [run queue links].
247 ///
248 /// # Safety
249 ///
250 /// This *must* be the first field in this type, to allow casting a
251 /// `NonNull<Task>` to a `NonNull<Header>`.
252 ///
253 /// [vtable]: Vtable
254 /// [state cell]: StateCell
255 /// [run queue links]: cordyceps::mpsc_queue::Links
256 header: Header,
257
258 /// A reference to the [scheduler] this task is spawned on, or `None` if
259 /// this task has not yet been bound to a scheduler.
260 ///
261 /// This is used to schedule the task when it is woken.
262 ///
263 /// [scheduler]: crate::scheduler::Schedule
264 scheduler: UnsafeCell<Option<S>>,
265}
266
267/// The core of a task: either the [`Future`] that was spawned, if the task
268/// has not yet completed, or the [`Output`] of the future, once the future has
269/// completed.
270///
271/// [`Output`]: Future::Output
272#[repr(C)]
273enum Cell<F: Future> {
274 /// The future is still pending.
275 Pending(F),
276 /// The future has completed, and its output is ready to be taken by a
277 /// `JoinHandle`, if one exists.
278 Ready(F::Output),
279 /// The future has completed, and the task's output has been taken or is not
280 /// needed.
281 Joined,
282}
283
284/// A [virtual function pointer table][vtable] (vtable) that specifies the
285/// behavior of a [`Task`] instance.
286///
287/// This is distinct from the [`RawWakerVTable`] type in [`core::task`]: that
288/// type specifies the vtable for a task's [`Waker`], while this vtable
289/// specifies functions called by the runtime to poll, join, and deallocate a
290/// spawned task.
291///
292/// The first argument passed to all functions inside this vtable is a pointer
293/// to the task.
294///
295/// The functions inside this struct are only intended to be called on a pointer
296/// to a spawned [`Task`]. Calling one of the contained functions using
297/// any other pointer will cause undefined behavior.
298///
299/// ## Task Vtables
300///
301/// Each spawned task has two virtual function tables, which perform dynamic
302/// dispatch on the type-erased type parameters of the task (the `S`-typed
303/// [scheduler], the `F`-typed [`Future`], and the `STO`-typed [`Storage`]).
304///
305/// The first vtable is the [`RawWakerVTable`], which is specified by the Rust
306/// standard library's [`core::task`] module. This vtable contains function
307/// pointers to the implementations of the task's [`Waker`] operations. The
308/// second vtable is the **task** vtable, which contains function pointers to
309/// functions that are specific to the task's [`Future`] type, such as polling
310/// the future and deallocating the task.
311///
312/// The [`RawWakerVTable`] is monomorphic only over the `S`-typed [`Schedule`]
313/// implementation, so all tasks spawned on the same type of [scheduler] share
314/// one instance of the [`RawWakerVTable`]. On the other hand, the task vtable
315/// is monomorphic over the task's `F`-typed [`Future`] and `S`-typed
316/// [`Storage`], so a separate monomorphization of the task vtable methods is
317/// generated for each spawned [`Future`] type.
318///
319/// The task vtable is generated by the [`Task`] struct, as it requires type
320/// information about the task's [`Future`] and [`Storage`], while the
321/// [`RawWakerVTable`] is generated by the [`Schedulable`] struct, as it only
322/// requires type information about the [`Schedule`] type. This reduces
323/// unnecessary monomorphization of the waker vtable methods for each future
324/// type that's spawned.
325///
326/// The methods contained in each vtable are as follows:
327///
328/// #### [`RawWakerVTable`]
329///
330/// * **`unsafe fn `[`clone`]`(*const ()) -> `[`RawWaker`]**
331///
332/// Called when a task's [`Waker`] is cloned.
333///
334/// Increments the task's reference count.
335///
336/// * **`unsafe fn `[`wake`]`(*const ())`**
337///
338/// Called when a task is woken by value.
339///
340/// Decrements the task's reference count.
341///
342/// * **`unsafe fn `[`wake_by_ref`]`(*const ())`**
343///
344/// Called when a task's [`Waker`] is woken through a reference.
345///
346/// This wakes the task but does not change the task's reference count.
347///
348/// * **`unsafe fn `[`drop`]`(*const ())`**
349///
350/// Called when a task's [`Waker`] is dropped.
351///
352/// Decrements the task's reference count.
353///
354/// #### Task `Vtable`
355///
356/// * **`unsafe fn `[`poll`]`(`[`NonNull`]`<`[`Header`]`>) -> `[`PollResult`]**
357///
358/// Polls the task's [`Future`].
359///
360/// This does *not* consume a [`TaskRef`], as the scheduler may wish to do
361/// additional operations on the task even if it should be dropped. Instead,
362/// this function returns a [`PollResult`] that indicates what the scheduler
363/// should do with the task after the poll.
364///
365/// * **`unsafe fn `[`poll_join`]`(`[`NonNull`]`<`[`Header`]`>, `[`NonNull`]`<()>,
366/// &mut `[`Context`]`<'_>) -> `[`Poll`]`<Result<(), `[`JoinError`]`>>`**
367///
368/// Called when a task's [`JoinHandle`] is polled.
369///
370/// This takes a `NonNull<Header>` rather than a [`TaskRef`], as it does not
371/// consume a ref count. The second [`NonNull`] is an out-pointer to which the
372/// task's output will be written if the task has completed. The caller is
373/// responsible for
374/// ensuring that this points to a valid, if uninitialized, memory location
375/// for a `F::Output`.
376///
377/// This method returns [`Poll::Ready`]`(Ok(()))` when the task has joined,
378/// [`Poll::Ready`]`(Err(`[`JoinError`]`))` if the task has been cancelled, or
379/// [`Poll::Pending`] when the task is still running.
380///
381/// * **`unsafe fn `[`deallocate`]`(`[`NonNull`]`<`[`Header`]`>)`**
382///
383/// Called when a task's final [`TaskRef`] is dropped and the task is ready to
384/// be deallocated.
385///
386/// This does not take a [`TaskRef`], as dropping a [`TaskRef`] decrements the
387/// reference count, and the final `TaskRef` has already been dropped.
388///
389/// [scheduler]: crate::scheduler::Schedule
390/// [task storage]: Storage
391/// [dynamic dispatch]: https://en.wikipedia.org/wiki/Dynamic_dispatch
392/// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table
393/// [`clone`]: core::task::RawWakerVTable#clone
394/// [`wake`]: core::task::RawWakerVTable#wake
395/// [`wake_by_ref`]: core::task::RawWakerVTable#wake_by_ref
396/// [`drop`]: core::task::RawWakerVTable#drop
397/// [`poll`]: Task::poll
398/// [`poll_join`]: Task::poll_join
399/// [`deallocate`]: Task::deallocate
400struct Vtable {
401 /// Poll the future, returning a [`PollResult`] that indicates what the
402 /// scheduler should do with the polled task.
403 poll: unsafe fn(NonNull<Header>) -> PollResult,
404
405 /// Poll the task's `JoinHandle` for completion, storing the output at the
406 /// provided [`NonNull`] pointer if the task has completed.
407 ///
408 /// If the task has not completed, the [`Waker`] from the provided
409 /// [`Context`] is registered to be woken when the task completes.
410 // Splitting this up into type aliases just makes it *harder* to understand
411 // IMO...
412 #[allow(clippy::type_complexity)]
413 poll_join: unsafe fn(
414 NonNull<Header>,
415 NonNull<()>,
416 &mut Context<'_>,
417 ) -> Poll<Result<(), JoinError<()>>>,
418
419 /// Drops the task and deallocates its memory.
420 deallocate: unsafe fn(NonNull<Header>),
421
422 /// The `wake_by_ref` function from the task's [`RawWakerVTable`].
423 ///
424 /// This is duplicated here as it's used to wake canceled tasks when a task
425 /// is canceled by a [`TaskRef`] or [`JoinHandle`].
426 wake_by_ref: unsafe fn(*const ()),
427}
428
429// === impl Task ===
430
431macro_rules! trace_waker_op {
432 ($ptr:expr, $method: ident) => {
433 trace_waker_op!($ptr, $method, op: $method)
434 };
435 ($ptr:expr, $method: ident, op: $op:ident) => {
436
437 #[cfg(any(feature = "tracing-01", loom))]
438 tracing_01::trace!(
439 target: "runtime::waker",
440 {
441 task.id = (*$ptr).span().tracing_01_id(),
442 task.addr = ?$ptr,
443 task.tid = (*$ptr).header.id.as_u64(),
444 op = concat!("waker.", stringify!($op)),
445 },
446 concat!("Task::", stringify!($method)),
447 );
448
449
450 #[cfg(not(any(feature = "tracing-01", loom)))]
451 trace!(
452 target: "runtime::waker",
453 {
454 task.addr = ?$ptr,
455 task.tid = (*$ptr).header.id.as_u64(),
456 op = concat!("waker.", stringify!($op)),
457 },
458 concat!("Task::", stringify!($method)),
459
460 );
461 };
462}
463
464impl<S, F, STO> Task<S, F, STO>
465where
466 F: Future,
467{
468 #[inline]
469 fn header(&self) -> &Header {
470 &self.schedulable.header
471 }
472
473 #[inline]
474 fn state(&self) -> &StateCell {
475 &self.header().state
476 }
477
478 #[inline]
479 #[cfg(any(feature = "tracing-01", feature = "tracing-02", test))]
480 fn span(&self) -> &trace::Span {
481 &self.header().span
482 }
483}
484
485impl<STO> Task<Stub, Stub, STO>
486where
487 STO: Storage<Stub, Stub>,
488{
489 /// The stub task's vtable is mostly nops, as it should never be polled,
490 /// joined, or woken.
491 const HEAP_STUB_VTABLE: Vtable = Vtable {
492 poll: _maitake_header_nop,
493 poll_join: _maitake_header_nop_poll_join,
494 // Heap allocated stub tasks *will* need to be deallocated, since the
495 // scheduler will deallocate its stub task if it's dropped.
496 deallocate: Self::deallocate,
497 wake_by_ref: _maitake_header_nop_wake_by_ref,
498 };
499
500 loom_const_fn! {
501 /// Create a new stub task.
502 pub(crate) fn new_stub() -> Self {
503 Task {
504 schedulable: Schedulable {
505 header: Header {
506 run_queue: mpsc_queue::Links::new(),
507 vtable: &Self::HEAP_STUB_VTABLE,
508 state: StateCell::new(),
509 id: TaskId::stub(),
510 span: crate::trace::Span::none(),
511 #[cfg(debug_assertions)]
512 scheduler_type: None,
513 },
514 scheduler: UnsafeCell::new(Some(Stub)),
515 },
516 inner: UnsafeCell::new(Cell::Pending(Stub)),
517 join_waker: UnsafeCell::new(CheckedMaybeUninit::uninit()),
518 storage: PhantomData,
519 }
520 }
521 }
522}
523
524impl<S, F, STO> Task<S, F, STO>
525where
526 S: Schedule + 'static,
527 F: Future,
528 STO: Storage<S, F>,
529{
530 const TASK_VTABLE: Vtable = Vtable {
531 poll: Self::poll,
532 poll_join: Self::poll_join,
533 deallocate: Self::deallocate,
534 wake_by_ref: Schedulable::<S>::wake_by_ref,
535 };
536
537 /// Create a new (non-heap-allocated) Task.
538 ///
539 /// This needs to be heap allocated using an implementor of
540 /// the [`Storage`] trait to be used with the scheduler.
541 ///
542 /// [`Storage`]: crate::task::Storage
543 pub fn new(future: F) -> Self {
544 Self {
545 schedulable: Schedulable {
546 header: Header {
547 run_queue: mpsc_queue::Links::new(),
548 vtable: &Self::TASK_VTABLE,
549 state: StateCell::new(),
550 id: TaskId::next(),
551 span: crate::trace::Span::none(),
552 #[cfg(debug_assertions)]
553 scheduler_type: Some(TypeId::of::<S>()),
554 },
555 scheduler: UnsafeCell::new(None),
556 },
557 inner: UnsafeCell::new(Cell::Pending(future)),
558 join_waker: UnsafeCell::new(CheckedMaybeUninit::uninit()),
559 storage: PhantomData,
560 }
561 }
562
563 /// Returns a [`TaskId`] that uniquely identifies this task.
564 ///
565 /// The returned ID does *not* increment the task's reference count, and may
566 /// persist even after the task it identifies has completed and been
567 /// deallocated.
568 #[inline]
569 #[must_use]
570 pub fn id(&self) -> TaskId {
571 self.header().id
572 }
573
574 pub(crate) fn bind(&mut self, scheduler: S) {
575 self.schedulable.scheduler.with_mut(|current| unsafe {
576 *current = Some(scheduler);
577 });
578 }
579
580 unsafe fn poll(ptr: NonNull<Header>) -> PollResult {
581 trace!(
582 task.addr = ?ptr,
583 task.output = %type_name::<<F>::Output>(),
584 task.tid = ptr.as_ref().id.as_u64(),
585 "Task::poll"
586 );
587 let mut this = ptr.cast::<Self>();
588 test_debug!(task = ?fmt::alt(this.as_ref()));
589 // try to transition the task to the polling state
590 let state = &this.as_ref().state();
591 match test_dbg!(state.start_poll()) {
592 // transitioned successfully!
593 StartPollAction::Poll => {}
594 // cancel culture has gone too far!
595 StartPollAction::Canceled { wake_join_waker } => {
596 trace!(task.addr = ?ptr, wake_join_waker, "task canceled!");
597 if wake_join_waker {
598 this.as_ref().wake_join_waker();
599 return PollResult::ReadyJoined;
600 } else {
601 return PollResult::Ready;
602 }
603 }
604 // can't poll this task for some reason...
605 StartPollAction::CantPoll => return PollResult::Ready,
606 };
607
608 // wrap the waker in `ManuallyDrop` because we're converting it from an
609 // existing task ref, rather than incrementing the task ref count. if
610 // this waker is consumed during the poll, we don't want to decrement
611 // its ref count when the poll ends.
612 let waker = {
613 let raw = Schedulable::<S>::raw_waker(this.as_ptr().cast());
614 mem::ManuallyDrop::new(Waker::from_raw(raw))
615 };
616
617 // actually poll the task
618 let poll = {
619 let cx = Context::from_waker(&waker);
620 let pin = Pin::new_unchecked(this.as_mut());
621 pin.poll_inner(cx)
622 };
623
624 // post-poll state transition
625 let result = test_dbg!(state.end_poll(poll.is_ready()));
626
627 // if the task is ready and has a `JoinHandle` to wake, wake the join
628 // waker now.
629 if result == PollResult::ReadyJoined {
630 this.as_ref().wake_join_waker()
631 }
632
633 result
634 }
635
636 /// Deallocates the task pointed to by `ptr`.
637 ///
638 /// This is a type-erased function called through the task's [`Vtable`].
639 ///
640 /// # Safety
641 ///
642 /// - `ptr` must point to the [`Header`] of a task of type `Self` (i.e. the
643 /// pointed header must have the same `S`, `F`, and `STO` type parameters
644 /// as `Self`)
645 /// - the pointed task must have zero active references.
646 unsafe fn deallocate(ptr: NonNull<Header>) {
647 trace!(
648 task.addr = ?ptr,
649 task.output = %type_name::<<F>::Output>(),
650 task.tid = ptr.as_ref().id.as_u64(),
651 "Task::deallocate"
652 );
653 let this = ptr.cast::<Self>();
654 debug_assert_eq!(
655 ptr.as_ref().state.load(Ordering::Acquire).ref_count(),
656 0,
657 "a task may not be deallocated if its ref count is greater than zero!"
658 );
659 drop(STO::from_raw(this));
660 }
661
662 /// Poll to join the task pointed to by `ptr`, taking its output if it has
663 /// completed.
664 ///
665 /// If the task has completed, this method returns [`Poll::Ready`], and the
666 /// task's output is stored at the memory location pointed to by `outptr`.
667 /// This function is called by [`JoinHandle`]s o poll the task they
668 /// correspond to.
669 ///
670 /// This is a type-erased function called through the task's [`Vtable`].
671 ///
672 /// # Safety
673 ///
674 /// - `ptr` must point to the [`Header`] of a task of type `Self` (i.e. the
675 /// pointed header must have the same `S`, `F`, and `STO` type parameters
676 /// as `Self`).
677 /// - `outptr` must point to a valid `MaybeUninit<F::Output>`.
678 unsafe fn poll_join(
679 ptr: NonNull<Header>,
680 outptr: NonNull<()>,
681 cx: &mut Context<'_>,
682 ) -> Poll<Result<(), JoinError<()>>> {
683 let task = ptr.cast::<Self>().as_ref();
684 trace!(
685 task.addr = ?ptr,
686 task.output = %type_name::<<F>::Output>(),
687 task.tid = task.id().as_u64(),
688 "Task::poll_join"
689 );
690 match test_dbg!(task.state().try_join()) {
691 JoinAction::Canceled { completed } => {
692 // if the task has completed before it was canceled, also try to
693 // read the output, so that it can be returned in the `JoinError`.
694 if completed {
695 unsafe {
696 // safety: if the state transition returned `Canceled`
697 // with `completed` set, this indicates that we have
698 // exclusive permission to take the output.
699 task.take_output(outptr);
700 }
701 }
702 return JoinError::canceled(completed, task.id());
703 }
704 JoinAction::TakeOutput => unsafe {
705 // safety: if the state transition returns
706 // `JoinAction::TakeOutput`, this indicates that we have
707 // exclusive permission to read the task output.
708 task.take_output(outptr);
709 return Poll::Ready(Ok(()));
710 },
711 JoinAction::Register => {
712 task.join_waker.with_mut(|waker| unsafe {
713 // safety: we now have exclusive permission to write to the
714 // join waker.
715 (*waker).write(cx.waker().clone());
716 })
717 }
718 JoinAction::Reregister => {
719 task.join_waker.with_mut(|waker| unsafe {
720 // safety: we now have exclusive permission to write to the
721 // join waker.
722 let waker = (*waker).assume_init_mut();
723 let my_waker = cx.waker();
724 if !waker.will_wake(my_waker) {
725 *waker = my_waker.clone();
726 }
727 });
728 }
729 }
730 task.state().set_join_waker_registered();
731 Poll::Pending
732 }
733
734 fn poll_inner(&self, mut cx: Context<'_>) -> Poll<()> {
735 #[cfg(any(feature = "tracing-01", feature = "tracing-02", test))]
736 let _span = self.span().enter();
737
738 self.inner.with_mut(|cell| unsafe { (*cell).poll(&mut cx) })
739 }
740
741 /// Wakes the task's [`JoinHandle`], if it has one.
742 ///
743 /// # Safety
744 ///
745 /// - The caller must have exclusive access to the task's `JoinWaker`. This
746 /// is ensured by the task's state management.
747 unsafe fn wake_join_waker(&self) {
748 self.join_waker.with_mut(|join_waker| unsafe {
749 let join_waker = (*join_waker).assume_init_read();
750 test_debug!(?join_waker, "waking");
751 join_waker.wake();
752 })
753 }
754
755 /// Takes the task's output, storing it at the memory location pointed to by
756 /// `outptr`.
757 ///
758 /// This function panics if the task has not completed (i.e., its `Cell`
759 /// must be in the [`Cell::Ready`] state).
760 ///
761 /// # Safety
762 ///
763 /// - `outptr` *must* point to a `MaybeUninit<F::Output>`!
764 /// - The the caller must have exclusive access to `self.inner`.
765 unsafe fn take_output(&self, outptr: NonNull<()>) {
766 self.inner.with_mut(|cell| {
767 match mem::replace(&mut *cell, Cell::Joined) {
768 Cell::Ready(output) => {
769 // safety: the caller is responsible for ensuring that this
770 // points to a `MaybeUninit<F::Output>`.
771 let outptr = outptr.cast::<mem::MaybeUninit<F::Output>>().as_mut();
772 // that's right, it goes in the `NonNull<()>` hole!
773 outptr.write(output)
774 },
775 state => unreachable!("attempted to take join output on a task that has not completed! task: {self:?}; state: {state:?}"),
776 }
777 });
778 }
779}
780
781unsafe impl<S, F, STO> Send for Task<S, F, STO>
782where
783 S: Send,
784 F: Future + Send,
785{
786}
787unsafe impl<S, F, STO> Sync for Task<S, F, STO>
788where
789 S: Sync,
790 F: Future + Sync,
791{
792}
793
794impl<S, F, STO> fmt::Debug for Task<S, F, STO>
795where
796 F: Future,
797{
798 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
799 let Self {
800 schedulable:
801 Schedulable {
802 header,
803 scheduler: _,
804 },
805 inner: _,
806 join_waker: _,
807 storage: _,
808 } = self;
809 f.debug_struct("Task")
810 .field("header", header)
811 .field("inner", &format_args!("UnsafeCell(<{}>)", type_name::<F>()))
812 .field("join_waker", &format_args!("UnsafeCell(<Waker>)"))
813 .field("scheduler", &fmt::display(type_name::<S>()))
814 .field("storage", &fmt::display(type_name::<STO>()))
815 .finish()
816 }
817}
818
819impl<S, F, STO> Drop for Task<S, F, STO>
820where
821 F: Future,
822{
823 fn drop(&mut self) {
824 test_debug!(task.tid = self.header().id.as_u64(), "Task::drop");
825 // if there's a join waker, ensure that its destructor runs when the
826 // task is dropped.
827 // NOTE: this *should* never happen; we don't ever expect to deallocate
828 // a task while it still has a `JoinHandle`, since the `JoinHandle`
829 // holds a task ref. However, let's make sure we don't leak another task
830 // in case something weird happens, I guess...
831 if self.header().state.join_waker_needs_drop() {
832 self.join_waker.with_mut(|waker| unsafe {
833 // safety: we now have exclusive permission to write to the
834 // join waker.
835 (*waker).assume_init_drop();
836 });
837 }
838 }
839}
840
841// === impl Schedulable ===
842
843impl<S: Schedule> Schedulable<S> {
844 /// The task's [`Waker`] vtable.
845 ///
846 /// This belongs to the `Schedulable` type rather than the [`Task`] type,
847 /// because the [`Waker`] vtable methods need only be monomorphized over the
848 /// `S`-typed [scheduler], and not over the task's `F`-typed [`Future`] or
849 /// the `STO`-typed [`Storage`].
850 ///
851 /// [scheduler]: crate::scheduler::Schedule
852 const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
853 Self::clone_waker,
854 Self::wake_by_val,
855 Self::wake_by_ref,
856 Self::drop_waker,
857 );
858
859 #[inline(always)]
860 unsafe fn schedule(this: TaskRef) {
861 this.0.cast::<Self>().as_ref().scheduler.with(|current| {
862 (*current)
863 .as_ref()
864 .expect("cannot schedule a task that has not been bound to a scheduler!")
865 .schedule(this)
866 })
867 }
868
869 #[inline]
870 unsafe fn drop_ref(this: NonNull<Self>) {
871 trace!(
872 task.addr = ?this,
873 task.tid = this.as_ref().header.id.as_u64(),
874 "Schedulable::drop_ref"
875 );
876 if !this.as_ref().state().drop_ref() {
877 return;
878 }
879
880 let deallocate = this.as_ref().header.vtable.deallocate;
881 deallocate(this.cast::<Header>())
882 }
883
884 fn raw_waker(this: *const Self) -> RawWaker {
885 RawWaker::new(this as *const (), &Self::WAKER_VTABLE)
886 }
887
888 #[inline(always)]
889 fn state(&self) -> &StateCell {
890 &self.header.state
891 }
892
893 #[inline(always)]
894 #[cfg(any(feature = "tracing-01", loom))]
895 fn span(&self) -> &trace::Span {
896 &self.header.span
897 }
898
899 // === Waker vtable methods ===
900
901 unsafe fn wake_by_val(ptr: *const ()) {
902 let ptr = ptr as *const Self;
903 trace_waker_op!(ptr, wake_by_val, op: wake);
904
905 let this = non_null(ptr as *mut Self);
906 match test_dbg!(this.as_ref().state().wake_by_val()) {
907 OrDrop::Drop => Self::drop_ref(this),
908 OrDrop::Action(ScheduleAction::Enqueue) => {
909 // the task should be enqueued.
910 //
911 // in the case that the task is enqueued, the state
912 // transition does *not* decrement the reference count. this is
913 // in order to avoid dropping the task while it is being
914 // scheduled. one reference is consumed by enqueuing the task...
915 Self::schedule(TaskRef(this.cast::<Header>()));
916 // now that the task has been enqueued, decrement the reference
917 // count to drop the waker that performed the `wake_by_val`.
918 Self::drop_ref(this);
919 }
920 OrDrop::Action(ScheduleAction::None) => {}
921 }
922 }
923
924 unsafe fn wake_by_ref(ptr: *const ()) {
925 let ptr = ptr as *const Self;
926 trace_waker_op!(ptr, wake_by_ref);
927
928 let this = non_null(ptr as *mut ()).cast::<Self>();
929 if test_dbg!(this.as_ref().state().wake_by_ref()) == ScheduleAction::Enqueue {
930 Self::schedule(TaskRef(this.cast::<Header>()));
931 }
932 }
933
934 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
935 let this = ptr as *const Self;
936 trace_waker_op!(this, clone_waker, op: clone);
937 (*this).header.state.clone_ref();
938 Self::raw_waker(this)
939 }
940
941 unsafe fn drop_waker(ptr: *const ()) {
942 let ptr = ptr as *const Self;
943 trace_waker_op!(ptr, drop_waker, op: drop);
944
945 let this = ptr as *mut _;
946 Self::drop_ref(non_null(this))
947 }
948}
949
950impl<S> fmt::Debug for Schedulable<S> {
951 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952 let Self {
953 header,
954 scheduler: _,
955 } = self;
956 f.debug_struct("Schedulable")
957 .field("header", header)
958 .field("scheduler", &fmt::display(type_name::<S>()))
959 .finish()
960 }
961}
962
963// === impl TaskRef ===
964
965impl TaskRef {
966 pub(crate) const NO_BUILDER: &'static Settings<'static> = &Settings::new();
967
968 /// Returns a [`TaskId`] that uniquely identifies this task.
969 ///
970 /// The returned ID does *not* increment the task's reference count, and may
971 /// persist even after the task it identifies has completed and been
972 /// deallocated.
973 #[inline]
974 #[must_use]
975 pub fn id(&self) -> TaskId {
976 self.header().id
977 }
978
979 /// Forcibly cancel the task.
980 ///
981 /// Canceling a task sets a flag indicating that it has been canceled and
982 /// should terminate. The next time a canceled task is polled by the
983 /// scheduler, it will terminate instead of polling the inner [`Future`]. If
984 /// the task has a [`JoinHandle`], that [`JoinHandle`] will complete with a
985 /// [`JoinError`]. The task then will be deallocated once all
986 /// [`JoinHandle`]s and [`TaskRef`]s referencing it have been dropped.
987 ///
988 /// This method returns `true` if the task was canceled successfully, and
989 /// `false` if the task could not be canceled (i.e., it has already completed,
990 /// has already been canceled, cancel culture has gone TOO FAR, et cetera).
991 pub fn cancel(&self) -> bool {
992 // try to set the canceled bit.
993 let canceled = self.state().cancel();
994
995 // if the task was successfully canceled, wake it so that it can clean
996 // up after itself.
997 if canceled {
998 test_debug!("woke canceled task");
999 self.wake_by_ref();
1000 }
1001
1002 canceled
1003 }
1004
1005 /// Returns `true` if this task has completed.
1006 ///
1007 /// Tasks are considered completed when the spawned [`Future`] has returned
1008 /// [`Poll::Ready`], or if the task has been canceled by the [`cancel()`]
1009 /// method.
1010 ///
1011 /// **Note**: This method can return `false` after [`cancel()`] has
1012 /// been called. This is because calling `cancel` *begins* the process of
1013 /// cancelling a task. The task is not considered canceled until it has been
1014 /// polled by the scheduler after calling [`cancel()`].
1015 ///
1016 /// [`cancel()`]: Self::cancel
1017 #[inline]
1018 #[must_use]
1019 pub fn is_complete(&self) -> bool {
1020 self.state()
1021 .load(Ordering::Acquire)
1022 .get(state::State::COMPLETED)
1023 }
1024
1025 /// Wakes the task.
1026 ///
1027 /// TODO(eliza): would this be better if we just added an `Into<Waker>` impl
1028 /// for `TaskRef` or something? Should this be a public API?
1029 pub(crate) fn wake_by_ref(&self) {
1030 test_debug!(?self, "TaskRef::wake_by_ref");
1031 let wake_by_ref = self.header().vtable.wake_by_ref;
1032 unsafe { wake_by_ref(self.0.as_ptr().cast::<()>()) }
1033 }
1034
1035 /// Sets the task's `WOKEN` bit.
1036 ///
1037 /// This must be called when enqueueing a spawned task for the first time.
1038 pub(crate) fn set_woken(&self) {
1039 self.state().set_woken();
1040 }
1041
1042 #[track_caller]
1043 pub(crate) fn new_allocated<S, F, STO>(
1044 scheduler: S,
1045 task: STO::StoredTask,
1046 ) -> (Self, JoinHandle<F::Output>)
1047 where
1048 S: Schedule + 'static,
1049 F: Future,
1050 STO: Storage<S, F>,
1051 {
1052 let (task, join) = Self::build_allocated::<S, F, STO>(Self::NO_BUILDER, task);
1053 unsafe { task.bind_scheduler(scheduler) };
1054 (task, join)
1055 }
1056
1057 /// Returns a **non-owning** pointer to the referenced task's [`Header`].
1058 ///
1059 /// This does **not** modify the task's ref count, the [`TaskRef`] on which
1060 /// this function is called still owns a reference. Therefore, this means
1061 /// the returned [`NonNull`] pointer **may not** outlive this [`TaskRef`].
1062 ///
1063 /// # Safety
1064 ///
1065 /// The returned [`NonNull`] pointer is not guaranteed to be valid if it
1066 /// outlives the lifetime of this [`TaskRef`]. If this [`TaskRef`] is
1067 /// dropped, it *may* deallocate the task, and the [`NonNull`] pointer may
1068 /// dangle.
1069 ///
1070 /// **Do not** dereference the returned [`NonNull`] pointer unless at least
1071 /// one [`TaskRef`] referencing this task is known to exist!
1072 pub(crate) fn as_ptr(&self) -> NonNull<Header> {
1073 self.0
1074 }
1075
1076 /// Convert a [`NonNull`] pointer to a task's [`Header`] into a new `TaskRef` to
1077 /// that task, incrementing the reference count.
1078 pub(crate) fn clone_from_raw(ptr: NonNull<Header>) -> Self {
1079 let this = Self(ptr);
1080 this.state().clone_ref();
1081 this
1082 }
1083
1084 #[track_caller]
1085 pub(crate) fn build_allocated<S, F, STO>(
1086 builder: &Settings<'_>,
1087 task: STO::StoredTask,
1088 ) -> (Self, JoinHandle<F::Output>)
1089 where
1090 S: Schedule,
1091 F: Future,
1092 STO: Storage<S, F>,
1093 {
1094 #[allow(unused_mut)]
1095 let mut ptr = STO::into_raw(task);
1096
1097 // attach the task span, if tracing is enabled.
1098 #[cfg(any(feature = "tracing-01", feature = "tracing-02", test))]
1099 {
1100 let loc = match builder.location {
1101 Some(ref loc) => loc,
1102 None => core::panic::Location::caller(),
1103 };
1104 let header = &mut unsafe { ptr.as_mut() }.schedulable.header;
1105 let span = trace_span!(
1106 "runtime.spawn",
1107 kind = %builder.kind,
1108 // XXX(eliza): would be nice to not use emptystring here but
1109 // `tracing` 0.2 is missing `Option` value support :(
1110 task.name = builder.name.unwrap_or(""),
1111 task.tid = header.id.as_u64(),
1112 task.addr = ?ptr,
1113 task.output = %type_name::<F::Output>(),
1114 task.storage = %type_name::<STO>(),
1115 loc.file = loc.file(),
1116 loc.line = loc.line(),
1117 loc.col = loc.column(),
1118 );
1119
1120 header.span = span;
1121
1122 trace!(
1123 task.name = builder.name.unwrap_or(""),
1124 task.addr = ?ptr,
1125 task.tid = header.id.as_u64(),
1126 task.kind = %builder.kind,
1127 task.spawn_location = %loc,
1128 "Task<..., Output = {}>::new",
1129 type_name::<F::Output>()
1130 );
1131 }
1132
1133 let ptr = ptr.cast::<Header>();
1134
1135 #[cfg(not(any(feature = "tracing-01", feature = "tracing-02", test)))]
1136 let _ = builder;
1137 let this = Self(ptr);
1138 let join_handle = unsafe {
1139 // Safety: it's fine to create a `JoinHandle` here, because we know
1140 // the task's actual output type.
1141 JoinHandle::from_task_ref(this.clone())
1142 };
1143 (this, join_handle)
1144 }
1145
1146 pub(crate) fn poll(&self) -> PollResult {
1147 let poll_fn = self.header().vtable.poll;
1148 unsafe { poll_fn(self.0) }
1149 }
1150
1151 pub(crate) unsafe fn bind_scheduler<S: Schedule + 'static>(&self, scheduler: S) {
1152 #[cfg(debug_assertions)]
1153 {
1154 if let Some(scheduler_type) = self.header().scheduler_type {
1155 assert_eq!(
1156 scheduler_type,
1157 TypeId::of::<S>(),
1158 "cannot bind {self:?} to a scheduler of type {}",
1159 type_name::<S>(),
1160 );
1161 }
1162 }
1163
1164 self.0
1165 .cast::<Schedulable<S>>()
1166 .as_ref()
1167 .scheduler
1168 .with_mut(|current| *current = Some(scheduler));
1169 }
1170
1171 /// # Safety
1172 ///
1173 /// `T` *must* be the task's actual output type!
1174 unsafe fn poll_join<T>(&self, cx: &mut Context<'_>) -> Poll<Result<T, JoinError<T>>> {
1175 let poll_join_fn = self.header().vtable.poll_join;
1176 // NOTE: we can't use `CheckedMaybeUninit` here, since the vtable method
1177 // will cast this to a `MaybeUninit` and write to it; this would ignore
1178 // the initialized tracking bit.
1179 let mut slot = mem::MaybeUninit::<T>::uninit();
1180 match test_dbg!(poll_join_fn(
1181 self.0,
1182 NonNull::from(&mut slot).cast::<()>(),
1183 cx
1184 )) {
1185 Poll::Ready(Ok(())) => {
1186 // if the poll function returned `Ok`, we get to take the
1187 // output!
1188 Poll::Ready(Ok(slot.assume_init_read()))
1189 }
1190 Poll::Ready(Err(e)) => {
1191 // if the task completed before being canceled, we can still
1192 // take its output.
1193 let output = if e.is_completed() {
1194 Some(slot.assume_init_read())
1195 } else {
1196 None
1197 };
1198 Poll::Ready(Err(e.with_output(output)))
1199 }
1200 Poll::Pending => Poll::Pending,
1201 }
1202 }
1203
1204 #[inline]
1205 fn state(&self) -> &StateCell {
1206 &self.header().state
1207 }
1208
1209 #[inline]
1210 fn header(&self) -> &Header {
1211 unsafe { self.0.as_ref() }
1212 }
1213}
1214
1215impl fmt::Debug for TaskRef {
1216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1217 f.debug_struct("TaskRef")
1218 .field("id", &self.id())
1219 .field("addr", &self.0)
1220 .finish()
1221 }
1222}
1223
1224impl fmt::Pointer for TaskRef {
1225 #[inline]
1226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1227 fmt::Pointer::fmt(&self.0, f)
1228 }
1229}
1230
1231impl Clone for TaskRef {
1232 #[inline]
1233 #[track_caller]
1234 fn clone(&self) -> Self {
1235 test_debug!(
1236 task.addr = ?self.0,
1237 task.tid = self.id().as_u64(),
1238 location = %core::panic::Location::caller(),
1239 "TaskRef::clone",
1240 );
1241 self.state().clone_ref();
1242 Self(self.0)
1243 }
1244}
1245
1246impl Drop for TaskRef {
1247 #[inline]
1248 #[track_caller]
1249 fn drop(&mut self) {
1250 test_debug!(
1251 task.addr = ?self.0,
1252 task.tid = self.id().as_u64(),
1253 "TaskRef::drop",
1254 );
1255 if !self.state().drop_ref() {
1256 return;
1257 }
1258
1259 unsafe {
1260 Header::deallocate(self.0);
1261 }
1262 }
1263}
1264
1265unsafe impl Send for TaskRef {}
1266unsafe impl Sync for TaskRef {}
1267
1268// === impl Header ===
1269
1270// See https://github.com/rust-lang/rust/issues/97708 for why
1271// this is necessary
1272#[no_mangle]
1273unsafe fn _maitake_header_nop(_ptr: NonNull<Header>) -> PollResult {
1274 debug_assert!(_ptr.as_ref().id.is_stub());
1275
1276 #[cfg(debug_assertions)]
1277 unreachable!("stub task ({_ptr:?}) should never be polled!");
1278 #[cfg(not(debug_assertions))]
1279 PollResult::Pending
1280}
1281
1282// See https://github.com/rust-lang/rust/issues/97708 for why
1283// this is necessary
1284#[no_mangle]
1285unsafe fn _maitake_header_nop_deallocate(ptr: NonNull<Header>) {
1286 debug_assert!(ptr.as_ref().id.is_stub());
1287 unreachable!("stub task ({ptr:p}) should never be deallocated!");
1288}
1289
1290// See https://github.com/rust-lang/rust/issues/97708 for why
1291// this is necessary
1292#[no_mangle]
1293unsafe fn _maitake_header_nop_poll_join(
1294 _ptr: NonNull<Header>,
1295 _: NonNull<()>,
1296 _: &mut Context<'_>,
1297) -> Poll<Result<(), JoinError<()>>> {
1298 debug_assert!(_ptr.as_ref().id.is_stub());
1299 #[cfg(debug_assertions)]
1300 unreachable!("stub task ({_ptr:?}) should never be polled!");
1301 #[cfg(not(debug_assertions))]
1302 Poll::Ready(Err(JoinError::stub()))
1303}
1304
1305// See https://github.com/rust-lang/rust/issues/97708 for why
1306// this is necessary
1307#[no_mangle]
1308unsafe fn _maitake_header_nop_wake_by_ref(_ptr: *const ()) {
1309 #[cfg(debug_assertions)]
1310 unreachable!("stub task ({_ptr:?}) should never be woken!");
1311}
1312
1313impl Header {
1314 const STATIC_STUB_VTABLE: Vtable = Vtable {
1315 poll: _maitake_header_nop,
1316 poll_join: _maitake_header_nop_poll_join,
1317 deallocate: _maitake_header_nop_deallocate,
1318 wake_by_ref: _maitake_header_nop_wake_by_ref,
1319 };
1320
1321 loom_const_fn! {
1322 pub(crate) fn new_static_stub() -> Self {
1323 Self {
1324 run_queue: mpsc_queue::Links::new_stub(),
1325 state: StateCell::new(),
1326 vtable: &Self::STATIC_STUB_VTABLE,
1327 span: trace::Span::none(),
1328 id: TaskId::stub(),
1329 #[cfg(debug_assertions)]
1330 scheduler_type: None,
1331 }
1332 }
1333 }
1334
1335 unsafe fn deallocate(this: NonNull<Self>) {
1336 #[cfg(debug_assertions)]
1337 {
1338 let refs = this
1339 .as_ref()
1340 .state
1341 .load(core::sync::atomic::Ordering::Acquire)
1342 .ref_count();
1343 debug_assert_eq!(refs, 0, "tried to deallocate a task with references!");
1344 }
1345
1346 let deallocate = this.as_ref().vtable.deallocate;
1347 deallocate(this)
1348 }
1349}
1350
1351/// # Safety
1352///
1353/// A task must be pinned to be spawned.
1354unsafe impl Linked<mpsc_queue::Links<Header>> for Header {
1355 type Handle = TaskRef;
1356
1357 #[inline]
1358 fn into_ptr(task: Self::Handle) -> NonNull<Self> {
1359 let ptr = task.0;
1360 // converting a `TaskRef` into a pointer to enqueue it assigns ownership
1361 // of the ref count to the queue, so we don't want to run its `Drop`
1362 // impl.
1363 mem::forget(task);
1364 ptr
1365 }
1366
1367 /// Convert a raw pointer to a `Handle`.
1368 ///
1369 /// # Safety
1370 ///
1371 /// This function is safe to call when:
1372 /// - It is valid to construct a `Handle` from a`raw pointer
1373 /// - The pointer points to a valid instance of `Self` (e.g. it does not
1374 /// dangle).
1375 #[inline]
1376 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
1377 TaskRef(ptr)
1378 }
1379
1380 /// Return the links of the node pointed to by `ptr`.
1381 ///
1382 /// # Safety
1383 ///
1384 /// This function is safe to call when:
1385 /// - It is valid to construct a `Handle` from a`raw pointer
1386 /// - The pointer points to a valid instance of `Self` (e.g. it does not
1387 /// dangle).
1388 #[inline]
1389 unsafe fn links(target: NonNull<Self>) -> NonNull<mpsc_queue::Links<Self>> {
1390 let target = target.as_ptr();
1391 // Safety: using `ptr::addr_of_mut!` avoids creating a temporary
1392 // reference, which stacked borrows dislikes.
1393 let links = ptr::addr_of_mut!((*target).run_queue);
1394 // Safety: it's fine to use `new_unchecked` here; if the pointer that we
1395 // offset to the `links` field is not null (which it shouldn't be, as we
1396 // received it as a `NonNull`), the offset pointer should therefore also
1397 // not be null.
1398 NonNull::new_unchecked(links)
1399 }
1400}
1401
1402unsafe impl Send for Header {}
1403unsafe impl Sync for Header {}
1404
1405// === impl Cell ===
1406
1407impl<F: Future> fmt::Debug for Cell<F> {
1408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1409 match self {
1410 Cell::Pending(_) => write!(f, "Cell::Pending({})", type_name::<F>()),
1411 Cell::Ready(_) => write!(f, "Cell::Ready({})", type_name::<F::Output>()),
1412 Cell::Joined => f.pad("Cell::Joined"),
1413 }
1414 }
1415}
1416
1417impl<F: Future> Cell<F> {
1418 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1419 let poll = match self {
1420 Cell::Pending(future) => unsafe { Pin::new_unchecked(future).poll(cx) },
1421 _ => unreachable!("tried to poll a completed future!"),
1422 };
1423
1424 match poll {
1425 Poll::Ready(ready) => {
1426 *self = Cell::Ready(ready);
1427 Poll::Ready(())
1428 }
1429 Poll::Pending => Poll::Pending,
1430 }
1431 }
1432}
1433
1434// === impl Vtable ===
1435
1436impl fmt::Debug for Vtable {
1437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1438 let &Self {
1439 poll,
1440 poll_join,
1441 deallocate,
1442 wake_by_ref,
1443 } = self;
1444 f.debug_struct("Vtable")
1445 .field("poll", &fmt::ptr(poll))
1446 .field("poll_join", &fmt::ptr(poll_join as *const ()))
1447 .field("deallocate", &fmt::ptr(deallocate))
1448 .field("wake_by_ref", &fmt::ptr(wake_by_ref))
1449 .finish()
1450 }
1451}
1452
1453// Additional types and capabilities only available with the "alloc"
1454// feature active
1455feature! {
1456 #![feature = "alloc"]
1457
1458 use alloc::boxed::Box;
1459
1460 impl TaskRef {
1461
1462 #[track_caller]
1463 pub(crate) fn new<S, F>(scheduler: S, future: F) -> (Self, JoinHandle<F::Output>)
1464 where
1465 S: Schedule + 'static,
1466 F: Future + 'static
1467 {
1468 let mut task = Box::new(Task::<S, F, BoxStorage>::new(future));
1469 task.bind(scheduler);
1470 Self::build_allocated::<S, F, BoxStorage>(Self::NO_BUILDER, task)
1471 }
1472 }
1473
1474}
1475
1476#[derive(Copy, Clone, Debug)]
1477pub(crate) struct Stub;
1478
1479impl Future for Stub {
1480 type Output = ();
1481 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
1482 unreachable!("the stub task should never be polled!")
1483 }
1484}
1485
1486impl Schedule for Stub {
1487 fn schedule(&self, _: TaskRef) {
1488 unimplemented!("stub task should never be woken!")
1489 }
1490
1491 fn current_task(&self) -> Option<TaskRef> {
1492 None
1493 }
1494}