maitake/time/timer/
wheel.rs

1use super::{sleep, Ticks};
2use crate::loom::sync::atomic::Ordering::*;
3use cordyceps::List;
4use core::{pin::Pin, ptr, task::Poll};
5use mycelium_util::fmt;
6
7#[cfg(all(test, not(loom)))]
8mod tests;
9
10#[derive(Debug)]
11pub(in crate::time) struct Core {
12    /// The current "now"
13    now: Ticks,
14
15    /// The actual timer wheels.
16    wheels: [Wheel; Self::WHEELS],
17}
18
19/// _The Wheel of Time_, by Robert Jordan
20struct Wheel {
21    /// A bitmap of the slots that are occupied.
22    ///
23    /// See <https://lwn.net/Articles/646056/> for details on
24    /// this strategy.
25    occupied_slots: u64,
26
27    /// This wheel's level.
28    level: usize,
29
30    /// The number of ticks represented by a single slot in this wheel.
31    ticks_per_slot: Ticks,
32
33    /// The number of ticks represented by this entire wheel.
34    ticks_per_wheel: Ticks,
35
36    /// A bitmask for masking out all lower wheels' indices from a `now` timestamp.
37    wheel_mask: u64,
38
39    slots: SlotArray,
40}
41
42#[derive(Copy, Clone, Debug)]
43pub(super) struct Deadline {
44    pub(super) ticks: Ticks,
45    slot: usize,
46    wheel: usize,
47}
48
49/// In loom mode, the slot arrays are apparently a bit too big to pass around
50/// (since loom's `UnsafeCell`s and atomics are larger than "real" ones), and we
51/// apparently segfault when trying to construct a timer wheel. Therefore, it's
52/// necessary to box the slot array when running under loom in order to reduce
53/// the stack size of the timer wheel.
54#[cfg(loom)]
55type SlotArray = alloc::boxed::Box<[List<sleep::Entry>; Wheel::SLOTS]>;
56
57#[cfg(not(loom))]
58type SlotArray = [List<sleep::Entry>; Wheel::SLOTS];
59
60// === impl Core ===
61
62impl Core {
63    const WHEELS: usize = Wheel::BITS;
64
65    pub(super) const MAX_SLEEP_TICKS: u64 = (1 << (Wheel::BITS * Self::WHEELS)) - 1;
66
67    loom_const_fn! {
68        pub(super) fn new() -> Self {
69            // Initialize the wheels.
70            // XXX(eliza): we would have to do this extremely gross thing if we
71            // wanted to support a variable number of wheels, because const fn...
72            /*
73            // Used as an initializer when constructing a new `Core`.
74            const NEW_WHEEL: Wheel = Wheel::empty();
75
76            let mut wheels = [NEW_WHEEL; Self::WHEELS];n
77            let mut level = 0;
78            while level < Self::WHEELS {
79                wheels[level].level = level;
80                wheels[level].ticks_per_slot = wheel::ticks_per_slot(level);
81                level += 1;
82            }
83            */
84            Self {
85                now: 0,
86                wheels: [
87                    Wheel::new(0),
88                    Wheel::new(1),
89                    Wheel::new(2),
90                    Wheel::new(3),
91                    Wheel::new(4),
92                    Wheel::new(5),
93                ],
94            }
95        }
96    }
97
98    #[inline(never)]
99    pub(super) fn turn_to(&mut self, now: Ticks) -> (usize, Option<Deadline>) {
100        let mut fired = 0;
101
102        // sleeps that need to be rescheduled on lower-level wheels need to be
103        // processed after we have finished turning the wheel, to avoid looping
104        // infinitely.
105        let mut pending_reschedule = List::<sleep::Entry>::new();
106
107        // we will stop looping if the next deadline is after `now`, but we
108        // still need to be able to return it.
109        let mut next_deadline = self.next_deadline();
110        while let Some(deadline) = next_deadline {
111            if deadline.ticks > now {
112                break;
113            }
114
115            let mut fired_this_turn = 0;
116            let entries = self.wheels[deadline.wheel].take(deadline.slot);
117            debug!(
118                now = self.now,
119                deadline.ticks,
120                entries = entries.len(),
121                "turning wheel to"
122            );
123
124            for entry in entries {
125                let entry_deadline = unsafe { entry.as_ref().deadline };
126
127                if test_dbg!(entry_deadline) > test_dbg!(now) {
128                    // this timer was on the top-level wheel and needs to be
129                    // rescheduled on a lower-level wheel, rather than firing now.
130                    debug_assert_ne!(
131                        deadline.wheel, 0,
132                        "if a timer is being rescheduled, it must not have been on the lowest-level wheel"
133                    );
134                    // this timer will need to be rescheduled.
135                    pending_reschedule.push_front(entry);
136                } else {
137                    // otherwise, fire the timer.
138                    unsafe {
139                        fired_this_turn += 1;
140                        entry.as_ref().fire();
141                    }
142                }
143            }
144
145            trace!(at = self.now, firing = fired_this_turn, "firing timers");
146
147            self.now = deadline.ticks;
148            fired += fired_this_turn;
149
150            next_deadline = self.next_deadline();
151        }
152
153        self.now = now;
154
155        // reschedule pending sleeps.
156
157        // If we need to reschedule something, we may need to recalculate the next deadline
158        let any = !pending_reschedule.is_empty();
159
160        if any || fired > 0 {
161            debug!(
162                now = self.now,
163                fired,
164                rescheduled = pending_reschedule.len(),
165                ?next_deadline,
166                "the Wheel of Time has turned"
167            );
168        }
169
170        for entry in pending_reschedule {
171            let deadline = unsafe { entry.as_ref().deadline };
172            debug_assert!(deadline > self.now);
173            debug_assert_ne!(deadline, 0);
174            self.insert_sleep_at(deadline, entry)
175        }
176
177        // Yup, we rescheduled something. Recalculate the next deadline in case one of those
178        // was sooner than the last calculated deadline
179        if any {
180            next_deadline = self.next_deadline();
181        }
182
183        (fired, next_deadline)
184    }
185
186    pub(super) fn cancel_sleep(&mut self, sleep: Pin<&mut sleep::Entry>) {
187        let deadline = sleep.deadline;
188        trace!(
189            sleep.addr = ?format_args!("{sleep:p}"),
190            sleep.deadline = deadline,
191            now = self.now,
192            "canceling sleep"
193        );
194        let wheel = self.wheel_index(deadline);
195        self.wheels[wheel].remove(deadline, sleep);
196    }
197
198    pub(super) fn register_sleep(&mut self, ptr: ptr::NonNull<sleep::Entry>) -> Poll<()> {
199        let deadline = {
200            let sleep = unsafe { ptr.as_ref() };
201
202            trace!(
203                sleep.addr = ?ptr,
204                sleep.ticks,
205                sleep.deadline,
206                now = self.now,
207                "registering sleep"
208            );
209
210            if sleep.deadline <= self.now {
211                trace!("sleep already completed, firing immediately");
212                sleep.fire();
213                return Poll::Ready(());
214            }
215
216            let _did_link = sleep.linked.compare_exchange(false, true, AcqRel, Acquire);
217            debug_assert!(
218                _did_link.is_ok(),
219                "tried to register a sleep that was already registered"
220            );
221            sleep.deadline
222        };
223
224        self.insert_sleep_at(deadline, ptr);
225        Poll::Pending
226    }
227
228    fn insert_sleep_at(&mut self, deadline: Ticks, sleep: ptr::NonNull<sleep::Entry>) {
229        let wheel = self.wheel_index(deadline);
230        trace!(wheel, sleep.deadline = deadline, sleep.addr = ?sleep, "inserting sleep");
231        self.wheels[wheel].insert(deadline, sleep);
232    }
233
234    /// Returns the deadline and location of the next firing timer in the wheel.
235    #[inline]
236    fn next_deadline(&self) -> Option<Deadline> {
237        self.wheels.iter().find_map(|wheel| {
238            let next_deadline = wheel.next_deadline(self.now)?;
239            test_trace!(
240                now = self.now,
241                next_deadline.ticks,
242                next_deadline.wheel,
243                next_deadline.slot,
244            );
245            Some(next_deadline)
246        })
247    }
248
249    #[inline]
250    fn wheel_index(&self, ticks: Ticks) -> usize {
251        wheel_index(self.now, ticks)
252    }
253}
254
255fn wheel_index(now: Ticks, ticks: Ticks) -> usize {
256    const WHEEL_MASK: u64 = (1 << Wheel::BITS) - 1;
257
258    // mask out the bits representing the index in the wheel
259    let mut wheel_indices = now ^ ticks | WHEEL_MASK;
260
261    // put sleeps over the max duration in the top level wheel
262    if wheel_indices >= Core::MAX_SLEEP_TICKS {
263        wheel_indices = Core::MAX_SLEEP_TICKS - 1;
264    }
265
266    let zeros = wheel_indices.leading_zeros();
267    let rest = u64::BITS - 1 - zeros;
268
269    rest as usize / Core::WHEELS
270}
271
272impl Wheel {
273    /// The number of slots per timer wheel is fixed at 64 slots.
274    ///
275    /// This is because we can use a 64-bit bitmap for each wheel to store which
276    /// slots are occupied.
277    const SLOTS: usize = 64;
278    const BITS: usize = Self::SLOTS.trailing_zeros() as usize;
279    loom_const_fn! {
280        fn new(level: usize) -> Self {
281            // linked list const initializer
282            const NEW_LIST: List<sleep::Entry> = List::new();
283
284            // how many ticks does a single slot represent in a wheel of this level?
285            let ticks_per_slot = Self::SLOTS.pow(level as u32) as Ticks;
286            let ticks_per_wheel = ticks_per_slot * Self::SLOTS as u64;
287
288            debug_assert!(ticks_per_slot.is_power_of_two());
289            debug_assert!(ticks_per_wheel.is_power_of_two());
290
291            // because `ticks_per_wheel` is a power of two, we can calculate a
292            // bitmask for masking out the indices in all lower wheels from a `now`
293            // timestamp.
294            let wheel_mask = !(ticks_per_wheel - 1);
295            let slots = [NEW_LIST; Self::SLOTS];
296            #[cfg(loom)]
297            let slots = alloc::boxed::Box::new(slots);
298
299            Self {
300                level,
301                ticks_per_slot,
302                ticks_per_wheel,
303                wheel_mask,
304                occupied_slots: 0,
305                slots,
306            }
307        }
308    }
309
310    /// Insert a sleep entry into this wheel.
311    fn insert(&mut self, deadline: Ticks, sleep: ptr::NonNull<sleep::Entry>) {
312        let slot = self.slot_index(deadline);
313        trace!(
314            wheel = self.level,
315            sleep.addr = ?fmt::ptr(sleep),
316            sleep.deadline = deadline,
317            sleep.slot = slot,
318            "Wheel::insert",
319        );
320
321        // insert the sleep entry into the appropriate linked list.
322        self.slots[slot].push_front(sleep);
323        // toggle the occupied bit for that slot.
324        self.fill_slot(slot);
325    }
326
327    /// Remove a sleep entry from this wheel.
328    fn remove(&mut self, deadline: Ticks, sleep: Pin<&mut sleep::Entry>) {
329        let slot = self.slot_index(deadline);
330        unsafe {
331            // safety: we will not use the `NonNull` to violate pinning
332            // invariants; it's used only to insert the sleep into the intrusive
333            // list. It's safe to remove the sleep from the linked list because
334            // we know it's in this list (provided the rest of the timer wheel
335            // is like...working...)
336            let ptr = ptr::NonNull::from(Pin::into_inner_unchecked(sleep));
337            trace!(
338                wheel = self.level,
339                sleep.addr = ?fmt::ptr(ptr),
340                sleep.deadline = deadline,
341                sleep.slot = slot,
342                "Wheel::remove",
343            );
344
345            if let Some(sleep) = self.slots[slot].remove(ptr) {
346                let _did_unlink = sleep
347                    .as_ref()
348                    .linked
349                    .compare_exchange(true, false, AcqRel, Acquire);
350                debug_assert!(
351                    _did_unlink.is_ok(),
352                    "removed a sleep whose linked bit was already unset, this is potentially real bad"
353                );
354            }
355        };
356
357        if self.slots[slot].is_empty() {
358            // if that was the only sleep in that slot's linked list, clear the
359            // corresponding occupied bit.
360            self.clear_slot(slot);
361        }
362    }
363
364    fn take(&mut self, slot: usize) -> List<sleep::Entry> {
365        debug_assert!(
366            self.occupied_slots & (1 << slot) != 0,
367            "taking an unoccupied slot!"
368        );
369        let list = self.slots[slot].split_off(0);
370        debug_assert!(
371            !list.is_empty(),
372            "if a slot is occupied, its list must not be empty"
373        );
374        self.clear_slot(slot);
375        list
376    }
377
378    fn next_deadline(&self, now: u64) -> Option<Deadline> {
379        let distance = self.next_slot_distance(now)?;
380
381        let slot = distance % Self::SLOTS;
382        // does the next slot wrap this wheel around from the now slot?
383        let skipped = distance.saturating_sub(Self::SLOTS);
384
385        debug_assert!(distance < Self::SLOTS * 2);
386        debug_assert!(
387            skipped == 0 || self.level == Core::WHEELS - 1,
388            "if the next expiring slot wraps around, we must be on the top level wheel\
389            \n    dist: {distance}\
390            \n    slot: {slot}\
391            \n skipped: {skipped}\
392            \n   level: {}",
393            self.level,
394        );
395
396        // when did the current rotation of this wheel begin? since all wheels
397        // represent a power-of-two number of ticks, we can determine the
398        // beginning of this rotation by masking out the bits for all lower wheels.
399        let rotation_start = now & self.wheel_mask;
400        // the next deadline is the start of the current rotation, plus the next
401        // slot's value.
402        let ticks = {
403            let skipped_ticks = skipped as u64 * self.ticks_per_wheel;
404            rotation_start + (slot as u64 * self.ticks_per_slot) + skipped_ticks
405        };
406
407        test_trace!(
408            now,
409            wheel = self.level,
410            rotation_start,
411            slot,
412            skipped,
413            ticks,
414            "Wheel::next_deadline"
415        );
416
417        let deadline = Deadline {
418            ticks,
419            slot,
420            wheel: self.level,
421        };
422
423        Some(deadline)
424    }
425
426    /// Returns the slot index of the next firing timer.
427    fn next_slot_distance(&self, now: Ticks) -> Option<usize> {
428        if self.occupied_slots == 0 {
429            return None;
430        }
431
432        // which slot is indexed by the `now` timestamp?
433        let now_slot = (now / self.ticks_per_slot) as u32 % Self::SLOTS as u32;
434        let next_dist = next_set_bit(self.occupied_slots, now_slot)?;
435
436        test_trace!(
437            now_slot,
438            next_dist,
439            occupied = ?fmt::bin(self.occupied_slots),
440            "next_slot_distance"
441        );
442        Some(next_dist)
443    }
444
445    fn clear_slot(&mut self, slot_index: usize) {
446        debug_assert!(slot_index < Self::SLOTS);
447        self.occupied_slots &= !(1 << slot_index);
448    }
449
450    fn fill_slot(&mut self, slot_index: usize) {
451        debug_assert!(slot_index < Self::SLOTS);
452        self.occupied_slots |= 1 << slot_index;
453    }
454
455    /// Given a duration, returns the slot into which an entry for that duratio
456    /// would be inserted.
457    const fn slot_index(&self, ticks: Ticks) -> usize {
458        let shift = self.level * Self::BITS;
459        ((ticks >> shift) % Self::SLOTS as u64) as usize
460    }
461}
462
463impl fmt::Debug for Wheel {
464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465        let Self {
466            level,
467            ticks_per_slot,
468            ticks_per_wheel,
469            wheel_mask,
470            occupied_slots,
471            slots: _,
472        } = self;
473        f.debug_struct("Wheel")
474            .field("level", level)
475            .field("ticks_per_slot", ticks_per_slot)
476            .field("ticks_per_wheel", ticks_per_wheel)
477            .field("wheel_mask", &fmt::bin(wheel_mask))
478            .field("occupied_slots", &fmt::bin(occupied_slots))
479            .field("slots", &format_args!("[Slot; {}]", Self::SLOTS))
480            .finish()
481    }
482}
483
484/// Finds the index of the next set bit in `bitmap` after the `offset`th` bit.
485/// If the `offset`th bit is set, returns `offset`.
486///
487/// Based on
488/// <https://github.com/torvalds/linux/blob/d0e60d46bc03252b8d4ffaaaa0b371970ac16cda/include/linux/find.h#L21-L45>
489fn next_set_bit(bitmap: u64, offset: u32) -> Option<usize> {
490    // XXX(eliza): there's probably a way to implement this with less
491    // branches via some kind of bit magic...
492    debug_assert!(offset < 64, "offset: {offset}");
493    if bitmap == 0 {
494        return None;
495    }
496    let shifted = bitmap >> offset;
497    let zeros = if shifted == 0 {
498        bitmap.rotate_right(offset).trailing_zeros()
499    } else {
500        shifted.trailing_zeros()
501    };
502    Some(zeros as usize + offset as usize)
503}