quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13use rand::{rngs::StdRng, Rng, SeedableRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18    cid_generator::ConnectionIdGenerator,
19    cid_queue::CidQueue,
20    coding::BufMutExt,
21    config::{ServerConfig, TransportConfig},
22    crypto::{self, KeyPair, Keys, PacketKey},
23    frame,
24    frame::{Close, Datagram, FrameStruct},
25    packet::{
26        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
27        PacketNumber, PartialDecode, SpaceId,
28    },
29    range_set::ArrayRangeSet,
30    shared::{
31        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32        EndpointEvent, EndpointEventInner,
33    },
34    token::ResetToken,
35    transport_parameters::TransportParameters,
36    Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode,
37    VarInt, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, TIMER_GRANULARITY,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66mod send_buffer;
67
68mod spaces;
69#[cfg(fuzzing)]
70pub use spaces::Retransmits;
71#[cfg(not(fuzzing))]
72use spaces::Retransmits;
73use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
74
75mod stats;
76pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
77
78mod streams;
79#[cfg(fuzzing)]
80pub use streams::StreamsState;
81#[cfg(not(fuzzing))]
82use streams::StreamsState;
83pub use streams::{
84    BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
85    SendStream, StreamEvent, Streams, WriteError, Written,
86};
87
88mod timer;
89use crate::congestion::Controller;
90use timer::{Timer, TimerTable};
91
92/// Protocol state and logic for a single QUIC connection
93///
94/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
95/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
96/// expects timeouts through various methods. A number of simple getter methods are exposed
97/// to allow callers to inspect some of the connection state.
98///
99/// `Connection` has roughly 4 types of methods:
100///
101/// - A. Simple getters, taking `&self`
102/// - B. Handlers for incoming events from the network or system, named `handle_*`.
103/// - C. State machine mutators, for incoming commands from the application. For convenience we
104///   refer to this as "performing I/O" below, however as per the design of this library none of the
105///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
106///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
107/// - D. Polling functions for outgoing events or actions for the caller to
108///   take, named `poll_*`.
109///
110/// The simplest way to use this API correctly is to call (B) and (C) whenever
111/// appropriate, then after each of those calls, as soon as feasible call all
112/// polling methods (D) and deal with their outputs appropriately, e.g. by
113/// passing it to the application or by making a system-level I/O call. You
114/// should call the polling functions in this order:
115///
116/// 1. [`poll_transmit`](Self::poll_transmit)
117/// 2. [`poll_timeout`](Self::poll_timeout)
118/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
119/// 4. [`poll`](Self::poll)
120///
121/// Currently the only actual dependency is from (2) to (1), however additional
122/// dependencies may be added in future, so the above order is recommended.
123///
124/// (A) may be called whenever desired.
125///
126/// Care should be made to ensure that the input events represent monotonically
127/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
128/// with events of the same [`Instant`] may be interleaved in any order with a
129/// call to [`handle_event`](Self::handle_event) at that same instant; however
130/// events or timeouts with different instants must not be interleaved.
131pub struct Connection {
132    endpoint_config: Arc<EndpointConfig>,
133    server_config: Option<Arc<ServerConfig>>,
134    config: Arc<TransportConfig>,
135    rng: StdRng,
136    crypto: Box<dyn crypto::Session>,
137    /// The CID we initially chose, for use during the handshake
138    handshake_cid: ConnectionId,
139    /// The CID the peer initially chose, for use during the handshake
140    rem_handshake_cid: ConnectionId,
141    /// The "real" local IP address which was was used to receive the initial packet.
142    /// This is only populated for the server case, and if known
143    local_ip: Option<IpAddr>,
144    path: PathData,
145    /// Whether MTU detection is supported in this environment
146    allow_mtud: bool,
147    prev_path: Option<(ConnectionId, PathData)>,
148    state: State,
149    side: Side,
150    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
151    zero_rtt_enabled: bool,
152    /// Set if 0-RTT is supported, then cleared when no longer needed.
153    zero_rtt_crypto: Option<ZeroRttCrypto>,
154    key_phase: bool,
155    /// How many packets are in the current key phase. Used only for `Data` space.
156    key_phase_size: u64,
157    /// Transport parameters set by the peer
158    peer_params: TransportParameters,
159    /// Source ConnectionId of the first packet received from the peer
160    orig_rem_cid: ConnectionId,
161    /// Destination ConnectionId sent by the client on the first Initial
162    initial_dst_cid: ConnectionId,
163    /// The value that the server included in the Source Connection ID field of a Retry packet, if
164    /// one was received
165    retry_src_cid: Option<ConnectionId>,
166    /// Total number of outgoing packets that have been deemed lost
167    lost_packets: u64,
168    events: VecDeque<Event>,
169    endpoint_events: VecDeque<EndpointEventInner>,
170    /// Whether the spin bit is in use for this connection
171    spin_enabled: bool,
172    /// Outgoing spin bit state
173    spin: bool,
174    /// Packet number spaces: initial, handshake, 1-RTT
175    spaces: [PacketSpace; 3],
176    /// Highest usable packet number space
177    highest_space: SpaceId,
178    /// 1-RTT keys used prior to a key update
179    prev_crypto: Option<PrevCrypto>,
180    /// 1-RTT keys to be used for the next key update
181    ///
182    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
183    /// spoofing key updates.
184    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
185    accepted_0rtt: bool,
186    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
187    permit_idle_reset: bool,
188    /// Negotiated idle timeout
189    idle_timeout: Option<VarInt>,
190    timers: TimerTable,
191    /// Number of packets received which could not be authenticated
192    authentication_failures: u64,
193    /// Why the connection was lost, if it has been
194    error: Option<ConnectionError>,
195    /// Sent in every outgoing Initial packet. Always empty for servers and after Initial keys are
196    /// discarded.
197    retry_token: Bytes,
198    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
199    packet_number_filter: PacketNumberFilter,
200
201    //
202    // Queued non-retransmittable 1-RTT data
203    //
204    /// Responses to PATH_CHALLENGE frames
205    path_responses: PathResponses,
206    close: bool,
207
208    //
209    // ACK frequency
210    //
211    ack_frequency: AckFrequencyState,
212
213    //
214    // Loss Detection
215    //
216    /// The number of times a PTO has been sent without receiving an ack.
217    pto_count: u32,
218
219    //
220    // Congestion Control
221    //
222    /// Whether the most recently received packet had an ECN codepoint set
223    receiving_ecn: bool,
224    /// Number of packets authenticated
225    total_authed_packets: u64,
226    /// Whether the last `poll_transmit` call yielded no data because there was
227    /// no outgoing application data.
228    app_limited: bool,
229
230    streams: StreamsState,
231    /// Surplus remote CIDs for future use on new paths
232    rem_cids: CidQueue,
233    // Attributes of CIDs generated by local peer
234    local_cid_state: CidState,
235    /// State of the unreliable datagram extension
236    datagrams: DatagramState,
237    /// Connection level statistics
238    stats: ConnectionStats,
239    /// QUIC version used for the connection.
240    version: u32,
241}
242
243impl Connection {
244    pub(crate) fn new(
245        endpoint_config: Arc<EndpointConfig>,
246        server_config: Option<Arc<ServerConfig>>,
247        config: Arc<TransportConfig>,
248        init_cid: ConnectionId,
249        loc_cid: ConnectionId,
250        rem_cid: ConnectionId,
251        pref_addr_cid: Option<ConnectionId>,
252        remote: SocketAddr,
253        local_ip: Option<IpAddr>,
254        crypto: Box<dyn crypto::Session>,
255        cid_gen: &dyn ConnectionIdGenerator,
256        now: Instant,
257        version: u32,
258        allow_mtud: bool,
259        rng_seed: [u8; 32],
260        path_validated: bool,
261    ) -> Self {
262        let side = if server_config.is_some() {
263            Side::Server
264        } else {
265            Side::Client
266        };
267        let initial_space = PacketSpace {
268            crypto: Some(crypto.initial_keys(&init_cid, side)),
269            ..PacketSpace::new(now)
270        };
271        let state = State::Handshake(state::Handshake {
272            rem_cid_set: side.is_server(),
273            expected_token: Bytes::new(),
274            client_hello: None,
275        });
276        let mut rng = StdRng::from_seed(rng_seed);
277        let mut this = Self {
278            endpoint_config,
279            server_config,
280            crypto,
281            handshake_cid: loc_cid,
282            rem_handshake_cid: rem_cid,
283            local_cid_state: CidState::new(
284                cid_gen.cid_len(),
285                cid_gen.cid_lifetime(),
286                now,
287                if pref_addr_cid.is_some() { 2 } else { 1 },
288            ),
289            path: PathData::new(remote, allow_mtud, None, now, path_validated, &config),
290            allow_mtud,
291            local_ip,
292            prev_path: None,
293            side,
294            state,
295            zero_rtt_enabled: false,
296            zero_rtt_crypto: None,
297            key_phase: false,
298            // A small initial key phase size ensures peers that don't handle key updates correctly
299            // fail sooner rather than later. It's okay for both peers to do this, as the first one
300            // to perform an update will reset the other's key phase size in `update_keys`, and a
301            // simultaneous key update by both is just like a regular key update with a really fast
302            // response. Inspired by quic-go's similar behavior of performing the first key update
303            // at the 100th short-header packet.
304            key_phase_size: rng.gen_range(10..1000),
305            peer_params: TransportParameters::default(),
306            orig_rem_cid: rem_cid,
307            initial_dst_cid: init_cid,
308            retry_src_cid: None,
309            lost_packets: 0,
310            events: VecDeque::new(),
311            endpoint_events: VecDeque::new(),
312            spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
313            spin: false,
314            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
315            highest_space: SpaceId::Initial,
316            prev_crypto: None,
317            next_crypto: None,
318            accepted_0rtt: false,
319            permit_idle_reset: true,
320            idle_timeout: config.max_idle_timeout,
321            timers: TimerTable::default(),
322            authentication_failures: 0,
323            error: None,
324            retry_token: Bytes::new(),
325            #[cfg(test)]
326            packet_number_filter: match config.deterministic_packet_numbers {
327                false => PacketNumberFilter::new(&mut rng),
328                true => PacketNumberFilter::disabled(),
329            },
330            #[cfg(not(test))]
331            packet_number_filter: PacketNumberFilter::new(&mut rng),
332
333            path_responses: PathResponses::default(),
334            close: false,
335
336            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
337                &TransportParameters::default(),
338            )),
339
340            pto_count: 0,
341
342            app_limited: false,
343            receiving_ecn: false,
344            total_authed_packets: 0,
345
346            streams: StreamsState::new(
347                side,
348                config.max_concurrent_uni_streams,
349                config.max_concurrent_bidi_streams,
350                config.send_window,
351                config.receive_window,
352                config.stream_receive_window,
353            ),
354            datagrams: DatagramState::default(),
355            config,
356            rem_cids: CidQueue::new(rem_cid),
357            rng,
358            stats: ConnectionStats::default(),
359            version,
360        };
361        if side.is_client() {
362            // Kick off the connection
363            this.write_crypto();
364            this.init_0rtt();
365        }
366        this
367    }
368
369    /// Returns the next time at which `handle_timeout` should be called
370    ///
371    /// The value returned may change after:
372    /// - the application performed some I/O on the connection
373    /// - a call was made to `handle_event`
374    /// - a call to `poll_transmit` returned `Some`
375    /// - a call was made to `handle_timeout`
376    #[must_use]
377    pub fn poll_timeout(&mut self) -> Option<Instant> {
378        self.timers.next_timeout()
379    }
380
381    /// Returns application-facing events
382    ///
383    /// Connections should be polled for events after:
384    /// - a call was made to `handle_event`
385    /// - a call was made to `handle_timeout`
386    #[must_use]
387    pub fn poll(&mut self) -> Option<Event> {
388        if let Some(x) = self.events.pop_front() {
389            return Some(x);
390        }
391
392        if let Some(event) = self.streams.poll() {
393            return Some(Event::Stream(event));
394        }
395
396        if let Some(err) = self.error.take() {
397            return Some(Event::ConnectionLost { reason: err });
398        }
399
400        None
401    }
402
403    /// Return endpoint-facing events
404    #[must_use]
405    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
406        self.endpoint_events.pop_front().map(EndpointEvent)
407    }
408
409    /// Provide control over streams
410    #[must_use]
411    pub fn streams(&mut self) -> Streams<'_> {
412        Streams {
413            state: &mut self.streams,
414            conn_state: &self.state,
415        }
416    }
417
418    /// Provide control over streams
419    #[must_use]
420    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
421        assert!(id.dir() == Dir::Bi || id.initiator() != self.side);
422        RecvStream {
423            id,
424            state: &mut self.streams,
425            pending: &mut self.spaces[SpaceId::Data].pending,
426        }
427    }
428
429    /// Provide control over streams
430    #[must_use]
431    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
432        assert!(id.dir() == Dir::Bi || id.initiator() == self.side);
433        SendStream {
434            id,
435            state: &mut self.streams,
436            pending: &mut self.spaces[SpaceId::Data].pending,
437            conn_state: &self.state,
438        }
439    }
440
441    /// Returns packets to transmit
442    ///
443    /// Connections should be polled for transmit after:
444    /// - the application performed some I/O on the connection
445    /// - a call was made to `handle_event`
446    /// - a call was made to `handle_timeout`
447    ///
448    /// `max_datagrams` specifies how many datagrams can be returned inside a
449    /// single Transmit using GSO. This must be at least 1.
450    #[must_use]
451    pub fn poll_transmit(
452        &mut self,
453        now: Instant,
454        max_datagrams: usize,
455        buf: &mut Vec<u8>,
456    ) -> Option<Transmit> {
457        assert!(max_datagrams != 0);
458        let max_datagrams = match self.config.enable_segmentation_offload {
459            false => 1,
460            true => max_datagrams.min(MAX_TRANSMIT_SEGMENTS),
461        };
462
463        let mut num_datagrams = 0;
464        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
465        // packets, this can be earlier than the start of the current QUIC packet.
466        let mut datagram_start = 0;
467        let mut segment_size = usize::from(self.path.current_mtu());
468
469        // Send PATH_CHALLENGE for a previous path if necessary
470        if let Some((prev_cid, ref mut prev_path)) = self.prev_path {
471            if prev_path.challenge_pending {
472                prev_path.challenge_pending = false;
473                let token = prev_path
474                    .challenge
475                    .expect("previous path challenge pending without token");
476                let destination = prev_path.remote;
477                debug_assert_eq!(
478                    self.highest_space,
479                    SpaceId::Data,
480                    "PATH_CHALLENGE queued without 1-RTT keys"
481                );
482                buf.reserve(MIN_INITIAL_SIZE as usize);
483
484                let buf_capacity = buf.capacity();
485
486                // Use the previous CID to avoid linking the new path with the previous path. We
487                // don't bother accounting for possible retirement of that prev_cid because this is
488                // sent once, immediately after migration, when the CID is known to be valid. Even
489                // if a post-migration packet caused the CID to be retired, it's fair to pretend
490                // this is sent first.
491                let mut builder = PacketBuilder::new(
492                    now,
493                    SpaceId::Data,
494                    prev_cid,
495                    buf,
496                    buf_capacity,
497                    0,
498                    false,
499                    self,
500                )?;
501                trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
502                buf.write(frame::Type::PATH_CHALLENGE);
503                buf.write(token);
504                self.stats.frame_tx.path_challenge += 1;
505
506                // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
507                // to at least the smallest allowed maximum datagram size of 1200 bytes,
508                // unless the anti-amplification limit for the path does not permit
509                // sending a datagram of this size
510                builder.pad_to(MIN_INITIAL_SIZE);
511
512                builder.finish(self, buf);
513                self.stats.udp_tx.on_sent(1, buf.len());
514                return Some(Transmit {
515                    destination,
516                    size: buf.len(),
517                    ecn: None,
518                    segment_size: None,
519                    src_ip: self.local_ip,
520                });
521            }
522        }
523
524        // If we need to send a probe, make sure we have something to send.
525        for space in SpaceId::iter() {
526            let request_immediate_ack =
527                space == SpaceId::Data && self.peer_supports_ack_frequency();
528            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
529        }
530
531        // Check whether we need to send a close message
532        let close = match self.state {
533            State::Drained => {
534                self.app_limited = true;
535                return None;
536            }
537            State::Draining | State::Closed(_) => {
538                // self.close is only reset once the associated packet had been
539                // encoded successfully
540                if !self.close {
541                    self.app_limited = true;
542                    return None;
543                }
544                true
545            }
546            _ => false,
547        };
548
549        // Check whether we need to send an ACK_FREQUENCY frame
550        if let Some(config) = &self.config.ack_frequency_config {
551            self.spaces[SpaceId::Data].pending.ack_frequency = self
552                .ack_frequency
553                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
554                && self.highest_space == SpaceId::Data
555                && self.peer_supports_ack_frequency();
556        }
557
558        // Reserving capacity can provide more capacity than we asked for. However, we are not
559        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
560        // separately.
561        let mut buf_capacity = 0;
562
563        let mut coalesce = true;
564        let mut builder_storage: Option<PacketBuilder> = None;
565        let mut sent_frames = None;
566        let mut pad_datagram = false;
567        let mut congestion_blocked = false;
568
569        // Iterate over all spaces and find data to send
570        let mut space_idx = 0;
571        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
572        // This loop will potentially spend multiple iterations in the same `SpaceId`,
573        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
574        while space_idx < spaces.len() {
575            let space_id = spaces[space_idx];
576            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
577            // be able to send an individual frame at least this large in the next 1-RTT
578            // packet. This could be generalized to support every space, but it's only needed to
579            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
580            // don't account for coalesced packets potentially occupying space because frames can
581            // always spill into the next datagram.
582            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
583            let frame_space_1rtt =
584                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
585
586            // Is there data or a close message to send in this space?
587            let can_send = self.space_can_send(space_id, frame_space_1rtt);
588            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
589                space_idx += 1;
590                continue;
591            }
592
593            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
594                || self.spaces[space_id].ping_pending
595                || self.spaces[space_id].immediate_ack_pending;
596            if space_id == SpaceId::Data {
597                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
598            }
599
600            // Can we append more data into the current buffer?
601            // It is not safe to assume that `buf.len()` is the end of the data,
602            // since the last packet might not have been finished.
603            let buf_end = if let Some(builder) = &builder_storage {
604                buf.len().max(builder.min_size) + builder.tag_len
605            } else {
606                buf.len()
607            };
608
609            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE {
610                // We need to send 1 more datagram and extend the buffer for that.
611
612                // Is 1 more datagram allowed?
613                if buf_capacity >= segment_size * max_datagrams {
614                    // No more datagrams allowed
615                    break;
616                }
617
618                // Anti-amplification is only based on `total_sent`, which gets
619                // updated at the end of this method. Therefore we pass the amount
620                // of bytes for datagrams that are already created, as well as 1 byte
621                // for starting another datagram. If there is any anti-amplification
622                // budget left, we always allow a full MTU to be sent
623                // (see https://github.com/quinn-rs/quinn/issues/1082)
624                if self
625                    .path
626                    .anti_amplification_blocked(segment_size as u64 * num_datagrams + 1)
627                {
628                    trace!("blocked by anti-amplification");
629                    break;
630                }
631
632                // Congestion control and pacing checks
633                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
634                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
635                    // Assume the current packet will get padded to fill the segment
636                    let untracked_bytes = if let Some(builder) = &builder_storage {
637                        buf_capacity - builder.partial_encode.start
638                    } else {
639                        0
640                    } as u64;
641                    debug_assert!(untracked_bytes <= segment_size as u64);
642
643                    let bytes_to_send = segment_size as u64 + untracked_bytes;
644                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
645                        space_idx += 1;
646                        congestion_blocked = true;
647                        // We continue instead of breaking here in order to avoid
648                        // blocking loss probes queued for higher spaces.
649                        trace!("blocked by congestion control");
650                        continue;
651                    }
652
653                    // Check whether the next datagram is blocked by pacing
654                    let smoothed_rtt = self.path.rtt.get();
655                    if let Some(delay) = self.path.pacing.delay(
656                        smoothed_rtt,
657                        bytes_to_send,
658                        self.path.current_mtu(),
659                        self.path.congestion.window(),
660                        now,
661                    ) {
662                        self.timers.set(Timer::Pacing, delay);
663                        congestion_blocked = true;
664                        // Loss probes should be subject to pacing, even though
665                        // they are not congestion controlled.
666                        trace!("blocked by pacing");
667                        break;
668                    }
669                }
670
671                // Finish current packet
672                if let Some(mut builder) = builder_storage.take() {
673                    if pad_datagram {
674                        builder.pad_to(MIN_INITIAL_SIZE);
675                    }
676
677                    if num_datagrams > 1 {
678                        // If too many padding bytes would be required to continue the GSO batch
679                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
680                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
681                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
682                        // and might benefit from further tuning, though there's no universally
683                        // optimal value.
684                        const MAX_PADDING: usize = 16;
685                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
686                            - datagram_start
687                            + builder.tag_len;
688                        if packet_len_unpadded + MAX_PADDING < segment_size {
689                            trace!(
690                                "GSO truncated by demand for {} padding bytes",
691                                segment_size - packet_len_unpadded
692                            );
693                            builder_storage = Some(builder);
694                            break;
695                        }
696
697                        // Pad the current packet to GSO segment size so it can be included in the
698                        // GSO batch.
699                        builder.pad_to(segment_size as u16);
700                    }
701
702                    builder.finish_and_track(now, self, sent_frames.take(), buf);
703
704                    if num_datagrams == 1 {
705                        // Set the segment size for this GSO batch to the size of the first UDP
706                        // datagram in the batch. Larger data that cannot be fragmented
707                        // (e.g. application datagrams) will be included in a future batch. When
708                        // sending large enough volumes of data for GSO to be useful, we expect
709                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
710                        // frames or uniformly sized datagrams.
711                        segment_size = buf.len();
712                        // Clip the unused capacity out of the buffer so future packets don't
713                        // overrun
714                        buf_capacity = buf.len();
715
716                        // Check whether the data we planned to send will fit in the reduced segment
717                        // size. If not, bail out and leave it for the next GSO batch so we don't
718                        // end up trying to send an empty packet. We can't easily compute the right
719                        // segment size before the original call to `space_can_send`, because at
720                        // that time we haven't determined whether we're going to coalesce with the
721                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
722                        if space_id == SpaceId::Data {
723                            let frame_space_1rtt =
724                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
725                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
726                                break;
727                            }
728                        }
729                    }
730                }
731
732                // Allocate space for another datagram
733                buf_capacity += segment_size;
734                if buf.capacity() < buf_capacity {
735                    // We reserve the maximum space for sending `max_datagrams` upfront
736                    // to avoid any reallocations if more datagrams have to be appended later on.
737                    // Benchmarks have shown shown a 5-10% throughput improvement
738                    // compared to continuously resizing the datagram buffer.
739                    // While this will lead to over-allocation for small transmits
740                    // (e.g. purely containing ACKs), modern memory allocators
741                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
742                    // and therefore this is still rather efficient.
743                    buf.reserve(max_datagrams * segment_size);
744                }
745                num_datagrams += 1;
746                coalesce = true;
747                pad_datagram = false;
748                datagram_start = buf.len();
749
750                debug_assert_eq!(
751                    datagram_start % segment_size,
752                    0,
753                    "datagrams in a GSO batch must be aligned to the segment size"
754                );
755            } else {
756                // We can append/coalesce the next packet into the current
757                // datagram.
758                // Finish current packet without adding extra padding
759                if let Some(builder) = builder_storage.take() {
760                    builder.finish_and_track(now, self, sent_frames.take(), buf);
761                }
762            }
763
764            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
765
766            //
767            // From here on, we've determined that a packet will definitely be sent.
768            //
769
770            if self.spaces[SpaceId::Initial].crypto.is_some()
771                && space_id == SpaceId::Handshake
772                && self.side.is_client()
773            {
774                // A client stops both sending and processing Initial packets when it
775                // sends its first Handshake packet.
776                self.discard_space(now, SpaceId::Initial);
777            }
778            if let Some(ref mut prev) = self.prev_crypto {
779                prev.update_unacked = false;
780            }
781
782            debug_assert!(
783                builder_storage.is_none() && sent_frames.is_none(),
784                "Previous packet must have been finished"
785            );
786
787            // This should really be `builder.insert()`, but `Option::insert`
788            // is not stable yet. Since we `debug_assert!(builder.is_none())` it
789            // doesn't make any functional difference.
790            let builder = builder_storage.get_or_insert(PacketBuilder::new(
791                now,
792                space_id,
793                self.rem_cids.active(),
794                buf,
795                buf_capacity,
796                datagram_start,
797                ack_eliciting,
798                self,
799            )?);
800            coalesce = coalesce && !builder.short_header;
801
802            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
803            pad_datagram |=
804                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
805
806            if close {
807                trace!("sending CONNECTION_CLOSE");
808                // Encode ACKs before the ConnectionClose message, to give the receiver
809                // a better approximate on what data has been processed. This is
810                // especially important with ack delay, since the peer might not
811                // have gotten any other ACK for the data earlier on.
812                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
813                    Self::populate_acks(
814                        now,
815                        self.receiving_ecn,
816                        &mut SentFrames::default(),
817                        &mut self.spaces[space_id],
818                        buf,
819                        &mut self.stats,
820                    );
821                }
822
823                // Since there only 64 ACK frames there will always be enough space
824                // to encode the ConnectionClose frame too. However we still have the
825                // check here to prevent crashes if something changes.
826                debug_assert!(
827                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
828                    "ACKs should leave space for ConnectionClose"
829                );
830                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
831                    let max_frame_size = builder.max_size - buf.len();
832                    match self.state {
833                        State::Closed(state::Closed { ref reason }) => {
834                            if space_id == SpaceId::Data || reason.is_transport_layer() {
835                                reason.encode(buf, max_frame_size)
836                            } else {
837                                frame::ConnectionClose {
838                                    error_code: TransportErrorCode::APPLICATION_ERROR,
839                                    frame_type: None,
840                                    reason: Bytes::new(),
841                                }
842                                .encode(buf, max_frame_size)
843                            }
844                        }
845                        State::Draining => frame::ConnectionClose {
846                            error_code: TransportErrorCode::NO_ERROR,
847                            frame_type: None,
848                            reason: Bytes::new(),
849                        }
850                        .encode(buf, max_frame_size),
851                        _ => unreachable!(
852                            "tried to make a close packet when the connection wasn't closed"
853                        ),
854                    }
855                }
856                if space_id == self.highest_space {
857                    // Don't send another close packet
858                    self.close = false;
859                    // `CONNECTION_CLOSE` is the final packet
860                    break;
861                } else {
862                    // Send a close frame in every possible space for robustness, per RFC9000
863                    // "Immediate Close during the Handshake". Don't bother trying to send anything
864                    // else.
865                    space_idx += 1;
866                    continue;
867                }
868            }
869
870            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
871            // validation can occur while the link is saturated.
872            if space_id == SpaceId::Data && num_datagrams == 1 {
873                if let Some((token, remote)) = self.path_responses.pop_off_path(&self.path.remote) {
874                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
875                    // above.
876                    let mut builder = builder_storage.take().unwrap();
877                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
878                    buf.write(frame::Type::PATH_RESPONSE);
879                    buf.write(token);
880                    self.stats.frame_tx.path_response += 1;
881                    builder.pad_to(MIN_INITIAL_SIZE);
882                    builder.finish_and_track(
883                        now,
884                        self,
885                        Some(SentFrames {
886                            non_retransmits: true,
887                            ..SentFrames::default()
888                        }),
889                        buf,
890                    );
891                    self.stats.udp_tx.on_sent(1, buf.len());
892                    return Some(Transmit {
893                        destination: remote,
894                        size: buf.len(),
895                        ecn: None,
896                        segment_size: None,
897                        src_ip: self.local_ip,
898                    });
899                }
900            }
901
902            let sent =
903                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
904
905            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
906            // any other reason, there is a bug which leads to one component announcing write
907            // readiness while not writing any data. This degrades performance. The condition is
908            // only checked if the full MTU is available and when potentially large fixed-size
909            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
910            // writing ACKs.
911            debug_assert!(
912                !(sent.is_ack_only(&self.streams)
913                    && !can_send.acks
914                    && can_send.other
915                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
916                    && self.datagrams.outgoing.is_empty()),
917                "SendableFrames was {can_send:?}, but only ACKs have been written"
918            );
919            pad_datagram |= sent.requires_padding;
920
921            if sent.largest_acked.is_some() {
922                self.spaces[space_id].pending_acks.acks_sent();
923                self.timers.stop(Timer::MaxAckDelay);
924            }
925
926            // Keep information about the packet around until it gets finalized
927            sent_frames = Some(sent);
928
929            // Don't increment space_idx.
930            // We stay in the current space and check if there is more data to send.
931        }
932
933        // Finish the last packet
934        if let Some(mut builder) = builder_storage {
935            if pad_datagram {
936                builder.pad_to(MIN_INITIAL_SIZE);
937            }
938            let last_packet_number = builder.exact_number;
939            builder.finish_and_track(now, self, sent_frames, buf);
940            self.path
941                .congestion
942                .on_sent(now, buf.len() as u64, last_packet_number);
943        }
944
945        self.app_limited = buf.is_empty() && !congestion_blocked;
946
947        // Send MTU probe if necessary
948        if buf.is_empty() && self.state.is_established() {
949            let space_id = SpaceId::Data;
950            let probe_size = match self
951                .path
952                .mtud
953                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))
954            {
955                Some(next_probe_size) => next_probe_size,
956                None => return None,
957            };
958
959            let buf_capacity = probe_size as usize;
960            buf.reserve(buf_capacity);
961
962            let mut builder = PacketBuilder::new(
963                now,
964                space_id,
965                self.rem_cids.active(),
966                buf,
967                buf_capacity,
968                0,
969                true,
970                self,
971            )?;
972
973            // We implement MTU probes as ping packets padded up to the probe size
974            buf.write(frame::Type::PING);
975            self.stats.frame_tx.ping += 1;
976
977            // If supported by the peer, we want no delays to the probe's ACK
978            if self.peer_supports_ack_frequency() {
979                buf.write(frame::Type::IMMEDIATE_ACK);
980                self.stats.frame_tx.immediate_ack += 1;
981            }
982
983            builder.pad_to(probe_size);
984            let sent_frames = SentFrames {
985                non_retransmits: true,
986                ..Default::default()
987            };
988            builder.finish_and_track(now, self, Some(sent_frames), buf);
989
990            self.stats.path.sent_plpmtud_probes += 1;
991            num_datagrams = 1;
992
993            trace!(?probe_size, "writing MTUD probe");
994        }
995
996        if buf.is_empty() {
997            return None;
998        }
999
1000        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1001        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1002
1003        self.stats.udp_tx.on_sent(num_datagrams, buf.len());
1004
1005        Some(Transmit {
1006            destination: self.path.remote,
1007            size: buf.len(),
1008            ecn: if self.path.sending_ecn {
1009                Some(EcnCodepoint::Ect0)
1010            } else {
1011                None
1012            },
1013            segment_size: match num_datagrams {
1014                1 => None,
1015                _ => Some(segment_size),
1016            },
1017            src_ip: self.local_ip,
1018        })
1019    }
1020
1021    /// Indicate what types of frames are ready to send for the given space
1022    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1023        if self.spaces[space_id].crypto.is_none()
1024            && (space_id != SpaceId::Data
1025                || self.zero_rtt_crypto.is_none()
1026                || self.side.is_server())
1027        {
1028            // No keys available for this space
1029            return SendableFrames::empty();
1030        }
1031        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1032        if space_id == SpaceId::Data {
1033            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1034        }
1035        can_send
1036    }
1037
1038    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1039    ///
1040    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1041    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1042    /// extracted through the relevant methods.
1043    pub fn handle_event(&mut self, event: ConnectionEvent) {
1044        use self::ConnectionEventInner::*;
1045        match event.0 {
1046            Datagram(DatagramConnectionEvent {
1047                now,
1048                remote,
1049                ecn,
1050                first_decode,
1051                remaining,
1052            }) => {
1053                // If this packet could initiate a migration and we're a client or a server that
1054                // forbids migration, drop the datagram. This could be relaxed to heuristically
1055                // permit NAT-rebinding-like migration.
1056                if remote != self.path.remote
1057                    && self.server_config.as_ref().map_or(true, |x| !x.migration)
1058                {
1059                    trace!("discarding packet from unrecognized peer {}", remote);
1060                    return;
1061                }
1062
1063                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1064
1065                self.stats.udp_rx.datagrams += 1;
1066                self.stats.udp_rx.bytes += first_decode.len() as u64;
1067                let data_len = first_decode.len();
1068
1069                self.handle_decode(now, remote, ecn, first_decode);
1070                // The current `path` might have changed inside `handle_decode`,
1071                // since the packet could have triggered a migration. Make sure
1072                // the data received is accounted for the most recent path by accessing
1073                // `path` after `handle_decode`.
1074                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1075
1076                if let Some(data) = remaining {
1077                    self.stats.udp_rx.bytes += data.len() as u64;
1078                    self.handle_coalesced(now, remote, ecn, data);
1079                }
1080
1081                if was_anti_amplification_blocked {
1082                    // A prior attempt to set the loss detection timer may have failed due to
1083                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1084                    // the server's first flight is lost.
1085                    self.set_loss_detection_timer(now);
1086                }
1087            }
1088            NewIdentifiers(ids, now) => {
1089                self.local_cid_state.new_cids(&ids, now);
1090                ids.into_iter().rev().for_each(|frame| {
1091                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1092                });
1093                // Update Timer::PushNewCid
1094                if self
1095                    .timers
1096                    .get(Timer::PushNewCid)
1097                    .map_or(true, |x| x <= now)
1098                {
1099                    self.reset_cid_retirement();
1100                }
1101            }
1102        }
1103    }
1104
1105    /// Process timer expirations
1106    ///
1107    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1108    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1109    /// methods.
1110    ///
1111    /// It is most efficient to call this immediately after the system clock reaches the latest
1112    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1113    /// no-op and therefore are safe.
1114    pub fn handle_timeout(&mut self, now: Instant) {
1115        for &timer in &Timer::VALUES {
1116            if !self.timers.is_expired(timer, now) {
1117                continue;
1118            }
1119            self.timers.stop(timer);
1120            trace!(timer = ?timer, "timeout");
1121            match timer {
1122                Timer::Close => {
1123                    self.state = State::Drained;
1124                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1125                }
1126                Timer::Idle => {
1127                    self.kill(ConnectionError::TimedOut);
1128                }
1129                Timer::KeepAlive => {
1130                    trace!("sending keep-alive");
1131                    self.ping();
1132                }
1133                Timer::LossDetection => {
1134                    self.on_loss_detection_timeout(now);
1135                }
1136                Timer::KeyDiscard => {
1137                    self.zero_rtt_crypto = None;
1138                    self.prev_crypto = None;
1139                }
1140                Timer::PathValidation => {
1141                    debug!("path validation failed");
1142                    if let Some((_, prev)) = self.prev_path.take() {
1143                        self.path = prev;
1144                    }
1145                    self.path.challenge = None;
1146                    self.path.challenge_pending = false;
1147                }
1148                Timer::Pacing => trace!("pacing timer expired"),
1149                Timer::PushNewCid => {
1150                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1151                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1152                    if !self.state.is_closed() {
1153                        trace!(
1154                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1155                            self.local_cid_state.retire_prior_to()
1156                        );
1157                        self.endpoint_events
1158                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1159                    }
1160                }
1161                Timer::MaxAckDelay => {
1162                    trace!("max ack delay reached");
1163                    // This timer is only armed in the Data space
1164                    self.spaces[SpaceId::Data]
1165                        .pending_acks
1166                        .on_max_ack_delay_timeout()
1167                }
1168            }
1169        }
1170    }
1171
1172    /// Close a connection immediately
1173    ///
1174    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1175    /// call this only when all important communications have been completed, e.g. by calling
1176    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1177    /// [`StreamEvent::Finished`] event.
1178    ///
1179    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1180    /// delivered. There may still be data from the peer that has not been received.
1181    ///
1182    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1183    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1184        self.close_inner(
1185            now,
1186            Close::Application(frame::ApplicationClose { error_code, reason }),
1187        )
1188    }
1189
1190    fn close_inner(&mut self, now: Instant, reason: Close) {
1191        let was_closed = self.state.is_closed();
1192        if !was_closed {
1193            self.close_common();
1194            self.set_close_timer(now);
1195            self.close = true;
1196            self.state = State::Closed(state::Closed { reason });
1197        }
1198    }
1199
1200    /// Control datagrams
1201    pub fn datagrams(&mut self) -> Datagrams<'_> {
1202        Datagrams { conn: self }
1203    }
1204
1205    /// Returns connection statistics
1206    pub fn stats(&self) -> ConnectionStats {
1207        let mut stats = self.stats;
1208        stats.path.rtt = self.path.rtt.get();
1209        stats.path.cwnd = self.path.congestion.window();
1210        stats.path.current_mtu = self.path.mtud.current_mtu();
1211
1212        stats
1213    }
1214
1215    /// Ping the remote endpoint
1216    ///
1217    /// Causes an ACK-eliciting packet to be transmitted.
1218    pub fn ping(&mut self) {
1219        self.spaces[self.highest_space].ping_pending = true;
1220    }
1221
1222    #[doc(hidden)]
1223    pub fn initiate_key_update(&mut self) {
1224        self.update_keys(None, false);
1225    }
1226
1227    /// Get a session reference
1228    pub fn crypto_session(&self) -> &dyn crypto::Session {
1229        &*self.crypto
1230    }
1231
1232    /// Whether the connection is in the process of being established
1233    ///
1234    /// If this returns `false`, the connection may be either established or closed, signaled by the
1235    /// emission of a `Connected` or `ConnectionLost` message respectively.
1236    pub fn is_handshaking(&self) -> bool {
1237        self.state.is_handshake()
1238    }
1239
1240    /// Whether the connection is closed
1241    ///
1242    /// Closed connections cannot transport any further data. A connection becomes closed when
1243    /// either peer application intentionally closes it, or when either transport layer detects an
1244    /// error such as a time-out or certificate validation failure.
1245    ///
1246    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1247    pub fn is_closed(&self) -> bool {
1248        self.state.is_closed()
1249    }
1250
1251    /// Whether there is no longer any need to keep the connection around
1252    ///
1253    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1254    /// packets from the peer. All drained connections have been closed.
1255    pub fn is_drained(&self) -> bool {
1256        self.state.is_drained()
1257    }
1258
1259    /// For clients, if the peer accepted the 0-RTT data packets
1260    ///
1261    /// The value is meaningless until after the handshake completes.
1262    pub fn accepted_0rtt(&self) -> bool {
1263        self.accepted_0rtt
1264    }
1265
1266    /// Whether 0-RTT is/was possible during the handshake
1267    pub fn has_0rtt(&self) -> bool {
1268        self.zero_rtt_enabled
1269    }
1270
1271    /// Whether there are any pending retransmits
1272    pub fn has_pending_retransmits(&self) -> bool {
1273        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1274    }
1275
1276    /// Look up whether we're the client or server of this Connection
1277    pub fn side(&self) -> Side {
1278        self.side
1279    }
1280
1281    /// The latest socket address for this connection's peer
1282    pub fn remote_address(&self) -> SocketAddr {
1283        self.path.remote
1284    }
1285
1286    /// The local IP address which was used when the peer established
1287    /// the connection
1288    ///
1289    /// This can be different from the address the endpoint is bound to, in case
1290    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1291    ///
1292    /// This will return `None` for clients, or when no `local_ip` was passed to
1293    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1294    /// connection.
1295    pub fn local_ip(&self) -> Option<IpAddr> {
1296        self.local_ip
1297    }
1298
1299    /// Current best estimate of this connection's latency (round-trip-time)
1300    pub fn rtt(&self) -> Duration {
1301        self.path.rtt.get()
1302    }
1303
1304    /// Current state of this connection's congestion controller, for debugging purposes
1305    pub fn congestion_state(&self) -> &dyn Controller {
1306        self.path.congestion.as_ref()
1307    }
1308
1309    /// Modify the number of remotely initiated streams that may be concurrently open
1310    ///
1311    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1312    /// `count`s increase both minimum and worst-case memory consumption.
1313    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1314        self.streams.set_max_concurrent(dir, count);
1315        // If the limit was reduced, then a flow control update previously deemed insignificant may
1316        // now be significant.
1317        let pending = &mut self.spaces[SpaceId::Data].pending;
1318        self.streams.queue_max_stream_id(pending);
1319    }
1320
1321    /// Current number of remotely initiated streams that may be concurrently open
1322    ///
1323    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1324    /// it will not change immediately, even if fewer streams are open. Instead, it will
1325    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1326    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1327        self.streams.max_concurrent(dir)
1328    }
1329
1330    /// See [`TransportConfig::receive_window()`]
1331    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1332        if self.streams.set_receive_window(receive_window) {
1333            self.spaces[SpaceId::Data].pending.max_data = true;
1334        }
1335    }
1336
1337    fn on_ack_received(
1338        &mut self,
1339        now: Instant,
1340        space: SpaceId,
1341        ack: frame::Ack,
1342    ) -> Result<(), TransportError> {
1343        if ack.largest >= self.spaces[space].next_packet_number {
1344            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1345        }
1346        let new_largest = {
1347            let space = &mut self.spaces[space];
1348            if space
1349                .largest_acked_packet
1350                .map_or(true, |pn| ack.largest > pn)
1351            {
1352                space.largest_acked_packet = Some(ack.largest);
1353                if let Some(info) = space.sent_packets.get(&ack.largest) {
1354                    // This should always succeed, but a misbehaving peer might ACK a packet we
1355                    // haven't sent. At worst, that will result in us spuriously reducing the
1356                    // congestion window.
1357                    space.largest_acked_packet_sent = info.time_sent;
1358                }
1359                true
1360            } else {
1361                false
1362            }
1363        };
1364
1365        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1366        let mut newly_acked = ArrayRangeSet::new();
1367        for range in ack.iter() {
1368            self.packet_number_filter.check_ack(space, range.clone())?;
1369            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1370                newly_acked.insert_one(pn);
1371            }
1372        }
1373
1374        if newly_acked.is_empty() {
1375            return Ok(());
1376        }
1377
1378        let mut ack_eliciting_acked = false;
1379        for packet in newly_acked.elts() {
1380            if let Some(info) = self.spaces[space].take(packet) {
1381                if let Some(acked) = info.largest_acked {
1382                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1383                    // been received. This can cause the peer to spuriously retransmit if some of
1384                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1385                    // discussion at
1386                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1387                    self.spaces[space].pending_acks.subtract_below(acked);
1388                }
1389                ack_eliciting_acked |= info.ack_eliciting;
1390
1391                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1392                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1393                if mtu_updated {
1394                    self.path
1395                        .congestion
1396                        .on_mtu_update(self.path.mtud.current_mtu());
1397                }
1398
1399                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1400                self.ack_frequency.on_acked(packet);
1401
1402                self.on_packet_acked(now, packet, info);
1403            }
1404        }
1405
1406        self.path.congestion.on_end_acks(
1407            now,
1408            self.path.in_flight.bytes,
1409            self.app_limited,
1410            self.spaces[space].largest_acked_packet,
1411        );
1412
1413        if new_largest && ack_eliciting_acked {
1414            let ack_delay = if space != SpaceId::Data {
1415                Duration::from_micros(0)
1416            } else {
1417                cmp::min(
1418                    self.ack_frequency.peer_max_ack_delay,
1419                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1420                )
1421            };
1422            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1423            self.path.rtt.update(ack_delay, rtt);
1424            if self.path.first_packet_after_rtt_sample.is_none() {
1425                self.path.first_packet_after_rtt_sample =
1426                    Some((space, self.spaces[space].next_packet_number));
1427            }
1428        }
1429
1430        // Must be called before crypto/pto_count are clobbered
1431        self.detect_lost_packets(now, space, true);
1432
1433        if self.peer_completed_address_validation() {
1434            self.pto_count = 0;
1435        }
1436
1437        // Explicit congestion notification
1438        if self.path.sending_ecn {
1439            if let Some(ecn) = ack.ecn {
1440                // We only examine ECN counters from ACKs that we are certain we received in transmit
1441                // order, allowing us to compute an increase in ECN counts to compare against the number
1442                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1443                // reordering.
1444                if new_largest {
1445                    let sent = self.spaces[space].largest_acked_packet_sent;
1446                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1447                }
1448            } else {
1449                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1450                debug!("ECN not acknowledged by peer");
1451                self.path.sending_ecn = false;
1452            }
1453        }
1454
1455        self.set_loss_detection_timer(now);
1456        Ok(())
1457    }
1458
1459    /// Process a new ECN block from an in-order ACK
1460    fn process_ecn(
1461        &mut self,
1462        now: Instant,
1463        space: SpaceId,
1464        newly_acked: u64,
1465        ecn: frame::EcnCounts,
1466        largest_sent_time: Instant,
1467    ) {
1468        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1469            Err(e) => {
1470                debug!("halting ECN due to verification failure: {}", e);
1471                self.path.sending_ecn = false;
1472                // Wipe out the existing value because it might be garbage and could interfere with
1473                // future attempts to use ECN on new paths.
1474                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1475            }
1476            Ok(false) => {}
1477            Ok(true) => {
1478                self.stats.path.congestion_events += 1;
1479                self.path
1480                    .congestion
1481                    .on_congestion_event(now, largest_sent_time, false, 0);
1482            }
1483        }
1484    }
1485
1486    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1487    // high-latency handshakes
1488    fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1489        self.remove_in_flight(pn, &info);
1490        if info.ack_eliciting && self.path.challenge.is_none() {
1491            // Only pass ACKs to the congestion controller if we are not validating the current
1492            // path, so as to ignore any ACKs from older paths still coming in.
1493            self.path.congestion.on_ack(
1494                now,
1495                info.time_sent,
1496                info.size.into(),
1497                self.app_limited,
1498                &self.path.rtt,
1499            );
1500        }
1501
1502        // Update state for confirmed delivery of frames
1503        if let Some(retransmits) = info.retransmits.get() {
1504            for (id, _) in retransmits.reset_stream.iter() {
1505                self.streams.reset_acked(*id);
1506            }
1507        }
1508
1509        for frame in info.stream_frames {
1510            self.streams.received_ack_of(frame);
1511        }
1512    }
1513
1514    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1515        let start = if self.zero_rtt_crypto.is_some() {
1516            now
1517        } else {
1518            self.prev_crypto
1519                .as_ref()
1520                .expect("no previous keys")
1521                .end_packet
1522                .as_ref()
1523                .expect("update not acknowledged yet")
1524                .1
1525        };
1526        self.timers
1527            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1528    }
1529
1530    fn on_loss_detection_timeout(&mut self, now: Instant) {
1531        if let Some((_, pn_space)) = self.loss_time_and_space() {
1532            // Time threshold loss Detection
1533            self.detect_lost_packets(now, pn_space, false);
1534            self.set_loss_detection_timer(now);
1535            return;
1536        }
1537
1538        let (_, space) = match self.pto_time_and_space(now) {
1539            Some(x) => x,
1540            None => {
1541                error!("PTO expired while unset");
1542                return;
1543            }
1544        };
1545        trace!(
1546            in_flight = self.path.in_flight.bytes,
1547            count = self.pto_count,
1548            ?space,
1549            "PTO fired"
1550        );
1551
1552        let count = match self.path.in_flight.ack_eliciting {
1553            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1554            // deadlock preventions
1555            0 => {
1556                debug_assert!(!self.peer_completed_address_validation());
1557                1
1558            }
1559            // Conventional loss probe
1560            _ => 2,
1561        };
1562        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1563        self.pto_count = self.pto_count.saturating_add(1);
1564        self.set_loss_detection_timer(now);
1565    }
1566
1567    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1568        let mut lost_packets = Vec::<u64>::new();
1569        let mut lost_mtu_probe = None;
1570        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1571        let rtt = self.path.rtt.conservative();
1572        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1573
1574        // Packets sent before this time are deemed lost.
1575        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1576        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1577        let packet_threshold = self.config.packet_threshold as u64;
1578        let mut size_of_lost_packets = 0u64;
1579
1580        // InPersistentCongestion: Determine if all packets in the time period before the newest
1581        // lost packet, including the edges, are marked lost. PTO computation must always
1582        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1583        let congestion_period =
1584            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1585        let mut persistent_congestion_start: Option<Instant> = None;
1586        let mut prev_packet = None;
1587        let mut in_persistent_congestion = false;
1588
1589        let space = &mut self.spaces[pn_space];
1590        space.loss_time = None;
1591
1592        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1593            if prev_packet != Some(packet.wrapping_sub(1)) {
1594                // An intervening packet was acknowledged
1595                persistent_congestion_start = None;
1596            }
1597
1598            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1599            {
1600                if Some(packet) == in_flight_mtu_probe {
1601                    // Lost MTU probes are not included in `lost_packets`, because they should not
1602                    // trigger a congestion control response
1603                    lost_mtu_probe = in_flight_mtu_probe;
1604                } else {
1605                    lost_packets.push(packet);
1606                    size_of_lost_packets += info.size as u64;
1607                    if info.ack_eliciting && due_to_ack {
1608                        match persistent_congestion_start {
1609                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1610                            // ACKed packets in between
1611                            Some(start) if info.time_sent - start > congestion_period => {
1612                                in_persistent_congestion = true;
1613                            }
1614                            // Persistent congestion must start after the first RTT sample
1615                            None if self
1616                                .path
1617                                .first_packet_after_rtt_sample
1618                                .map_or(false, |x| x < (pn_space, packet)) =>
1619                            {
1620                                persistent_congestion_start = Some(info.time_sent);
1621                            }
1622                            _ => {}
1623                        }
1624                    }
1625                }
1626            } else {
1627                let next_loss_time = info.time_sent + loss_delay;
1628                space.loss_time = Some(
1629                    space
1630                        .loss_time
1631                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1632                );
1633                persistent_congestion_start = None;
1634            }
1635
1636            prev_packet = Some(packet);
1637        }
1638
1639        // OnPacketsLost
1640        if let Some(largest_lost) = lost_packets.last().cloned() {
1641            let old_bytes_in_flight = self.path.in_flight.bytes;
1642            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1643            self.lost_packets += lost_packets.len() as u64;
1644            self.stats.path.lost_packets += lost_packets.len() as u64;
1645            self.stats.path.lost_bytes += size_of_lost_packets;
1646            trace!(
1647                "packets lost: {:?}, bytes lost: {}",
1648                lost_packets,
1649                size_of_lost_packets
1650            );
1651
1652            for &packet in &lost_packets {
1653                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
1654                self.remove_in_flight(packet, &info);
1655                for frame in info.stream_frames {
1656                    self.streams.retransmit(frame);
1657                }
1658                self.spaces[pn_space].pending |= info.retransmits;
1659                self.path.mtud.on_non_probe_lost(packet, info.size);
1660            }
1661
1662            if self.path.mtud.black_hole_detected(now) {
1663                self.stats.path.black_holes_detected += 1;
1664                self.path
1665                    .congestion
1666                    .on_mtu_update(self.path.mtud.current_mtu());
1667                if let Some(max_datagram_size) = self.datagrams().max_size() {
1668                    self.datagrams.drop_oversized(max_datagram_size);
1669                }
1670            }
1671
1672            // Don't apply congestion penalty for lost ack-only packets
1673            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1674
1675            if lost_ack_eliciting {
1676                self.stats.path.congestion_events += 1;
1677                self.path.congestion.on_congestion_event(
1678                    now,
1679                    largest_lost_sent,
1680                    in_persistent_congestion,
1681                    size_of_lost_packets,
1682                );
1683            }
1684        }
1685
1686        // Handle a lost MTU probe
1687        if let Some(packet) = lost_mtu_probe {
1688            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
1689            self.remove_in_flight(packet, &info);
1690            self.path.mtud.on_probe_lost();
1691            self.stats.path.lost_plpmtud_probes += 1;
1692        }
1693    }
1694
1695    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1696        SpaceId::iter()
1697            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1698            .min_by_key(|&(time, _)| time)
1699    }
1700
1701    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1702        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1703        let mut duration = self.path.rtt.pto_base() * backoff;
1704
1705        if self.path.in_flight.ack_eliciting == 0 {
1706            debug_assert!(!self.peer_completed_address_validation());
1707            let space = match self.highest_space {
1708                SpaceId::Handshake => SpaceId::Handshake,
1709                _ => SpaceId::Initial,
1710            };
1711            return Some((now + duration, space));
1712        }
1713
1714        let mut result = None;
1715        for space in SpaceId::iter() {
1716            if self.spaces[space].in_flight == 0 {
1717                continue;
1718            }
1719            if space == SpaceId::Data {
1720                // Skip ApplicationData until handshake completes.
1721                if self.is_handshaking() {
1722                    return result;
1723                }
1724                // Include max_ack_delay and backoff for ApplicationData.
1725                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1726            }
1727            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1728                Some(time) => time,
1729                None => continue,
1730            };
1731            let pto = last_ack_eliciting + duration;
1732            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1733                result = Some((pto, space));
1734            }
1735        }
1736        result
1737    }
1738
1739    #[allow(clippy::suspicious_operation_groupings)]
1740    fn peer_completed_address_validation(&self) -> bool {
1741        if self.side.is_server() || self.state.is_closed() {
1742            return true;
1743        }
1744        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
1745        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
1746        self.spaces[SpaceId::Handshake]
1747            .largest_acked_packet
1748            .is_some()
1749            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1750            || (self.spaces[SpaceId::Data].crypto.is_some()
1751                && self.spaces[SpaceId::Handshake].crypto.is_none())
1752    }
1753
1754    fn set_loss_detection_timer(&mut self, now: Instant) {
1755        if self.state.is_closed() {
1756            // No loss detection takes place on closed connections, and `close_common` already
1757            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
1758            // reordered packet being handled by state-insensitive code.
1759            return;
1760        }
1761
1762        if let Some((loss_time, _)) = self.loss_time_and_space() {
1763            // Time threshold loss detection.
1764            self.timers.set(Timer::LossDetection, loss_time);
1765            return;
1766        }
1767
1768        if self.path.anti_amplification_blocked(1) {
1769            // We wouldn't be able to send anything, so don't bother.
1770            self.timers.stop(Timer::LossDetection);
1771            return;
1772        }
1773
1774        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1775            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
1776            // the timer if the server might be blocked by the anti-amplification limit.
1777            self.timers.stop(Timer::LossDetection);
1778            return;
1779        }
1780
1781        // Determine which PN space to arm PTO for.
1782        // Calculate PTO duration
1783        if let Some((timeout, _)) = self.pto_time_and_space(now) {
1784            self.timers.set(Timer::LossDetection, timeout);
1785        } else {
1786            self.timers.stop(Timer::LossDetection);
1787        }
1788    }
1789
1790    /// Probe Timeout
1791    fn pto(&self, space: SpaceId) -> Duration {
1792        let max_ack_delay = match space {
1793            SpaceId::Initial | SpaceId::Handshake => Duration::new(0, 0),
1794            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1795        };
1796        self.path.rtt.pto_base() + max_ack_delay
1797    }
1798
1799    fn on_packet_authenticated(
1800        &mut self,
1801        now: Instant,
1802        space_id: SpaceId,
1803        ecn: Option<EcnCodepoint>,
1804        packet: Option<u64>,
1805        spin: bool,
1806        is_1rtt: bool,
1807    ) {
1808        self.total_authed_packets += 1;
1809        self.reset_keep_alive(now);
1810        self.reset_idle_timeout(now, space_id);
1811        self.permit_idle_reset = true;
1812        self.receiving_ecn |= ecn.is_some();
1813        if let Some(x) = ecn {
1814            let space = &mut self.spaces[space_id];
1815            space.ecn_counters += x;
1816
1817            if x.is_ce() {
1818                space.pending_acks.set_immediate_ack_required();
1819            }
1820        }
1821
1822        let packet = match packet {
1823            Some(x) => x,
1824            None => return,
1825        };
1826        if self.side.is_server() {
1827            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1828                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
1829                self.discard_space(now, SpaceId::Initial);
1830            }
1831            if self.zero_rtt_crypto.is_some() && is_1rtt {
1832                // Discard 0-RTT keys soon after receiving a 1-RTT packet
1833                self.set_key_discard_timer(now, space_id)
1834            }
1835        }
1836        let space = &mut self.spaces[space_id];
1837        space.pending_acks.insert_one(packet, now);
1838        if packet >= space.rx_packet {
1839            space.rx_packet = packet;
1840            // Update outgoing spin bit, inverting iff we're the client
1841            self.spin = self.side.is_client() ^ spin;
1842        }
1843    }
1844
1845    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1846        let timeout = match self.idle_timeout {
1847            None => return,
1848            Some(x) => Duration::from_millis(x.0),
1849        };
1850        if self.state.is_closed() {
1851            self.timers.stop(Timer::Idle);
1852            return;
1853        }
1854        let dt = cmp::max(timeout, 3 * self.pto(space));
1855        self.timers.set(Timer::Idle, now + dt);
1856    }
1857
1858    fn reset_keep_alive(&mut self, now: Instant) {
1859        let interval = match self.config.keep_alive_interval {
1860            Some(x) if self.state.is_established() => x,
1861            _ => return,
1862        };
1863        self.timers.set(Timer::KeepAlive, now + interval);
1864    }
1865
1866    fn reset_cid_retirement(&mut self) {
1867        if let Some(t) = self.local_cid_state.next_timeout() {
1868            self.timers.set(Timer::PushNewCid, t);
1869        }
1870    }
1871
1872    /// Handle the already-decrypted first packet from the client
1873    ///
1874    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
1875    /// efficient.
1876    pub(crate) fn handle_first_packet(
1877        &mut self,
1878        now: Instant,
1879        remote: SocketAddr,
1880        ecn: Option<EcnCodepoint>,
1881        packet_number: u64,
1882        packet: InitialPacket,
1883        remaining: Option<BytesMut>,
1884    ) -> Result<(), ConnectionError> {
1885        let span = trace_span!("first recv");
1886        let _guard = span.enter();
1887        debug_assert!(self.side.is_server());
1888        let len = packet.header_data.len() + packet.payload.len();
1889        self.path.total_recvd = len as u64;
1890
1891        match self.state {
1892            State::Handshake(ref mut state) => {
1893                state.expected_token = packet.header.token.clone();
1894            }
1895            _ => unreachable!("first packet must be delivered in Handshake state"),
1896        }
1897
1898        self.on_packet_authenticated(
1899            now,
1900            SpaceId::Initial,
1901            ecn,
1902            Some(packet_number),
1903            false,
1904            false,
1905        );
1906
1907        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1908        if let Some(data) = remaining {
1909            self.handle_coalesced(now, remote, ecn, data);
1910        }
1911        Ok(())
1912    }
1913
1914    fn init_0rtt(&mut self) {
1915        let (header, packet) = match self.crypto.early_crypto() {
1916            Some(x) => x,
1917            None => return,
1918        };
1919        if self.side.is_client() {
1920            match self.crypto.transport_parameters() {
1921                Ok(params) => {
1922                    let params = params
1923                        .expect("crypto layer didn't supply transport parameters with ticket");
1924                    // Certain values must not be cached
1925                    let params = TransportParameters {
1926                        initial_src_cid: None,
1927                        original_dst_cid: None,
1928                        preferred_address: None,
1929                        retry_src_cid: None,
1930                        stateless_reset_token: None,
1931                        min_ack_delay: None,
1932                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1933                        max_ack_delay: TransportParameters::default().max_ack_delay,
1934                        ..params
1935                    };
1936                    self.set_peer_params(params);
1937                }
1938                Err(e) => {
1939                    error!("session ticket has malformed transport parameters: {}", e);
1940                    return;
1941                }
1942            }
1943        }
1944        trace!("0-RTT enabled");
1945        self.zero_rtt_enabled = true;
1946        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1947    }
1948
1949    fn read_crypto(
1950        &mut self,
1951        space: SpaceId,
1952        crypto: &frame::Crypto,
1953        payload_len: usize,
1954    ) -> Result<(), TransportError> {
1955        let expected = if !self.state.is_handshake() {
1956            SpaceId::Data
1957        } else if self.highest_space == SpaceId::Initial {
1958            SpaceId::Initial
1959        } else {
1960            // On the server, self.highest_space can be Data after receiving the client's first
1961            // flight, but we expect Handshake CRYPTO until the handshake is complete.
1962            SpaceId::Handshake
1963        };
1964        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
1965        // packets are illegal, and we don't process 1-RTT packets until the handshake is
1966        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
1967        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
1968
1969        let end = crypto.offset + crypto.data.len() as u64;
1970        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
1971            warn!(
1972                "received new {:?} CRYPTO data when expecting {:?}",
1973                space, expected
1974            );
1975            return Err(TransportError::PROTOCOL_VIOLATION(
1976                "new data at unexpected encryption level",
1977            ));
1978        }
1979
1980        let space = &mut self.spaces[space];
1981        let max = end.saturating_sub(space.crypto_stream.bytes_read());
1982        if max > self.config.crypto_buffer_size as u64 {
1983            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
1984        }
1985
1986        space
1987            .crypto_stream
1988            .insert(crypto.offset, crypto.data.clone(), payload_len);
1989        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
1990            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
1991            if self.crypto.read_handshake(&chunk.bytes)? {
1992                self.events.push_back(Event::HandshakeDataReady);
1993            }
1994        }
1995
1996        Ok(())
1997    }
1998
1999    fn write_crypto(&mut self) {
2000        loop {
2001            let space = self.highest_space;
2002            let mut outgoing = Vec::new();
2003            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2004                match space {
2005                    SpaceId::Initial => {
2006                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2007                    }
2008                    SpaceId::Handshake => {
2009                        self.upgrade_crypto(SpaceId::Data, crypto);
2010                    }
2011                    _ => unreachable!("got updated secrets during 1-RTT"),
2012                }
2013            }
2014            if outgoing.is_empty() {
2015                if space == self.highest_space {
2016                    break;
2017                } else {
2018                    // Keys updated, check for more data to send
2019                    continue;
2020                }
2021            }
2022            let offset = self.spaces[space].crypto_offset;
2023            let outgoing = Bytes::from(outgoing);
2024            if let State::Handshake(ref mut state) = self.state {
2025                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2026                    state.client_hello = Some(outgoing.clone());
2027                }
2028            }
2029            self.spaces[space].crypto_offset += outgoing.len() as u64;
2030            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2031            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2032                offset,
2033                data: outgoing,
2034            });
2035        }
2036    }
2037
2038    /// Switch to stronger cryptography during handshake
2039    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2040        debug_assert!(
2041            self.spaces[space].crypto.is_none(),
2042            "already reached packet space {space:?}"
2043        );
2044        trace!("{:?} keys ready", space);
2045        if space == SpaceId::Data {
2046            // Precompute the first key update
2047            self.next_crypto = Some(
2048                self.crypto
2049                    .next_1rtt_keys()
2050                    .expect("handshake should be complete"),
2051            );
2052        }
2053
2054        self.spaces[space].crypto = Some(crypto);
2055        debug_assert!(space as usize > self.highest_space as usize);
2056        self.highest_space = space;
2057        if space == SpaceId::Data && self.side.is_client() {
2058            // Discard 0-RTT keys because 1-RTT keys are available.
2059            self.zero_rtt_crypto = None;
2060        }
2061    }
2062
2063    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2064        debug_assert!(space_id != SpaceId::Data);
2065        trace!("discarding {:?} keys", space_id);
2066        if space_id == SpaceId::Initial {
2067            // No longer needed
2068            self.retry_token = Bytes::new();
2069        }
2070        let space = &mut self.spaces[space_id];
2071        space.crypto = None;
2072        space.time_of_last_ack_eliciting_packet = None;
2073        space.loss_time = None;
2074        space.in_flight = 0;
2075        let sent_packets = mem::take(&mut space.sent_packets);
2076        for (pn, packet) in sent_packets.into_iter() {
2077            self.remove_in_flight(pn, &packet);
2078        }
2079        self.set_loss_detection_timer(now)
2080    }
2081
2082    fn handle_coalesced(
2083        &mut self,
2084        now: Instant,
2085        remote: SocketAddr,
2086        ecn: Option<EcnCodepoint>,
2087        data: BytesMut,
2088    ) {
2089        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2090        let mut remaining = Some(data);
2091        while let Some(data) = remaining {
2092            match PartialDecode::new(
2093                data,
2094                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2095                &[self.version],
2096                self.endpoint_config.grease_quic_bit,
2097            ) {
2098                Ok((partial_decode, rest)) => {
2099                    remaining = rest;
2100                    self.handle_decode(now, remote, ecn, partial_decode);
2101                }
2102                Err(e) => {
2103                    trace!("malformed header: {}", e);
2104                    return;
2105                }
2106            }
2107        }
2108    }
2109
2110    fn handle_decode(
2111        &mut self,
2112        now: Instant,
2113        remote: SocketAddr,
2114        ecn: Option<EcnCodepoint>,
2115        partial_decode: PartialDecode,
2116    ) {
2117        if let Some(decoded) = packet_crypto::unprotect_header(
2118            partial_decode,
2119            &self.spaces,
2120            self.zero_rtt_crypto.as_ref(),
2121            self.peer_params.stateless_reset_token,
2122        ) {
2123            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2124        }
2125    }
2126
2127    fn handle_packet(
2128        &mut self,
2129        now: Instant,
2130        remote: SocketAddr,
2131        ecn: Option<EcnCodepoint>,
2132        packet: Option<Packet>,
2133        stateless_reset: bool,
2134    ) {
2135        self.stats.udp_rx.ios += 1;
2136        if let Some(ref packet) = packet {
2137            trace!(
2138                "got {:?} packet ({} bytes) from {} using id {}",
2139                packet.header.space(),
2140                packet.payload.len() + packet.header_data.len(),
2141                remote,
2142                packet.header.dst_cid(),
2143            );
2144        }
2145
2146        if self.is_handshaking() && remote != self.path.remote {
2147            debug!("discarding packet with unexpected remote during handshake");
2148            return;
2149        }
2150
2151        let was_closed = self.state.is_closed();
2152        let was_drained = self.state.is_drained();
2153
2154        let decrypted = match packet {
2155            None => Err(None),
2156            Some(mut packet) => self
2157                .decrypt_packet(now, &mut packet)
2158                .map(move |number| (packet, number)),
2159        };
2160        let result = match decrypted {
2161            _ if stateless_reset => {
2162                debug!("got stateless reset");
2163                Err(ConnectionError::Reset)
2164            }
2165            Err(Some(e)) => {
2166                warn!("illegal packet: {}", e);
2167                Err(e.into())
2168            }
2169            Err(None) => {
2170                debug!("failed to authenticate packet");
2171                self.authentication_failures += 1;
2172                let integrity_limit = self.spaces[self.highest_space]
2173                    .crypto
2174                    .as_ref()
2175                    .unwrap()
2176                    .packet
2177                    .local
2178                    .integrity_limit();
2179                if self.authentication_failures > integrity_limit {
2180                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2181                } else {
2182                    return;
2183                }
2184            }
2185            Ok((packet, number)) => {
2186                let span = match number {
2187                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2188                    None => trace_span!("recv", space = ?packet.header.space()),
2189                };
2190                let _guard = span.enter();
2191
2192                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2193                if number.map_or(false, is_duplicate) {
2194                    debug!("discarding possible duplicate packet");
2195                    return;
2196                } else if self.state.is_handshake() && packet.header.is_short() {
2197                    // TODO: SHOULD buffer these to improve reordering tolerance.
2198                    trace!("dropping short packet during handshake");
2199                    return;
2200                } else {
2201                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2202                        if let State::Handshake(ref hs) = self.state {
2203                            if self.side.is_server() && token != &hs.expected_token {
2204                                // Clients must send the same retry token in every Initial. Initial
2205                                // packets can be spoofed, so we discard rather than killing the
2206                                // connection.
2207                                warn!("discarding Initial with invalid retry token");
2208                                return;
2209                            }
2210                        }
2211                    }
2212
2213                    if !self.state.is_closed() {
2214                        let spin = match packet.header {
2215                            Header::Short { spin, .. } => spin,
2216                            _ => false,
2217                        };
2218                        self.on_packet_authenticated(
2219                            now,
2220                            packet.header.space(),
2221                            ecn,
2222                            number,
2223                            spin,
2224                            packet.header.is_1rtt(),
2225                        );
2226                    }
2227                    self.process_decrypted_packet(now, remote, number, packet)
2228                }
2229            }
2230        };
2231
2232        // State transitions for error cases
2233        if let Err(conn_err) = result {
2234            self.error = Some(conn_err.clone());
2235            self.state = match conn_err {
2236                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2237                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2238                ConnectionError::Reset
2239                | ConnectionError::TransportError(TransportError {
2240                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2241                    ..
2242                }) => State::Drained,
2243                ConnectionError::TimedOut => {
2244                    unreachable!("timeouts aren't generated by packet processing");
2245                }
2246                ConnectionError::TransportError(err) => {
2247                    debug!("closing connection due to transport error: {}", err);
2248                    State::closed(err)
2249                }
2250                ConnectionError::VersionMismatch => State::Draining,
2251                ConnectionError::LocallyClosed => {
2252                    unreachable!("LocallyClosed isn't generated by packet processing");
2253                }
2254                ConnectionError::CidsExhausted => {
2255                    unreachable!("CidsExhausted isn't generated by packet processing");
2256                }
2257            };
2258        }
2259
2260        if !was_closed && self.state.is_closed() {
2261            self.close_common();
2262            if !self.state.is_drained() {
2263                self.set_close_timer(now);
2264            }
2265        }
2266        if !was_drained && self.state.is_drained() {
2267            self.endpoint_events.push_back(EndpointEventInner::Drained);
2268            // Close timer may have been started previously, e.g. if we sent a close and got a
2269            // stateless reset in response
2270            self.timers.stop(Timer::Close);
2271        }
2272
2273        // Transmit CONNECTION_CLOSE if necessary
2274        if let State::Closed(_) = self.state {
2275            self.close = remote == self.path.remote;
2276        }
2277    }
2278
2279    fn process_decrypted_packet(
2280        &mut self,
2281        now: Instant,
2282        remote: SocketAddr,
2283        number: Option<u64>,
2284        packet: Packet,
2285    ) -> Result<(), ConnectionError> {
2286        let state = match self.state {
2287            State::Established => {
2288                match packet.header.space() {
2289                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2290                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2291                    _ => {
2292                        trace!("discarding unexpected pre-handshake packet");
2293                    }
2294                }
2295                return Ok(());
2296            }
2297            State::Closed(_) => {
2298                for result in frame::Iter::new(packet.payload.freeze())? {
2299                    let frame = match result {
2300                        Ok(frame) => frame,
2301                        Err(err) => {
2302                            debug!("frame decoding error: {err:?}");
2303                            continue;
2304                        }
2305                    };
2306
2307                    if let Frame::Padding = frame {
2308                        continue;
2309                    };
2310
2311                    self.stats.frame_rx.record(&frame);
2312
2313                    if let Frame::Close(_) = frame {
2314                        trace!("draining");
2315                        self.state = State::Draining;
2316                        break;
2317                    }
2318                }
2319                return Ok(());
2320            }
2321            State::Draining | State::Drained => return Ok(()),
2322            State::Handshake(ref mut state) => state,
2323        };
2324
2325        match packet.header {
2326            Header::Retry {
2327                src_cid: rem_cid, ..
2328            } => {
2329                if self.side.is_server() {
2330                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2331                }
2332
2333                if self.total_authed_packets > 1
2334                            || packet.payload.len() <= 16 // token + 16 byte tag
2335                            || !self.crypto.is_valid_retry(
2336                                &self.rem_cids.active(),
2337                                &packet.header_data,
2338                                &packet.payload,
2339                            )
2340                {
2341                    trace!("discarding invalid Retry");
2342                    // - After the client has received and processed an Initial or Retry
2343                    //   packet from the server, it MUST discard any subsequent Retry
2344                    //   packets that it receives.
2345                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2346                    //   field.
2347                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2348                    //   that cannot be validated
2349                    return Ok(());
2350                }
2351
2352                trace!("retrying with CID {}", rem_cid);
2353                let client_hello = state.client_hello.take().unwrap();
2354                self.retry_src_cid = Some(rem_cid);
2355                self.rem_cids.update_initial_cid(rem_cid);
2356                self.rem_handshake_cid = rem_cid;
2357
2358                let space = &mut self.spaces[SpaceId::Initial];
2359                if let Some(info) = space.take(0) {
2360                    self.on_packet_acked(now, 0, info);
2361                };
2362
2363                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2364                self.spaces[SpaceId::Initial] = PacketSpace {
2365                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side)),
2366                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2367                    crypto_offset: client_hello.len() as u64,
2368                    ..PacketSpace::new(now)
2369                };
2370                self.spaces[SpaceId::Initial]
2371                    .pending
2372                    .crypto
2373                    .push_back(frame::Crypto {
2374                        offset: 0,
2375                        data: client_hello,
2376                    });
2377
2378                // Retransmit all 0-RTT data
2379                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2380                for (pn, info) in zero_rtt {
2381                    self.remove_in_flight(pn, &info);
2382                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2383                }
2384                self.streams.retransmit_all_for_0rtt();
2385
2386                let token_len = packet.payload.len() - 16;
2387                self.retry_token = packet.payload.freeze().split_to(token_len);
2388                self.state = State::Handshake(state::Handshake {
2389                    expected_token: Bytes::new(),
2390                    rem_cid_set: false,
2391                    client_hello: None,
2392                });
2393                Ok(())
2394            }
2395            Header::Long {
2396                ty: LongType::Handshake,
2397                src_cid: rem_cid,
2398                ..
2399            } => {
2400                if rem_cid != self.rem_handshake_cid {
2401                    debug!(
2402                        "discarding packet with mismatched remote CID: {} != {}",
2403                        self.rem_handshake_cid, rem_cid
2404                    );
2405                    return Ok(());
2406                }
2407                self.path.validated = true;
2408
2409                self.process_early_payload(now, packet)?;
2410                if self.state.is_closed() {
2411                    return Ok(());
2412                }
2413
2414                if self.crypto.is_handshaking() {
2415                    trace!("handshake ongoing");
2416                    return Ok(());
2417                }
2418
2419                if self.side.is_client() {
2420                    // Client-only because server params were set from the client's Initial
2421                    let params =
2422                        self.crypto
2423                            .transport_parameters()?
2424                            .ok_or_else(|| TransportError {
2425                                code: TransportErrorCode::crypto(0x6d),
2426                                frame: None,
2427                                reason: "transport parameters missing".into(),
2428                            })?;
2429
2430                    if self.has_0rtt() {
2431                        if !self.crypto.early_data_accepted().unwrap() {
2432                            debug_assert!(self.side.is_client());
2433                            debug!("0-RTT rejected");
2434                            self.accepted_0rtt = false;
2435                            self.streams.zero_rtt_rejected();
2436
2437                            // Discard already-queued frames
2438                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2439
2440                            // Discard 0-RTT packets
2441                            let sent_packets =
2442                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2443                            for (pn, packet) in sent_packets {
2444                                self.remove_in_flight(pn, &packet);
2445                            }
2446                        } else {
2447                            self.accepted_0rtt = true;
2448                            params.validate_resumption_from(&self.peer_params)?;
2449                        }
2450                    }
2451                    if let Some(token) = params.stateless_reset_token {
2452                        self.endpoint_events
2453                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2454                    }
2455                    self.handle_peer_params(params)?;
2456                    self.issue_first_cids(now);
2457                } else {
2458                    // Server-only
2459                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2460                    self.discard_space(now, SpaceId::Handshake);
2461                }
2462
2463                self.events.push_back(Event::Connected);
2464                self.state = State::Established;
2465                trace!("established");
2466                Ok(())
2467            }
2468            Header::Initial(InitialHeader {
2469                src_cid: rem_cid, ..
2470            }) => {
2471                if !state.rem_cid_set {
2472                    trace!("switching remote CID to {}", rem_cid);
2473                    let mut state = state.clone();
2474                    self.rem_cids.update_initial_cid(rem_cid);
2475                    self.rem_handshake_cid = rem_cid;
2476                    self.orig_rem_cid = rem_cid;
2477                    state.rem_cid_set = true;
2478                    self.state = State::Handshake(state);
2479                } else if rem_cid != self.rem_handshake_cid {
2480                    debug!(
2481                        "discarding packet with mismatched remote CID: {} != {}",
2482                        self.rem_handshake_cid, rem_cid
2483                    );
2484                    return Ok(());
2485                }
2486
2487                let starting_space = self.highest_space;
2488                self.process_early_payload(now, packet)?;
2489
2490                if self.side.is_server()
2491                    && starting_space == SpaceId::Initial
2492                    && self.highest_space != SpaceId::Initial
2493                {
2494                    let params =
2495                        self.crypto
2496                            .transport_parameters()?
2497                            .ok_or_else(|| TransportError {
2498                                code: TransportErrorCode::crypto(0x6d),
2499                                frame: None,
2500                                reason: "transport parameters missing".into(),
2501                            })?;
2502                    self.handle_peer_params(params)?;
2503                    self.issue_first_cids(now);
2504                    self.init_0rtt();
2505                }
2506                Ok(())
2507            }
2508            Header::Long {
2509                ty: LongType::ZeroRtt,
2510                ..
2511            } => {
2512                self.process_payload(now, remote, number.unwrap(), packet)?;
2513                Ok(())
2514            }
2515            Header::VersionNegotiate { .. } => {
2516                if self.total_authed_packets > 1 {
2517                    return Ok(());
2518                }
2519                let supported = packet
2520                    .payload
2521                    .chunks(4)
2522                    .any(|x| match <[u8; 4]>::try_from(x) {
2523                        Ok(version) => self.version == u32::from_be_bytes(version),
2524                        Err(_) => false,
2525                    });
2526                if supported {
2527                    return Ok(());
2528                }
2529                debug!("remote doesn't support our version");
2530                Err(ConnectionError::VersionMismatch)
2531            }
2532            Header::Short { .. } => unreachable!(
2533                "short packets received during handshake are discarded in handle_packet"
2534            ),
2535        }
2536    }
2537
2538    /// Process an Initial or Handshake packet payload
2539    fn process_early_payload(
2540        &mut self,
2541        now: Instant,
2542        packet: Packet,
2543    ) -> Result<(), TransportError> {
2544        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2545        let payload_len = packet.payload.len();
2546        let mut ack_eliciting = false;
2547        for result in frame::Iter::new(packet.payload.freeze())? {
2548            let frame = result?;
2549            let span = match frame {
2550                Frame::Padding => continue,
2551                _ => Some(trace_span!("frame", ty = %frame.ty())),
2552            };
2553
2554            self.stats.frame_rx.record(&frame);
2555
2556            let _guard = span.as_ref().map(|x| x.enter());
2557            ack_eliciting |= frame.is_ack_eliciting();
2558
2559            // Process frames
2560            match frame {
2561                Frame::Padding | Frame::Ping => {}
2562                Frame::Crypto(frame) => {
2563                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2564                }
2565                Frame::Ack(ack) => {
2566                    self.on_ack_received(now, packet.header.space(), ack)?;
2567                }
2568                Frame::Close(reason) => {
2569                    self.error = Some(reason.into());
2570                    self.state = State::Draining;
2571                    return Ok(());
2572                }
2573                _ => {
2574                    let mut err =
2575                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2576                    err.frame = Some(frame.ty());
2577                    return Err(err);
2578                }
2579            }
2580        }
2581
2582        if ack_eliciting {
2583            // In the initial and handshake spaces, ACKs must be sent immediately
2584            self.spaces[packet.header.space()]
2585                .pending_acks
2586                .set_immediate_ack_required();
2587        }
2588
2589        self.write_crypto();
2590        Ok(())
2591    }
2592
2593    fn process_payload(
2594        &mut self,
2595        now: Instant,
2596        remote: SocketAddr,
2597        number: u64,
2598        packet: Packet,
2599    ) -> Result<(), TransportError> {
2600        let payload = packet.payload.freeze();
2601        let mut is_probing_packet = true;
2602        let mut close = None;
2603        let payload_len = payload.len();
2604        let mut ack_eliciting = false;
2605        for result in frame::Iter::new(payload)? {
2606            let frame = result?;
2607            let span = match frame {
2608                Frame::Padding => continue,
2609                _ => Some(trace_span!("frame", ty = %frame.ty())),
2610            };
2611
2612            self.stats.frame_rx.record(&frame);
2613            // Crypto, Stream and Datagram frames are special cased in order no pollute
2614            // the log with payload data
2615            match &frame {
2616                Frame::Crypto(f) => {
2617                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2618                }
2619                Frame::Stream(f) => {
2620                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2621                }
2622                Frame::Datagram(f) => {
2623                    trace!(len = f.data.len(), "got datagram frame");
2624                }
2625                f => {
2626                    trace!("got frame {:?}", f);
2627                }
2628            }
2629
2630            let _guard = span.as_ref().map(|x| x.enter());
2631            if packet.header.is_0rtt() {
2632                match frame {
2633                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2634                        return Err(TransportError::PROTOCOL_VIOLATION(
2635                            "illegal frame type in 0-RTT",
2636                        ));
2637                    }
2638                    _ => {}
2639                }
2640            }
2641            ack_eliciting |= frame.is_ack_eliciting();
2642
2643            // Check whether this could be a probing packet
2644            match frame {
2645                Frame::Padding
2646                | Frame::PathChallenge(_)
2647                | Frame::PathResponse(_)
2648                | Frame::NewConnectionId(_) => {}
2649                _ => {
2650                    is_probing_packet = false;
2651                }
2652            }
2653            match frame {
2654                Frame::Crypto(frame) => {
2655                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2656                }
2657                Frame::Stream(frame) => {
2658                    if self.streams.received(frame, payload_len)?.should_transmit() {
2659                        self.spaces[SpaceId::Data].pending.max_data = true;
2660                    }
2661                }
2662                Frame::Ack(ack) => {
2663                    self.on_ack_received(now, SpaceId::Data, ack)?;
2664                }
2665                Frame::Padding | Frame::Ping => {}
2666                Frame::Close(reason) => {
2667                    close = Some(reason);
2668                }
2669                Frame::PathChallenge(token) => {
2670                    self.path_responses.push(number, token, remote);
2671                    if remote == self.path.remote {
2672                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
2673                        // attack. Send a non-probing packet to recover the active path.
2674                        match self.peer_supports_ack_frequency() {
2675                            true => self.immediate_ack(),
2676                            false => self.ping(),
2677                        }
2678                    }
2679                }
2680                Frame::PathResponse(token) => {
2681                    if self.path.challenge == Some(token) && remote == self.path.remote {
2682                        trace!("new path validated");
2683                        self.timers.stop(Timer::PathValidation);
2684                        self.path.challenge = None;
2685                        self.path.validated = true;
2686                        if let Some((_, ref mut prev_path)) = self.prev_path {
2687                            prev_path.challenge = None;
2688                            prev_path.challenge_pending = false;
2689                        }
2690                    } else {
2691                        debug!(token, "ignoring invalid PATH_RESPONSE");
2692                    }
2693                }
2694                Frame::MaxData(bytes) => {
2695                    self.streams.received_max_data(bytes);
2696                }
2697                Frame::MaxStreamData { id, offset } => {
2698                    self.streams.received_max_stream_data(id, offset)?;
2699                }
2700                Frame::MaxStreams { dir, count } => {
2701                    self.streams.received_max_streams(dir, count)?;
2702                }
2703                Frame::ResetStream(frame) => {
2704                    if self.streams.received_reset(frame)?.should_transmit() {
2705                        self.spaces[SpaceId::Data].pending.max_data = true;
2706                    }
2707                }
2708                Frame::DataBlocked { offset } => {
2709                    debug!(offset, "peer claims to be blocked at connection level");
2710                }
2711                Frame::StreamDataBlocked { id, offset } => {
2712                    if id.initiator() == self.side && id.dir() == Dir::Uni {
2713                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2714                        return Err(TransportError::STREAM_STATE_ERROR(
2715                            "STREAM_DATA_BLOCKED on send-only stream",
2716                        ));
2717                    }
2718                    debug!(
2719                        stream = %id,
2720                        offset, "peer claims to be blocked at stream level"
2721                    );
2722                }
2723                Frame::StreamsBlocked { dir, limit } => {
2724                    if limit > MAX_STREAM_COUNT {
2725                        return Err(TransportError::FRAME_ENCODING_ERROR(
2726                            "unrepresentable stream limit",
2727                        ));
2728                    }
2729                    debug!(
2730                        "peer claims to be blocked opening more than {} {} streams",
2731                        limit, dir
2732                    );
2733                }
2734                Frame::StopSending(frame::StopSending { id, error_code }) => {
2735                    if id.initiator() != self.side {
2736                        if id.dir() == Dir::Uni {
2737                            debug!("got STOP_SENDING on recv-only {}", id);
2738                            return Err(TransportError::STREAM_STATE_ERROR(
2739                                "STOP_SENDING on recv-only stream",
2740                            ));
2741                        }
2742                    } else if self.streams.is_local_unopened(id) {
2743                        return Err(TransportError::STREAM_STATE_ERROR(
2744                            "STOP_SENDING on unopened stream",
2745                        ));
2746                    }
2747                    self.streams.received_stop_sending(id, error_code);
2748                }
2749                Frame::RetireConnectionId { sequence } => {
2750                    let allow_more_cids = self
2751                        .local_cid_state
2752                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2753                    self.endpoint_events
2754                        .push_back(EndpointEventInner::RetireConnectionId(
2755                            now,
2756                            sequence,
2757                            allow_more_cids,
2758                        ));
2759                }
2760                Frame::NewConnectionId(frame) => {
2761                    trace!(
2762                        sequence = frame.sequence,
2763                        id = %frame.id,
2764                        retire_prior_to = frame.retire_prior_to,
2765                    );
2766                    if self.rem_cids.active().is_empty() {
2767                        return Err(TransportError::PROTOCOL_VIOLATION(
2768                            "NEW_CONNECTION_ID when CIDs aren't in use",
2769                        ));
2770                    }
2771                    if frame.retire_prior_to > frame.sequence {
2772                        return Err(TransportError::PROTOCOL_VIOLATION(
2773                            "NEW_CONNECTION_ID retiring unissued CIDs",
2774                        ));
2775                    }
2776
2777                    use crate::cid_queue::InsertError;
2778                    match self.rem_cids.insert(frame) {
2779                        Ok(None) => {}
2780                        Ok(Some((retired, reset_token))) => {
2781                            let pending_retired =
2782                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
2783                            /// Ensure `pending_retired` cannot grow without bound. Limit is
2784                            /// somewhat arbitrary but very permissive.
2785                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2786                            // We don't bother counting in-flight frames because those are bounded
2787                            // by congestion control.
2788                            if (pending_retired.len() as u64)
2789                                .saturating_add(retired.end.saturating_sub(retired.start))
2790                                > MAX_PENDING_RETIRED_CIDS
2791                            {
2792                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2793                                    "queued too many retired CIDs",
2794                                ));
2795                            }
2796                            pending_retired.extend(retired);
2797                            self.set_reset_token(reset_token);
2798                        }
2799                        Err(InsertError::ExceedsLimit) => {
2800                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2801                        }
2802                        Err(InsertError::Retired) => {
2803                            trace!("discarding already-retired");
2804                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
2805                            // range of connection IDs larger than the active connection ID limit
2806                            // was retired all at once via retire_prior_to.
2807                            self.spaces[SpaceId::Data]
2808                                .pending
2809                                .retire_cids
2810                                .push(frame.sequence);
2811                            continue;
2812                        }
2813                    };
2814
2815                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2816                        // We're a server still using the initial remote CID for the client, so
2817                        // let's switch immediately to enable clientside stateless resets.
2818                        self.update_rem_cid();
2819                    }
2820                }
2821                Frame::NewToken { token } => {
2822                    if self.side.is_server() {
2823                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2824                    }
2825                    if token.is_empty() {
2826                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2827                    }
2828                    trace!("got new token");
2829                    // TODO: Cache, or perhaps forward to user?
2830                }
2831                Frame::Datagram(datagram) => {
2832                    if self
2833                        .datagrams
2834                        .received(datagram, &self.config.datagram_receive_buffer_size)?
2835                    {
2836                        self.events.push_back(Event::DatagramReceived);
2837                    }
2838                }
2839                Frame::AckFrequency(ack_frequency) => {
2840                    // This frame can only be sent in the Data space
2841                    let space = &mut self.spaces[SpaceId::Data];
2842
2843                    if !self
2844                        .ack_frequency
2845                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2846                    {
2847                        // The AckFrequency frame is stale (we have already received a more recent one)
2848                        continue;
2849                    }
2850
2851                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
2852                    // timeout
2853                    if let Some(timeout) = space
2854                        .pending_acks
2855                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2856                    {
2857                        self.timers.set(Timer::MaxAckDelay, timeout);
2858                    }
2859                }
2860                Frame::ImmediateAck => {
2861                    // This frame can only be sent in the Data space
2862                    self.spaces[SpaceId::Data]
2863                        .pending_acks
2864                        .set_immediate_ack_required();
2865                }
2866                Frame::HandshakeDone => {
2867                    if self.side.is_server() {
2868                        return Err(TransportError::PROTOCOL_VIOLATION(
2869                            "client sent HANDSHAKE_DONE",
2870                        ));
2871                    }
2872                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
2873                        self.discard_space(now, SpaceId::Handshake);
2874                    }
2875                }
2876            }
2877        }
2878
2879        let space = &mut self.spaces[SpaceId::Data];
2880        if space
2881            .pending_acks
2882            .packet_received(now, number, ack_eliciting, &space.dedup)
2883        {
2884            self.timers
2885                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2886        }
2887
2888        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
2889        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
2890        // are only freed, and hence only issue credit, once the application has been notified
2891        // during a read on the stream.
2892        let pending = &mut self.spaces[SpaceId::Data].pending;
2893        self.streams.queue_max_stream_id(pending);
2894
2895        if let Some(reason) = close {
2896            self.error = Some(reason.into());
2897            self.state = State::Draining;
2898            self.close = true;
2899        }
2900
2901        if remote != self.path.remote
2902            && !is_probing_packet
2903            && number == self.spaces[SpaceId::Data].rx_packet
2904        {
2905            debug_assert!(
2906                self.server_config
2907                    .as_ref()
2908                    .expect("packets from unknown remote should be dropped by clients")
2909                    .migration,
2910                "migration-initiating packets should have been dropped immediately"
2911            );
2912            self.migrate(now, remote);
2913            // Break linkability, if possible
2914            self.update_rem_cid();
2915            self.spin = false;
2916        }
2917
2918        Ok(())
2919    }
2920
2921    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
2922        trace!(%remote, "migration initiated");
2923        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
2924        // Note that the congestion window will not grow until validation terminates. Helps mitigate
2925        // amplification attacks performed by spoofing source addresses.
2926        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
2927            PathData::from_previous(remote, &self.path, now)
2928        } else {
2929            let peer_max_udp_payload_size =
2930                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
2931                    .unwrap_or(u16::MAX);
2932            PathData::new(
2933                remote,
2934                self.allow_mtud,
2935                Some(peer_max_udp_payload_size),
2936                now,
2937                false,
2938                &self.config,
2939            )
2940        };
2941        new_path.challenge = Some(self.rng.gen());
2942        new_path.challenge_pending = true;
2943        let prev_pto = self.pto(SpaceId::Data);
2944
2945        let mut prev = mem::replace(&mut self.path, new_path);
2946        // Don't clobber the original path if the previous one hasn't been validated yet
2947        if prev.challenge.is_none() {
2948            prev.challenge = Some(self.rng.gen());
2949            prev.challenge_pending = true;
2950            // We haven't updated the remote CID yet, this captures the remote CID we were using on
2951            // the previous path.
2952            self.prev_path = Some((self.rem_cids.active(), prev));
2953        }
2954
2955        self.timers.set(
2956            Timer::PathValidation,
2957            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
2958        );
2959    }
2960
2961    /// Handle a change in the local address, i.e. an active migration
2962    pub fn local_address_changed(&mut self) {
2963        self.update_rem_cid();
2964        self.ping();
2965    }
2966
2967    /// Switch to a previously unused remote connection ID, if possible
2968    fn update_rem_cid(&mut self) {
2969        let (reset_token, retired) = match self.rem_cids.next() {
2970            Some(x) => x,
2971            None => return,
2972        };
2973
2974        // Retire the current remote CID and any CIDs we had to skip.
2975        self.spaces[SpaceId::Data]
2976            .pending
2977            .retire_cids
2978            .extend(retired);
2979        self.set_reset_token(reset_token);
2980    }
2981
2982    fn set_reset_token(&mut self, reset_token: ResetToken) {
2983        self.endpoint_events
2984            .push_back(EndpointEventInner::ResetToken(
2985                self.path.remote,
2986                reset_token,
2987            ));
2988        self.peer_params.stateless_reset_token = Some(reset_token);
2989    }
2990
2991    /// Issue an initial set of connection IDs to the peer upon connection
2992    fn issue_first_cids(&mut self, now: Instant) {
2993        if self.local_cid_state.cid_len() == 0 {
2994            return;
2995        }
2996
2997        // Subtract 1 to account for the CID we supplied while handshaking
2998        let n = self.peer_params.issue_cids_limit() - 1;
2999        self.endpoint_events
3000            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3001    }
3002
3003    fn populate_packet(
3004        &mut self,
3005        now: Instant,
3006        space_id: SpaceId,
3007        buf: &mut Vec<u8>,
3008        max_size: usize,
3009        pn: u64,
3010    ) -> SentFrames {
3011        let mut sent = SentFrames::default();
3012        let space = &mut self.spaces[space_id];
3013        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3014        space.pending_acks.maybe_ack_non_eliciting();
3015
3016        // HANDSHAKE_DONE
3017        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3018            buf.write(frame::Type::HANDSHAKE_DONE);
3019            sent.retransmits.get_or_create().handshake_done = true;
3020            // This is just a u8 counter and the frame is typically just sent once
3021            self.stats.frame_tx.handshake_done =
3022                self.stats.frame_tx.handshake_done.saturating_add(1);
3023        }
3024
3025        // PING
3026        if mem::replace(&mut space.ping_pending, false) {
3027            trace!("PING");
3028            buf.write(frame::Type::PING);
3029            sent.non_retransmits = true;
3030            self.stats.frame_tx.ping += 1;
3031        }
3032
3033        // IMMEDIATE_ACK
3034        if mem::replace(&mut space.immediate_ack_pending, false) {
3035            trace!("IMMEDIATE_ACK");
3036            buf.write(frame::Type::IMMEDIATE_ACK);
3037            sent.non_retransmits = true;
3038            self.stats.frame_tx.immediate_ack += 1;
3039        }
3040
3041        // ACK
3042        if space.pending_acks.can_send() {
3043            Self::populate_acks(
3044                now,
3045                self.receiving_ecn,
3046                &mut sent,
3047                space,
3048                buf,
3049                &mut self.stats,
3050            );
3051        }
3052
3053        // ACK_FREQUENCY
3054        if mem::replace(&mut space.pending.ack_frequency, false) {
3055            let sequence_number = self.ack_frequency.next_sequence_number();
3056
3057            // Safe to unwrap because this is always provided when ACK frequency is enabled
3058            let config = self.config.ack_frequency_config.as_ref().unwrap();
3059
3060            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3061            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3062                self.path.rtt.get(),
3063                config,
3064                &self.peer_params,
3065            );
3066
3067            trace!(?max_ack_delay, "ACK_FREQUENCY");
3068
3069            frame::AckFrequency {
3070                sequence: sequence_number,
3071                ack_eliciting_threshold: config.ack_eliciting_threshold,
3072                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3073                reordering_threshold: config.reordering_threshold,
3074            }
3075            .encode(buf);
3076
3077            sent.retransmits.get_or_create().ack_frequency = true;
3078
3079            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3080            self.stats.frame_tx.ack_frequency += 1;
3081        }
3082
3083        // PATH_CHALLENGE
3084        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3085            // Transmit challenges with every outgoing frame on an unvalidated path
3086            if let Some(token) = self.path.challenge {
3087                // But only send a packet solely for that purpose at most once
3088                self.path.challenge_pending = false;
3089                sent.non_retransmits = true;
3090                sent.requires_padding = true;
3091                trace!("PATH_CHALLENGE {:08x}", token);
3092                buf.write(frame::Type::PATH_CHALLENGE);
3093                buf.write(token);
3094                self.stats.frame_tx.path_challenge += 1;
3095            }
3096        }
3097
3098        // PATH_RESPONSE
3099        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3100            if let Some(token) = self.path_responses.pop_on_path(&self.path.remote) {
3101                sent.non_retransmits = true;
3102                sent.requires_padding = true;
3103                trace!("PATH_RESPONSE {:08x}", token);
3104                buf.write(frame::Type::PATH_RESPONSE);
3105                buf.write(token);
3106                self.stats.frame_tx.path_response += 1;
3107            }
3108        }
3109
3110        // CRYPTO
3111        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3112            let mut frame = match space.pending.crypto.pop_front() {
3113                Some(x) => x,
3114                None => break,
3115            };
3116
3117            // Calculate the maximum amount of crypto data we can store in the buffer.
3118            // Since the offset is known, we can reserve the exact size required to encode it.
3119            // For length we reserve 2bytes which allows to encode up to 2^14,
3120            // which is more than what fits into normally sized QUIC frames.
3121            let max_crypto_data_size = max_size
3122                - buf.len()
3123                - 1 // Frame Type
3124                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3125                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3126
3127            let len = frame
3128                .data
3129                .len()
3130                .min(2usize.pow(14) - 1)
3131                .min(max_crypto_data_size);
3132
3133            let data = frame.data.split_to(len);
3134            let truncated = frame::Crypto {
3135                offset: frame.offset,
3136                data,
3137            };
3138            trace!(
3139                "CRYPTO: off {} len {}",
3140                truncated.offset,
3141                truncated.data.len()
3142            );
3143            truncated.encode(buf);
3144            self.stats.frame_tx.crypto += 1;
3145            sent.retransmits.get_or_create().crypto.push_back(truncated);
3146            if !frame.data.is_empty() {
3147                frame.offset += len as u64;
3148                space.pending.crypto.push_front(frame);
3149            }
3150        }
3151
3152        if space_id == SpaceId::Data {
3153            self.streams.write_control_frames(
3154                buf,
3155                &mut space.pending,
3156                &mut sent.retransmits,
3157                &mut self.stats.frame_tx,
3158                max_size,
3159            );
3160        }
3161
3162        // NEW_CONNECTION_ID
3163        while buf.len() + 44 < max_size {
3164            let issued = match space.pending.new_cids.pop() {
3165                Some(x) => x,
3166                None => break,
3167            };
3168            trace!(
3169                sequence = issued.sequence,
3170                id = %issued.id,
3171                "NEW_CONNECTION_ID"
3172            );
3173            frame::NewConnectionId {
3174                sequence: issued.sequence,
3175                retire_prior_to: self.local_cid_state.retire_prior_to(),
3176                id: issued.id,
3177                reset_token: issued.reset_token,
3178            }
3179            .encode(buf);
3180            sent.retransmits.get_or_create().new_cids.push(issued);
3181            self.stats.frame_tx.new_connection_id += 1;
3182        }
3183
3184        // RETIRE_CONNECTION_ID
3185        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3186            let seq = match space.pending.retire_cids.pop() {
3187                Some(x) => x,
3188                None => break,
3189            };
3190            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3191            buf.write(frame::Type::RETIRE_CONNECTION_ID);
3192            buf.write_var(seq);
3193            sent.retransmits.get_or_create().retire_cids.push(seq);
3194            self.stats.frame_tx.retire_connection_id += 1;
3195        }
3196
3197        // DATAGRAM
3198        let mut sent_datagrams = false;
3199        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3200            match self.datagrams.write(buf, max_size) {
3201                true => {
3202                    sent_datagrams = true;
3203                    sent.non_retransmits = true;
3204                    self.stats.frame_tx.datagram += 1;
3205                }
3206                false => break,
3207            }
3208        }
3209        if self.datagrams.send_blocked && sent_datagrams {
3210            self.events.push_back(Event::DatagramsUnblocked);
3211            self.datagrams.send_blocked = false;
3212        }
3213
3214        // STREAM
3215        if space_id == SpaceId::Data {
3216            sent.stream_frames = self.streams.write_stream_frames(buf, max_size);
3217            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3218        }
3219
3220        sent
3221    }
3222
3223    /// Write pending ACKs into a buffer
3224    ///
3225    /// This method assumes ACKs are pending, and should only be called if
3226    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3227    fn populate_acks(
3228        now: Instant,
3229        receiving_ecn: bool,
3230        sent: &mut SentFrames,
3231        space: &mut PacketSpace,
3232        buf: &mut Vec<u8>,
3233        stats: &mut ConnectionStats,
3234    ) {
3235        debug_assert!(!space.pending_acks.ranges().is_empty());
3236
3237        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3238        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3239        let ecn = if receiving_ecn {
3240            Some(&space.ecn_counters)
3241        } else {
3242            None
3243        };
3244        sent.largest_acked = space.pending_acks.ranges().max();
3245
3246        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3247
3248        // TODO: This should come from `TransportConfig` if that gets configurable.
3249        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3250        let delay = delay_micros >> ack_delay_exp.into_inner();
3251
3252        trace!(
3253            "ACK {:?}, Delay = {}us",
3254            space.pending_acks.ranges(),
3255            delay_micros
3256        );
3257
3258        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3259        stats.frame_tx.acks += 1;
3260    }
3261
3262    fn close_common(&mut self) {
3263        trace!("connection closed");
3264        for &timer in &Timer::VALUES {
3265            self.timers.stop(timer);
3266        }
3267    }
3268
3269    fn set_close_timer(&mut self, now: Instant) {
3270        self.timers
3271            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3272    }
3273
3274    /// Handle transport parameters received from the peer
3275    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3276        if Some(self.orig_rem_cid) != params.initial_src_cid
3277            || (self.side.is_client()
3278                && (Some(self.initial_dst_cid) != params.original_dst_cid
3279                    || self.retry_src_cid != params.retry_src_cid))
3280        {
3281            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3282                "CID authentication failure",
3283            ));
3284        }
3285
3286        self.set_peer_params(params);
3287
3288        Ok(())
3289    }
3290
3291    fn set_peer_params(&mut self, params: TransportParameters) {
3292        self.streams.set_params(&params);
3293        self.idle_timeout = match (self.config.max_idle_timeout, params.max_idle_timeout) {
3294            (None, VarInt(0)) => None,
3295            (None, x) => Some(x),
3296            (Some(x), VarInt(0)) => Some(x),
3297            (Some(x), y) => Some(cmp::min(x, y)),
3298        };
3299        if let Some(ref info) = params.preferred_address {
3300            self.rem_cids.insert(frame::NewConnectionId {
3301                sequence: 1,
3302                id: info.connection_id,
3303                reset_token: info.stateless_reset_token,
3304                retire_prior_to: 0,
3305            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3306        }
3307        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3308        self.peer_params = params;
3309        self.path.mtud.on_peer_max_udp_payload_size_received(
3310            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3311        );
3312    }
3313
3314    fn decrypt_packet(
3315        &mut self,
3316        now: Instant,
3317        packet: &mut Packet,
3318    ) -> Result<Option<u64>, Option<TransportError>> {
3319        let result = packet_crypto::decrypt_packet_body(
3320            packet,
3321            &self.spaces,
3322            self.zero_rtt_crypto.as_ref(),
3323            self.key_phase,
3324            self.prev_crypto.as_ref(),
3325            self.next_crypto.as_ref(),
3326        )?;
3327
3328        let result = match result {
3329            Some(r) => r,
3330            None => return Ok(None),
3331        };
3332
3333        if result.outgoing_key_update_acked {
3334            if let Some(prev) = self.prev_crypto.as_mut() {
3335                prev.end_packet = Some((result.number, now));
3336                self.set_key_discard_timer(now, packet.header.space());
3337            }
3338        }
3339
3340        if result.incoming_key_update {
3341            trace!("key update authenticated");
3342            self.update_keys(Some((result.number, now)), true);
3343            self.set_key_discard_timer(now, packet.header.space());
3344        }
3345
3346        Ok(Some(result.number))
3347    }
3348
3349    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3350        trace!("executing key update");
3351        // Generate keys for the key phase after the one we're switching to, store them in
3352        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3353        // `prev_crypto`.
3354        let new = self
3355            .crypto
3356            .next_1rtt_keys()
3357            .expect("only called for `Data` packets");
3358        self.key_phase_size = new
3359            .local
3360            .confidentiality_limit()
3361            .saturating_sub(KEY_UPDATE_MARGIN);
3362        let old = mem::replace(
3363            &mut self.spaces[SpaceId::Data]
3364                .crypto
3365                .as_mut()
3366                .unwrap() // safe because update_keys() can only be triggered by short packets
3367                .packet,
3368            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3369        );
3370        self.spaces[SpaceId::Data].sent_with_keys = 0;
3371        self.prev_crypto = Some(PrevCrypto {
3372            crypto: old,
3373            end_packet,
3374            update_unacked: remote,
3375        });
3376        self.key_phase = !self.key_phase;
3377    }
3378
3379    fn peer_supports_ack_frequency(&self) -> bool {
3380        self.peer_params.min_ack_delay.is_some()
3381    }
3382
3383    /// Send an IMMEDIATE_ACK frame to the remote endpoint
3384    ///
3385    /// According to the spec, this will result in an error if the remote endpoint does not support
3386    /// the Acknowledgement Frequency extension
3387    pub(crate) fn immediate_ack(&mut self) {
3388        self.spaces[self.highest_space].immediate_ack_pending = true;
3389    }
3390
3391    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
3392    #[cfg(test)]
3393    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3394        let (first_decode, remaining) = match &event.0 {
3395            ConnectionEventInner::Datagram(DatagramConnectionEvent {
3396                first_decode,
3397                remaining,
3398                ..
3399            }) => (first_decode, remaining),
3400            _ => return None,
3401        };
3402
3403        if remaining.is_some() {
3404            panic!("Packets should never be coalesced in tests");
3405        }
3406
3407        let decrypted_header = packet_crypto::unprotect_header(
3408            first_decode.clone(),
3409            &self.spaces,
3410            self.zero_rtt_crypto.as_ref(),
3411            self.peer_params.stateless_reset_token,
3412        )?;
3413
3414        let mut packet = decrypted_header.packet?;
3415        packet_crypto::decrypt_packet_body(
3416            &mut packet,
3417            &self.spaces,
3418            self.zero_rtt_crypto.as_ref(),
3419            self.key_phase,
3420            self.prev_crypto.as_ref(),
3421            self.next_crypto.as_ref(),
3422        )
3423        .ok()?;
3424
3425        Some(packet.payload.to_vec())
3426    }
3427
3428    /// The number of bytes of packets containing retransmittable frames that have not been
3429    /// acknowledged or declared lost.
3430    #[cfg(test)]
3431    pub(crate) fn bytes_in_flight(&self) -> u64 {
3432        self.path.in_flight.bytes
3433    }
3434
3435    /// Number of bytes worth of non-ack-only packets that may be sent
3436    #[cfg(test)]
3437    pub(crate) fn congestion_window(&self) -> u64 {
3438        self.path
3439            .congestion
3440            .window()
3441            .saturating_sub(self.path.in_flight.bytes)
3442    }
3443
3444    /// Whether no timers but keepalive, idle, rtt and pushnewcid are running
3445    #[cfg(test)]
3446    pub(crate) fn is_idle(&self) -> bool {
3447        Timer::VALUES
3448            .iter()
3449            .filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
3450            .filter_map(|&t| Some((t, self.timers.get(t)?)))
3451            .min_by_key(|&(_, time)| time)
3452            .map_or(true, |(timer, _)| timer == Timer::Idle)
3453    }
3454
3455    /// Total number of outgoing packets that have been deemed lost
3456    #[cfg(test)]
3457    pub(crate) fn lost_packets(&self) -> u64 {
3458        self.lost_packets
3459    }
3460
3461    /// Whether explicit congestion notification is in use on outgoing packets.
3462    #[cfg(test)]
3463    pub(crate) fn using_ecn(&self) -> bool {
3464        self.path.sending_ecn
3465    }
3466
3467    /// The number of received bytes in the current path
3468    #[cfg(test)]
3469    pub(crate) fn total_recvd(&self) -> u64 {
3470        self.path.total_recvd
3471    }
3472
3473    #[cfg(test)]
3474    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3475        self.local_cid_state.active_seq()
3476    }
3477
3478    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
3479    /// with updated `retire_prior_to` field set to `v`
3480    #[cfg(test)]
3481    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3482        let n = self.local_cid_state.assign_retire_seq(v);
3483        self.endpoint_events
3484            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3485    }
3486
3487    /// Check the current active remote CID sequence
3488    #[cfg(test)]
3489    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3490        self.rem_cids.active_seq()
3491    }
3492
3493    /// Returns the detected maximum udp payload size for the current path
3494    #[cfg(test)]
3495    pub(crate) fn path_mtu(&self) -> u16 {
3496        self.path.current_mtu()
3497    }
3498
3499    /// Whether we have 1-RTT data to send
3500    ///
3501    /// See also `self.space(SpaceId::Data).can_send()`
3502    fn can_send_1rtt(&self, max_size: usize) -> bool {
3503        self.streams.can_send_stream_data()
3504            || self.path.challenge_pending
3505            || self
3506                .prev_path
3507                .as_ref()
3508                .map_or(false, |(_, x)| x.challenge_pending)
3509            || !self.path_responses.is_empty()
3510            || self
3511                .datagrams
3512                .outgoing
3513                .front()
3514                .map_or(false, |x| x.size(true) <= max_size)
3515    }
3516
3517    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
3518    fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3519        // Visit known paths from newest to oldest to find the one `pn` was sent on
3520        for path in [&mut self.path]
3521            .into_iter()
3522            .chain(self.prev_path.as_mut().map(|(_, data)| data))
3523        {
3524            if path.remove_in_flight(pn, packet) {
3525                return;
3526            }
3527        }
3528    }
3529
3530    /// Terminate the connection instantly, without sending a close packet
3531    fn kill(&mut self, reason: ConnectionError) {
3532        self.close_common();
3533        self.error = Some(reason);
3534        self.state = State::Drained;
3535        self.endpoint_events.push_back(EndpointEventInner::Drained);
3536    }
3537
3538    /// Storage size required for the largest packet known to be supported by the current path
3539    ///
3540    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
3541    pub fn current_mtu(&self) -> u16 {
3542        self.path.current_mtu()
3543    }
3544
3545    /// Size of non-frame data for a 1-RTT packet
3546    ///
3547    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
3548    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
3549    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
3550    /// latency and packet loss.
3551    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3552        let pn_len = match pn {
3553            Some(pn) => PacketNumber::new(
3554                pn,
3555                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3556            )
3557            .len(),
3558            // Upper bound
3559            None => 4,
3560        };
3561
3562        // 1 byte for flags
3563        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3564    }
3565
3566    fn tag_len_1rtt(&self) -> usize {
3567        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3568            Some(crypto) => Some(&*crypto.packet.local),
3569            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3570        };
3571        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
3572        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
3573        // but that would needlessly prevent sending datagrams during 0-RTT.
3574        key.map_or(16, |x| x.tag_len())
3575    }
3576}
3577
3578impl fmt::Debug for Connection {
3579    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3580        f.debug_struct("Connection")
3581            .field("handshake_cid", &self.handshake_cid)
3582            .finish()
3583    }
3584}
3585
3586/// Reasons why a connection might be lost
3587#[derive(Debug, Error, Clone, PartialEq, Eq)]
3588pub enum ConnectionError {
3589    /// The peer doesn't implement any supported version
3590    #[error("peer doesn't implement any supported version")]
3591    VersionMismatch,
3592    /// The peer violated the QUIC specification as understood by this implementation
3593    #[error(transparent)]
3594    TransportError(#[from] TransportError),
3595    /// The peer's QUIC stack aborted the connection automatically
3596    #[error("aborted by peer: {0}")]
3597    ConnectionClosed(frame::ConnectionClose),
3598    /// The peer closed the connection
3599    #[error("closed by peer: {0}")]
3600    ApplicationClosed(frame::ApplicationClose),
3601    /// The peer is unable to continue processing this connection, usually due to having restarted
3602    #[error("reset by peer")]
3603    Reset,
3604    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
3605    ///
3606    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
3607    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
3608    /// and [`TransportConfig::keep_alive_interval()`].
3609    #[error("timed out")]
3610    TimedOut,
3611    /// The local application closed the connection
3612    #[error("closed")]
3613    LocallyClosed,
3614    /// The connection could not be created because not enough of the CID space is available
3615    ///
3616    /// Try using longer connection IDs.
3617    #[error("CIDs exhausted")]
3618    CidsExhausted,
3619}
3620
3621impl From<Close> for ConnectionError {
3622    fn from(x: Close) -> Self {
3623        match x {
3624            Close::Connection(reason) => Self::ConnectionClosed(reason),
3625            Close::Application(reason) => Self::ApplicationClosed(reason),
3626        }
3627    }
3628}
3629
3630// For compatibility with API consumers
3631impl From<ConnectionError> for io::Error {
3632    fn from(x: ConnectionError) -> Self {
3633        use self::ConnectionError::*;
3634        let kind = match x {
3635            TimedOut => io::ErrorKind::TimedOut,
3636            Reset => io::ErrorKind::ConnectionReset,
3637            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3638            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3639                io::ErrorKind::Other
3640            }
3641        };
3642        Self::new(kind, x)
3643    }
3644}
3645
3646#[allow(unreachable_pub)] // fuzzing only
3647#[derive(Clone)]
3648pub enum State {
3649    Handshake(state::Handshake),
3650    Established,
3651    Closed(state::Closed),
3652    Draining,
3653    /// Waiting for application to call close so we can dispose of the resources
3654    Drained,
3655}
3656
3657impl State {
3658    fn closed<R: Into<Close>>(reason: R) -> Self {
3659        Self::Closed(state::Closed {
3660            reason: reason.into(),
3661        })
3662    }
3663
3664    fn is_handshake(&self) -> bool {
3665        matches!(*self, Self::Handshake(_))
3666    }
3667
3668    fn is_established(&self) -> bool {
3669        matches!(*self, Self::Established)
3670    }
3671
3672    fn is_closed(&self) -> bool {
3673        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3674    }
3675
3676    fn is_drained(&self) -> bool {
3677        matches!(*self, Self::Drained)
3678    }
3679}
3680
3681mod state {
3682    use super::*;
3683
3684    #[allow(unreachable_pub)] // fuzzing only
3685    #[derive(Clone)]
3686    pub struct Handshake {
3687        /// Whether the remote CID has been set by the peer yet
3688        ///
3689        /// Always set for servers
3690        pub(super) rem_cid_set: bool,
3691        /// Stateless retry token received in the first Initial by a server.
3692        ///
3693        /// Must be present in every Initial. Always empty for clients.
3694        pub(super) expected_token: Bytes,
3695        /// First cryptographic message
3696        ///
3697        /// Only set for clients
3698        pub(super) client_hello: Option<Bytes>,
3699    }
3700
3701    #[allow(unreachable_pub)] // fuzzing only
3702    #[derive(Clone)]
3703    pub struct Closed {
3704        pub(super) reason: Close,
3705    }
3706}
3707
3708/// Events of interest to the application
3709#[derive(Debug)]
3710pub enum Event {
3711    /// The connection's handshake data is ready
3712    HandshakeDataReady,
3713    /// The connection was successfully established
3714    Connected,
3715    /// The connection was lost
3716    ///
3717    /// Emitted if the peer closes the connection or an error is encountered.
3718    ConnectionLost {
3719        /// Reason that the connection was closed
3720        reason: ConnectionError,
3721    },
3722    /// Stream events
3723    Stream(StreamEvent),
3724    /// One or more application datagrams have been received
3725    DatagramReceived,
3726    /// One or more application datagrams have been sent after blocking
3727    DatagramsUnblocked,
3728}
3729
3730fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
3731    if x > y {
3732        x - y
3733    } else {
3734        Duration::new(0, 0)
3735    }
3736}
3737
3738fn get_max_ack_delay(params: &TransportParameters) -> Duration {
3739    Duration::from_micros(params.max_ack_delay.0 * 1000)
3740}
3741
3742// Prevents overflow and improves behavior in extreme circumstances
3743const MAX_BACKOFF_EXPONENT: u32 = 16;
3744// Minimal remaining size to allow packet coalescing
3745const MIN_PACKET_SPACE: usize = 40;
3746/// The maximum amount of datagrams that are sent in a single transmit
3747///
3748/// This can be lower than the maximum platform capabilities, to avoid excessive
3749/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
3750/// that numbers around 10 are a good compromise.
3751const MAX_TRANSMIT_SEGMENTS: usize = 10;
3752
3753/// Perform key updates this many packets before the AEAD confidentiality limit.
3754///
3755/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
3756const KEY_UPDATE_MARGIN: u64 = 10_000;
3757
3758#[derive(Default)]
3759struct SentFrames {
3760    retransmits: ThinRetransmits,
3761    largest_acked: Option<u64>,
3762    stream_frames: StreamMetaVec,
3763    /// Whether the packet contains non-retransmittable frames (like datagrams)
3764    non_retransmits: bool,
3765    requires_padding: bool,
3766}
3767
3768impl SentFrames {
3769    /// Returns whether the packet contains only ACKs
3770    fn is_ack_only(&self, streams: &StreamsState) -> bool {
3771        self.largest_acked.is_some()
3772            && !self.non_retransmits
3773            && self.stream_frames.is_empty()
3774            && self.retransmits.is_empty(streams)
3775    }
3776}