async_io/
reactor.rs

1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21// Choose the proper implementation of `Registration` based on the target platform.
22cfg_if::cfg_if! {
23    if #[cfg(windows)] {
24        mod windows;
25        pub use windows::Registration;
26    } else if #[cfg(any(
27        target_os = "macos",
28        target_os = "ios",
29        target_os = "tvos",
30        target_os = "watchos",
31        target_os = "freebsd",
32        target_os = "netbsd",
33        target_os = "openbsd",
34        target_os = "dragonfly",
35    ))] {
36        mod kqueue;
37        pub use kqueue::Registration;
38    } else if #[cfg(unix)] {
39        mod unix;
40        pub use unix::Registration;
41    } else {
42        compile_error!("unsupported platform");
43    }
44}
45
46#[cfg(not(target_os = "espidf"))]
47const TIMER_QUEUE_SIZE: usize = 1000;
48
49/// ESP-IDF - being an embedded OS - does not need so many timers
50/// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K
51#[cfg(target_os = "espidf")]
52const TIMER_QUEUE_SIZE: usize = 100;
53
54const READ: usize = 0;
55const WRITE: usize = 1;
56
57/// The reactor.
58///
59/// There is only one global instance of this type, accessible by [`Reactor::get()`].
60pub(crate) struct Reactor {
61    /// Portable bindings to epoll/kqueue/event ports/IOCP.
62    ///
63    /// This is where I/O is polled, producing I/O events.
64    pub(crate) poller: Poller,
65
66    /// Ticker bumped before polling.
67    ///
68    /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
69    /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
70    /// methods must make sure they don't receive stale I/O events - they only accept events from a
71    /// fresh "round" of `ReactorLock::react()`.
72    ticker: AtomicUsize,
73
74    /// Registered sources.
75    sources: Mutex<Slab<Arc<Source>>>,
76
77    /// Temporary storage for I/O events when polling the reactor.
78    ///
79    /// Holding a lock on this event list implies the exclusive right to poll I/O.
80    events: Mutex<Events>,
81
82    /// An ordered map of registered timers.
83    ///
84    /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
85    /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
86    /// timer.
87    timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
88
89    /// A queue of timer operations (insert and remove).
90    ///
91    /// When inserting or removing a timer, we don't process it immediately - we just push it into
92    /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
93    timer_ops: ConcurrentQueue<TimerOp>,
94}
95
96impl Reactor {
97    /// Returns a reference to the reactor.
98    pub(crate) fn get() -> &'static Reactor {
99        static REACTOR: OnceCell<Reactor> = OnceCell::new();
100
101        REACTOR.get_or_init_blocking(|| {
102            crate::driver::init();
103            Reactor {
104                poller: Poller::new().expect("cannot initialize I/O event notification"),
105                ticker: AtomicUsize::new(0),
106                sources: Mutex::new(Slab::new()),
107                events: Mutex::new(Events::new()),
108                timers: Mutex::new(BTreeMap::new()),
109                timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
110            }
111        })
112    }
113
114    /// Returns the current ticker.
115    pub(crate) fn ticker(&self) -> usize {
116        self.ticker.load(Ordering::SeqCst)
117    }
118
119    /// Registers an I/O source in the reactor.
120    pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
121        // Create an I/O source for this file descriptor.
122        let source = {
123            let mut sources = self.sources.lock().unwrap();
124            let key = sources.vacant_entry().key();
125            let source = Arc::new(Source {
126                registration: raw,
127                key,
128                state: Default::default(),
129            });
130            sources.insert(source.clone());
131            source
132        };
133
134        // Register the file descriptor.
135        if let Err(err) = source.registration.add(&self.poller, source.key) {
136            let mut sources = self.sources.lock().unwrap();
137            sources.remove(source.key);
138            return Err(err);
139        }
140
141        Ok(source)
142    }
143
144    /// Deregisters an I/O source from the reactor.
145    pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
146        let mut sources = self.sources.lock().unwrap();
147        sources.remove(source.key);
148        source.registration.delete(&self.poller)
149    }
150
151    /// Registers a timer in the reactor.
152    ///
153    /// Returns the inserted timer's ID.
154    pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
155        // Generate a new timer ID.
156        static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
157        let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
158
159        // Push an insert operation.
160        while self
161            .timer_ops
162            .push(TimerOp::Insert(when, id, waker.clone()))
163            .is_err()
164        {
165            // If the queue is full, drain it and try again.
166            let mut timers = self.timers.lock().unwrap();
167            self.process_timer_ops(&mut timers);
168        }
169
170        // Notify that a timer has been inserted.
171        self.notify();
172
173        id
174    }
175
176    /// Deregisters a timer from the reactor.
177    pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
178        // Push a remove operation.
179        while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
180            // If the queue is full, drain it and try again.
181            let mut timers = self.timers.lock().unwrap();
182            self.process_timer_ops(&mut timers);
183        }
184    }
185
186    /// Notifies the thread blocked on the reactor.
187    pub(crate) fn notify(&self) {
188        self.poller.notify().expect("failed to notify reactor");
189    }
190
191    /// Locks the reactor, potentially blocking if the lock is held by another thread.
192    pub(crate) fn lock(&self) -> ReactorLock<'_> {
193        let reactor = self;
194        let events = self.events.lock().unwrap();
195        ReactorLock { reactor, events }
196    }
197
198    /// Attempts to lock the reactor.
199    pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
200        self.events.try_lock().ok().map(|events| {
201            let reactor = self;
202            ReactorLock { reactor, events }
203        })
204    }
205
206    /// Processes ready timers and extends the list of wakers to wake.
207    ///
208    /// Returns the duration until the next timer before this method was called.
209    fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
210        let span = tracing::trace_span!("process_timers");
211        let _enter = span.enter();
212
213        let mut timers = self.timers.lock().unwrap();
214        self.process_timer_ops(&mut timers);
215
216        let now = Instant::now();
217
218        // Split timers into ready and pending timers.
219        //
220        // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
221        // ready.
222        let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
223        let ready = mem::replace(&mut *timers, pending);
224
225        // Calculate the duration until the next event.
226        let dur = if ready.is_empty() {
227            // Duration until the next timer.
228            timers
229                .keys()
230                .next()
231                .map(|(when, _)| when.saturating_duration_since(now))
232        } else {
233            // Timers are about to fire right now.
234            Some(Duration::from_secs(0))
235        };
236
237        // Drop the lock before waking.
238        drop(timers);
239
240        // Add wakers to the list.
241        tracing::trace!("{} ready wakers", ready.len());
242
243        for (_, waker) in ready {
244            wakers.push(waker);
245        }
246
247        dur
248    }
249
250    /// Processes queued timer operations.
251    fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
252        // Process only as much as fits into the queue, or else this loop could in theory run
253        // forever.
254        self.timer_ops
255            .try_iter()
256            .take(self.timer_ops.capacity().unwrap())
257            .for_each(|op| match op {
258                TimerOp::Insert(when, id, waker) => {
259                    timers.insert((when, id), waker);
260                }
261                TimerOp::Remove(when, id) => {
262                    timers.remove(&(when, id));
263                }
264            });
265    }
266}
267
268/// A lock on the reactor.
269pub(crate) struct ReactorLock<'a> {
270    reactor: &'a Reactor,
271    events: MutexGuard<'a, Events>,
272}
273
274impl ReactorLock<'_> {
275    /// Processes new events, blocking until the first event or the timeout.
276    pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
277        let span = tracing::trace_span!("react");
278        let _enter = span.enter();
279
280        let mut wakers = Vec::new();
281
282        // Process ready timers.
283        let next_timer = self.reactor.process_timers(&mut wakers);
284
285        // compute the timeout for blocking on I/O events.
286        let timeout = match (next_timer, timeout) {
287            (None, None) => None,
288            (Some(t), None) | (None, Some(t)) => Some(t),
289            (Some(a), Some(b)) => Some(a.min(b)),
290        };
291
292        // Bump the ticker before polling I/O.
293        let tick = self
294            .reactor
295            .ticker
296            .fetch_add(1, Ordering::SeqCst)
297            .wrapping_add(1);
298
299        self.events.clear();
300
301        // Block on I/O events.
302        let res = match self.reactor.poller.wait(&mut self.events, timeout) {
303            // No I/O events occurred.
304            Ok(0) => {
305                if timeout != Some(Duration::from_secs(0)) {
306                    // The non-zero timeout was hit so fire ready timers.
307                    self.reactor.process_timers(&mut wakers);
308                }
309                Ok(())
310            }
311
312            // At least one I/O event occurred.
313            Ok(_) => {
314                // Iterate over sources in the event list.
315                let sources = self.reactor.sources.lock().unwrap();
316
317                for ev in self.events.iter() {
318                    // Check if there is a source in the table with this key.
319                    if let Some(source) = sources.get(ev.key) {
320                        let mut state = source.state.lock().unwrap();
321
322                        // Collect wakers if a writability event was emitted.
323                        for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
324                            if emitted {
325                                state[dir].tick = tick;
326                                state[dir].drain_into(&mut wakers);
327                            }
328                        }
329
330                        // Re-register if there are still writers or readers. This can happen if
331                        // e.g. we were previously interested in both readability and writability,
332                        // but only one of them was emitted.
333                        if !state[READ].is_empty() || !state[WRITE].is_empty() {
334                            // Create the event that we are interested in.
335                            let event = {
336                                let mut event = Event::none(source.key);
337                                event.readable = !state[READ].is_empty();
338                                event.writable = !state[WRITE].is_empty();
339                                event
340                            };
341
342                            // Register interest in this event.
343                            source.registration.modify(&self.reactor.poller, event)?;
344                        }
345                    }
346                }
347
348                Ok(())
349            }
350
351            // The syscall was interrupted.
352            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
353
354            // An actual error occureed.
355            Err(err) => Err(err),
356        };
357
358        // Wake up ready tasks.
359        tracing::trace!("{} ready wakers", wakers.len());
360        for waker in wakers {
361            // Don't let a panicking waker blow everything up.
362            panic::catch_unwind(|| waker.wake()).ok();
363        }
364
365        res
366    }
367}
368
369/// A single timer operation.
370enum TimerOp {
371    Insert(Instant, usize, Waker),
372    Remove(Instant, usize),
373}
374
375/// A registered source of I/O events.
376#[derive(Debug)]
377pub(crate) struct Source {
378    /// This source's registration into the reactor.
379    registration: Registration,
380
381    /// The key of this source obtained during registration.
382    key: usize,
383
384    /// Inner state with registered wakers.
385    state: Mutex<[Direction; 2]>,
386}
387
388/// A read or write direction.
389#[derive(Debug, Default)]
390struct Direction {
391    /// Last reactor tick that delivered an event.
392    tick: usize,
393
394    /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
395    ticks: Option<(usize, usize)>,
396
397    /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
398    waker: Option<Waker>,
399
400    /// Wakers of tasks waiting for the next event.
401    ///
402    /// Registered by `Async::readable()` and `Async::writable()`.
403    wakers: Slab<Option<Waker>>,
404}
405
406impl Direction {
407    /// Returns `true` if there are no wakers interested in this direction.
408    fn is_empty(&self) -> bool {
409        self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
410    }
411
412    /// Moves all wakers into a `Vec`.
413    fn drain_into(&mut self, dst: &mut Vec<Waker>) {
414        if let Some(w) = self.waker.take() {
415            dst.push(w);
416        }
417        for (_, opt) in self.wakers.iter_mut() {
418            if let Some(w) = opt.take() {
419                dst.push(w);
420            }
421        }
422    }
423}
424
425impl Source {
426    /// Polls the I/O source for readability.
427    pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
428        self.poll_ready(READ, cx)
429    }
430
431    /// Polls the I/O source for writability.
432    pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
433        self.poll_ready(WRITE, cx)
434    }
435
436    /// Registers a waker from `poll_readable()` or `poll_writable()`.
437    ///
438    /// If a different waker is already registered, it gets replaced and woken.
439    fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
440        let mut state = self.state.lock().unwrap();
441
442        // Check if the reactor has delivered an event.
443        if let Some((a, b)) = state[dir].ticks {
444            // If `state[dir].tick` has changed to a value other than the old reactor tick,
445            // that means a newer reactor tick has delivered an event.
446            if state[dir].tick != a && state[dir].tick != b {
447                state[dir].ticks = None;
448                return Poll::Ready(Ok(()));
449            }
450        }
451
452        let was_empty = state[dir].is_empty();
453
454        // Register the current task's waker.
455        if let Some(w) = state[dir].waker.take() {
456            if w.will_wake(cx.waker()) {
457                state[dir].waker = Some(w);
458                return Poll::Pending;
459            }
460            // Wake the previous waker because it's going to get replaced.
461            panic::catch_unwind(|| w.wake()).ok();
462        }
463        state[dir].waker = Some(cx.waker().clone());
464        state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
465
466        // Update interest in this I/O handle.
467        if was_empty {
468            // Create the event that we are interested in.
469            let event = {
470                let mut event = Event::none(self.key);
471                event.readable = !state[READ].is_empty();
472                event.writable = !state[WRITE].is_empty();
473                event
474            };
475
476            // Register interest in it.
477            self.registration.modify(&Reactor::get().poller, event)?;
478        }
479
480        Poll::Pending
481    }
482
483    /// Waits until the I/O source is readable.
484    pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
485        Readable(Self::ready(handle, READ))
486    }
487
488    /// Waits until the I/O source is readable.
489    pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
490        ReadableOwned(Self::ready(handle, READ))
491    }
492
493    /// Waits until the I/O source is writable.
494    pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
495        Writable(Self::ready(handle, WRITE))
496    }
497
498    /// Waits until the I/O source is writable.
499    pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
500        WritableOwned(Self::ready(handle, WRITE))
501    }
502
503    /// Waits until the I/O source is readable or writable.
504    fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
505        Ready {
506            handle,
507            dir,
508            ticks: None,
509            index: None,
510            _capture: PhantomData,
511        }
512    }
513}
514
515/// Future for [`Async::readable`](crate::Async::readable).
516#[must_use = "futures do nothing unless you `.await` or poll them"]
517pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
518
519impl<T> Future for Readable<'_, T> {
520    type Output = io::Result<()>;
521
522    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
523        ready!(Pin::new(&mut self.0).poll(cx))?;
524        tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
525        Poll::Ready(Ok(()))
526    }
527}
528
529impl<T> fmt::Debug for Readable<'_, T> {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.debug_struct("Readable").finish()
532    }
533}
534
535/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
536#[must_use = "futures do nothing unless you `.await` or poll them"]
537pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
538
539impl<T> Future for ReadableOwned<T> {
540    type Output = io::Result<()>;
541
542    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
543        ready!(Pin::new(&mut self.0).poll(cx))?;
544        tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
545        Poll::Ready(Ok(()))
546    }
547}
548
549impl<T> fmt::Debug for ReadableOwned<T> {
550    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
551        f.debug_struct("ReadableOwned").finish()
552    }
553}
554
555/// Future for [`Async::writable`](crate::Async::writable).
556#[must_use = "futures do nothing unless you `.await` or poll them"]
557pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
558
559impl<T> Future for Writable<'_, T> {
560    type Output = io::Result<()>;
561
562    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563        ready!(Pin::new(&mut self.0).poll(cx))?;
564        tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
565        Poll::Ready(Ok(()))
566    }
567}
568
569impl<T> fmt::Debug for Writable<'_, T> {
570    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571        f.debug_struct("Writable").finish()
572    }
573}
574
575/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
576#[must_use = "futures do nothing unless you `.await` or poll them"]
577pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
578
579impl<T> Future for WritableOwned<T> {
580    type Output = io::Result<()>;
581
582    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
583        ready!(Pin::new(&mut self.0).poll(cx))?;
584        tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
585        Poll::Ready(Ok(()))
586    }
587}
588
589impl<T> fmt::Debug for WritableOwned<T> {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        f.debug_struct("WritableOwned").finish()
592    }
593}
594
595struct Ready<H: Borrow<crate::Async<T>>, T> {
596    handle: H,
597    dir: usize,
598    ticks: Option<(usize, usize)>,
599    index: Option<usize>,
600    _capture: PhantomData<fn() -> T>,
601}
602
603impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
604
605impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
606    type Output = io::Result<()>;
607
608    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609        let Self {
610            ref handle,
611            dir,
612            ticks,
613            index,
614            ..
615        } = &mut *self;
616
617        let mut state = handle.borrow().source.state.lock().unwrap();
618
619        // Check if the reactor has delivered an event.
620        if let Some((a, b)) = *ticks {
621            // If `state[dir].tick` has changed to a value other than the old reactor tick,
622            // that means a newer reactor tick has delivered an event.
623            if state[*dir].tick != a && state[*dir].tick != b {
624                return Poll::Ready(Ok(()));
625            }
626        }
627
628        let was_empty = state[*dir].is_empty();
629
630        // Register the current task's waker.
631        let i = match *index {
632            Some(i) => i,
633            None => {
634                let i = state[*dir].wakers.insert(None);
635                *index = Some(i);
636                *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
637                i
638            }
639        };
640        state[*dir].wakers[i] = Some(cx.waker().clone());
641
642        // Update interest in this I/O handle.
643        if was_empty {
644            // Create the event that we are interested in.
645            let event = {
646                let mut event = Event::none(handle.borrow().source.key);
647                event.readable = !state[READ].is_empty();
648                event.writable = !state[WRITE].is_empty();
649                event
650            };
651
652            // Indicate that we are interested in this event.
653            handle
654                .borrow()
655                .source
656                .registration
657                .modify(&Reactor::get().poller, event)?;
658        }
659
660        Poll::Pending
661    }
662}
663
664impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
665    fn drop(&mut self) {
666        // Remove our waker when dropped.
667        if let Some(key) = self.index {
668            let mut state = self.handle.borrow().source.state.lock().unwrap();
669            let wakers = &mut state[self.dir].wakers;
670            if wakers.contains(key) {
671                wakers.remove(key);
672            }
673        }
674    }
675}