1use crate::arch;
2use core::{
3 cell::Cell,
4 cmp,
5 future::Future,
6 sync::atomic::{AtomicBool, AtomicUsize, Ordering::*},
7};
8use maitake::{
9 scheduler::{self, StaticScheduler, Stealer},
10 sync::spin,
11 time,
12};
13use mycelium_util::{fmt, sync::InitOnce};
14use rand::Rng;
15
16pub use maitake::task::JoinHandle;
17
18pub struct Core {
20 scheduler: &'static StaticScheduler,
22
23 id: usize,
27
28 running: AtomicBool,
30
31 rng: rand_xoshiro::Xoroshiro128PlusPlus,
43}
44
45struct Runtime {
46 cores: [InitOnce<StaticScheduler>; MAX_CORES],
47
48 injector: scheduler::Injector<&'static StaticScheduler>,
50 initialized: AtomicUsize,
51}
52
53pub const MAX_CORES: usize = 512;
55
56static TIMER: spin::InitOnce<time::Timer> = spin::InitOnce::uninitialized();
57
58static RUNTIME: Runtime = {
59 #[allow(clippy::declare_interior_mutable_const)]
64 const UNINIT_SCHEDULER: InitOnce<StaticScheduler> = InitOnce::uninitialized();
65
66 Runtime {
67 cores: [UNINIT_SCHEDULER; MAX_CORES],
68 initialized: AtomicUsize::new(0),
69 injector: {
70 static STUB_TASK: scheduler::TaskStub = scheduler::TaskStub::new();
71 unsafe { scheduler::Injector::new_with_static_stub(&STUB_TASK) }
72 },
73 }
74};
75
76pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
78where
79 F: Future + Send + 'static,
80 F::Output: Send + 'static,
81{
82 SCHEDULER.with(|scheduler| {
83 if let Some(scheduler) = scheduler.get() {
84 scheduler.spawn(future)
85 } else {
86 RUNTIME.injector.spawn(future)
88 }
89 })
90}
91
92pub fn init(clock: maitake::time::Clock) {
94 tracing::info!(
95 clock = %clock.name(),
96 clock.max_duration = ?clock.max_duration(),
97 "initializing kernel runtime...",
98 );
99 let timer = TIMER.init(time::Timer::new(clock));
100 time::set_global_timer(timer).expect("`rt::init` should only be called once!");
101
102 tracing::info!("kernel runtime initialized");
103}
104
105pub const DUMP_RT: crate::shell::Command = crate::shell::Command::new("rt")
106 .with_help("print the kernel's async runtime")
107 .with_fn(|_| {
108 tracing::info!(runtime = ?RUNTIME);
109 Ok(())
110 });
111
112static SCHEDULER: arch::LocalKey<Cell<Option<&'static StaticScheduler>>> =
113 arch::LocalKey::new(|| Cell::new(None));
114
115impl Core {
116 #[must_use]
117 pub fn new() -> Self {
118 let (id, scheduler) = RUNTIME.new_scheduler();
119 tracing::info!(core = id, "initialized task scheduler");
120 Self {
121 scheduler,
122 id,
123 rng: arch::seed_rng(),
124 running: AtomicBool::new(false),
125 }
126 }
127
128 pub fn tick(&mut self) -> bool {
132 let tick = self.scheduler.tick();
134
135 TIMER.get().turn();
138
139 if tick.has_remaining {
141 return true;
142 }
143
144 let stolen = self.try_steal();
147 if stolen > 0 {
148 tracing::debug!(tick.stolen = stolen);
149 return true;
151 }
152
153 false
156 }
157
158 #[inline]
160 pub fn is_running(&self) -> bool {
161 self.running.load(Acquire)
162 }
163
164 pub fn stop(&self) -> bool {
171 let was_running = self
172 .running
173 .compare_exchange(true, false, AcqRel, Acquire)
174 .is_ok();
175 tracing::info!(core = self.id, core.was_running = was_running, "stopping");
176 was_running
177 }
178
179 pub fn run(&mut self) {
181 struct CoreGuard;
182 impl Drop for CoreGuard {
183 fn drop(&mut self) {
184 SCHEDULER.with(|scheduler| scheduler.set(None));
185 }
186 }
187
188 let _span = tracing::info_span!("core", id = self.id).entered();
189 if self
190 .running
191 .compare_exchange(false, true, AcqRel, Acquire)
192 .is_err()
193 {
194 tracing::error!("this core is already running!");
195 return;
196 }
197
198 SCHEDULER.with(|scheduler| scheduler.set(Some(self.scheduler)));
199 let _unset = CoreGuard;
200
201 tracing::info!("started kernel main loop");
202
203 loop {
204 if self.tick() {
206 continue;
207 }
208
209 if !self.is_running() {
211 tracing::info!(core = self.id, "stop signal received, shutting down");
212 return;
213 }
214
215 arch::wait_for_interrupt();
218 }
219 }
220
221 fn try_steal(&mut self) -> usize {
222 const MAX_STEAL_ATTEMPTS: usize = 16;
225 const MAX_STOLEN_PER_TICK: usize = 256;
227
228 if let Ok(injector) = RUNTIME.injector.try_steal() {
230 return injector.spawn_n(&self.scheduler, MAX_STOLEN_PER_TICK);
231 }
232
233 let mut attempts = 0;
236 while attempts < MAX_STEAL_ATTEMPTS {
237 let active_cores = RUNTIME.active_cores();
238
239 if active_cores <= 1 {
242 break;
243 }
244
245 let victim_idx = self.rng.gen_range(0..active_cores);
247
248 if victim_idx == self.id {
250 continue;
251 }
252
253 if let Some(victim) = RUNTIME.try_steal_from(victim_idx) {
255 let num_steal = cmp::min(victim.initial_task_count() / 2, MAX_STOLEN_PER_TICK);
256 return victim.spawn_n(&self.scheduler, num_steal);
257 } else {
258 attempts += 1;
259 }
260 }
261
262 if let Ok(injector) = RUNTIME.injector.try_steal() {
264 injector.spawn_n(&self.scheduler, MAX_STOLEN_PER_TICK)
265 } else {
266 0
267 }
268 }
269}
270
271impl Default for Core {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277impl Runtime {
280 fn active_cores(&self) -> usize {
281 self.initialized.load(Acquire)
282 }
283
284 fn new_scheduler(&self) -> (usize, &StaticScheduler) {
285 let next = self.initialized.fetch_add(1, AcqRel);
286 assert!(next < MAX_CORES);
287 let scheduler = self.cores[next].init(StaticScheduler::new());
288 (next, scheduler)
289 }
290
291 fn try_steal_from(
292 &'static self,
293 idx: usize,
294 ) -> Option<Stealer<'static, &'static StaticScheduler>> {
295 self.cores[idx].try_get()?.try_steal().ok()
296 }
297}
298
299impl fmt::Debug for Runtime {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301 let cores = self.active_cores();
302 f.debug_struct("Runtime")
303 .field("active_cores", &cores)
304 .field("cores", &&self.cores[..cores])
305 .field("injector", &self.injector)
306 .finish()
307 }
308}