futures_timer/native/
timer.rs

1use std::fmt;
2use std::pin::Pin;
3use std::sync::atomic::Ordering::SeqCst;
4use std::sync::atomic::{AtomicPtr, AtomicUsize};
5use std::sync::{Arc, Mutex, Weak};
6use std::task::{Context, Poll};
7use std::time::Instant;
8
9use std::future::Future;
10
11use super::AtomicWaker;
12use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
13
14/// A "timer heap" used to power separately owned instances of `Delay`.
15///
16/// This timer is implemented as a priority queued-based heap. Each `Timer`
17/// contains a few primary methods which which to drive it:
18///
19/// * `next_wake` indicates how long the ambient system needs to sleep until it
20///   invokes further processing on a `Timer`
21/// * `advance_to` is what actually fires timers on the `Timer`, and should be
22///   called essentially every iteration of the event loop, or when the time
23///   specified by `next_wake` has elapsed.
24/// * The `Future` implementation for `Timer` is used to process incoming timer
25///   updates and requests. This is used to schedule new timeouts, update
26///   existing ones, or delete existing timeouts. The `Future` implementation
27///   will never resolve, but it'll schedule notifications of when to wake up
28///   and process more messages.
29///
30/// Note that if you're using this crate you probably don't need to use a
31/// `Timer` as there is a global one already available for you run on a helper
32/// thread. If this isn't desirable, though, then the
33/// `TimerHandle::set_fallback` method can be used instead!
34pub struct Timer {
35    inner: Arc<Inner>,
36    timer_heap: Heap<HeapTimer>,
37}
38
39/// A handle to a `Timer` which is used to create instances of a `Delay`.
40#[derive(Clone)]
41pub struct TimerHandle {
42    pub(crate) inner: Weak<Inner>,
43}
44
45pub(crate) struct Inner {
46    /// List of updates the `Timer` needs to process
47    pub(crate) list: ArcList<ScheduledTimer>,
48
49    /// The blocked `Timer` task to receive notifications to the `list` above.
50    pub(crate) waker: AtomicWaker,
51}
52
53/// Shared state between the `Timer` and a `Delay`.
54pub(crate) struct ScheduledTimer {
55    pub(crate) waker: AtomicWaker,
56
57    // The lowest bit here is whether the timer has fired or not, the second
58    // lowest bit is whether the timer has been invalidated, and all the other
59    // bits are the "generation" of the timer which is reset during the `reset`
60    // function. Only timers for a matching generation are fired.
61    pub(crate) state: AtomicUsize,
62
63    pub(crate) inner: Weak<Inner>,
64    pub(crate) at: Mutex<Option<Instant>>,
65
66    // TODO: this is only accessed by the timer thread, should have a more
67    // lightweight protection than a `Mutex`
68    pub(crate) slot: Mutex<Option<Slot>>,
69}
70
71impl Timer {
72    /// Creates a new timer heap ready to create new timers.
73    pub fn new() -> Timer {
74        Timer {
75            inner: Arc::new(Inner {
76                list: ArcList::new(),
77                waker: AtomicWaker::new(),
78            }),
79            timer_heap: Heap::new(),
80        }
81    }
82
83    /// Returns a handle to this timer heap, used to create new timeouts.
84    pub fn handle(&self) -> TimerHandle {
85        TimerHandle {
86            inner: Arc::downgrade(&self.inner),
87        }
88    }
89
90    /// Returns the time at which this timer next needs to be invoked with
91    /// `advance_to`.
92    ///
93    /// Event loops or threads typically want to sleep until the specified
94    /// instant.
95    pub fn next_event(&self) -> Option<Instant> {
96        self.timer_heap.peek().map(|t| t.at)
97    }
98
99    /// Proces any timers which are supposed to fire at or before the current
100    /// instant.
101    ///
102    /// This method is equivalent to `self.advance_to(Instant::now())`.
103    pub fn advance(&mut self) {
104        self.advance_to(Instant::now())
105    }
106
107    /// Proces any timers which are supposed to fire before `now` specified.
108    ///
109    /// This method should be called on `Timer` periodically to advance the
110    /// internal state and process any pending timers which need to fire.
111    pub fn advance_to(&mut self, now: Instant) {
112        loop {
113            match self.timer_heap.peek() {
114                Some(head) if head.at <= now => {}
115                Some(_) => break,
116                None => break,
117            };
118
119            // Flag the timer as fired and then notify its task, if any, that's
120            // blocked.
121            let heap_timer = self.timer_heap.pop().unwrap();
122            *heap_timer.node.slot.lock().unwrap() = None;
123            let bits = heap_timer.gen << 2;
124            match heap_timer
125                .node
126                .state
127                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
128            {
129                Ok(_) => heap_timer.node.waker.wake(),
130                Err(_b) => {}
131            }
132        }
133    }
134
135    /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
136    /// timer at `idx` and sets it to fire at `at`.
137    fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
138        // TODO: avoid remove + push and instead just do one sift of the heap?
139        // In theory we could update it in place and then do the percolation
140        // as necessary
141        let gen = node.state.load(SeqCst) >> 2;
142        let mut slot = node.slot.lock().unwrap();
143        if let Some(heap_slot) = slot.take() {
144            self.timer_heap.remove(heap_slot);
145        }
146        *slot = Some(self.timer_heap.push(HeapTimer {
147            at,
148            gen,
149            node: node.clone(),
150        }));
151    }
152
153    fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
154        // If this `idx` is still around and it's still got a registered timer,
155        // then we jettison it form the timer heap.
156        let mut slot = node.slot.lock().unwrap();
157        let heap_slot = match slot.take() {
158            Some(slot) => slot,
159            None => return,
160        };
161        self.timer_heap.remove(heap_slot);
162    }
163
164    fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
165        node.state.fetch_or(0b10, SeqCst);
166        node.waker.wake();
167    }
168}
169
170impl Future for Timer {
171    type Output = ();
172
173    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174        Pin::new(&mut self.inner).waker.register(cx.waker());
175        let mut list = self.inner.list.take();
176        while let Some(node) = list.pop() {
177            let at = *node.at.lock().unwrap();
178            match at {
179                Some(at) => self.update_or_add(at, node),
180                None => self.remove(node),
181            }
182        }
183        Poll::Pending
184    }
185}
186
187impl Drop for Timer {
188    fn drop(&mut self) {
189        // Seal off our list to prevent any more updates from getting pushed on.
190        // Any timer which sees an error from the push will immediately become
191        // inert.
192        let mut list = self.inner.list.take_and_seal();
193
194        // Now that we'll never receive another timer, drain the list of all
195        // updates and also drain our heap of all active timers, invalidating
196        // everything.
197        while let Some(t) = list.pop() {
198            self.invalidate(t);
199        }
200        while let Some(t) = self.timer_heap.pop() {
201            self.invalidate(t.node);
202        }
203    }
204}
205
206impl fmt::Debug for Timer {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
208        f.debug_struct("Timer").field("heap", &"...").finish()
209    }
210}
211
212impl Default for Timer {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218static HANDLE_FALLBACK: AtomicPtr<Inner> = AtomicPtr::new(EMPTY_HANDLE);
219const EMPTY_HANDLE: *mut Inner = std::ptr::null_mut();
220
221/// Error returned from `TimerHandle::set_fallback`.
222#[derive(Clone, Debug)]
223struct SetDefaultError(());
224
225impl TimerHandle {
226    /// Configures this timer handle to be the one returned by
227    /// `TimerHandle::default`.
228    ///
229    /// By default a global thread is initialized on the first call to
230    /// `TimerHandle::default`. This first call can happen transitively through
231    /// `Delay::new`. If, however, that hasn't happened yet then the global
232    /// default timer handle can be configured through this method.
233    ///
234    /// This method can be used to prevent the global helper thread from
235    /// spawning. If this method is successful then the global helper thread
236    /// will never get spun up.
237    ///
238    /// On success this timer handle will have installed itself globally to be
239    /// used as the return value for `TimerHandle::default` unless otherwise
240    /// specified.
241    ///
242    /// # Errors
243    ///
244    /// If another thread has already called `set_as_global_fallback` or this
245    /// thread otherwise loses a race to call this method then it will fail
246    /// returning an error. Once a call to `set_as_global_fallback` is
247    /// successful then no future calls may succeed.
248    fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
249        unsafe {
250            let val = self.into_raw();
251            match HANDLE_FALLBACK.compare_exchange(EMPTY_HANDLE, val, SeqCst, SeqCst) {
252                Ok(_) => Ok(()),
253                Err(_) => {
254                    drop(TimerHandle::from_raw(val));
255                    Err(SetDefaultError(()))
256                }
257            }
258        }
259    }
260
261    fn into_raw(self) -> *mut Inner {
262        self.inner.into_raw() as *mut Inner
263    }
264
265    unsafe fn from_raw(val: *mut Inner) -> TimerHandle {
266        let inner = Weak::from_raw(val);
267        TimerHandle { inner }
268    }
269}
270
271impl Default for TimerHandle {
272    fn default() -> TimerHandle {
273        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
274
275        // If the fallback hasn't been previously initialized then let's spin
276        // up a helper thread and try to initialize with that. If we can't
277        // actually create a helper thread then we'll just return a "defunkt"
278        // handle which will return errors when timer objects are attempted to
279        // be associated.
280        if fallback == EMPTY_HANDLE {
281            let helper = match global::HelperThread::new() {
282                Ok(helper) => helper,
283                Err(_) => return TimerHandle { inner: Weak::new() },
284            };
285
286            // If we successfully set ourselves as the actual fallback then we
287            // want to `forget` the helper thread to ensure that it persists
288            // globally. If we fail to set ourselves as the fallback that means
289            // that someone was racing with this call to
290            // `TimerHandle::default`.  They ended up winning so we'll destroy
291            // our helper thread (which shuts down the thread) and reload the
292            // fallback.
293            if helper.handle().set_as_global_fallback().is_ok() {
294                let ret = helper.handle();
295                helper.forget();
296                return ret;
297            }
298            fallback = HANDLE_FALLBACK.load(SeqCst);
299        }
300
301        // At this point our fallback handle global was configured so we use
302        // its value to reify a handle, clone it, and then forget our reified
303        // handle as we don't actually have an owning reference to it.
304        assert!(fallback != EMPTY_HANDLE);
305        unsafe {
306            let handle = TimerHandle::from_raw(fallback);
307            let ret = handle.clone();
308            let _ = handle.into_raw();
309            ret
310        }
311    }
312}
313
314impl fmt::Debug for TimerHandle {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
316        f.debug_struct("TimerHandle")
317            .field("inner", &"...")
318            .finish()
319    }
320}