mycelium_kernel/
rt.rs

1use crate::arch;
2use core::{
3    cell::Cell,
4    cmp,
5    future::Future,
6    sync::atomic::{AtomicBool, AtomicUsize, Ordering::*},
7};
8use maitake::{
9    scheduler::{self, StaticScheduler, Stealer},
10    sync::spin,
11    time,
12};
13use mycelium_util::{fmt, sync::InitOnce};
14use rand::Rng;
15
16pub use maitake::task::JoinHandle;
17
18/// A kernel runtime for a single core.
19pub struct Core {
20    /// The task scheduler for this core.
21    scheduler: &'static StaticScheduler,
22
23    /// This core's ID.
24    ///
25    /// ID 0 is the first CPU core started when the system boots.
26    id: usize,
27
28    /// Set to `false` if this core should shut down.
29    running: AtomicBool,
30
31    /// Used to select the index of the next core to steal work from.
32    ///
33    /// Selecting a random core index when work-stealing helps ensure we don't
34    /// have a situation where all idle steal from the first available worker,
35    /// resulting in other cores ending up with huge queues of idle tasks while
36    /// the first core's queue is always empty.
37    ///
38    /// This is *not* a cryptographically secure random number generator, since
39    /// randomness of this value is not required for security. Instead, it just
40    /// helps ensure a good distribution of load. Therefore, we use a fast,
41    /// non-cryptographic RNG.
42    rng: rand_xoshiro::Xoroshiro128PlusPlus,
43}
44
45struct Runtime {
46    cores: [InitOnce<StaticScheduler>; MAX_CORES],
47
48    /// Global injector queue for spawning tasks on any `Core` instance.
49    injector: scheduler::Injector<&'static StaticScheduler>,
50    initialized: AtomicUsize,
51}
52
53/// 512 CPU cores ought to be enough for anybody...
54pub const MAX_CORES: usize = 512;
55
56static TIMER: spin::InitOnce<time::Timer> = spin::InitOnce::uninitialized();
57
58static RUNTIME: Runtime = {
59    // This constant is used as an array initializer; the clippy warning that it
60    // contains interior mutability is not actually a problem here, since we
61    // *want* a new instance of the value for each array element created using
62    // the `const`.
63    #[allow(clippy::declare_interior_mutable_const)]
64    const UNINIT_SCHEDULER: InitOnce<StaticScheduler> = InitOnce::uninitialized();
65
66    Runtime {
67        cores: [UNINIT_SCHEDULER; MAX_CORES],
68        initialized: AtomicUsize::new(0),
69        injector: {
70            static STUB_TASK: scheduler::TaskStub = scheduler::TaskStub::new();
71            unsafe { scheduler::Injector::new_with_static_stub(&STUB_TASK) }
72        },
73    }
74};
75
76/// Spawn a task on Mycelium's global runtime.
77pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
78where
79    F: Future + Send + 'static,
80    F::Output: Send + 'static,
81{
82    SCHEDULER.with(|scheduler| {
83        if let Some(scheduler) = scheduler.get() {
84            scheduler.spawn(future)
85        } else {
86            // no scheduler is running on this core.
87            RUNTIME.injector.spawn(future)
88        }
89    })
90}
91
92/// Initialize the kernel runtime.
93pub fn init(clock: maitake::time::Clock) {
94    tracing::info!(
95        clock = %clock.name(),
96        clock.max_duration = ?clock.max_duration(),
97        "initializing kernel runtime...",
98    );
99    let timer = TIMER.init(time::Timer::new(clock));
100    time::set_global_timer(timer).expect("`rt::init` should only be called once!");
101
102    tracing::info!("kernel runtime initialized");
103}
104
105pub const DUMP_RT: crate::shell::Command = crate::shell::Command::new("rt")
106    .with_help("print the kernel's async runtime")
107    .with_fn(|_| {
108        tracing::info!(runtime = ?RUNTIME);
109        Ok(())
110    });
111
112static SCHEDULER: arch::LocalKey<Cell<Option<&'static StaticScheduler>>> =
113    arch::LocalKey::new(|| Cell::new(None));
114
115impl Core {
116    #[must_use]
117    pub fn new() -> Self {
118        let (id, scheduler) = RUNTIME.new_scheduler();
119        tracing::info!(core = id, "initialized task scheduler");
120        Self {
121            scheduler,
122            id,
123            rng: arch::seed_rng(),
124            running: AtomicBool::new(false),
125        }
126    }
127
128    /// Runs one tick of the kernel main loop on this core.
129    ///
130    /// Returns `true` if this core has more work to do, or `false` if it does not.
131    pub fn tick(&mut self) -> bool {
132        // drive the task scheduler
133        let tick = self.scheduler.tick();
134
135        // turn the timer wheel if it wasn't turned recently and no one else is
136        // holding a lock, ensuring any pending timer ticks are consumed.
137        TIMER.get().turn();
138
139        // if there are remaining tasks to poll, continue without stealing.
140        if tick.has_remaining {
141            return true;
142        }
143
144        // if there are no tasks remaining in this core's run queue, try to
145        // steal new tasks from the distributor queue.
146        let stolen = self.try_steal();
147        if stolen > 0 {
148            tracing::debug!(tick.stolen = stolen);
149            // if we stole tasks, we need to keep ticking
150            return true;
151        }
152
153        // if we have no remaining woken tasks, and we didn't steal any new
154        // tasks, this core can sleep until an interrupt occurs.
155        false
156    }
157
158    /// Returns `true` if this core is currently running.
159    #[inline]
160    pub fn is_running(&self) -> bool {
161        self.running.load(Acquire)
162    }
163
164    /// Stops this core if it is currently running.
165    ///
166    /// # Returns
167    ///
168    /// - `true` if this core was running and is now stopping
169    /// - `false` if this core was not running.
170    pub fn stop(&self) -> bool {
171        let was_running = self
172            .running
173            .compare_exchange(true, false, AcqRel, Acquire)
174            .is_ok();
175        tracing::info!(core = self.id, core.was_running = was_running, "stopping");
176        was_running
177    }
178
179    /// Run this core until [`Core::stop`] is called.
180    pub fn run(&mut self) {
181        struct CoreGuard;
182        impl Drop for CoreGuard {
183            fn drop(&mut self) {
184                SCHEDULER.with(|scheduler| scheduler.set(None));
185            }
186        }
187
188        let _span = tracing::info_span!("core", id = self.id).entered();
189        if self
190            .running
191            .compare_exchange(false, true, AcqRel, Acquire)
192            .is_err()
193        {
194            tracing::error!("this core is already running!");
195            return;
196        }
197
198        SCHEDULER.with(|scheduler| scheduler.set(Some(self.scheduler)));
199        let _unset = CoreGuard;
200
201        tracing::info!("started kernel main loop");
202
203        loop {
204            // tick the scheduler until it indicates that it's out of tasks to run.
205            if self.tick() {
206                continue;
207            }
208
209            // check if this core should shut down.
210            if !self.is_running() {
211                tracing::info!(core = self.id, "stop signal received, shutting down");
212                return;
213            }
214
215            // if we have no tasks to run, we can sleep until an interrupt
216            // occurs.
217            arch::wait_for_interrupt();
218        }
219    }
220
221    fn try_steal(&mut self) -> usize {
222        // don't try stealing work infinitely many times if all potential
223        // victims' queues are empty or busy.
224        const MAX_STEAL_ATTEMPTS: usize = 16;
225        // chosen arbitrarily!
226        const MAX_STOLEN_PER_TICK: usize = 256;
227
228        // first, try to steal from the injector queue.
229        if let Ok(injector) = RUNTIME.injector.try_steal() {
230            return injector.spawn_n(&self.scheduler, MAX_STOLEN_PER_TICK);
231        }
232
233        // if the injector queue is empty or someone else is stealing from it,
234        // try to find another worker to steal from.
235        let mut attempts = 0;
236        while attempts < MAX_STEAL_ATTEMPTS {
237            let active_cores = RUNTIME.active_cores();
238
239            // if the stealing core is the only active core, there's no one else
240            // to steal from, so bail.
241            if active_cores <= 1 {
242                break;
243            }
244
245            // randomly pick a potential victim core to steal from.
246            let victim_idx = self.rng.gen_range(0..active_cores);
247
248            // we can't steal tasks from ourself.
249            if victim_idx == self.id {
250                continue;
251            }
252
253            // found a core to steal from
254            if let Some(victim) = RUNTIME.try_steal_from(victim_idx) {
255                let num_steal = cmp::min(victim.initial_task_count() / 2, MAX_STOLEN_PER_TICK);
256                return victim.spawn_n(&self.scheduler, num_steal);
257            } else {
258                attempts += 1;
259            }
260        }
261
262        // try the injector queue again if we couldn't find anything else
263        if let Ok(injector) = RUNTIME.injector.try_steal() {
264            injector.spawn_n(&self.scheduler, MAX_STOLEN_PER_TICK)
265        } else {
266            0
267        }
268    }
269}
270
271impl Default for Core {
272    fn default() -> Self {
273        Self::new()
274    }
275}
276
277// === impl Runtime ===
278
279impl Runtime {
280    fn active_cores(&self) -> usize {
281        self.initialized.load(Acquire)
282    }
283
284    fn new_scheduler(&self) -> (usize, &StaticScheduler) {
285        let next = self.initialized.fetch_add(1, AcqRel);
286        assert!(next < MAX_CORES);
287        let scheduler = self.cores[next].init(StaticScheduler::new());
288        (next, scheduler)
289    }
290
291    fn try_steal_from(
292        &'static self,
293        idx: usize,
294    ) -> Option<Stealer<'static, &'static StaticScheduler>> {
295        self.cores[idx].try_get()?.try_steal().ok()
296    }
297}
298
299impl fmt::Debug for Runtime {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        let cores = self.active_cores();
302        f.debug_struct("Runtime")
303            .field("active_cores", &cores)
304            .field("cores", &&self.cores[..cores])
305            .field("injector", &self.injector)
306            .finish()
307    }
308}