futures_timer/native/timer.rs
1use std::fmt;
2use std::pin::Pin;
3use std::sync::atomic::Ordering::SeqCst;
4use std::sync::atomic::{AtomicPtr, AtomicUsize};
5use std::sync::{Arc, Mutex, Weak};
6use std::task::{Context, Poll};
7use std::time::Instant;
8
9use std::future::Future;
10
11use super::AtomicWaker;
12use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
13
14/// A "timer heap" used to power separately owned instances of `Delay`.
15///
16/// This timer is implemented as a priority queued-based heap. Each `Timer`
17/// contains a few primary methods which which to drive it:
18///
19/// * `next_wake` indicates how long the ambient system needs to sleep until it
20/// invokes further processing on a `Timer`
21/// * `advance_to` is what actually fires timers on the `Timer`, and should be
22/// called essentially every iteration of the event loop, or when the time
23/// specified by `next_wake` has elapsed.
24/// * The `Future` implementation for `Timer` is used to process incoming timer
25/// updates and requests. This is used to schedule new timeouts, update
26/// existing ones, or delete existing timeouts. The `Future` implementation
27/// will never resolve, but it'll schedule notifications of when to wake up
28/// and process more messages.
29///
30/// Note that if you're using this crate you probably don't need to use a
31/// `Timer` as there is a global one already available for you run on a helper
32/// thread. If this isn't desirable, though, then the
33/// `TimerHandle::set_fallback` method can be used instead!
34pub struct Timer {
35 inner: Arc<Inner>,
36 timer_heap: Heap<HeapTimer>,
37}
38
39/// A handle to a `Timer` which is used to create instances of a `Delay`.
40#[derive(Clone)]
41pub struct TimerHandle {
42 pub(crate) inner: Weak<Inner>,
43}
44
45pub(crate) struct Inner {
46 /// List of updates the `Timer` needs to process
47 pub(crate) list: ArcList<ScheduledTimer>,
48
49 /// The blocked `Timer` task to receive notifications to the `list` above.
50 pub(crate) waker: AtomicWaker,
51}
52
53/// Shared state between the `Timer` and a `Delay`.
54pub(crate) struct ScheduledTimer {
55 pub(crate) waker: AtomicWaker,
56
57 // The lowest bit here is whether the timer has fired or not, the second
58 // lowest bit is whether the timer has been invalidated, and all the other
59 // bits are the "generation" of the timer which is reset during the `reset`
60 // function. Only timers for a matching generation are fired.
61 pub(crate) state: AtomicUsize,
62
63 pub(crate) inner: Weak<Inner>,
64 pub(crate) at: Mutex<Option<Instant>>,
65
66 // TODO: this is only accessed by the timer thread, should have a more
67 // lightweight protection than a `Mutex`
68 pub(crate) slot: Mutex<Option<Slot>>,
69}
70
71impl Timer {
72 /// Creates a new timer heap ready to create new timers.
73 pub fn new() -> Timer {
74 Timer {
75 inner: Arc::new(Inner {
76 list: ArcList::new(),
77 waker: AtomicWaker::new(),
78 }),
79 timer_heap: Heap::new(),
80 }
81 }
82
83 /// Returns a handle to this timer heap, used to create new timeouts.
84 pub fn handle(&self) -> TimerHandle {
85 TimerHandle {
86 inner: Arc::downgrade(&self.inner),
87 }
88 }
89
90 /// Returns the time at which this timer next needs to be invoked with
91 /// `advance_to`.
92 ///
93 /// Event loops or threads typically want to sleep until the specified
94 /// instant.
95 pub fn next_event(&self) -> Option<Instant> {
96 self.timer_heap.peek().map(|t| t.at)
97 }
98
99 /// Proces any timers which are supposed to fire at or before the current
100 /// instant.
101 ///
102 /// This method is equivalent to `self.advance_to(Instant::now())`.
103 pub fn advance(&mut self) {
104 self.advance_to(Instant::now())
105 }
106
107 /// Proces any timers which are supposed to fire before `now` specified.
108 ///
109 /// This method should be called on `Timer` periodically to advance the
110 /// internal state and process any pending timers which need to fire.
111 pub fn advance_to(&mut self, now: Instant) {
112 loop {
113 match self.timer_heap.peek() {
114 Some(head) if head.at <= now => {}
115 Some(_) => break,
116 None => break,
117 };
118
119 // Flag the timer as fired and then notify its task, if any, that's
120 // blocked.
121 let heap_timer = self.timer_heap.pop().unwrap();
122 *heap_timer.node.slot.lock().unwrap() = None;
123 let bits = heap_timer.gen << 2;
124 match heap_timer
125 .node
126 .state
127 .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
128 {
129 Ok(_) => heap_timer.node.waker.wake(),
130 Err(_b) => {}
131 }
132 }
133 }
134
135 /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
136 /// timer at `idx` and sets it to fire at `at`.
137 fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
138 // TODO: avoid remove + push and instead just do one sift of the heap?
139 // In theory we could update it in place and then do the percolation
140 // as necessary
141 let gen = node.state.load(SeqCst) >> 2;
142 let mut slot = node.slot.lock().unwrap();
143 if let Some(heap_slot) = slot.take() {
144 self.timer_heap.remove(heap_slot);
145 }
146 *slot = Some(self.timer_heap.push(HeapTimer {
147 at,
148 gen,
149 node: node.clone(),
150 }));
151 }
152
153 fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
154 // If this `idx` is still around and it's still got a registered timer,
155 // then we jettison it form the timer heap.
156 let mut slot = node.slot.lock().unwrap();
157 let heap_slot = match slot.take() {
158 Some(slot) => slot,
159 None => return,
160 };
161 self.timer_heap.remove(heap_slot);
162 }
163
164 fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
165 node.state.fetch_or(0b10, SeqCst);
166 node.waker.wake();
167 }
168}
169
170impl Future for Timer {
171 type Output = ();
172
173 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174 Pin::new(&mut self.inner).waker.register(cx.waker());
175 let mut list = self.inner.list.take();
176 while let Some(node) = list.pop() {
177 let at = *node.at.lock().unwrap();
178 match at {
179 Some(at) => self.update_or_add(at, node),
180 None => self.remove(node),
181 }
182 }
183 Poll::Pending
184 }
185}
186
187impl Drop for Timer {
188 fn drop(&mut self) {
189 // Seal off our list to prevent any more updates from getting pushed on.
190 // Any timer which sees an error from the push will immediately become
191 // inert.
192 let mut list = self.inner.list.take_and_seal();
193
194 // Now that we'll never receive another timer, drain the list of all
195 // updates and also drain our heap of all active timers, invalidating
196 // everything.
197 while let Some(t) = list.pop() {
198 self.invalidate(t);
199 }
200 while let Some(t) = self.timer_heap.pop() {
201 self.invalidate(t.node);
202 }
203 }
204}
205
206impl fmt::Debug for Timer {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
208 f.debug_struct("Timer").field("heap", &"...").finish()
209 }
210}
211
212impl Default for Timer {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218static HANDLE_FALLBACK: AtomicPtr<Inner> = AtomicPtr::new(EMPTY_HANDLE);
219const EMPTY_HANDLE: *mut Inner = std::ptr::null_mut();
220
221/// Error returned from `TimerHandle::set_fallback`.
222#[derive(Clone, Debug)]
223struct SetDefaultError(());
224
225impl TimerHandle {
226 /// Configures this timer handle to be the one returned by
227 /// `TimerHandle::default`.
228 ///
229 /// By default a global thread is initialized on the first call to
230 /// `TimerHandle::default`. This first call can happen transitively through
231 /// `Delay::new`. If, however, that hasn't happened yet then the global
232 /// default timer handle can be configured through this method.
233 ///
234 /// This method can be used to prevent the global helper thread from
235 /// spawning. If this method is successful then the global helper thread
236 /// will never get spun up.
237 ///
238 /// On success this timer handle will have installed itself globally to be
239 /// used as the return value for `TimerHandle::default` unless otherwise
240 /// specified.
241 ///
242 /// # Errors
243 ///
244 /// If another thread has already called `set_as_global_fallback` or this
245 /// thread otherwise loses a race to call this method then it will fail
246 /// returning an error. Once a call to `set_as_global_fallback` is
247 /// successful then no future calls may succeed.
248 fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
249 unsafe {
250 let val = self.into_raw();
251 match HANDLE_FALLBACK.compare_exchange(EMPTY_HANDLE, val, SeqCst, SeqCst) {
252 Ok(_) => Ok(()),
253 Err(_) => {
254 drop(TimerHandle::from_raw(val));
255 Err(SetDefaultError(()))
256 }
257 }
258 }
259 }
260
261 fn into_raw(self) -> *mut Inner {
262 self.inner.into_raw() as *mut Inner
263 }
264
265 unsafe fn from_raw(val: *mut Inner) -> TimerHandle {
266 let inner = Weak::from_raw(val);
267 TimerHandle { inner }
268 }
269}
270
271impl Default for TimerHandle {
272 fn default() -> TimerHandle {
273 let mut fallback = HANDLE_FALLBACK.load(SeqCst);
274
275 // If the fallback hasn't been previously initialized then let's spin
276 // up a helper thread and try to initialize with that. If we can't
277 // actually create a helper thread then we'll just return a "defunkt"
278 // handle which will return errors when timer objects are attempted to
279 // be associated.
280 if fallback == EMPTY_HANDLE {
281 let helper = match global::HelperThread::new() {
282 Ok(helper) => helper,
283 Err(_) => return TimerHandle { inner: Weak::new() },
284 };
285
286 // If we successfully set ourselves as the actual fallback then we
287 // want to `forget` the helper thread to ensure that it persists
288 // globally. If we fail to set ourselves as the fallback that means
289 // that someone was racing with this call to
290 // `TimerHandle::default`. They ended up winning so we'll destroy
291 // our helper thread (which shuts down the thread) and reload the
292 // fallback.
293 if helper.handle().set_as_global_fallback().is_ok() {
294 let ret = helper.handle();
295 helper.forget();
296 return ret;
297 }
298 fallback = HANDLE_FALLBACK.load(SeqCst);
299 }
300
301 // At this point our fallback handle global was configured so we use
302 // its value to reify a handle, clone it, and then forget our reified
303 // handle as we don't actually have an owning reference to it.
304 assert!(fallback != EMPTY_HANDLE);
305 unsafe {
306 let handle = TimerHandle::from_raw(fallback);
307 let ret = handle.clone();
308 let _ = handle.into_raw();
309 ret
310 }
311 }
312}
313
314impl fmt::Debug for TimerHandle {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
316 f.debug_struct("TimerHandle")
317 .field("inner", &"...")
318 .finish()
319 }
320}