maitake/scheduler.rs
1//! Schedulers for executing [tasks][task].
2//!
3//! In order to execute [asynchronous tasks][task], a system must have one or
4//! more _schedulers_. A scheduler (also sometimes referred to as an
5//! _executor_) is a component responsible for tracking which tasks have been
6//! [woken], and [polling] them when they are ready to make progress.
7//!
8//! This module contains scheduler implementations for use with the [`maitake`
9//! task system][task].
10//!
11//! # Using Schedulers
12//!
13//! This module provides two types which can be used as schedulers. These types
14//! differ based on how the core data of the scheduler is shared with tasks
15//! spawned on that scheduler:
16//!
17//! - [`Scheduler`]: a reference-counted single-core scheduler (requires the
18//! "alloc" [feature]). A [`Scheduler`] is internally implemented using an
19//! [`Arc`], and each task spawned on a [`Scheduler`] holds an `Arc` clone
20//! of the scheduler core.
21//! - [`StaticScheduler`]: a single-core scheduler stored in a `static`
22//! variable. A [`StaticScheduler`] is referenced by tasks spawned on it
23//! as an `&'static StaticScheduler` reference. Therefore, it can be used
24//! without requiring `alloc`, and avoids atomic reference count
25//! increments when spawning tasks. However, in order to be used, a
26//! [`StaticScheduler`] *must* be stored in a `'static`, which can limit
27//! its usage in some cases.
28//! - [`LocalScheduler`]: a reference-counted scheduler for `!`[`Send`] `Future`s
29//! (requires the "alloc" [feature]). This type is identical to the
30//! [`Scheduler`] type, except that it is capable of spawning `Future`s
31//! that do not implement [`Send`], and is itself not [`Send`] or [`Sync`]
32//! (it cannot be shared between CPU cores).
33//! - [`LocalStaticScheduler`]: a [`StaticScheduler`] variant for `!`[`Send`]
34//! `Future`s. This type is identical to the [`StaticScheduler`] type,
35//! except that it is capable of spawning `Future`s that do not implement
36//! [`Send`], and is itself not [`Send`] or [`Sync`] (it cannot be shared
37//! between CPU cores).
38//!
39//! The [`Schedule`] trait in this module is used by the [`Task`] type to
40//! abstract over both types of scheduler that tasks may be spawned on.
41//!
42//! ## Spawning Tasks
43//!
44//! Once a scheduler has been constructed, tasks may be spawned on it using the
45//! [`Scheduler::spawn`] or [`StaticScheduler::spawn`] methods. These methods
46//! allocate a [new `Box` to store the spawned task](task::BoxStorage), and
47//! therefore require the ["alloc" feature][feature].
48//!
49//! Alternatively, if [custom task storage](task::Storage) is in use, the
50//! scheduler types also provide [`Scheduler::spawn_allocated`] and
51//! [`StaticScheduler::spawn_allocated`] methods, which allow spawning a task
52//! that has already been stored in a type implementing the [`task::Storage`]
53//! trait. This can be used *without* the "alloc" feature flag, and is primarily
54//! intended for use in systems where tasks are statically allocated, or where
55//! an alternative allocator API (rather than `liballoc`) is in use.
56//!
57//! Finally, to configure the properties of a task prior to spawning it, both
58//! scheduler types provide [`Scheduler::build_task`] and
59//! [`StaticScheduler::build_task`] methods. These methods return a
60//! [`task::Builder`] struct, which can be used to set properties of a task and
61//! then spawn it on that scheduler.
62//!
63//! ## Executing Tasks
64//!
65//! In order to actually execute the tasks spawned on a scheduler, the scheduler
66//! must be _driven_ by dequeueing tasks from its run queue and polling them.
67//!
68//! Because [`maitake` is a low-level async runtime "construction kit"][kit]
69//! rather than a complete runtime implementation, the interface for driving a
70//! scheduler is tick-based. A _tick_ refers to an iteration of a
71//! scheduler's run loop, in which a set of tasks are dequeued from the
72//! scheduler's run queue and polled. Calling the [`Scheduler::tick`] or
73//! [`StaticScheduler::tick`] method on a scheduler runs that scheduler for a
74//! single tick, returning a [`Tick`] struct with data describing the events
75//! that occurred during that tick.
76//!
77//! The scheduler API is tick-based, rather than providing methods that
78//! continuously tick the scheduler until all tasks have completed, because
79//! ticking a scheduler is often only one step of a system's run loop. A
80//! scheduler is responsible for polling the tasks that have been woken, but it
81//! does *not* wake tasks which are waiting for other runtime services, such as
82//! timers and I/O resources.
83//!
84//! Typically, an iteration of a system's run loop consists of the following steps:
85//!
86//! - **Tick the scheduler**, executing any tasks that have been woken,
87//! - **Tick a [timer][^1]**, to advance the system clock and wake any tasks waiting
88//! for time-based events,
89//! - **Process wakeups from I/O resources**, such as hardware interrupts that
90//! occurred during the tick. The component responsible for this is often
91//! referred to as an [I/O reactor].
92//! - Optionally, **spawn tasks from external sources**, such as work-stealing
93//! tasks from other schedulers, or receiving tasks from a remote system.
94//!
95//! The implementation of the timer and I/O runtime services in a bare-metal
96//! system typically depend on details of the hardware platform in use.
97//! Therefore, `maitake` does not provide a batteries-included runtime that
98//! bundles together a scheduler, timer, and I/O reactor. Instead, the
99//! lower-level tick-based scheduler interface allows running a `maitake`
100//! scheduler as part of a run loop implementation that also drives other parts
101//! of the runtime.
102//!
103//! A single call to [`Scheduler::tick`] will dequeue and poll up to
104//! [`Scheduler::DEFAULT_TICK_SIZE`] tasks from the run queue, rather than
105//! looping until all tasks in the queue have been dequeued.
106//!
107//! ## Examples
108//!
109//! A simple implementation of a system's run loop might look like this:
110//!
111//! ```rust
112//! use maitake::scheduler::Scheduler;
113//!
114//! /// Process any time-based events that have occurred since this function
115//! /// was last called.
116//! fn process_timeouts() {
117//! // this might tick a `maitake::time::Timer` or run some other form of
118//! // time driver implementation.
119//! }
120//!
121//!
122//! /// Process any I/O events that have occurred since this function
123//! /// was last called.
124//! fn process_io_events() {
125//! // this function would handle dispatching any I/O interrupts that
126//! // occurred during the tick to tasks that are waiting for those I/O
127//! // events.
128//! }
129//!
130//! /// Put the system into a low-power state until a hardware interrupt
131//! /// occurs.
132//! fn wait_for_interrupts() {
133//! // the implementation of this function would, of course, depend on the
134//! // hardware platform in use...
135//! }
136//!
137//! /// The system's main run loop.
138//! fn run_loop() {
139//! let scheduler = Scheduler::new();
140//!
141//! loop {
142//! // process time-based events
143//! process_timeouts();
144//!
145//! // process I/O events
146//! process_io_events();
147//!
148//! // tick the scheduler, running any tasks woken by processing time
149//! // and I/O events, as well as tasks woken by other tasks during the
150//! // tick.
151//! let tick = scheduler.tick();
152//!
153//! if !tick.has_remaining {
154//! // if the scheduler's run queue is empty, wait for an interrupt
155//! // to occur before ticking the scheduler again.
156//! wait_for_interrupts();
157//! }
158//! }
159//! }
160//! ```
161//!
162//! [^1]: The [`maitake::time`](crate::time) module provides one
163//! [`Timer`](crate::time::Timer) implementation, but other timers could be
164//! used as well.
165//!
166//! # Scheduling in Multi-Core Systems
167//!
168//! WIP ELIZA WRITE THIS
169//!
170//! [woken]: task::Waker::wake
171//! [polling]: core::future::Future::poll
172//! [task]: crate::task
173//! [feature]: crate#features
174//! [kit]: crate#maitake-is-not-a-complete-asynchronous-runtime
175//! [timer]: crate::time
176//! [I/O reactor]: https://en.wikipedia.org/wiki/Reactor_pattern
177#![warn(missing_docs, missing_debug_implementations)]
178use crate::{
179 loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering::*},
180 task::{self, Header, JoinHandle, Storage, TaskRef},
181};
182use core::{future::Future, marker::PhantomData, ptr};
183
184use cordyceps::mpsc_queue::{MpscQueue, TryDequeueError};
185
186#[cfg(any(feature = "tracing-01", feature = "tracing-02", test))]
187use mycelium_util::fmt;
188
189mod steal;
190#[cfg(test)]
191mod tests;
192
193pub use self::steal::*;
194
195/// A statically-initialized scheduler implementation.
196///
197/// This type stores the core of the scheduler behind an `&'static` reference,
198/// which is passed into each task spawned on the scheduler. This means that, in
199/// order to spawn tasks, the `StaticScheduler` *must* be stored in a `static`
200/// variable.
201///
202/// The use of a `&'static` reference allows `StaticScheduler`s to be used
203/// without `liballoc`. In addition, spawning and deallocating tasks is slightly
204/// cheaper than when using the reference-counted [`Scheduler`] type, because an
205/// atomic reference count increment/decrement is not required.
206///
207/// # Usage
208///
209/// A `StaticScheduler` may be created one of two ways, depending on how the
210/// [stub task] used by the MPSC queue algorithm is created: either with a
211/// statically-allocated stub task by using the
212/// [`StaticScheduler::new_with_static_stub`] function, or with a heap-allocated
213/// stub task using [`StaticScheduler::new`] or [`StaticScheduler::default`]
214/// (which require the ["alloc" feature][features]).
215///
216/// The [`new_with_static_stub`] function is a `const fn`, which allows a
217/// `StaticScheduler` to be constructed directly in a `static` initializer.
218/// However, it requires the [`TaskStub`] to be constructed manually by the
219/// caller and passed in to initialize the scheduler. Furthermore, this function
220/// is `unsafe` to call, as it requires that the provided [`TaskStub`] *not* be
221/// used by any other `StaticScheduler` instance, which the function cannot
222/// ensure.
223///
224/// For example:
225///
226/// ```rust
227/// use maitake::scheduler::{self, StaticScheduler};
228///
229/// static SCHEDULER: StaticScheduler = {
230/// // create a new static stub task *inside* the initializer for the
231/// // `StaticScheduler`. since the stub task static cannot be referenced
232/// // outside of this block, we can ensure that it is not used by any
233/// // other calls to `StaticScheduler::new_with_static_stub`.
234/// static STUB_TASK: scheduler::TaskStub = scheduler::TaskStub::new();
235///
236/// // now, create the scheduler itself:
237/// unsafe {
238/// // safety: creating the stub task inside the block used as an
239/// // initializer expression for the scheduler static ensures that
240/// // the stub task is not used by any other scheduler instance.
241/// StaticScheduler::new_with_static_stub(&STUB_TASK)
242/// }
243/// };
244///
245/// // now, we can use the scheduler to spawn tasks:
246/// SCHEDULER.spawn(async { /* ... */ });
247/// ```
248///
249/// The [`scheduler::new_static!`] macro abstracts over the above code, allowing
250/// a `static StaticScheduler` to be initialized without requiring the caller to
251/// manually write `unsafe` code:
252///
253/// ```rust
254/// use maitake::scheduler::{self, StaticScheduler};
255///
256/// // this macro expands to code identical to the previous example.
257/// static SCHEDULER: StaticScheduler = scheduler::new_static!();
258///
259/// // now, we can use the scheduler to spawn tasks:
260/// SCHEDULER.spawn(async { /* ... */ });
261/// ```
262///
263/// Alternatively, the [`new`] and [`default`] constructors can be used to
264/// create a new `StaticScheduler` with a heap-allocated stub task. This does
265/// not require the user to manually create a stub task and ensure that it is
266/// not used by any other `StaticScheduler` instances. However, these
267/// constructors are not `const fn`s and require the ["alloc" feature][features]
268/// to be enabled.
269///
270/// Because [`StaticScheduler::new`] and [`StaticScheduler::default`] are not
271/// `const fn`s, but the scheduler must still be stored in a `static` to be
272/// used, some form of lazy initialization of the `StaticScheduler` is necessary:
273///
274/// ```rust
275/// use maitake::scheduler::StaticScheduler;
276/// use mycelium_util::sync::Lazy;
277///
278/// static SCHEDULER: Lazy<StaticScheduler> = Lazy::new(StaticScheduler::new);
279///
280/// // now, we can use the scheduler to spawn tasks:
281/// SCHEDULER.spawn(async { /* ... */ });
282/// ```
283///
284/// Although the scheduler itself is no longer constructed in a `const fn`
285/// static initializer in this case, storing it in a `static` rather than an
286/// [`Arc`] still provides a minor performance benefit, as it avoids atomic
287/// reference counting when spawning tasks.
288///
289/// [stub task]: TaskStub
290/// [features]: crate#features
291/// [`new_with_static_stub`]: Self::new_with_static_stub
292/// [`scheduler::new_static!`]: new_static!
293/// [`new`]: Self::new
294/// [`default`]: Self::default
295#[derive(Debug)]
296#[cfg_attr(feature = "alloc", derive(Default))]
297pub struct StaticScheduler(Core);
298
299/// A statically-initialized scheduler for `!`[`Send`] tasks.
300///
301/// This type is identical to the [`StaticScheduler`] type, except that it is
302/// capable of scheduling [`Future`]s that do not implement [`Send`]. Because
303/// this scheduler's futures cannot be moved across threads[^1], the scheduler
304/// itself is also `!Send` and `!Sync`, as ticking it from another thread would
305/// cause its tasks to be polled from that thread, violating the [`Send`] and
306/// [`Sync`] contracts.
307///
308/// [^1]: Or CPU cores, in bare-metal systems.
309#[derive(Debug)]
310#[cfg_attr(feature = "alloc", derive(Default))]
311pub struct LocalStaticScheduler {
312 core: Core,
313 _not_send: PhantomData<*mut ()>,
314}
315
316/// A handle to a [`LocalStaticScheduler`] that implements [`Send`].
317///
318/// The [`LocalScheduler`] and [`LocalStaticScheduler`] types are capable of
319/// spawning futures which do not implement [`Send`]. Because of this, those
320/// scheduler types themselves are also `!Send` and `!Sync`, as as ticking them
321/// from another thread would cause its tasks to be polled from that thread,
322/// violating the [`Send`] and [`Sync`] contracts.
323///
324/// However, tasks which *are* [`Send`] may still be spawned on a `!Send`
325/// scheduler, alongside `!Send` tasks. Because the scheduler types are `!Sync`,
326/// other threads may not reference them in order to spawn remote tasks on those
327/// schedulers. This type is a handle to a `!Sync` scheduler which *can* be sent
328/// across thread boundaries, as it does not have the capacity to poll tasks or
329/// reference the current task.
330///
331/// This type is returned by [`LocalStaticScheduler::spawner`].
332#[derive(Debug)]
333pub struct LocalStaticSpawner(&'static LocalStaticScheduler);
334
335/// Metrics recorded during a scheduler tick.
336///
337/// This type is returned by the [`Scheduler::tick`] and
338/// [`StaticScheduler::tick`] methods.
339///
340/// This type bundles together a number of values describing what occurred
341/// during a scheduler tick, such as how many tasks were polled, how many of
342/// those tasks completed, and how many new tasks were spawned since the last
343/// tick.
344///
345/// Most of these values are primarily useful as performance and debugging
346/// metrics. However, in some cases, they may also drive system behavior. For
347/// example, the `has_remaining` field on this type indicates whether or not
348/// more tasks are left in the scheduler's run queue after the tick. This can be
349/// used to determine whether or not the system should continue ticking the
350/// scheduler, or should perform other work before ticking again.
351#[derive(Debug)]
352#[non_exhaustive]
353pub struct Tick {
354 /// The total number of tasks polled on this scheduler tick.
355 pub polled: usize,
356
357 /// The number of polled tasks that *completed* on this scheduler tick.
358 ///
359 /// This should always be <= `self.polled`.
360 pub completed: usize,
361
362 /// `true` if the tick completed with any tasks remaining in the run queue.
363 pub has_remaining: bool,
364
365 /// The number of tasks that were spawned since the last tick.
366 pub spawned: usize,
367
368 /// The number of tasks that were woken from outside of their own `poll`
369 /// calls since the last tick.
370 pub woken_external: usize,
371
372 /// The number of tasks that were woken from within their own `poll` calls
373 /// during this tick.
374 pub woken_internal: usize,
375}
376
377/// Trait implemented by schedulers.
378///
379/// This trait is implemented by the [`Scheduler`] and [`StaticScheduler`]
380/// types. It is not intended to be publicly implemented by user-defined types,
381/// but can be used to abstract over `static` and reference-counted schedulers.
382pub trait Schedule: Sized + Clone + 'static {
383 /// Schedule a task on this scheduler.
384 ///
385 /// This method is called by the task's [`Waker`] when a task is woken.
386 ///
387 /// [`Waker`]: core::task::Waker
388 fn schedule(&self, task: TaskRef);
389
390 /// Returns a [`TaskRef`] referencing the task currently being polled by
391 /// this scheduler, if a task is currently being polled.
392 fn current_task(&self) -> Option<TaskRef>;
393
394 /// Returns a new [task `Builder`] for configuring tasks prior to spawning
395 /// them on this scheduler.
396 ///
397 /// [task `Builder`]: task::Builder
398 #[must_use]
399 fn build_task<'a>(&self) -> task::Builder<'a, Self> {
400 task::Builder::new(self.clone())
401 }
402}
403
404/// A stub [`Task`].
405///
406/// This represents a [`Task`] that will never actually be executed.
407/// It is used exclusively for initializing a [`StaticScheduler`],
408/// using the unsafe [`new_with_static_stub()`] method.
409///
410/// [`StaticScheduler`]: crate::scheduler::StaticScheduler
411/// [`new_with_static_stub()`]: crate::scheduler::StaticScheduler::new_with_static_stub
412#[repr(transparent)]
413#[cfg_attr(loom, allow(dead_code))]
414#[derive(Debug)]
415pub struct TaskStub {
416 hdr: Header,
417}
418
419/// Safely constructs a new [`StaticScheduler`] instance in a `static`
420/// initializer.
421///
422/// This macro is intended to be used as a `static` initializer:
423///
424/// ```rust
425/// use maitake::scheduler;
426///
427/// // look ma, no `unsafe`!
428/// static SCHEDULER: scheduler::StaticScheduler = scheduler::new_static!();
429/// ```
430///
431/// Note that this macro is re-exported in the [`scheduler`] module as
432/// [`scheduler::new_static!`], which feels somewhat more idiomatic than using
433/// it at the crate-level; however, it is also available at the crate-level as
434/// [`new_static_scheduler!`].
435///
436/// The [`StaticScheduler::new_with_static_stub`] constructor is unsafe to call,
437/// because it requires that the [`TaskStub`] passed to the scheduler not be
438/// used by other scheduler instances. This macro is a safe alternative to
439/// manually initializing a [`StaticScheduler`] instance using
440/// [`new_with_static_stub`], as it creates the stub task inside a scope,
441/// ensuring that it cannot be referenceed by other [`StaticScheduler`]
442/// instances.
443///
444/// This macro expands to the following code:
445/// ```rust
446/// # static SCHEDULER: maitake::scheduler::StaticScheduler =
447/// {
448/// static STUB_TASK: maitake::scheduler::TaskStub = maitake::scheduler::TaskStub::new();
449/// unsafe {
450/// // safety: `StaticScheduler::new_with_static_stub` is unsafe because
451/// // the stub task must not be shared with any other `StaticScheduler`
452/// // instance. because the `new_static` macro creates the stub task
453/// // inside the scope of the static initializer, it is guaranteed that
454/// // no other `StaticScheduler` instance can reference the `STUB_TASK`
455/// // static, so this is always safe.
456/// maitake::scheduler::StaticScheduler::new_with_static_stub(&STUB_TASK)
457/// }
458/// }
459/// # ;
460/// ```
461///
462/// [`new_with_static_stub`]: StaticScheduler::new_with_static_stub
463/// [`scheduler`]: crate::scheduler
464/// [`scheduler::new_static!`]: crate::scheduler::new_static!
465/// [`new_static_scheduler!`]: crate::new_static_scheduler!
466#[cfg(not(loom))]
467#[macro_export]
468macro_rules! new_static_scheduler {
469 () => {{
470 static STUB_TASK: $crate::scheduler::TaskStub = $crate::scheduler::TaskStub::new();
471 unsafe {
472 // safety: `StaticScheduler::new_with_static_stub` is unsafe because
473 // the stub task must not be shared with any other `StaticScheduler`
474 // instance. because the `new_static` macro creates the stub task
475 // inside the scope of the static initializer, it is guaranteed that
476 // no other `StaticScheduler` instance can reference the `STUB_TASK`
477 // static, so this is always safe.
478 $crate::scheduler::StaticScheduler::new_with_static_stub(&STUB_TASK)
479 }
480 }};
481}
482
483#[cfg(not(loom))]
484pub use new_static_scheduler as new_static;
485
486/// Core implementation of a scheduler, used by both the [`Scheduler`] and
487/// [`StaticScheduler`] types.
488///
489/// Each scheduler instance (which must implement `Clone`) must own a
490/// single instance of a `Core`, which is shared across clones of that scheduler.
491#[derive(Debug)]
492struct Core {
493 /// The scheduler's run queue.
494 ///
495 /// This is an [atomic multi-producer, single-consumer queue][mpsc] of
496 /// [`TaskRef`]s. When a task is [scheduled], it is pushed to this queue.
497 /// When the scheduler polls tasks, they are dequeued from the queue
498 /// and polled. If a task is woken during its poll, the scheduler
499 /// will push it back to this queue. Otherwise, if the task doesn't
500 /// self-wake, it will be pushed to the queue again if its [`Waker`]
501 /// is woken.
502 ///
503 /// [mpsc]: cordyceps::MpscQueue
504 /// [scheduled]: Schedule::schedule
505 /// [`Waker`]: core::task::Waker
506 run_queue: MpscQueue<Header>,
507
508 /// The task currently being polled by this scheduler, if it is currently
509 /// polling a task.
510 ///
511 /// If no task is currently being polled, this will be [`ptr::null_mut`].
512 current_task: AtomicPtr<Header>,
513
514 /// A counter of how many tasks were spawned since the last scheduler tick.
515 spawned: AtomicUsize,
516
517 /// A counter of how many tasks are in the scheduler's run queue.
518 queued: AtomicUsize,
519
520 /// A counter of how many tasks were woken from outside their own `poll`
521 /// methods.
522 woken: AtomicUsize,
523}
524
525// === impl TaskStub ===
526
527impl TaskStub {
528 loom_const_fn! {
529 /// Create a new unique stub [`Task`].
530 // Thee whole point of this thing is to be const-initialized, so a
531 // non-const-fn `Default` impl is basically useless.
532 #[allow(clippy::new_without_default)]
533 pub fn new() -> Self {
534 Self {
535 hdr: Header::new_static_stub(),
536 }
537 }
538 }
539}
540
541// === impl StaticScheduler ===
542
543impl StaticScheduler {
544 /// How many tasks are polled per call to [`StaticScheduler::tick`].
545 ///
546 /// Chosen by fair dice roll, guaranteed to be random.
547 pub const DEFAULT_TICK_SIZE: usize = Core::DEFAULT_TICK_SIZE;
548
549 /// Create a StaticScheduler with a static "stub" task entity
550 ///
551 /// This is used for creating a StaticScheduler as a `static` variable.
552 ///
553 /// # Safety
554 ///
555 /// The "stub" provided must ONLY EVER be used for a single StaticScheduler.
556 /// Re-using the stub for multiple schedulers may lead to undefined behavior.
557 ///
558 /// For a safe alternative, consider using the [`new_static!`] macro to
559 /// initialize a `StaticScheduler` in a `static` variable.
560 #[cfg(not(loom))]
561 pub const unsafe fn new_with_static_stub(stub: &'static TaskStub) -> Self {
562 StaticScheduler(Core::new_with_static_stub(&stub.hdr))
563 }
564
565 /// Spawn a pre-allocated task
566 ///
567 /// This method is used to spawn a task that requires some bespoke
568 /// procedure of allocation, typically of a custom [`Storage`] implementor.
569 /// See the documentation for the [`Storage`] trait for more details on
570 /// using custom task storage.
571 ///
572 /// This method returns a [`JoinHandle`] that can be used to await the
573 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
574 /// allowing it to run in the background without awaiting its output.
575 ///
576 /// When tasks are spawned on a scheduler, the scheduler must be
577 /// [ticked](Self::tick) in order to drive those tasks to completion.
578 /// See the [module-level documentation][run-loops] for more information
579 /// on implementing a system's run loop.
580 ///
581 /// [`Storage`]: crate::task::Storage
582 /// [run-loops]: crate::scheduler#executing-tasks
583 #[inline]
584 #[track_caller]
585 pub fn spawn_allocated<F, STO>(&'static self, task: STO::StoredTask) -> JoinHandle<F::Output>
586 where
587 F: Future + Send + 'static,
588 F::Output: Send + 'static,
589 STO: Storage<&'static Self, F>,
590 {
591 let (task, join) = TaskRef::new_allocated::<&'static Self, F, STO>(self, task);
592 self.schedule(task);
593 join
594 }
595
596 /// Returns a new [task `Builder`] for configuring tasks prior to spawning
597 /// them on this scheduler.
598 ///
599 /// [task `Builder`]: task::Builder
600 #[must_use]
601 pub fn build_task<'a>(&'static self) -> task::Builder<'a, &'static Self> {
602 task::Builder::new(self)
603 }
604
605 /// Returns a [`TaskRef`] referencing the task currently being polled by
606 /// this scheduler, if a task is currently being polled.
607 ///
608 /// # Returns
609 ///
610 /// - [`Some`]`(`[`TaskRef`]`)` referencing the currently-polling task, if a
611 /// task is currently being polled (i.e., the scheduler is
612 /// [ticking](Self::tick) and the queue of scheduled tasks is non-empty).
613 ///
614 /// - [`None`] if the scheduler is not currently being polled (i.e., the
615 /// scheduler is not ticking or its run queue is empty and all polls have
616 /// completed).
617 #[inline]
618 pub fn current_task(&self) -> Option<TaskRef> {
619 self.0.current_task()
620 }
621
622 /// Tick this scheduler, polling up to [`Self::DEFAULT_TICK_SIZE`] tasks
623 /// from the scheduler's run queue.
624 ///
625 /// Only a single CPU core/thread may tick a given scheduler at a time. If
626 /// another call to `tick` is in progress on a different core, this method
627 /// will immediately return.
628 ///
629 /// See [the module-level documentation][run-loops] for more information on
630 /// using this function to implement a system's run loop.
631 ///
632 /// # Returns
633 ///
634 /// A [`Tick`] struct with data describing what occurred during the
635 /// scheduler tick.
636 ///
637 /// [run-loops]: crate::scheduler#executing-tasks
638 pub fn tick(&'static self) -> Tick {
639 self.0.tick_n(Self::DEFAULT_TICK_SIZE)
640 }
641}
642
643impl Schedule for &'static StaticScheduler {
644 fn schedule(&self, task: TaskRef) {
645 self.0.wake(task)
646 }
647
648 fn current_task(&self) -> Option<TaskRef> {
649 self.0.current_task()
650 }
651}
652
653// === impl LocalStaticScheduler ===
654
655impl LocalStaticScheduler {
656 /// How many tasks are polled per call to [`LocalStaticScheduler::tick`].
657 ///
658 /// Chosen by fair dice roll, guaranteed to be random.
659 pub const DEFAULT_TICK_SIZE: usize = Core::DEFAULT_TICK_SIZE;
660
661 /// Create a `LocalStaticScheduler` with a static "stub" task entity
662 ///
663 /// This is used for creating a `LocalStaticScheduler` as a `static` variable.
664 ///
665 /// # Safety
666 ///
667 /// The "stub" provided must ONLY EVER be used for a single `LocalStaticScheduler`.
668 /// Re-using the stub for multiple schedulers may lead to undefined behavior.
669 ///
670 /// For a safe alternative, consider using the [`new_static!`] macro to
671 /// initialize a `LocalStaticScheduler` in a `static` variable.
672 #[cfg(not(loom))]
673 pub const unsafe fn new_with_static_stub(stub: &'static TaskStub) -> Self {
674 LocalStaticScheduler {
675 core: Core::new_with_static_stub(&stub.hdr),
676 _not_send: PhantomData,
677 }
678 }
679
680 /// Spawn a pre-allocated, ![`Send`] task.
681 ///
682 /// Unlike [`StaticScheduler::spawn_allocated`], this method is capable of
683 /// spawning [`Future`]s which do not implement [`Send`].
684 ///
685 /// This method is used to spawn a task that requires some bespoke
686 /// procedure of allocation, typically of a custom [`Storage`] implementor.
687 /// See the documentation for the [`Storage`] trait for more details on
688 /// using custom task storage.
689 ///
690 /// This method returns a [`JoinHandle`] that can be used to await the
691 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
692 /// allowing it to run in the background without awaiting its output.
693 ///
694 /// When tasks are spawned on a scheduler, the scheduler must be
695 /// [ticked](Self::tick) in order to drive those tasks to completion.
696 /// See the [module-level documentation][run-loops] for more information
697 /// on implementing a system's run loop.
698 ///
699 /// [`Storage`]: crate::task::Storage
700 /// [run-loops]: crate::scheduler#executing-tasks
701 #[inline]
702 #[track_caller]
703 pub fn spawn_allocated<F, STO>(&'static self, task: STO::StoredTask) -> JoinHandle<F::Output>
704 where
705 F: Future + 'static,
706 F::Output: 'static,
707 STO: Storage<&'static Self, F>,
708 {
709 let (task, join) = TaskRef::new_allocated::<&'static Self, F, STO>(self, task);
710 self.schedule(task);
711 join
712 }
713
714 /// Returns a new [task `Builder`] for configuring tasks prior to spawning
715 /// them on this scheduler.
716 ///
717 /// To spawn `!`[`Send`] tasks using a [`Builder`](task::Builder), use the
718 /// [`Builder::spawn_local`](task::Builder::spawn_local) method.
719 ///
720 /// [task `Builder`]: task::Builder
721 #[must_use]
722 pub fn build_task<'a>(&'static self) -> task::Builder<'a, &'static Self> {
723 task::Builder::new(self)
724 }
725
726 /// Returns a [`TaskRef`] referencing the task currently being polled by
727 /// this scheduler, if a task is currently being polled.
728 ///
729 /// # Returns
730 ///
731 /// - [`Some`]`(`[`TaskRef`]`)` referencing the currently-polling task, if a
732 /// task is currently being polled (i.e., the scheduler is
733 /// [ticking](Self::tick) and the queue of scheduled tasks is non-empty).
734 ///
735 /// - [`None`] if the scheduler is not currently being polled (i.e., the
736 /// scheduler is not ticking or its run queue is empty and all polls have
737 /// completed).
738 #[must_use]
739 #[inline]
740 pub fn current_task(&'static self) -> Option<TaskRef> {
741 self.core.current_task()
742 }
743
744 /// Tick this scheduler, polling up to [`Self::DEFAULT_TICK_SIZE`] tasks
745 /// from the scheduler's run queue.
746 ///
747 /// Only a single CPU core/thread may tick a given scheduler at a time. If
748 /// another call to `tick` is in progress on a different core, this method
749 /// will immediately return.
750 ///
751 /// See [the module-level documentation][run-loops] for more information on
752 /// using this function to implement a system's run loop.
753 ///
754 /// # Returns
755 ///
756 /// A [`Tick`] struct with data describing what occurred during the
757 /// scheduler tick.
758 ///
759 /// [run-loops]: crate::scheduler#executing-tasks
760 pub fn tick(&'static self) -> Tick {
761 self.core.tick_n(Self::DEFAULT_TICK_SIZE)
762 }
763
764 /// Returns a new [`LocalStaticSpawner`] that can be used by other threads to
765 /// spawn [`Send`] tasks on this scheduler.
766 #[must_use = "the returned `LocalStaticSpawner` does nothing unless used to spawn tasks"]
767 pub fn spawner(&'static self) -> LocalStaticSpawner {
768 LocalStaticSpawner(self)
769 }
770}
771
772impl Schedule for &'static LocalStaticScheduler {
773 fn schedule(&self, task: TaskRef) {
774 self.core.wake(task)
775 }
776
777 fn current_task(&self) -> Option<TaskRef> {
778 self.core.current_task()
779 }
780}
781
782// === impl LocalStaticSpawner ===
783
784impl LocalStaticSpawner {
785 /// Spawn a pre-allocated task on the [`LocalStaticScheduler`] this spawner
786 /// references.
787 ///
788 /// Unlike [`LocalScheduler::spawn_allocated`] and
789 /// [`LocalStaticScheduler::spawn_allocated`], this method requires that the
790 /// spawned `Future` implement [`Send`], as the `LocalSpawner` type is [`Send`]
791 /// and [`Sync`], and therefore allows tasks to be spawned on a local
792 /// scheduler from other threads.
793 ///
794 /// This method is used to spawn a task that requires some bespoke
795 /// procedure of allocation, typically of a custom [`Storage`] implementor.
796 /// See the documentation for the [`Storage`] trait for more details on
797 /// using custom task storage.
798 ///
799 /// This method returns a [`JoinHandle`] that can be used to await the
800 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
801 /// allowing it to run in the background without awaiting its output.
802 ///
803 /// When tasks are spawned on a scheduler, the scheduler must be
804 /// [ticked](LocalStaticScheduler::tick) in order to drive those tasks to completion.
805 /// See the [module-level documentation][run-loops] for more information
806 /// on implementing a system's run loop.
807 ///
808 /// [`Storage`]: crate::task::Storage
809 /// [run-loops]: crate::scheduler#executing-tasks
810 #[inline]
811 #[track_caller]
812 pub fn spawn_allocated<F, STO>(&self, task: STO::StoredTask) -> JoinHandle<F::Output>
813 where
814 F: Future + Send + 'static,
815 F::Output: Send + 'static,
816 STO: Storage<&'static LocalStaticScheduler, F>,
817 {
818 self.0.spawn_allocated::<F, STO>(task)
819 }
820}
821
822/// # Safety
823///
824/// A `LocalStaticSpawner` cannot be used to access any `!Send` tasks on the
825/// local scheduler it references. It can only push tasks to that scheduler's
826/// run queue, which *is* thread safe.
827unsafe impl Send for LocalStaticSpawner {}
828unsafe impl Sync for LocalStaticSpawner {}
829
830// === impl Core ===
831
832impl Core {
833 /// How many tasks are polled per scheduler tick.
834 ///
835 /// Chosen by fair dice roll, guaranteed to be random.
836 const DEFAULT_TICK_SIZE: usize = 256;
837
838 #[cfg(not(loom))]
839 const unsafe fn new_with_static_stub(stub: &'static Header) -> Self {
840 Self {
841 run_queue: MpscQueue::new_with_static_stub(stub),
842 current_task: AtomicPtr::new(ptr::null_mut()),
843 queued: AtomicUsize::new(0),
844 spawned: AtomicUsize::new(0),
845 woken: AtomicUsize::new(0),
846 }
847 }
848
849 #[inline(always)]
850 fn current_task(&self) -> Option<TaskRef> {
851 let ptr = self.current_task.load(Acquire);
852 let ptr = ptr::NonNull::new(ptr)?;
853 Some(TaskRef::clone_from_raw(ptr))
854 }
855
856 /// Wake `task`, adding it to the scheduler's run queue.
857 #[inline(always)]
858 fn wake(&self, task: TaskRef) {
859 self.woken.fetch_add(1, Relaxed);
860 self.schedule(task)
861 }
862
863 /// Schedule `task` for execution, adding it to this scheduler's run queue.
864 #[inline]
865 fn schedule(&self, task: TaskRef) {
866 self.queued.fetch_add(1, Relaxed);
867 self.run_queue.enqueue(task);
868 }
869
870 #[inline(always)]
871 fn spawn_inner(&self, task: TaskRef) {
872 // ensure the woken bit is set when spawning so the task won't be queued twice.
873 task.set_woken();
874 self.spawned.fetch_add(1, Relaxed);
875 self.schedule(task);
876 }
877
878 fn tick_n(&self, n: usize) -> Tick {
879 use task::PollResult;
880
881 let mut tick = Tick {
882 polled: 0,
883 completed: 0,
884 spawned: 0,
885 woken_external: 0,
886 woken_internal: 0,
887 has_remaining: false,
888 };
889
890 while tick.polled < n {
891 let task = match self.run_queue.try_dequeue() {
892 Ok(task) => task,
893 // If inconsistent, just try again.
894 Err(TryDequeueError::Inconsistent) => {
895 core::hint::spin_loop();
896 continue;
897 }
898 // Queue is empty or busy (in use by something else), bail out.
899 Err(TryDequeueError::Busy | TryDequeueError::Empty) => {
900 break;
901 }
902 };
903
904 self.queued.fetch_sub(1, Relaxed);
905 let _span = trace_span!(
906 "poll",
907 task.addr = ?fmt::ptr(&task),
908 task.tid = task.id().as_u64(),
909 )
910 .entered();
911 // store the currently polled task in the `current_task` pointer.
912 // using `TaskRef::as_ptr` is safe here, since we will clear the
913 // `current_task` pointer before dropping the `TaskRef`.
914 self.current_task.store(task.as_ptr().as_ptr(), Release);
915
916 // poll the task
917 let poll_result = task.poll();
918
919 // clear the current task cell before potentially dropping the
920 // `TaskRef`.
921 self.current_task.store(ptr::null_mut(), Release);
922
923 tick.polled += 1;
924 match poll_result {
925 PollResult::Ready | PollResult::ReadyJoined => tick.completed += 1,
926 PollResult::PendingSchedule => {
927 self.schedule(task);
928 tick.woken_internal += 1;
929 }
930 PollResult::Pending => {}
931 }
932
933 trace!(poll = ?poll_result, tick.polled, tick.completed);
934 }
935
936 tick.spawned = self.spawned.swap(0, Relaxed);
937 tick.woken_external = self.woken.swap(0, Relaxed);
938
939 // are there still tasks in the queue? if so, we have more tasks to poll.
940 if test_dbg!(self.queued.load(Relaxed)) > 0 {
941 tick.has_remaining = true;
942 }
943
944 if tick.polled > 0 {
945 // log scheduler metrics.
946 debug!(
947 tick.polled,
948 tick.completed,
949 tick.spawned,
950 tick.woken = tick.woken(),
951 tick.woken.external = tick.woken_external,
952 tick.woken.internal = tick.woken_internal,
953 tick.has_remaining
954 );
955 }
956
957 tick
958 }
959}
960
961impl Tick {
962 /// Returns the total number of tasks woken since the last poll.
963 pub fn woken(&self) -> usize {
964 self.woken_external + self.woken_internal
965 }
966}
967
968// Additional types and capabilities only available with the "alloc"
969// feature active
970feature! {
971 #![feature = "alloc"]
972
973 use crate::{
974 loom::sync::{Arc},
975 task::{BoxStorage, Task},
976 };
977 use alloc::{sync::Weak, boxed::Box};
978
979 /// An atomically reference-counted single-core scheduler implementation.
980 ///
981 /// This type stores the core of the scheduler inside an [`Arc`], which is
982 /// cloned by each task spawned on the scheduler. The use of [`Arc`] allows
983 /// schedulers to be created and dropped dynamically at runtime. This is in
984 /// contrast to the [`StaticScheduler`] type, which must be stored in a
985 /// `static` variable for the entire lifetime of the program.
986 ///
987 /// Due to the use of [`Arc`], this type requires [the "alloc" feature
988 /// flag][features] to be enabled.
989 ///
990 /// [features]: crate#features
991 #[derive(Clone, Debug, Default)]
992 pub struct Scheduler(Arc<Core>);
993
994 /// A reference-counted scheduler for `!`[`Send`] tasks.
995 ///
996 /// This type is identical to the [`LocalScheduler`] type, except that it is
997 /// capable of scheduling [`Future`]s that do not implement [`Send`]. Because
998 /// this scheduler's futures cannot be moved across threads[^1], the scheduler
999 /// itself is also `!Send` and `!Sync`, as ticking it from multiple threads would
1000 /// move ownership of a `!Send` future.
1001 ///
1002 /// This type stores the core of the scheduler inside an [`Arc`], which is
1003 /// cloned by each task spawned on the scheduler. The use of [`Arc`] allows
1004 /// schedulers to be created and dropped dynamically at runtime. This is in
1005 /// contrast to the [`StaticScheduler`] type, which must be stored in a
1006 /// `static` variable for the entire lifetime of the program.
1007 ///
1008 /// Due to the use of [`Arc`], this type requires [the "alloc" feature
1009 /// flag][features] to be enabled.
1010 ///
1011 /// [features]: crate#features
1012 /// [^1]: Or CPU cores, in bare-metal systems.
1013 #[derive(Clone, Debug, Default)]
1014 pub struct LocalScheduler {
1015 core: Arc<Core>,
1016 _not_send: PhantomData<*mut ()>,
1017 }
1018
1019 /// A handle to a [`LocalScheduler`] that implements [`Send`].
1020 ///
1021 /// The [`LocalScheduler`] and [`LocalStaticScheduler`] types are capable of
1022 /// spawning futures which do not implement [`Send`]. Because of this, those
1023 /// scheduler types themselves are also `!Send` and `!Sync`, as as ticking them
1024 /// from another thread would cause its tasks to be polled from that thread,
1025 /// violating the [`Send`] and [`Sync`] contracts.
1026 ///
1027 /// However, tasks which *are* [`Send`] may still be spawned on a `!Send`
1028 /// scheduler, alongside `!Send` tasks. Because the scheduler types are `!Sync`,
1029 /// other threads may not reference them in order to spawn remote tasks on those
1030 /// schedulers. This type is a handle to a `!Sync` scheduler which *can* be sent
1031 /// across thread boundaries, as it does not have the capacity to poll tasks or
1032 /// reference the current task.
1033 ///
1034 /// This type owns a [`Weak`] reference to the scheduler. If the
1035 /// `LocalScheduler` is dropped, any attempts to spawn a task using this
1036 /// handle will return a [`JoinHandle`] that fails with a "scheduler shut
1037 /// down" error.
1038 ///
1039 /// This type is returned by [`LocalScheduler::spawner`].
1040 #[derive(Clone, Debug)]
1041 pub struct LocalSpawner(Weak<Core>);
1042
1043 // === impl Scheduler ===
1044
1045 impl Scheduler {
1046 /// How many tasks are polled per call to `Scheduler::tick`.
1047 ///
1048 /// Chosen by fair dice roll, guaranteed to be random.
1049 pub const DEFAULT_TICK_SIZE: usize = Core::DEFAULT_TICK_SIZE;
1050
1051 /// Returns a new `Scheduler`.
1052 #[must_use]
1053 pub fn new() -> Self {
1054 Self::default()
1055 }
1056
1057 /// Returns a new [task `Builder`][`Builder`] for configuring tasks prior to spawning
1058 /// them on this scheduler.
1059 ///
1060 /// # Examples
1061 ///
1062 /// ```
1063 /// use maitake::scheduler::Scheduler;
1064 ///
1065 /// let scheduler = Scheduler::new();
1066 /// scheduler.build_task().name("hello world").spawn(async {
1067 /// // ...
1068 /// });
1069 ///
1070 /// scheduler.tick();
1071 /// ```
1072 ///
1073 /// Multiple tasks can be spawned using the same [`Builder`]:
1074 ///
1075 /// ```
1076 /// use maitake::scheduler::Scheduler;
1077 ///
1078 /// let scheduler = Scheduler::new();
1079 /// let builder = scheduler
1080 /// .build_task()
1081 /// .kind("my_cool_task");
1082 ///
1083 /// builder.spawn(async {
1084 /// // ...
1085 /// });
1086 ///
1087 /// builder.spawn(async {
1088 /// // ...
1089 /// });
1090 ///
1091 /// scheduler.tick();
1092 /// ```
1093 ///
1094 /// [`Builder`]: task::Builder
1095 #[must_use]
1096 #[inline]
1097 pub fn build_task<'a>(&self) -> task::Builder<'a, Self> {
1098 task::Builder::new(self.clone())
1099 }
1100
1101 /// Spawn a [task].
1102 ///
1103 /// This method returns a [`JoinHandle`] that can be used to await the
1104 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1105 /// allowing it to run in the background without awaiting its output.
1106 ///
1107 /// When tasks are spawned on a scheduler, the scheduler must be
1108 /// [ticked](Self::tick) in order to drive those tasks to completion.
1109 /// See the [module-level documentation][run-loops] for more information
1110 /// on implementing a system's run loop.
1111 ///
1112 /// # Examples
1113 ///
1114 /// Spawning a task and awaiting its output:
1115 ///
1116 /// ```
1117 /// use maitake::scheduler::Scheduler;
1118 ///
1119 /// let scheduler = Scheduler::new();
1120 ///
1121 /// // spawn a new task, returning a `JoinHandle`.
1122 /// let task = scheduler.spawn(async move {
1123 /// // ... do stuff ...
1124 /// 42
1125 /// });
1126 ///
1127 /// // spawn another task that awaits the output of the first task.
1128 /// scheduler.spawn(async move {
1129 /// // await the `JoinHandle` future, which completes when the task
1130 /// // finishes, and unwrap its output.
1131 /// let output = task.await.expect("task is not cancelled");
1132 /// assert_eq!(output, 42);
1133 /// });
1134 ///
1135 /// // run the scheduler, driving the spawned tasks to completion.
1136 /// while scheduler.tick().has_remaining {}
1137 /// ```
1138 ///
1139 /// Spawning a task to run in the background, without awaiting its
1140 /// output:
1141 ///
1142 /// ```
1143 /// use maitake::scheduler::Scheduler;
1144 ///
1145 /// let scheduler = Scheduler::new();
1146 ///
1147 /// // dropping the `JoinHandle` allows the task to run in the background
1148 /// // without awaiting its output.
1149 /// scheduler.spawn(async move {
1150 /// // ... do stuff ...
1151 /// });
1152 ///
1153 /// // run the scheduler, driving the spawned tasks to completion.
1154 /// while scheduler.tick().has_remaining {}
1155 /// ```
1156 ///
1157 /// [task]: crate::task
1158 /// [run-loops]: crate::scheduler#executing-tasks
1159 #[inline]
1160 #[track_caller]
1161 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
1162 where
1163 F: Future + Send + 'static,
1164 F::Output: Send + 'static,
1165 {
1166 let (task, join) = TaskRef::new(self.clone(), future);
1167 self.0.spawn_inner(task);
1168 join
1169 }
1170
1171
1172 /// Spawn a pre-allocated task
1173 ///
1174 /// This method is used to spawn a task that requires some bespoke
1175 /// procedure of allocation, typically of a custom [`Storage`]
1176 /// implementor. See the documentation for the [`Storage`] trait for
1177 /// more details on using custom task storage.
1178 ///
1179 /// This method returns a [`JoinHandle`] that can be used to await the
1180 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1181 /// allowing it to run in the background without awaiting its output.
1182 ///
1183 /// When tasks are spawned on a scheduler, the scheduler must be
1184 /// [ticked](Self::tick) in order to drive those tasks to completion.
1185 /// See the [module-level documentation][run-loops] for more information
1186 /// on implementing a system's run loop.
1187 ///
1188 /// [`Storage`]: crate::task::Storage
1189 /// [run-loops]: crate::scheduler#executing-tasks
1190 #[inline]
1191 #[track_caller]
1192 pub fn spawn_allocated<F>(&'static self, task: Box<Task<Self, F, BoxStorage>>) -> JoinHandle<F::Output>
1193 where
1194 F: Future + Send + 'static,
1195 F::Output: Send + 'static,
1196 {
1197 let (task, join) = TaskRef::new_allocated::<Self, F, BoxStorage>(self.clone(), task);
1198 self.0.spawn_inner(task);
1199 join
1200 }
1201
1202 /// Returns a [`TaskRef`] referencing the task currently being polled by
1203 /// this scheduler, if a task is currently being polled.
1204 ///
1205 /// # Returns
1206 ///
1207 /// - [`Some`]`(`[`TaskRef`]`)` referencing the currently-polling task,
1208 /// if a task is currently being polled (i.e., the scheduler is
1209 /// [ticking](Self::tick) and the queue of scheduled tasks is
1210 /// non-empty).
1211 ///
1212 /// - [`None`] if the scheduler is not currently being polled (i.e., the
1213 /// scheduler is not ticking or its run queue is empty and all polls
1214 /// have completed).
1215 #[inline]
1216 pub fn current_task(&self) -> Option<TaskRef> {
1217 self.0.current_task()
1218 }
1219
1220 /// Tick this scheduler, polling up to [`Self::DEFAULT_TICK_SIZE`] tasks
1221 /// from the scheduler's run queue.
1222 ///
1223 /// Only a single CPU core/thread may tick a given scheduler at a time. If
1224 /// another call to `tick` is in progress on a different core, this method
1225 /// will immediately return.
1226 ///
1227 /// See [the module-level documentation][run-loops] for more information on
1228 /// using this function to implement a system's run loop.
1229 ///
1230 /// # Returns
1231 ///
1232 /// A [`Tick`] struct with data describing what occurred during the
1233 /// scheduler tick.
1234 ///
1235 /// [run-loops]: crate::scheduler#executing-tasks
1236 pub fn tick(&self) -> Tick {
1237 self.0.tick_n(Self::DEFAULT_TICK_SIZE)
1238 }
1239 }
1240
1241 impl Schedule for Scheduler {
1242 fn schedule(&self, task: TaskRef) {
1243 self.0.wake(task)
1244 }
1245
1246 fn current_task(&self) -> Option<TaskRef> {
1247 self.0.current_task()
1248 }
1249 }
1250
1251 // === impl StaticScheduler ===
1252
1253 impl StaticScheduler {
1254 /// Returns a new `StaticScheduler` with a heap-allocated stub task.
1255 ///
1256 /// Unlike [`StaticScheduler::new_with_static_stub`], this is *not* a
1257 /// `const fn`, as it performs a heap allocation for the stub task.
1258 /// However, the returned `StaticScheduler` must still be stored in a
1259 /// `static` variable in order to be used.
1260 ///
1261 /// This method is generally used with lazy initialization of the
1262 /// scheduler `static`.
1263 #[must_use]
1264 pub fn new() -> Self {
1265 Self::default()
1266 }
1267
1268 /// Spawn a [task].
1269 ///
1270 /// This method returns a [`JoinHandle`] that can be used to await the
1271 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1272 /// allowing it to run in the background without awaiting its output.
1273 ///
1274 /// When tasks are spawned on a scheduler, the scheduler must be
1275 /// [ticked](Self::tick) in order to drive those tasks to completion.
1276 /// See the [module-level documentation][run-loops] for more information
1277 /// on implementing a system's run loop.
1278 ///
1279 /// # Examples
1280 ///
1281 /// Spawning a task and awaiting its output:
1282 ///
1283 /// ```
1284 /// use maitake::scheduler::{self, StaticScheduler};
1285 /// static SCHEDULER: StaticScheduler = scheduler::new_static!();
1286 ///
1287 /// // spawn a new task, returning a `JoinHandle`.
1288 /// let task = SCHEDULER.spawn(async move {
1289 /// // ... do stuff ...
1290 /// 42
1291 /// });
1292 ///
1293 /// // spawn another task that awaits the output of the first task.
1294 /// SCHEDULER.spawn(async move {
1295 /// // await the `JoinHandle` future, which completes when the task
1296 /// // finishes, and unwrap its output.
1297 /// let output = task.await.expect("task is not cancelled");
1298 /// assert_eq!(output, 42);
1299 /// });
1300 ///
1301 /// // run the scheduler, driving the spawned tasks to completion.
1302 /// while SCHEDULER.tick().has_remaining {}
1303 /// ```
1304 ///
1305 /// Spawning a task to run in the background, without awaiting its
1306 /// output:
1307 ///
1308 /// ```
1309 /// use maitake::scheduler::{self, StaticScheduler};
1310 /// static SCHEDULER: StaticScheduler = scheduler::new_static!();
1311 ///
1312 /// // dropping the `JoinHandle` allows the task to run in the background
1313 /// // without awaiting its output.
1314 /// SCHEDULER.spawn(async move {
1315 /// // ... do stuff ...
1316 /// });
1317 ///
1318 /// // run the scheduler, driving the spawned tasks to completion.
1319 /// while SCHEDULER.tick().has_remaining {}
1320 /// ```
1321 ///
1322 /// [task]: crate::task
1323 /// [run-loops]: crate::scheduler#executing-tasks
1324 #[inline]
1325 #[track_caller]
1326 pub fn spawn<F>(&'static self, future: F) -> JoinHandle<F::Output>
1327 where
1328 F: Future + Send + 'static,
1329 F::Output: Send + 'static,
1330 {
1331 let (task, join) = TaskRef::new(self, future);
1332 self.0.spawn_inner(task);
1333 join
1334 }
1335 }
1336
1337 // === impl LocalStaticScheduler ===
1338
1339 impl LocalStaticScheduler {
1340 /// Returns a new `LocalStaticScheduler` with a heap-allocated stub task.
1341 ///
1342 /// Unlike [`LocalStaticScheduler::new_with_static_stub`], this is *not* a
1343 /// `const fn`, as it performs a heap allocation for the stub task.
1344 /// However, the returned `StaticScheduler` must still be stored in a
1345 /// `static` variable in order to be used.
1346 ///
1347 /// This method is generally used with lazy initialization of the
1348 /// scheduler `static`.
1349 #[must_use]
1350 pub fn new() -> Self {
1351 Self::default()
1352 }
1353
1354 /// Spawn a [task].
1355 ///
1356 /// This method returns a [`JoinHandle`] that can be used to await the
1357 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1358 /// allowing it to run in the background without awaiting its output.
1359 ///
1360 /// When tasks are spawned on a scheduler, the scheduler must be
1361 /// [ticked](Self::tick) in order to drive those tasks to completion.
1362 /// See the [module-level documentation][run-loops] for more information
1363 /// on implementing a system's run loop.
1364 ///
1365 /// [task]: crate::task
1366 /// [run-loops]: crate::scheduler#executing-tasks
1367 #[inline]
1368 #[track_caller]
1369 pub fn spawn<F>(&'static self, future: F) -> JoinHandle<F::Output>
1370 where
1371 F: Future + 'static,
1372 F::Output: 'static,
1373 {
1374 let (task, join) = TaskRef::new(self, future);
1375 self.core.spawn_inner(task);
1376 join
1377 }
1378 }
1379
1380 // === impl LocalScheduler ===
1381
1382 impl LocalScheduler {
1383 /// How many tasks are polled per call to `LocalScheduler::tick`.
1384 ///
1385 /// Chosen by fair dice roll, guaranteed to be random.
1386 pub const DEFAULT_TICK_SIZE: usize = Core::DEFAULT_TICK_SIZE;
1387
1388 /// Returns a new `LocalScheduler`.
1389 #[must_use]
1390 pub fn new() -> Self {
1391 Self::default()
1392 }
1393
1394 /// Returns a new [task `Builder`][`Builder`] for configuring tasks prior to spawning
1395 /// them on this scheduler.
1396 ///
1397 /// To spawn `!`[`Send`] tasks using a [`Builder`], use the
1398 /// [`Builder::spawn_local`](task::Builder::spawn_local) method.
1399 ///
1400 /// # Examples
1401 ///
1402 /// ```
1403 /// use maitake::scheduler::LocalScheduler;
1404 ///
1405 /// let scheduler = LocalScheduler::new();
1406 /// scheduler.build_task().name("hello world").spawn_local(async {
1407 /// // ...
1408 /// });
1409 ///
1410 /// scheduler.tick();
1411 /// ```
1412 ///
1413 /// Multiple tasks can be spawned using the same [`Builder`]:
1414 ///
1415 /// ```
1416 /// use maitake::scheduler::LocalScheduler;
1417 ///
1418 /// let scheduler = LocalScheduler::new();
1419 /// let builder = scheduler
1420 /// .build_task()
1421 /// .kind("my_cool_task");
1422 ///
1423 /// builder.spawn_local(async {
1424 /// // ...
1425 /// });
1426 ///
1427 /// builder.spawn_local(async {
1428 /// // ...
1429 /// });
1430 ///
1431 /// scheduler.tick();
1432 /// ```
1433 ///
1434 /// [`Builder`]: task::Builder
1435 #[must_use]
1436 #[inline]
1437 pub fn build_task<'a>(&self) -> task::Builder<'a, Self> {
1438 task::Builder::new(self.clone())
1439 }
1440
1441 /// Spawn a `!`[`Send`] [task].
1442 ///
1443 /// This method returns a [`JoinHandle`] that can be used to await the
1444 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1445 /// allowing it to run in the background without awaiting its output.
1446 ///
1447 /// When tasks are spawned on a scheduler, the scheduler must be
1448 /// [ticked](Self::tick) in order to drive those tasks to completion.
1449 /// See the [module-level documentation][run-loops] for more information
1450 /// on implementing a system's run loop.
1451 ///
1452 /// # Examples
1453 ///
1454 /// Spawning a task and awaiting its output:
1455 ///
1456 /// ```
1457 /// use maitake::scheduler::LocalScheduler;
1458 ///
1459 /// let scheduler = LocalScheduler::new();
1460 ///
1461 /// // spawn a new task, returning a `JoinHandle`.
1462 /// let task = scheduler.spawn(async move {
1463 /// // ... do stuff ...
1464 /// 42
1465 /// });
1466 ///
1467 /// // spawn another task that awaits the output of the first task.
1468 /// scheduler.spawn(async move {
1469 /// // await the `JoinHandle` future, which completes when the task
1470 /// // finishes, and unwrap its output.
1471 /// let output = task.await.expect("task is not cancelled");
1472 /// assert_eq!(output, 42);
1473 /// });
1474 ///
1475 /// // run the scheduler, driving the spawned tasks to completion.
1476 /// while scheduler.tick().has_remaining {}
1477 /// ```
1478 ///
1479 /// Spawning a task to run in the background, without awaiting its
1480 /// output:
1481 ///
1482 /// ```
1483 /// use maitake::scheduler::LocalScheduler;
1484 ///
1485 /// let scheduler = LocalScheduler::new();
1486 ///
1487 /// // dropping the `JoinHandle` allows the task to run in the background
1488 /// // without awaiting its output.
1489 /// scheduler.spawn(async move {
1490 /// // ... do stuff ...
1491 /// });
1492 ///
1493 /// // run the scheduler, driving the spawned tasks to completion.
1494 /// while scheduler.tick().has_remaining {}
1495 /// ```
1496 ///
1497 /// [task]: crate::task
1498 /// [run-loops]: crate::scheduler#executing-tasks
1499 #[inline]
1500 #[track_caller]
1501 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
1502 where
1503 F: Future + 'static,
1504 F::Output: 'static,
1505 {
1506 let (task, join) = TaskRef::new(self.clone(), future);
1507 self.core.spawn_inner(task);
1508 join
1509 }
1510
1511
1512 /// Spawn a pre-allocated `!`[`Send`] task.
1513 ///
1514 /// This method is used to spawn a task that requires some bespoke
1515 /// procedure of allocation, typically of a custom [`Storage`]
1516 /// implementor. See the documentation for the [`Storage`] trait for
1517 /// more details on using custom task storage.
1518 ///
1519 /// This method returns a [`JoinHandle`] that can be used to await the
1520 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1521 /// allowing it to run in the background without awaiting its output.
1522 ///
1523 /// When tasks are spawned on a scheduler, the scheduler must be
1524 /// [ticked](Self::tick) in order to drive those tasks to completion.
1525 /// See the [module-level documentation][run-loops] for more information
1526 /// on implementing a system's run loop.
1527 ///
1528 /// [`Storage`]: crate::task::Storage
1529 /// [run-loops]: crate::scheduler#executing-tasks
1530 #[inline]
1531 #[track_caller]
1532 pub fn spawn_allocated<F>(&self, task: Box<Task<Self, F, BoxStorage>>) -> JoinHandle<F::Output>
1533 where
1534 F: Future + 'static,
1535 F::Output: 'static,
1536 {
1537 let (task, join) = TaskRef::new_allocated::<Self, F, BoxStorage>(self.clone(), task);
1538 self.core.spawn_inner(task);
1539 join
1540 }
1541
1542 /// Returns a [`TaskRef`] referencing the task currently being polled by
1543 /// this scheduler, if a task is currently being polled.
1544 ///
1545 /// # Returns
1546 ///
1547 /// - [`Some`]`(`[`TaskRef`]`)` referencing the currently-polling task,
1548 /// if a task is currently being polled (i.e., the scheduler is
1549 /// [ticking](Self::tick) and the queue of scheduled tasks is
1550 /// non-empty).
1551 ///
1552 /// - [`None`] if the scheduler is not currently being polled (i.e., the
1553 /// scheduler is not ticking or its run queue is empty and all polls
1554 /// have completed).
1555 #[must_use]
1556 #[inline]
1557 pub fn current_task(&self) -> Option<TaskRef> {
1558 self.core.current_task()
1559 }
1560
1561
1562 /// Tick this scheduler, polling up to [`Self::DEFAULT_TICK_SIZE`] tasks
1563 /// from the scheduler's run queue.
1564 ///
1565 /// Only a single CPU core/thread may tick a given scheduler at a time. If
1566 /// another call to `tick` is in progress on a different core, this method
1567 /// will immediately return.
1568 ///
1569 /// See [the module-level documentation][run-loops] for more information on
1570 /// using this function to implement a system's run loop.
1571 ///
1572 /// # Returns
1573 ///
1574 /// A [`Tick`] struct with data describing what occurred during the
1575 /// scheduler tick.
1576 ///
1577 /// [run-loops]: crate::scheduler#executing-tasks
1578 pub fn tick(&self) -> Tick {
1579 self.core.tick_n(Self::DEFAULT_TICK_SIZE)
1580 }
1581
1582 /// Returns a new [`LocalSpawner`] that can be used by other threads to
1583 /// spawn [`Send`] tasks on this scheduler.
1584 #[must_use = "the returned `LocalSpawner` does nothing unless used to spawn tasks"]
1585 #[cfg(not(loom))] // Loom's `Arc` does not have a weak reference type...
1586 pub fn spawner(&self) -> LocalSpawner {
1587 LocalSpawner(Arc::downgrade(&self.core))
1588 }
1589 }
1590
1591 impl Schedule for LocalScheduler {
1592 fn schedule(&self, task: TaskRef) {
1593 self.core.wake(task)
1594 }
1595
1596 fn current_task(&self) -> Option<TaskRef> {
1597 self.core.current_task()
1598 }
1599 }
1600
1601 // === impl LocalStaticSpawner ===
1602
1603 impl LocalStaticSpawner {
1604 /// Spawn a task on the [`LocalStaticScheduler`] this handle
1605 /// references.
1606 ///
1607 /// Unlike [`LocalStaticScheduler::spawn`], this method requires that the
1608 /// spawned `Future` implement [`Send`], as the `LocalStaticSpawner` type is [`Send`]
1609 /// and [`Sync`], and therefore allows tasks to be spawned on a local
1610 /// scheduler from other threads.
1611 ///
1612 /// This method returns a [`JoinHandle`] that can be used to await the
1613 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1614 /// allowing it to run in the background without awaiting its output.
1615 ///
1616 /// When tasks are spawned on a scheduler, the scheduler must be
1617 /// [ticked](LocalStaticScheduler::tick) in order to drive those tasks to completion.
1618 /// See the [module-level documentation][run-loops] for more information
1619 /// on implementing a system's run loop.
1620 ///
1621 /// [`Storage`]: crate::task::Storage
1622 /// [run-loops]: crate::scheduler#executing-tasks
1623 #[inline]
1624 #[track_caller]
1625 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
1626 where
1627 F: Future + Send +'static,
1628 F::Output: Send + 'static,
1629 {
1630 self.0.spawn(future)
1631 }
1632 }
1633
1634 // === impl LocalSpawner ===
1635
1636 impl LocalSpawner {
1637 /// Spawn a task on the [`LocalScheduler`] this handle
1638 /// references.
1639 ///
1640 /// Unlike [`LocalScheduler::spawn`], this method requires that the
1641 /// spawned `Future` implement [`Send`], as the `LocalSpawner` type is [`Send`]
1642 /// and [`Sync`], and therefore allows tasks to be spawned on a local
1643 /// scheduler from other threads.
1644 ///
1645 /// This method returns a [`JoinHandle`] that can be used to await the
1646 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1647 /// allowing it to run in the background without awaiting its output.
1648 ///
1649 /// When tasks are spawned on a scheduler, the scheduler must be
1650 /// [ticked](LocalScheduler::tick) in order to drive those tasks to completion.
1651 /// See the [module-level documentation][run-loops] for more information
1652 /// on implementing a system's run loop.
1653 ///
1654 /// [`Storage`]: crate::task::Storage
1655 /// [run-loops]: crate::scheduler#executing-tasks
1656 #[inline]
1657 #[track_caller]
1658 #[cfg(not(loom))]
1659 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
1660 where
1661 F: Future + Send +'static,
1662 F::Output: Send + 'static,
1663 {
1664 match self.0.upgrade() {
1665 Some(core) => LocalScheduler { core, _not_send: PhantomData }.spawn(future),
1666 None => JoinHandle::error(task::join_handle::JoinErrorKind::Shutdown),
1667 }
1668 }
1669
1670 /// Spawn a pre-allocated task on the [`LocalScheduler`] this handle
1671 /// references.
1672 ///
1673 /// Unlike [`LocalScheduler::spawn_allocated`] and
1674 /// [`LocalStaticScheduler::spawn_allocated`], this method requires that the
1675 /// spawned `Future` implement [`Send`], as the `LocalSpawner` type is [`Send`]
1676 /// and [`Sync`], and therefore allows tasks to be spawned on a local
1677 /// scheduler from other threads.
1678 ///
1679 /// This method is used to spawn a task that requires some bespoke
1680 /// procedure of allocation, typically of a custom [`Storage`] implementor.
1681 /// See the documentation for the [`Storage`] trait for more details on
1682 /// using custom task storage.
1683 ///
1684 /// This method returns a [`JoinHandle`] that can be used to await the
1685 /// task's output. Dropping the [`JoinHandle`] _detaches_ the spawned task,
1686 /// allowing it to run in the background without awaiting its output.
1687 ///
1688 /// When tasks are spawned on a scheduler, the scheduler must be
1689 /// [ticked](LocalScheduler::tick) in order to drive those tasks to completion.
1690 /// See the [module-level documentation][run-loops] for more information
1691 /// on implementing a system's run loop.
1692 ///
1693 /// [`Storage`]: crate::task::Storage
1694 /// [run-loops]: crate::scheduler#executing-tasks
1695 #[inline]
1696 #[track_caller]
1697 #[cfg(not(loom))]
1698 pub fn spawn_allocated<F>(&self, task: Box<Task<LocalScheduler, F, BoxStorage>>) -> JoinHandle<F::Output>
1699 where
1700 F: Future + Send + 'static,
1701 F::Output: Send + 'static,
1702 {
1703 match self.0.upgrade() {
1704 Some(core) => LocalScheduler { core, _not_send: PhantomData }.spawn_allocated(task),
1705 None => JoinHandle::error(task::join_handle::JoinErrorKind::Shutdown),
1706 }
1707 }
1708 }
1709
1710 // === impl Core ===
1711
1712 impl Core {
1713 fn new() -> Self {
1714 let stub_task = Box::new(Task::new_stub());
1715 let (stub_task, _) = TaskRef::new_allocated::<task::Stub, task::Stub, BoxStorage>(task::Stub, stub_task);
1716 Self {
1717 run_queue: MpscQueue::new_with_stub(test_dbg!(stub_task)),
1718 queued: AtomicUsize::new(0),
1719 current_task: AtomicPtr::new(ptr::null_mut()),
1720 spawned: AtomicUsize::new(0),
1721 woken: AtomicUsize::new(0),
1722 }
1723 }
1724 }
1725
1726 impl Default for Core {
1727 fn default() -> Self {
1728 Self::new()
1729 }
1730 }
1731}