1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13use rand::{rngs::StdRng, Rng, SeedableRng};
14use thiserror::Error;
15use tracing::{debug, error, trace, trace_span, warn};
16
17use crate::{
18 cid_generator::ConnectionIdGenerator,
19 cid_queue::CidQueue,
20 coding::BufMutExt,
21 config::{ServerConfig, TransportConfig},
22 crypto::{self, KeyPair, Keys, PacketKey},
23 frame,
24 frame::{Close, Datagram, FrameStruct},
25 packet::{
26 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
27 PacketNumber, PartialDecode, SpaceId,
28 },
29 range_set::ArrayRangeSet,
30 shared::{
31 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32 EndpointEvent, EndpointEventInner,
33 },
34 token::ResetToken,
35 transport_parameters::TransportParameters,
36 Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode,
37 VarInt, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, TIMER_GRANULARITY,
38};
39
40mod ack_frequency;
41use ack_frequency::AckFrequencyState;
42
43mod assembler;
44pub use assembler::Chunk;
45
46mod cid_state;
47use cid_state::CidState;
48
49mod datagrams;
50use datagrams::DatagramState;
51pub use datagrams::{Datagrams, SendDatagramError};
52
53mod mtud;
54mod pacing;
55
56mod packet_builder;
57use packet_builder::PacketBuilder;
58
59mod packet_crypto;
60use packet_crypto::{PrevCrypto, ZeroRttCrypto};
61
62mod paths;
63pub use paths::RttEstimator;
64use paths::{PathData, PathResponses};
65
66mod send_buffer;
67
68mod spaces;
69#[cfg(fuzzing)]
70pub use spaces::Retransmits;
71#[cfg(not(fuzzing))]
72use spaces::Retransmits;
73use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
74
75mod stats;
76pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
77
78mod streams;
79#[cfg(fuzzing)]
80pub use streams::StreamsState;
81#[cfg(not(fuzzing))]
82use streams::StreamsState;
83pub use streams::{
84 BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
85 SendStream, StreamEvent, Streams, WriteError, Written,
86};
87
88mod timer;
89use crate::congestion::Controller;
90use timer::{Timer, TimerTable};
91
92pub struct Connection {
132 endpoint_config: Arc<EndpointConfig>,
133 server_config: Option<Arc<ServerConfig>>,
134 config: Arc<TransportConfig>,
135 rng: StdRng,
136 crypto: Box<dyn crypto::Session>,
137 handshake_cid: ConnectionId,
139 rem_handshake_cid: ConnectionId,
141 local_ip: Option<IpAddr>,
144 path: PathData,
145 allow_mtud: bool,
147 prev_path: Option<(ConnectionId, PathData)>,
148 state: State,
149 side: Side,
150 zero_rtt_enabled: bool,
152 zero_rtt_crypto: Option<ZeroRttCrypto>,
154 key_phase: bool,
155 key_phase_size: u64,
157 peer_params: TransportParameters,
159 orig_rem_cid: ConnectionId,
161 initial_dst_cid: ConnectionId,
163 retry_src_cid: Option<ConnectionId>,
166 lost_packets: u64,
168 events: VecDeque<Event>,
169 endpoint_events: VecDeque<EndpointEventInner>,
170 spin_enabled: bool,
172 spin: bool,
174 spaces: [PacketSpace; 3],
176 highest_space: SpaceId,
178 prev_crypto: Option<PrevCrypto>,
180 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
185 accepted_0rtt: bool,
186 permit_idle_reset: bool,
188 idle_timeout: Option<VarInt>,
190 timers: TimerTable,
191 authentication_failures: u64,
193 error: Option<ConnectionError>,
195 retry_token: Bytes,
198 packet_number_filter: PacketNumberFilter,
200
201 path_responses: PathResponses,
206 close: bool,
207
208 ack_frequency: AckFrequencyState,
212
213 pto_count: u32,
218
219 receiving_ecn: bool,
224 total_authed_packets: u64,
226 app_limited: bool,
229
230 streams: StreamsState,
231 rem_cids: CidQueue,
233 local_cid_state: CidState,
235 datagrams: DatagramState,
237 stats: ConnectionStats,
239 version: u32,
241}
242
243impl Connection {
244 pub(crate) fn new(
245 endpoint_config: Arc<EndpointConfig>,
246 server_config: Option<Arc<ServerConfig>>,
247 config: Arc<TransportConfig>,
248 init_cid: ConnectionId,
249 loc_cid: ConnectionId,
250 rem_cid: ConnectionId,
251 pref_addr_cid: Option<ConnectionId>,
252 remote: SocketAddr,
253 local_ip: Option<IpAddr>,
254 crypto: Box<dyn crypto::Session>,
255 cid_gen: &dyn ConnectionIdGenerator,
256 now: Instant,
257 version: u32,
258 allow_mtud: bool,
259 rng_seed: [u8; 32],
260 path_validated: bool,
261 ) -> Self {
262 let side = if server_config.is_some() {
263 Side::Server
264 } else {
265 Side::Client
266 };
267 let initial_space = PacketSpace {
268 crypto: Some(crypto.initial_keys(&init_cid, side)),
269 ..PacketSpace::new(now)
270 };
271 let state = State::Handshake(state::Handshake {
272 rem_cid_set: side.is_server(),
273 expected_token: Bytes::new(),
274 client_hello: None,
275 });
276 let mut rng = StdRng::from_seed(rng_seed);
277 let mut this = Self {
278 endpoint_config,
279 server_config,
280 crypto,
281 handshake_cid: loc_cid,
282 rem_handshake_cid: rem_cid,
283 local_cid_state: CidState::new(
284 cid_gen.cid_len(),
285 cid_gen.cid_lifetime(),
286 now,
287 if pref_addr_cid.is_some() { 2 } else { 1 },
288 ),
289 path: PathData::new(remote, allow_mtud, None, now, path_validated, &config),
290 allow_mtud,
291 local_ip,
292 prev_path: None,
293 side,
294 state,
295 zero_rtt_enabled: false,
296 zero_rtt_crypto: None,
297 key_phase: false,
298 key_phase_size: rng.gen_range(10..1000),
305 peer_params: TransportParameters::default(),
306 orig_rem_cid: rem_cid,
307 initial_dst_cid: init_cid,
308 retry_src_cid: None,
309 lost_packets: 0,
310 events: VecDeque::new(),
311 endpoint_events: VecDeque::new(),
312 spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
313 spin: false,
314 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
315 highest_space: SpaceId::Initial,
316 prev_crypto: None,
317 next_crypto: None,
318 accepted_0rtt: false,
319 permit_idle_reset: true,
320 idle_timeout: config.max_idle_timeout,
321 timers: TimerTable::default(),
322 authentication_failures: 0,
323 error: None,
324 retry_token: Bytes::new(),
325 #[cfg(test)]
326 packet_number_filter: match config.deterministic_packet_numbers {
327 false => PacketNumberFilter::new(&mut rng),
328 true => PacketNumberFilter::disabled(),
329 },
330 #[cfg(not(test))]
331 packet_number_filter: PacketNumberFilter::new(&mut rng),
332
333 path_responses: PathResponses::default(),
334 close: false,
335
336 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
337 &TransportParameters::default(),
338 )),
339
340 pto_count: 0,
341
342 app_limited: false,
343 receiving_ecn: false,
344 total_authed_packets: 0,
345
346 streams: StreamsState::new(
347 side,
348 config.max_concurrent_uni_streams,
349 config.max_concurrent_bidi_streams,
350 config.send_window,
351 config.receive_window,
352 config.stream_receive_window,
353 ),
354 datagrams: DatagramState::default(),
355 config,
356 rem_cids: CidQueue::new(rem_cid),
357 rng,
358 stats: ConnectionStats::default(),
359 version,
360 };
361 if side.is_client() {
362 this.write_crypto();
364 this.init_0rtt();
365 }
366 this
367 }
368
369 #[must_use]
377 pub fn poll_timeout(&mut self) -> Option<Instant> {
378 self.timers.next_timeout()
379 }
380
381 #[must_use]
387 pub fn poll(&mut self) -> Option<Event> {
388 if let Some(x) = self.events.pop_front() {
389 return Some(x);
390 }
391
392 if let Some(event) = self.streams.poll() {
393 return Some(Event::Stream(event));
394 }
395
396 if let Some(err) = self.error.take() {
397 return Some(Event::ConnectionLost { reason: err });
398 }
399
400 None
401 }
402
403 #[must_use]
405 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
406 self.endpoint_events.pop_front().map(EndpointEvent)
407 }
408
409 #[must_use]
411 pub fn streams(&mut self) -> Streams<'_> {
412 Streams {
413 state: &mut self.streams,
414 conn_state: &self.state,
415 }
416 }
417
418 #[must_use]
420 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
421 assert!(id.dir() == Dir::Bi || id.initiator() != self.side);
422 RecvStream {
423 id,
424 state: &mut self.streams,
425 pending: &mut self.spaces[SpaceId::Data].pending,
426 }
427 }
428
429 #[must_use]
431 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
432 assert!(id.dir() == Dir::Bi || id.initiator() == self.side);
433 SendStream {
434 id,
435 state: &mut self.streams,
436 pending: &mut self.spaces[SpaceId::Data].pending,
437 conn_state: &self.state,
438 }
439 }
440
441 #[must_use]
451 pub fn poll_transmit(
452 &mut self,
453 now: Instant,
454 max_datagrams: usize,
455 buf: &mut Vec<u8>,
456 ) -> Option<Transmit> {
457 assert!(max_datagrams != 0);
458 let max_datagrams = match self.config.enable_segmentation_offload {
459 false => 1,
460 true => max_datagrams.min(MAX_TRANSMIT_SEGMENTS),
461 };
462
463 let mut num_datagrams = 0;
464 let mut datagram_start = 0;
467 let mut segment_size = usize::from(self.path.current_mtu());
468
469 if let Some((prev_cid, ref mut prev_path)) = self.prev_path {
471 if prev_path.challenge_pending {
472 prev_path.challenge_pending = false;
473 let token = prev_path
474 .challenge
475 .expect("previous path challenge pending without token");
476 let destination = prev_path.remote;
477 debug_assert_eq!(
478 self.highest_space,
479 SpaceId::Data,
480 "PATH_CHALLENGE queued without 1-RTT keys"
481 );
482 buf.reserve(MIN_INITIAL_SIZE as usize);
483
484 let buf_capacity = buf.capacity();
485
486 let mut builder = PacketBuilder::new(
492 now,
493 SpaceId::Data,
494 prev_cid,
495 buf,
496 buf_capacity,
497 0,
498 false,
499 self,
500 )?;
501 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
502 buf.write(frame::Type::PATH_CHALLENGE);
503 buf.write(token);
504 self.stats.frame_tx.path_challenge += 1;
505
506 builder.pad_to(MIN_INITIAL_SIZE);
511
512 builder.finish(self, buf);
513 self.stats.udp_tx.on_sent(1, buf.len());
514 return Some(Transmit {
515 destination,
516 size: buf.len(),
517 ecn: None,
518 segment_size: None,
519 src_ip: self.local_ip,
520 });
521 }
522 }
523
524 for space in SpaceId::iter() {
526 let request_immediate_ack =
527 space == SpaceId::Data && self.peer_supports_ack_frequency();
528 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
529 }
530
531 let close = match self.state {
533 State::Drained => {
534 self.app_limited = true;
535 return None;
536 }
537 State::Draining | State::Closed(_) => {
538 if !self.close {
541 self.app_limited = true;
542 return None;
543 }
544 true
545 }
546 _ => false,
547 };
548
549 if let Some(config) = &self.config.ack_frequency_config {
551 self.spaces[SpaceId::Data].pending.ack_frequency = self
552 .ack_frequency
553 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
554 && self.highest_space == SpaceId::Data
555 && self.peer_supports_ack_frequency();
556 }
557
558 let mut buf_capacity = 0;
562
563 let mut coalesce = true;
564 let mut builder_storage: Option<PacketBuilder> = None;
565 let mut sent_frames = None;
566 let mut pad_datagram = false;
567 let mut congestion_blocked = false;
568
569 let mut space_idx = 0;
571 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
572 while space_idx < spaces.len() {
575 let space_id = spaces[space_idx];
576 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
583 let frame_space_1rtt =
584 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
585
586 let can_send = self.space_can_send(space_id, frame_space_1rtt);
588 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
589 space_idx += 1;
590 continue;
591 }
592
593 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
594 || self.spaces[space_id].ping_pending
595 || self.spaces[space_id].immediate_ack_pending;
596 if space_id == SpaceId::Data {
597 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
598 }
599
600 let buf_end = if let Some(builder) = &builder_storage {
604 buf.len().max(builder.min_size) + builder.tag_len
605 } else {
606 buf.len()
607 };
608
609 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE {
610 if buf_capacity >= segment_size * max_datagrams {
614 break;
616 }
617
618 if self
625 .path
626 .anti_amplification_blocked(segment_size as u64 * num_datagrams + 1)
627 {
628 trace!("blocked by anti-amplification");
629 break;
630 }
631
632 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
635 let untracked_bytes = if let Some(builder) = &builder_storage {
637 buf_capacity - builder.partial_encode.start
638 } else {
639 0
640 } as u64;
641 debug_assert!(untracked_bytes <= segment_size as u64);
642
643 let bytes_to_send = segment_size as u64 + untracked_bytes;
644 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
645 space_idx += 1;
646 congestion_blocked = true;
647 trace!("blocked by congestion control");
650 continue;
651 }
652
653 let smoothed_rtt = self.path.rtt.get();
655 if let Some(delay) = self.path.pacing.delay(
656 smoothed_rtt,
657 bytes_to_send,
658 self.path.current_mtu(),
659 self.path.congestion.window(),
660 now,
661 ) {
662 self.timers.set(Timer::Pacing, delay);
663 congestion_blocked = true;
664 trace!("blocked by pacing");
667 break;
668 }
669 }
670
671 if let Some(mut builder) = builder_storage.take() {
673 if pad_datagram {
674 builder.pad_to(MIN_INITIAL_SIZE);
675 }
676
677 if num_datagrams > 1 {
678 const MAX_PADDING: usize = 16;
685 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
686 - datagram_start
687 + builder.tag_len;
688 if packet_len_unpadded + MAX_PADDING < segment_size {
689 trace!(
690 "GSO truncated by demand for {} padding bytes",
691 segment_size - packet_len_unpadded
692 );
693 builder_storage = Some(builder);
694 break;
695 }
696
697 builder.pad_to(segment_size as u16);
700 }
701
702 builder.finish_and_track(now, self, sent_frames.take(), buf);
703
704 if num_datagrams == 1 {
705 segment_size = buf.len();
712 buf_capacity = buf.len();
715
716 if space_id == SpaceId::Data {
723 let frame_space_1rtt =
724 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
725 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
726 break;
727 }
728 }
729 }
730 }
731
732 buf_capacity += segment_size;
734 if buf.capacity() < buf_capacity {
735 buf.reserve(max_datagrams * segment_size);
744 }
745 num_datagrams += 1;
746 coalesce = true;
747 pad_datagram = false;
748 datagram_start = buf.len();
749
750 debug_assert_eq!(
751 datagram_start % segment_size,
752 0,
753 "datagrams in a GSO batch must be aligned to the segment size"
754 );
755 } else {
756 if let Some(builder) = builder_storage.take() {
760 builder.finish_and_track(now, self, sent_frames.take(), buf);
761 }
762 }
763
764 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
765
766 if self.spaces[SpaceId::Initial].crypto.is_some()
771 && space_id == SpaceId::Handshake
772 && self.side.is_client()
773 {
774 self.discard_space(now, SpaceId::Initial);
777 }
778 if let Some(ref mut prev) = self.prev_crypto {
779 prev.update_unacked = false;
780 }
781
782 debug_assert!(
783 builder_storage.is_none() && sent_frames.is_none(),
784 "Previous packet must have been finished"
785 );
786
787 let builder = builder_storage.get_or_insert(PacketBuilder::new(
791 now,
792 space_id,
793 self.rem_cids.active(),
794 buf,
795 buf_capacity,
796 datagram_start,
797 ack_eliciting,
798 self,
799 )?);
800 coalesce = coalesce && !builder.short_header;
801
802 pad_datagram |=
804 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
805
806 if close {
807 trace!("sending CONNECTION_CLOSE");
808 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
813 Self::populate_acks(
814 now,
815 self.receiving_ecn,
816 &mut SentFrames::default(),
817 &mut self.spaces[space_id],
818 buf,
819 &mut self.stats,
820 );
821 }
822
823 debug_assert!(
827 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
828 "ACKs should leave space for ConnectionClose"
829 );
830 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
831 let max_frame_size = builder.max_size - buf.len();
832 match self.state {
833 State::Closed(state::Closed { ref reason }) => {
834 if space_id == SpaceId::Data || reason.is_transport_layer() {
835 reason.encode(buf, max_frame_size)
836 } else {
837 frame::ConnectionClose {
838 error_code: TransportErrorCode::APPLICATION_ERROR,
839 frame_type: None,
840 reason: Bytes::new(),
841 }
842 .encode(buf, max_frame_size)
843 }
844 }
845 State::Draining => frame::ConnectionClose {
846 error_code: TransportErrorCode::NO_ERROR,
847 frame_type: None,
848 reason: Bytes::new(),
849 }
850 .encode(buf, max_frame_size),
851 _ => unreachable!(
852 "tried to make a close packet when the connection wasn't closed"
853 ),
854 }
855 }
856 if space_id == self.highest_space {
857 self.close = false;
859 break;
861 } else {
862 space_idx += 1;
866 continue;
867 }
868 }
869
870 if space_id == SpaceId::Data && num_datagrams == 1 {
873 if let Some((token, remote)) = self.path_responses.pop_off_path(&self.path.remote) {
874 let mut builder = builder_storage.take().unwrap();
877 trace!("PATH_RESPONSE {:08x} (off-path)", token);
878 buf.write(frame::Type::PATH_RESPONSE);
879 buf.write(token);
880 self.stats.frame_tx.path_response += 1;
881 builder.pad_to(MIN_INITIAL_SIZE);
882 builder.finish_and_track(
883 now,
884 self,
885 Some(SentFrames {
886 non_retransmits: true,
887 ..SentFrames::default()
888 }),
889 buf,
890 );
891 self.stats.udp_tx.on_sent(1, buf.len());
892 return Some(Transmit {
893 destination: remote,
894 size: buf.len(),
895 ecn: None,
896 segment_size: None,
897 src_ip: self.local_ip,
898 });
899 }
900 }
901
902 let sent =
903 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
904
905 debug_assert!(
912 !(sent.is_ack_only(&self.streams)
913 && !can_send.acks
914 && can_send.other
915 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
916 && self.datagrams.outgoing.is_empty()),
917 "SendableFrames was {can_send:?}, but only ACKs have been written"
918 );
919 pad_datagram |= sent.requires_padding;
920
921 if sent.largest_acked.is_some() {
922 self.spaces[space_id].pending_acks.acks_sent();
923 self.timers.stop(Timer::MaxAckDelay);
924 }
925
926 sent_frames = Some(sent);
928
929 }
932
933 if let Some(mut builder) = builder_storage {
935 if pad_datagram {
936 builder.pad_to(MIN_INITIAL_SIZE);
937 }
938 let last_packet_number = builder.exact_number;
939 builder.finish_and_track(now, self, sent_frames, buf);
940 self.path
941 .congestion
942 .on_sent(now, buf.len() as u64, last_packet_number);
943 }
944
945 self.app_limited = buf.is_empty() && !congestion_blocked;
946
947 if buf.is_empty() && self.state.is_established() {
949 let space_id = SpaceId::Data;
950 let probe_size = match self
951 .path
952 .mtud
953 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))
954 {
955 Some(next_probe_size) => next_probe_size,
956 None => return None,
957 };
958
959 let buf_capacity = probe_size as usize;
960 buf.reserve(buf_capacity);
961
962 let mut builder = PacketBuilder::new(
963 now,
964 space_id,
965 self.rem_cids.active(),
966 buf,
967 buf_capacity,
968 0,
969 true,
970 self,
971 )?;
972
973 buf.write(frame::Type::PING);
975 self.stats.frame_tx.ping += 1;
976
977 if self.peer_supports_ack_frequency() {
979 buf.write(frame::Type::IMMEDIATE_ACK);
980 self.stats.frame_tx.immediate_ack += 1;
981 }
982
983 builder.pad_to(probe_size);
984 let sent_frames = SentFrames {
985 non_retransmits: true,
986 ..Default::default()
987 };
988 builder.finish_and_track(now, self, Some(sent_frames), buf);
989
990 self.stats.path.sent_plpmtud_probes += 1;
991 num_datagrams = 1;
992
993 trace!(?probe_size, "writing MTUD probe");
994 }
995
996 if buf.is_empty() {
997 return None;
998 }
999
1000 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
1001 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
1002
1003 self.stats.udp_tx.on_sent(num_datagrams, buf.len());
1004
1005 Some(Transmit {
1006 destination: self.path.remote,
1007 size: buf.len(),
1008 ecn: if self.path.sending_ecn {
1009 Some(EcnCodepoint::Ect0)
1010 } else {
1011 None
1012 },
1013 segment_size: match num_datagrams {
1014 1 => None,
1015 _ => Some(segment_size),
1016 },
1017 src_ip: self.local_ip,
1018 })
1019 }
1020
1021 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1023 if self.spaces[space_id].crypto.is_none()
1024 && (space_id != SpaceId::Data
1025 || self.zero_rtt_crypto.is_none()
1026 || self.side.is_server())
1027 {
1028 return SendableFrames::empty();
1030 }
1031 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1032 if space_id == SpaceId::Data {
1033 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1034 }
1035 can_send
1036 }
1037
1038 pub fn handle_event(&mut self, event: ConnectionEvent) {
1044 use self::ConnectionEventInner::*;
1045 match event.0 {
1046 Datagram(DatagramConnectionEvent {
1047 now,
1048 remote,
1049 ecn,
1050 first_decode,
1051 remaining,
1052 }) => {
1053 if remote != self.path.remote
1057 && self.server_config.as_ref().map_or(true, |x| !x.migration)
1058 {
1059 trace!("discarding packet from unrecognized peer {}", remote);
1060 return;
1061 }
1062
1063 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1064
1065 self.stats.udp_rx.datagrams += 1;
1066 self.stats.udp_rx.bytes += first_decode.len() as u64;
1067 let data_len = first_decode.len();
1068
1069 self.handle_decode(now, remote, ecn, first_decode);
1070 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1075
1076 if let Some(data) = remaining {
1077 self.stats.udp_rx.bytes += data.len() as u64;
1078 self.handle_coalesced(now, remote, ecn, data);
1079 }
1080
1081 if was_anti_amplification_blocked {
1082 self.set_loss_detection_timer(now);
1086 }
1087 }
1088 NewIdentifiers(ids, now) => {
1089 self.local_cid_state.new_cids(&ids, now);
1090 ids.into_iter().rev().for_each(|frame| {
1091 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1092 });
1093 if self
1095 .timers
1096 .get(Timer::PushNewCid)
1097 .map_or(true, |x| x <= now)
1098 {
1099 self.reset_cid_retirement();
1100 }
1101 }
1102 }
1103 }
1104
1105 pub fn handle_timeout(&mut self, now: Instant) {
1115 for &timer in &Timer::VALUES {
1116 if !self.timers.is_expired(timer, now) {
1117 continue;
1118 }
1119 self.timers.stop(timer);
1120 trace!(timer = ?timer, "timeout");
1121 match timer {
1122 Timer::Close => {
1123 self.state = State::Drained;
1124 self.endpoint_events.push_back(EndpointEventInner::Drained);
1125 }
1126 Timer::Idle => {
1127 self.kill(ConnectionError::TimedOut);
1128 }
1129 Timer::KeepAlive => {
1130 trace!("sending keep-alive");
1131 self.ping();
1132 }
1133 Timer::LossDetection => {
1134 self.on_loss_detection_timeout(now);
1135 }
1136 Timer::KeyDiscard => {
1137 self.zero_rtt_crypto = None;
1138 self.prev_crypto = None;
1139 }
1140 Timer::PathValidation => {
1141 debug!("path validation failed");
1142 if let Some((_, prev)) = self.prev_path.take() {
1143 self.path = prev;
1144 }
1145 self.path.challenge = None;
1146 self.path.challenge_pending = false;
1147 }
1148 Timer::Pacing => trace!("pacing timer expired"),
1149 Timer::PushNewCid => {
1150 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1152 if !self.state.is_closed() {
1153 trace!(
1154 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1155 self.local_cid_state.retire_prior_to()
1156 );
1157 self.endpoint_events
1158 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1159 }
1160 }
1161 Timer::MaxAckDelay => {
1162 trace!("max ack delay reached");
1163 self.spaces[SpaceId::Data]
1165 .pending_acks
1166 .on_max_ack_delay_timeout()
1167 }
1168 }
1169 }
1170 }
1171
1172 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1184 self.close_inner(
1185 now,
1186 Close::Application(frame::ApplicationClose { error_code, reason }),
1187 )
1188 }
1189
1190 fn close_inner(&mut self, now: Instant, reason: Close) {
1191 let was_closed = self.state.is_closed();
1192 if !was_closed {
1193 self.close_common();
1194 self.set_close_timer(now);
1195 self.close = true;
1196 self.state = State::Closed(state::Closed { reason });
1197 }
1198 }
1199
1200 pub fn datagrams(&mut self) -> Datagrams<'_> {
1202 Datagrams { conn: self }
1203 }
1204
1205 pub fn stats(&self) -> ConnectionStats {
1207 let mut stats = self.stats;
1208 stats.path.rtt = self.path.rtt.get();
1209 stats.path.cwnd = self.path.congestion.window();
1210 stats.path.current_mtu = self.path.mtud.current_mtu();
1211
1212 stats
1213 }
1214
1215 pub fn ping(&mut self) {
1219 self.spaces[self.highest_space].ping_pending = true;
1220 }
1221
1222 #[doc(hidden)]
1223 pub fn initiate_key_update(&mut self) {
1224 self.update_keys(None, false);
1225 }
1226
1227 pub fn crypto_session(&self) -> &dyn crypto::Session {
1229 &*self.crypto
1230 }
1231
1232 pub fn is_handshaking(&self) -> bool {
1237 self.state.is_handshake()
1238 }
1239
1240 pub fn is_closed(&self) -> bool {
1248 self.state.is_closed()
1249 }
1250
1251 pub fn is_drained(&self) -> bool {
1256 self.state.is_drained()
1257 }
1258
1259 pub fn accepted_0rtt(&self) -> bool {
1263 self.accepted_0rtt
1264 }
1265
1266 pub fn has_0rtt(&self) -> bool {
1268 self.zero_rtt_enabled
1269 }
1270
1271 pub fn has_pending_retransmits(&self) -> bool {
1273 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1274 }
1275
1276 pub fn side(&self) -> Side {
1278 self.side
1279 }
1280
1281 pub fn remote_address(&self) -> SocketAddr {
1283 self.path.remote
1284 }
1285
1286 pub fn local_ip(&self) -> Option<IpAddr> {
1296 self.local_ip
1297 }
1298
1299 pub fn rtt(&self) -> Duration {
1301 self.path.rtt.get()
1302 }
1303
1304 pub fn congestion_state(&self) -> &dyn Controller {
1306 self.path.congestion.as_ref()
1307 }
1308
1309 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1314 self.streams.set_max_concurrent(dir, count);
1315 let pending = &mut self.spaces[SpaceId::Data].pending;
1318 self.streams.queue_max_stream_id(pending);
1319 }
1320
1321 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1327 self.streams.max_concurrent(dir)
1328 }
1329
1330 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1332 if self.streams.set_receive_window(receive_window) {
1333 self.spaces[SpaceId::Data].pending.max_data = true;
1334 }
1335 }
1336
1337 fn on_ack_received(
1338 &mut self,
1339 now: Instant,
1340 space: SpaceId,
1341 ack: frame::Ack,
1342 ) -> Result<(), TransportError> {
1343 if ack.largest >= self.spaces[space].next_packet_number {
1344 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1345 }
1346 let new_largest = {
1347 let space = &mut self.spaces[space];
1348 if space
1349 .largest_acked_packet
1350 .map_or(true, |pn| ack.largest > pn)
1351 {
1352 space.largest_acked_packet = Some(ack.largest);
1353 if let Some(info) = space.sent_packets.get(&ack.largest) {
1354 space.largest_acked_packet_sent = info.time_sent;
1358 }
1359 true
1360 } else {
1361 false
1362 }
1363 };
1364
1365 let mut newly_acked = ArrayRangeSet::new();
1367 for range in ack.iter() {
1368 self.packet_number_filter.check_ack(space, range.clone())?;
1369 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1370 newly_acked.insert_one(pn);
1371 }
1372 }
1373
1374 if newly_acked.is_empty() {
1375 return Ok(());
1376 }
1377
1378 let mut ack_eliciting_acked = false;
1379 for packet in newly_acked.elts() {
1380 if let Some(info) = self.spaces[space].take(packet) {
1381 if let Some(acked) = info.largest_acked {
1382 self.spaces[space].pending_acks.subtract_below(acked);
1388 }
1389 ack_eliciting_acked |= info.ack_eliciting;
1390
1391 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1393 if mtu_updated {
1394 self.path
1395 .congestion
1396 .on_mtu_update(self.path.mtud.current_mtu());
1397 }
1398
1399 self.ack_frequency.on_acked(packet);
1401
1402 self.on_packet_acked(now, packet, info);
1403 }
1404 }
1405
1406 self.path.congestion.on_end_acks(
1407 now,
1408 self.path.in_flight.bytes,
1409 self.app_limited,
1410 self.spaces[space].largest_acked_packet,
1411 );
1412
1413 if new_largest && ack_eliciting_acked {
1414 let ack_delay = if space != SpaceId::Data {
1415 Duration::from_micros(0)
1416 } else {
1417 cmp::min(
1418 self.ack_frequency.peer_max_ack_delay,
1419 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1420 )
1421 };
1422 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1423 self.path.rtt.update(ack_delay, rtt);
1424 if self.path.first_packet_after_rtt_sample.is_none() {
1425 self.path.first_packet_after_rtt_sample =
1426 Some((space, self.spaces[space].next_packet_number));
1427 }
1428 }
1429
1430 self.detect_lost_packets(now, space, true);
1432
1433 if self.peer_completed_address_validation() {
1434 self.pto_count = 0;
1435 }
1436
1437 if self.path.sending_ecn {
1439 if let Some(ecn) = ack.ecn {
1440 if new_largest {
1445 let sent = self.spaces[space].largest_acked_packet_sent;
1446 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1447 }
1448 } else {
1449 debug!("ECN not acknowledged by peer");
1451 self.path.sending_ecn = false;
1452 }
1453 }
1454
1455 self.set_loss_detection_timer(now);
1456 Ok(())
1457 }
1458
1459 fn process_ecn(
1461 &mut self,
1462 now: Instant,
1463 space: SpaceId,
1464 newly_acked: u64,
1465 ecn: frame::EcnCounts,
1466 largest_sent_time: Instant,
1467 ) {
1468 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1469 Err(e) => {
1470 debug!("halting ECN due to verification failure: {}", e);
1471 self.path.sending_ecn = false;
1472 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1475 }
1476 Ok(false) => {}
1477 Ok(true) => {
1478 self.stats.path.congestion_events += 1;
1479 self.path
1480 .congestion
1481 .on_congestion_event(now, largest_sent_time, false, 0);
1482 }
1483 }
1484 }
1485
1486 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1489 self.remove_in_flight(pn, &info);
1490 if info.ack_eliciting && self.path.challenge.is_none() {
1491 self.path.congestion.on_ack(
1494 now,
1495 info.time_sent,
1496 info.size.into(),
1497 self.app_limited,
1498 &self.path.rtt,
1499 );
1500 }
1501
1502 if let Some(retransmits) = info.retransmits.get() {
1504 for (id, _) in retransmits.reset_stream.iter() {
1505 self.streams.reset_acked(*id);
1506 }
1507 }
1508
1509 for frame in info.stream_frames {
1510 self.streams.received_ack_of(frame);
1511 }
1512 }
1513
1514 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1515 let start = if self.zero_rtt_crypto.is_some() {
1516 now
1517 } else {
1518 self.prev_crypto
1519 .as_ref()
1520 .expect("no previous keys")
1521 .end_packet
1522 .as_ref()
1523 .expect("update not acknowledged yet")
1524 .1
1525 };
1526 self.timers
1527 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1528 }
1529
1530 fn on_loss_detection_timeout(&mut self, now: Instant) {
1531 if let Some((_, pn_space)) = self.loss_time_and_space() {
1532 self.detect_lost_packets(now, pn_space, false);
1534 self.set_loss_detection_timer(now);
1535 return;
1536 }
1537
1538 let (_, space) = match self.pto_time_and_space(now) {
1539 Some(x) => x,
1540 None => {
1541 error!("PTO expired while unset");
1542 return;
1543 }
1544 };
1545 trace!(
1546 in_flight = self.path.in_flight.bytes,
1547 count = self.pto_count,
1548 ?space,
1549 "PTO fired"
1550 );
1551
1552 let count = match self.path.in_flight.ack_eliciting {
1553 0 => {
1556 debug_assert!(!self.peer_completed_address_validation());
1557 1
1558 }
1559 _ => 2,
1561 };
1562 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1563 self.pto_count = self.pto_count.saturating_add(1);
1564 self.set_loss_detection_timer(now);
1565 }
1566
1567 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1568 let mut lost_packets = Vec::<u64>::new();
1569 let mut lost_mtu_probe = None;
1570 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1571 let rtt = self.path.rtt.conservative();
1572 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1573
1574 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1576 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1577 let packet_threshold = self.config.packet_threshold as u64;
1578 let mut size_of_lost_packets = 0u64;
1579
1580 let congestion_period =
1584 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1585 let mut persistent_congestion_start: Option<Instant> = None;
1586 let mut prev_packet = None;
1587 let mut in_persistent_congestion = false;
1588
1589 let space = &mut self.spaces[pn_space];
1590 space.loss_time = None;
1591
1592 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1593 if prev_packet != Some(packet.wrapping_sub(1)) {
1594 persistent_congestion_start = None;
1596 }
1597
1598 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1599 {
1600 if Some(packet) == in_flight_mtu_probe {
1601 lost_mtu_probe = in_flight_mtu_probe;
1604 } else {
1605 lost_packets.push(packet);
1606 size_of_lost_packets += info.size as u64;
1607 if info.ack_eliciting && due_to_ack {
1608 match persistent_congestion_start {
1609 Some(start) if info.time_sent - start > congestion_period => {
1612 in_persistent_congestion = true;
1613 }
1614 None if self
1616 .path
1617 .first_packet_after_rtt_sample
1618 .map_or(false, |x| x < (pn_space, packet)) =>
1619 {
1620 persistent_congestion_start = Some(info.time_sent);
1621 }
1622 _ => {}
1623 }
1624 }
1625 }
1626 } else {
1627 let next_loss_time = info.time_sent + loss_delay;
1628 space.loss_time = Some(
1629 space
1630 .loss_time
1631 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1632 );
1633 persistent_congestion_start = None;
1634 }
1635
1636 prev_packet = Some(packet);
1637 }
1638
1639 if let Some(largest_lost) = lost_packets.last().cloned() {
1641 let old_bytes_in_flight = self.path.in_flight.bytes;
1642 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1643 self.lost_packets += lost_packets.len() as u64;
1644 self.stats.path.lost_packets += lost_packets.len() as u64;
1645 self.stats.path.lost_bytes += size_of_lost_packets;
1646 trace!(
1647 "packets lost: {:?}, bytes lost: {}",
1648 lost_packets,
1649 size_of_lost_packets
1650 );
1651
1652 for &packet in &lost_packets {
1653 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1655 for frame in info.stream_frames {
1656 self.streams.retransmit(frame);
1657 }
1658 self.spaces[pn_space].pending |= info.retransmits;
1659 self.path.mtud.on_non_probe_lost(packet, info.size);
1660 }
1661
1662 if self.path.mtud.black_hole_detected(now) {
1663 self.stats.path.black_holes_detected += 1;
1664 self.path
1665 .congestion
1666 .on_mtu_update(self.path.mtud.current_mtu());
1667 if let Some(max_datagram_size) = self.datagrams().max_size() {
1668 self.datagrams.drop_oversized(max_datagram_size);
1669 }
1670 }
1671
1672 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1674
1675 if lost_ack_eliciting {
1676 self.stats.path.congestion_events += 1;
1677 self.path.congestion.on_congestion_event(
1678 now,
1679 largest_lost_sent,
1680 in_persistent_congestion,
1681 size_of_lost_packets,
1682 );
1683 }
1684 }
1685
1686 if let Some(packet) = lost_mtu_probe {
1688 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1690 self.path.mtud.on_probe_lost();
1691 self.stats.path.lost_plpmtud_probes += 1;
1692 }
1693 }
1694
1695 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1696 SpaceId::iter()
1697 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1698 .min_by_key(|&(time, _)| time)
1699 }
1700
1701 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1702 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1703 let mut duration = self.path.rtt.pto_base() * backoff;
1704
1705 if self.path.in_flight.ack_eliciting == 0 {
1706 debug_assert!(!self.peer_completed_address_validation());
1707 let space = match self.highest_space {
1708 SpaceId::Handshake => SpaceId::Handshake,
1709 _ => SpaceId::Initial,
1710 };
1711 return Some((now + duration, space));
1712 }
1713
1714 let mut result = None;
1715 for space in SpaceId::iter() {
1716 if self.spaces[space].in_flight == 0 {
1717 continue;
1718 }
1719 if space == SpaceId::Data {
1720 if self.is_handshaking() {
1722 return result;
1723 }
1724 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1726 }
1727 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1728 Some(time) => time,
1729 None => continue,
1730 };
1731 let pto = last_ack_eliciting + duration;
1732 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1733 result = Some((pto, space));
1734 }
1735 }
1736 result
1737 }
1738
1739 #[allow(clippy::suspicious_operation_groupings)]
1740 fn peer_completed_address_validation(&self) -> bool {
1741 if self.side.is_server() || self.state.is_closed() {
1742 return true;
1743 }
1744 self.spaces[SpaceId::Handshake]
1747 .largest_acked_packet
1748 .is_some()
1749 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1750 || (self.spaces[SpaceId::Data].crypto.is_some()
1751 && self.spaces[SpaceId::Handshake].crypto.is_none())
1752 }
1753
1754 fn set_loss_detection_timer(&mut self, now: Instant) {
1755 if self.state.is_closed() {
1756 return;
1760 }
1761
1762 if let Some((loss_time, _)) = self.loss_time_and_space() {
1763 self.timers.set(Timer::LossDetection, loss_time);
1765 return;
1766 }
1767
1768 if self.path.anti_amplification_blocked(1) {
1769 self.timers.stop(Timer::LossDetection);
1771 return;
1772 }
1773
1774 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1775 self.timers.stop(Timer::LossDetection);
1778 return;
1779 }
1780
1781 if let Some((timeout, _)) = self.pto_time_and_space(now) {
1784 self.timers.set(Timer::LossDetection, timeout);
1785 } else {
1786 self.timers.stop(Timer::LossDetection);
1787 }
1788 }
1789
1790 fn pto(&self, space: SpaceId) -> Duration {
1792 let max_ack_delay = match space {
1793 SpaceId::Initial | SpaceId::Handshake => Duration::new(0, 0),
1794 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1795 };
1796 self.path.rtt.pto_base() + max_ack_delay
1797 }
1798
1799 fn on_packet_authenticated(
1800 &mut self,
1801 now: Instant,
1802 space_id: SpaceId,
1803 ecn: Option<EcnCodepoint>,
1804 packet: Option<u64>,
1805 spin: bool,
1806 is_1rtt: bool,
1807 ) {
1808 self.total_authed_packets += 1;
1809 self.reset_keep_alive(now);
1810 self.reset_idle_timeout(now, space_id);
1811 self.permit_idle_reset = true;
1812 self.receiving_ecn |= ecn.is_some();
1813 if let Some(x) = ecn {
1814 let space = &mut self.spaces[space_id];
1815 space.ecn_counters += x;
1816
1817 if x.is_ce() {
1818 space.pending_acks.set_immediate_ack_required();
1819 }
1820 }
1821
1822 let packet = match packet {
1823 Some(x) => x,
1824 None => return,
1825 };
1826 if self.side.is_server() {
1827 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1828 self.discard_space(now, SpaceId::Initial);
1830 }
1831 if self.zero_rtt_crypto.is_some() && is_1rtt {
1832 self.set_key_discard_timer(now, space_id)
1834 }
1835 }
1836 let space = &mut self.spaces[space_id];
1837 space.pending_acks.insert_one(packet, now);
1838 if packet >= space.rx_packet {
1839 space.rx_packet = packet;
1840 self.spin = self.side.is_client() ^ spin;
1842 }
1843 }
1844
1845 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1846 let timeout = match self.idle_timeout {
1847 None => return,
1848 Some(x) => Duration::from_millis(x.0),
1849 };
1850 if self.state.is_closed() {
1851 self.timers.stop(Timer::Idle);
1852 return;
1853 }
1854 let dt = cmp::max(timeout, 3 * self.pto(space));
1855 self.timers.set(Timer::Idle, now + dt);
1856 }
1857
1858 fn reset_keep_alive(&mut self, now: Instant) {
1859 let interval = match self.config.keep_alive_interval {
1860 Some(x) if self.state.is_established() => x,
1861 _ => return,
1862 };
1863 self.timers.set(Timer::KeepAlive, now + interval);
1864 }
1865
1866 fn reset_cid_retirement(&mut self) {
1867 if let Some(t) = self.local_cid_state.next_timeout() {
1868 self.timers.set(Timer::PushNewCid, t);
1869 }
1870 }
1871
1872 pub(crate) fn handle_first_packet(
1877 &mut self,
1878 now: Instant,
1879 remote: SocketAddr,
1880 ecn: Option<EcnCodepoint>,
1881 packet_number: u64,
1882 packet: InitialPacket,
1883 remaining: Option<BytesMut>,
1884 ) -> Result<(), ConnectionError> {
1885 let span = trace_span!("first recv");
1886 let _guard = span.enter();
1887 debug_assert!(self.side.is_server());
1888 let len = packet.header_data.len() + packet.payload.len();
1889 self.path.total_recvd = len as u64;
1890
1891 match self.state {
1892 State::Handshake(ref mut state) => {
1893 state.expected_token = packet.header.token.clone();
1894 }
1895 _ => unreachable!("first packet must be delivered in Handshake state"),
1896 }
1897
1898 self.on_packet_authenticated(
1899 now,
1900 SpaceId::Initial,
1901 ecn,
1902 Some(packet_number),
1903 false,
1904 false,
1905 );
1906
1907 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1908 if let Some(data) = remaining {
1909 self.handle_coalesced(now, remote, ecn, data);
1910 }
1911 Ok(())
1912 }
1913
1914 fn init_0rtt(&mut self) {
1915 let (header, packet) = match self.crypto.early_crypto() {
1916 Some(x) => x,
1917 None => return,
1918 };
1919 if self.side.is_client() {
1920 match self.crypto.transport_parameters() {
1921 Ok(params) => {
1922 let params = params
1923 .expect("crypto layer didn't supply transport parameters with ticket");
1924 let params = TransportParameters {
1926 initial_src_cid: None,
1927 original_dst_cid: None,
1928 preferred_address: None,
1929 retry_src_cid: None,
1930 stateless_reset_token: None,
1931 min_ack_delay: None,
1932 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1933 max_ack_delay: TransportParameters::default().max_ack_delay,
1934 ..params
1935 };
1936 self.set_peer_params(params);
1937 }
1938 Err(e) => {
1939 error!("session ticket has malformed transport parameters: {}", e);
1940 return;
1941 }
1942 }
1943 }
1944 trace!("0-RTT enabled");
1945 self.zero_rtt_enabled = true;
1946 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1947 }
1948
1949 fn read_crypto(
1950 &mut self,
1951 space: SpaceId,
1952 crypto: &frame::Crypto,
1953 payload_len: usize,
1954 ) -> Result<(), TransportError> {
1955 let expected = if !self.state.is_handshake() {
1956 SpaceId::Data
1957 } else if self.highest_space == SpaceId::Initial {
1958 SpaceId::Initial
1959 } else {
1960 SpaceId::Handshake
1963 };
1964 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
1968
1969 let end = crypto.offset + crypto.data.len() as u64;
1970 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
1971 warn!(
1972 "received new {:?} CRYPTO data when expecting {:?}",
1973 space, expected
1974 );
1975 return Err(TransportError::PROTOCOL_VIOLATION(
1976 "new data at unexpected encryption level",
1977 ));
1978 }
1979
1980 let space = &mut self.spaces[space];
1981 let max = end.saturating_sub(space.crypto_stream.bytes_read());
1982 if max > self.config.crypto_buffer_size as u64 {
1983 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
1984 }
1985
1986 space
1987 .crypto_stream
1988 .insert(crypto.offset, crypto.data.clone(), payload_len);
1989 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
1990 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
1991 if self.crypto.read_handshake(&chunk.bytes)? {
1992 self.events.push_back(Event::HandshakeDataReady);
1993 }
1994 }
1995
1996 Ok(())
1997 }
1998
1999 fn write_crypto(&mut self) {
2000 loop {
2001 let space = self.highest_space;
2002 let mut outgoing = Vec::new();
2003 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2004 match space {
2005 SpaceId::Initial => {
2006 self.upgrade_crypto(SpaceId::Handshake, crypto);
2007 }
2008 SpaceId::Handshake => {
2009 self.upgrade_crypto(SpaceId::Data, crypto);
2010 }
2011 _ => unreachable!("got updated secrets during 1-RTT"),
2012 }
2013 }
2014 if outgoing.is_empty() {
2015 if space == self.highest_space {
2016 break;
2017 } else {
2018 continue;
2020 }
2021 }
2022 let offset = self.spaces[space].crypto_offset;
2023 let outgoing = Bytes::from(outgoing);
2024 if let State::Handshake(ref mut state) = self.state {
2025 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2026 state.client_hello = Some(outgoing.clone());
2027 }
2028 }
2029 self.spaces[space].crypto_offset += outgoing.len() as u64;
2030 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2031 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2032 offset,
2033 data: outgoing,
2034 });
2035 }
2036 }
2037
2038 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2040 debug_assert!(
2041 self.spaces[space].crypto.is_none(),
2042 "already reached packet space {space:?}"
2043 );
2044 trace!("{:?} keys ready", space);
2045 if space == SpaceId::Data {
2046 self.next_crypto = Some(
2048 self.crypto
2049 .next_1rtt_keys()
2050 .expect("handshake should be complete"),
2051 );
2052 }
2053
2054 self.spaces[space].crypto = Some(crypto);
2055 debug_assert!(space as usize > self.highest_space as usize);
2056 self.highest_space = space;
2057 if space == SpaceId::Data && self.side.is_client() {
2058 self.zero_rtt_crypto = None;
2060 }
2061 }
2062
2063 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2064 debug_assert!(space_id != SpaceId::Data);
2065 trace!("discarding {:?} keys", space_id);
2066 if space_id == SpaceId::Initial {
2067 self.retry_token = Bytes::new();
2069 }
2070 let space = &mut self.spaces[space_id];
2071 space.crypto = None;
2072 space.time_of_last_ack_eliciting_packet = None;
2073 space.loss_time = None;
2074 space.in_flight = 0;
2075 let sent_packets = mem::take(&mut space.sent_packets);
2076 for (pn, packet) in sent_packets.into_iter() {
2077 self.remove_in_flight(pn, &packet);
2078 }
2079 self.set_loss_detection_timer(now)
2080 }
2081
2082 fn handle_coalesced(
2083 &mut self,
2084 now: Instant,
2085 remote: SocketAddr,
2086 ecn: Option<EcnCodepoint>,
2087 data: BytesMut,
2088 ) {
2089 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2090 let mut remaining = Some(data);
2091 while let Some(data) = remaining {
2092 match PartialDecode::new(
2093 data,
2094 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2095 &[self.version],
2096 self.endpoint_config.grease_quic_bit,
2097 ) {
2098 Ok((partial_decode, rest)) => {
2099 remaining = rest;
2100 self.handle_decode(now, remote, ecn, partial_decode);
2101 }
2102 Err(e) => {
2103 trace!("malformed header: {}", e);
2104 return;
2105 }
2106 }
2107 }
2108 }
2109
2110 fn handle_decode(
2111 &mut self,
2112 now: Instant,
2113 remote: SocketAddr,
2114 ecn: Option<EcnCodepoint>,
2115 partial_decode: PartialDecode,
2116 ) {
2117 if let Some(decoded) = packet_crypto::unprotect_header(
2118 partial_decode,
2119 &self.spaces,
2120 self.zero_rtt_crypto.as_ref(),
2121 self.peer_params.stateless_reset_token,
2122 ) {
2123 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2124 }
2125 }
2126
2127 fn handle_packet(
2128 &mut self,
2129 now: Instant,
2130 remote: SocketAddr,
2131 ecn: Option<EcnCodepoint>,
2132 packet: Option<Packet>,
2133 stateless_reset: bool,
2134 ) {
2135 self.stats.udp_rx.ios += 1;
2136 if let Some(ref packet) = packet {
2137 trace!(
2138 "got {:?} packet ({} bytes) from {} using id {}",
2139 packet.header.space(),
2140 packet.payload.len() + packet.header_data.len(),
2141 remote,
2142 packet.header.dst_cid(),
2143 );
2144 }
2145
2146 if self.is_handshaking() && remote != self.path.remote {
2147 debug!("discarding packet with unexpected remote during handshake");
2148 return;
2149 }
2150
2151 let was_closed = self.state.is_closed();
2152 let was_drained = self.state.is_drained();
2153
2154 let decrypted = match packet {
2155 None => Err(None),
2156 Some(mut packet) => self
2157 .decrypt_packet(now, &mut packet)
2158 .map(move |number| (packet, number)),
2159 };
2160 let result = match decrypted {
2161 _ if stateless_reset => {
2162 debug!("got stateless reset");
2163 Err(ConnectionError::Reset)
2164 }
2165 Err(Some(e)) => {
2166 warn!("illegal packet: {}", e);
2167 Err(e.into())
2168 }
2169 Err(None) => {
2170 debug!("failed to authenticate packet");
2171 self.authentication_failures += 1;
2172 let integrity_limit = self.spaces[self.highest_space]
2173 .crypto
2174 .as_ref()
2175 .unwrap()
2176 .packet
2177 .local
2178 .integrity_limit();
2179 if self.authentication_failures > integrity_limit {
2180 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2181 } else {
2182 return;
2183 }
2184 }
2185 Ok((packet, number)) => {
2186 let span = match number {
2187 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2188 None => trace_span!("recv", space = ?packet.header.space()),
2189 };
2190 let _guard = span.enter();
2191
2192 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2193 if number.map_or(false, is_duplicate) {
2194 debug!("discarding possible duplicate packet");
2195 return;
2196 } else if self.state.is_handshake() && packet.header.is_short() {
2197 trace!("dropping short packet during handshake");
2199 return;
2200 } else {
2201 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2202 if let State::Handshake(ref hs) = self.state {
2203 if self.side.is_server() && token != &hs.expected_token {
2204 warn!("discarding Initial with invalid retry token");
2208 return;
2209 }
2210 }
2211 }
2212
2213 if !self.state.is_closed() {
2214 let spin = match packet.header {
2215 Header::Short { spin, .. } => spin,
2216 _ => false,
2217 };
2218 self.on_packet_authenticated(
2219 now,
2220 packet.header.space(),
2221 ecn,
2222 number,
2223 spin,
2224 packet.header.is_1rtt(),
2225 );
2226 }
2227 self.process_decrypted_packet(now, remote, number, packet)
2228 }
2229 }
2230 };
2231
2232 if let Err(conn_err) = result {
2234 self.error = Some(conn_err.clone());
2235 self.state = match conn_err {
2236 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2237 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2238 ConnectionError::Reset
2239 | ConnectionError::TransportError(TransportError {
2240 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2241 ..
2242 }) => State::Drained,
2243 ConnectionError::TimedOut => {
2244 unreachable!("timeouts aren't generated by packet processing");
2245 }
2246 ConnectionError::TransportError(err) => {
2247 debug!("closing connection due to transport error: {}", err);
2248 State::closed(err)
2249 }
2250 ConnectionError::VersionMismatch => State::Draining,
2251 ConnectionError::LocallyClosed => {
2252 unreachable!("LocallyClosed isn't generated by packet processing");
2253 }
2254 ConnectionError::CidsExhausted => {
2255 unreachable!("CidsExhausted isn't generated by packet processing");
2256 }
2257 };
2258 }
2259
2260 if !was_closed && self.state.is_closed() {
2261 self.close_common();
2262 if !self.state.is_drained() {
2263 self.set_close_timer(now);
2264 }
2265 }
2266 if !was_drained && self.state.is_drained() {
2267 self.endpoint_events.push_back(EndpointEventInner::Drained);
2268 self.timers.stop(Timer::Close);
2271 }
2272
2273 if let State::Closed(_) = self.state {
2275 self.close = remote == self.path.remote;
2276 }
2277 }
2278
2279 fn process_decrypted_packet(
2280 &mut self,
2281 now: Instant,
2282 remote: SocketAddr,
2283 number: Option<u64>,
2284 packet: Packet,
2285 ) -> Result<(), ConnectionError> {
2286 let state = match self.state {
2287 State::Established => {
2288 match packet.header.space() {
2289 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2290 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2291 _ => {
2292 trace!("discarding unexpected pre-handshake packet");
2293 }
2294 }
2295 return Ok(());
2296 }
2297 State::Closed(_) => {
2298 for result in frame::Iter::new(packet.payload.freeze())? {
2299 let frame = match result {
2300 Ok(frame) => frame,
2301 Err(err) => {
2302 debug!("frame decoding error: {err:?}");
2303 continue;
2304 }
2305 };
2306
2307 if let Frame::Padding = frame {
2308 continue;
2309 };
2310
2311 self.stats.frame_rx.record(&frame);
2312
2313 if let Frame::Close(_) = frame {
2314 trace!("draining");
2315 self.state = State::Draining;
2316 break;
2317 }
2318 }
2319 return Ok(());
2320 }
2321 State::Draining | State::Drained => return Ok(()),
2322 State::Handshake(ref mut state) => state,
2323 };
2324
2325 match packet.header {
2326 Header::Retry {
2327 src_cid: rem_cid, ..
2328 } => {
2329 if self.side.is_server() {
2330 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2331 }
2332
2333 if self.total_authed_packets > 1
2334 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2336 &self.rem_cids.active(),
2337 &packet.header_data,
2338 &packet.payload,
2339 )
2340 {
2341 trace!("discarding invalid Retry");
2342 return Ok(());
2350 }
2351
2352 trace!("retrying with CID {}", rem_cid);
2353 let client_hello = state.client_hello.take().unwrap();
2354 self.retry_src_cid = Some(rem_cid);
2355 self.rem_cids.update_initial_cid(rem_cid);
2356 self.rem_handshake_cid = rem_cid;
2357
2358 let space = &mut self.spaces[SpaceId::Initial];
2359 if let Some(info) = space.take(0) {
2360 self.on_packet_acked(now, 0, info);
2361 };
2362
2363 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2365 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side)),
2366 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2367 crypto_offset: client_hello.len() as u64,
2368 ..PacketSpace::new(now)
2369 };
2370 self.spaces[SpaceId::Initial]
2371 .pending
2372 .crypto
2373 .push_back(frame::Crypto {
2374 offset: 0,
2375 data: client_hello,
2376 });
2377
2378 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2380 for (pn, info) in zero_rtt {
2381 self.remove_in_flight(pn, &info);
2382 self.spaces[SpaceId::Data].pending |= info.retransmits;
2383 }
2384 self.streams.retransmit_all_for_0rtt();
2385
2386 let token_len = packet.payload.len() - 16;
2387 self.retry_token = packet.payload.freeze().split_to(token_len);
2388 self.state = State::Handshake(state::Handshake {
2389 expected_token: Bytes::new(),
2390 rem_cid_set: false,
2391 client_hello: None,
2392 });
2393 Ok(())
2394 }
2395 Header::Long {
2396 ty: LongType::Handshake,
2397 src_cid: rem_cid,
2398 ..
2399 } => {
2400 if rem_cid != self.rem_handshake_cid {
2401 debug!(
2402 "discarding packet with mismatched remote CID: {} != {}",
2403 self.rem_handshake_cid, rem_cid
2404 );
2405 return Ok(());
2406 }
2407 self.path.validated = true;
2408
2409 self.process_early_payload(now, packet)?;
2410 if self.state.is_closed() {
2411 return Ok(());
2412 }
2413
2414 if self.crypto.is_handshaking() {
2415 trace!("handshake ongoing");
2416 return Ok(());
2417 }
2418
2419 if self.side.is_client() {
2420 let params =
2422 self.crypto
2423 .transport_parameters()?
2424 .ok_or_else(|| TransportError {
2425 code: TransportErrorCode::crypto(0x6d),
2426 frame: None,
2427 reason: "transport parameters missing".into(),
2428 })?;
2429
2430 if self.has_0rtt() {
2431 if !self.crypto.early_data_accepted().unwrap() {
2432 debug_assert!(self.side.is_client());
2433 debug!("0-RTT rejected");
2434 self.accepted_0rtt = false;
2435 self.streams.zero_rtt_rejected();
2436
2437 self.spaces[SpaceId::Data].pending = Retransmits::default();
2439
2440 let sent_packets =
2442 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2443 for (pn, packet) in sent_packets {
2444 self.remove_in_flight(pn, &packet);
2445 }
2446 } else {
2447 self.accepted_0rtt = true;
2448 params.validate_resumption_from(&self.peer_params)?;
2449 }
2450 }
2451 if let Some(token) = params.stateless_reset_token {
2452 self.endpoint_events
2453 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2454 }
2455 self.handle_peer_params(params)?;
2456 self.issue_first_cids(now);
2457 } else {
2458 self.spaces[SpaceId::Data].pending.handshake_done = true;
2460 self.discard_space(now, SpaceId::Handshake);
2461 }
2462
2463 self.events.push_back(Event::Connected);
2464 self.state = State::Established;
2465 trace!("established");
2466 Ok(())
2467 }
2468 Header::Initial(InitialHeader {
2469 src_cid: rem_cid, ..
2470 }) => {
2471 if !state.rem_cid_set {
2472 trace!("switching remote CID to {}", rem_cid);
2473 let mut state = state.clone();
2474 self.rem_cids.update_initial_cid(rem_cid);
2475 self.rem_handshake_cid = rem_cid;
2476 self.orig_rem_cid = rem_cid;
2477 state.rem_cid_set = true;
2478 self.state = State::Handshake(state);
2479 } else if rem_cid != self.rem_handshake_cid {
2480 debug!(
2481 "discarding packet with mismatched remote CID: {} != {}",
2482 self.rem_handshake_cid, rem_cid
2483 );
2484 return Ok(());
2485 }
2486
2487 let starting_space = self.highest_space;
2488 self.process_early_payload(now, packet)?;
2489
2490 if self.side.is_server()
2491 && starting_space == SpaceId::Initial
2492 && self.highest_space != SpaceId::Initial
2493 {
2494 let params =
2495 self.crypto
2496 .transport_parameters()?
2497 .ok_or_else(|| TransportError {
2498 code: TransportErrorCode::crypto(0x6d),
2499 frame: None,
2500 reason: "transport parameters missing".into(),
2501 })?;
2502 self.handle_peer_params(params)?;
2503 self.issue_first_cids(now);
2504 self.init_0rtt();
2505 }
2506 Ok(())
2507 }
2508 Header::Long {
2509 ty: LongType::ZeroRtt,
2510 ..
2511 } => {
2512 self.process_payload(now, remote, number.unwrap(), packet)?;
2513 Ok(())
2514 }
2515 Header::VersionNegotiate { .. } => {
2516 if self.total_authed_packets > 1 {
2517 return Ok(());
2518 }
2519 let supported = packet
2520 .payload
2521 .chunks(4)
2522 .any(|x| match <[u8; 4]>::try_from(x) {
2523 Ok(version) => self.version == u32::from_be_bytes(version),
2524 Err(_) => false,
2525 });
2526 if supported {
2527 return Ok(());
2528 }
2529 debug!("remote doesn't support our version");
2530 Err(ConnectionError::VersionMismatch)
2531 }
2532 Header::Short { .. } => unreachable!(
2533 "short packets received during handshake are discarded in handle_packet"
2534 ),
2535 }
2536 }
2537
2538 fn process_early_payload(
2540 &mut self,
2541 now: Instant,
2542 packet: Packet,
2543 ) -> Result<(), TransportError> {
2544 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2545 let payload_len = packet.payload.len();
2546 let mut ack_eliciting = false;
2547 for result in frame::Iter::new(packet.payload.freeze())? {
2548 let frame = result?;
2549 let span = match frame {
2550 Frame::Padding => continue,
2551 _ => Some(trace_span!("frame", ty = %frame.ty())),
2552 };
2553
2554 self.stats.frame_rx.record(&frame);
2555
2556 let _guard = span.as_ref().map(|x| x.enter());
2557 ack_eliciting |= frame.is_ack_eliciting();
2558
2559 match frame {
2561 Frame::Padding | Frame::Ping => {}
2562 Frame::Crypto(frame) => {
2563 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2564 }
2565 Frame::Ack(ack) => {
2566 self.on_ack_received(now, packet.header.space(), ack)?;
2567 }
2568 Frame::Close(reason) => {
2569 self.error = Some(reason.into());
2570 self.state = State::Draining;
2571 return Ok(());
2572 }
2573 _ => {
2574 let mut err =
2575 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2576 err.frame = Some(frame.ty());
2577 return Err(err);
2578 }
2579 }
2580 }
2581
2582 if ack_eliciting {
2583 self.spaces[packet.header.space()]
2585 .pending_acks
2586 .set_immediate_ack_required();
2587 }
2588
2589 self.write_crypto();
2590 Ok(())
2591 }
2592
2593 fn process_payload(
2594 &mut self,
2595 now: Instant,
2596 remote: SocketAddr,
2597 number: u64,
2598 packet: Packet,
2599 ) -> Result<(), TransportError> {
2600 let payload = packet.payload.freeze();
2601 let mut is_probing_packet = true;
2602 let mut close = None;
2603 let payload_len = payload.len();
2604 let mut ack_eliciting = false;
2605 for result in frame::Iter::new(payload)? {
2606 let frame = result?;
2607 let span = match frame {
2608 Frame::Padding => continue,
2609 _ => Some(trace_span!("frame", ty = %frame.ty())),
2610 };
2611
2612 self.stats.frame_rx.record(&frame);
2613 match &frame {
2616 Frame::Crypto(f) => {
2617 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2618 }
2619 Frame::Stream(f) => {
2620 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2621 }
2622 Frame::Datagram(f) => {
2623 trace!(len = f.data.len(), "got datagram frame");
2624 }
2625 f => {
2626 trace!("got frame {:?}", f);
2627 }
2628 }
2629
2630 let _guard = span.as_ref().map(|x| x.enter());
2631 if packet.header.is_0rtt() {
2632 match frame {
2633 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2634 return Err(TransportError::PROTOCOL_VIOLATION(
2635 "illegal frame type in 0-RTT",
2636 ));
2637 }
2638 _ => {}
2639 }
2640 }
2641 ack_eliciting |= frame.is_ack_eliciting();
2642
2643 match frame {
2645 Frame::Padding
2646 | Frame::PathChallenge(_)
2647 | Frame::PathResponse(_)
2648 | Frame::NewConnectionId(_) => {}
2649 _ => {
2650 is_probing_packet = false;
2651 }
2652 }
2653 match frame {
2654 Frame::Crypto(frame) => {
2655 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2656 }
2657 Frame::Stream(frame) => {
2658 if self.streams.received(frame, payload_len)?.should_transmit() {
2659 self.spaces[SpaceId::Data].pending.max_data = true;
2660 }
2661 }
2662 Frame::Ack(ack) => {
2663 self.on_ack_received(now, SpaceId::Data, ack)?;
2664 }
2665 Frame::Padding | Frame::Ping => {}
2666 Frame::Close(reason) => {
2667 close = Some(reason);
2668 }
2669 Frame::PathChallenge(token) => {
2670 self.path_responses.push(number, token, remote);
2671 if remote == self.path.remote {
2672 match self.peer_supports_ack_frequency() {
2675 true => self.immediate_ack(),
2676 false => self.ping(),
2677 }
2678 }
2679 }
2680 Frame::PathResponse(token) => {
2681 if self.path.challenge == Some(token) && remote == self.path.remote {
2682 trace!("new path validated");
2683 self.timers.stop(Timer::PathValidation);
2684 self.path.challenge = None;
2685 self.path.validated = true;
2686 if let Some((_, ref mut prev_path)) = self.prev_path {
2687 prev_path.challenge = None;
2688 prev_path.challenge_pending = false;
2689 }
2690 } else {
2691 debug!(token, "ignoring invalid PATH_RESPONSE");
2692 }
2693 }
2694 Frame::MaxData(bytes) => {
2695 self.streams.received_max_data(bytes);
2696 }
2697 Frame::MaxStreamData { id, offset } => {
2698 self.streams.received_max_stream_data(id, offset)?;
2699 }
2700 Frame::MaxStreams { dir, count } => {
2701 self.streams.received_max_streams(dir, count)?;
2702 }
2703 Frame::ResetStream(frame) => {
2704 if self.streams.received_reset(frame)?.should_transmit() {
2705 self.spaces[SpaceId::Data].pending.max_data = true;
2706 }
2707 }
2708 Frame::DataBlocked { offset } => {
2709 debug!(offset, "peer claims to be blocked at connection level");
2710 }
2711 Frame::StreamDataBlocked { id, offset } => {
2712 if id.initiator() == self.side && id.dir() == Dir::Uni {
2713 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2714 return Err(TransportError::STREAM_STATE_ERROR(
2715 "STREAM_DATA_BLOCKED on send-only stream",
2716 ));
2717 }
2718 debug!(
2719 stream = %id,
2720 offset, "peer claims to be blocked at stream level"
2721 );
2722 }
2723 Frame::StreamsBlocked { dir, limit } => {
2724 if limit > MAX_STREAM_COUNT {
2725 return Err(TransportError::FRAME_ENCODING_ERROR(
2726 "unrepresentable stream limit",
2727 ));
2728 }
2729 debug!(
2730 "peer claims to be blocked opening more than {} {} streams",
2731 limit, dir
2732 );
2733 }
2734 Frame::StopSending(frame::StopSending { id, error_code }) => {
2735 if id.initiator() != self.side {
2736 if id.dir() == Dir::Uni {
2737 debug!("got STOP_SENDING on recv-only {}", id);
2738 return Err(TransportError::STREAM_STATE_ERROR(
2739 "STOP_SENDING on recv-only stream",
2740 ));
2741 }
2742 } else if self.streams.is_local_unopened(id) {
2743 return Err(TransportError::STREAM_STATE_ERROR(
2744 "STOP_SENDING on unopened stream",
2745 ));
2746 }
2747 self.streams.received_stop_sending(id, error_code);
2748 }
2749 Frame::RetireConnectionId { sequence } => {
2750 let allow_more_cids = self
2751 .local_cid_state
2752 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2753 self.endpoint_events
2754 .push_back(EndpointEventInner::RetireConnectionId(
2755 now,
2756 sequence,
2757 allow_more_cids,
2758 ));
2759 }
2760 Frame::NewConnectionId(frame) => {
2761 trace!(
2762 sequence = frame.sequence,
2763 id = %frame.id,
2764 retire_prior_to = frame.retire_prior_to,
2765 );
2766 if self.rem_cids.active().is_empty() {
2767 return Err(TransportError::PROTOCOL_VIOLATION(
2768 "NEW_CONNECTION_ID when CIDs aren't in use",
2769 ));
2770 }
2771 if frame.retire_prior_to > frame.sequence {
2772 return Err(TransportError::PROTOCOL_VIOLATION(
2773 "NEW_CONNECTION_ID retiring unissued CIDs",
2774 ));
2775 }
2776
2777 use crate::cid_queue::InsertError;
2778 match self.rem_cids.insert(frame) {
2779 Ok(None) => {}
2780 Ok(Some((retired, reset_token))) => {
2781 let pending_retired =
2782 &mut self.spaces[SpaceId::Data].pending.retire_cids;
2783 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2786 if (pending_retired.len() as u64)
2789 .saturating_add(retired.end.saturating_sub(retired.start))
2790 > MAX_PENDING_RETIRED_CIDS
2791 {
2792 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2793 "queued too many retired CIDs",
2794 ));
2795 }
2796 pending_retired.extend(retired);
2797 self.set_reset_token(reset_token);
2798 }
2799 Err(InsertError::ExceedsLimit) => {
2800 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2801 }
2802 Err(InsertError::Retired) => {
2803 trace!("discarding already-retired");
2804 self.spaces[SpaceId::Data]
2808 .pending
2809 .retire_cids
2810 .push(frame.sequence);
2811 continue;
2812 }
2813 };
2814
2815 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2816 self.update_rem_cid();
2819 }
2820 }
2821 Frame::NewToken { token } => {
2822 if self.side.is_server() {
2823 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2824 }
2825 if token.is_empty() {
2826 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2827 }
2828 trace!("got new token");
2829 }
2831 Frame::Datagram(datagram) => {
2832 if self
2833 .datagrams
2834 .received(datagram, &self.config.datagram_receive_buffer_size)?
2835 {
2836 self.events.push_back(Event::DatagramReceived);
2837 }
2838 }
2839 Frame::AckFrequency(ack_frequency) => {
2840 let space = &mut self.spaces[SpaceId::Data];
2842
2843 if !self
2844 .ack_frequency
2845 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2846 {
2847 continue;
2849 }
2850
2851 if let Some(timeout) = space
2854 .pending_acks
2855 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2856 {
2857 self.timers.set(Timer::MaxAckDelay, timeout);
2858 }
2859 }
2860 Frame::ImmediateAck => {
2861 self.spaces[SpaceId::Data]
2863 .pending_acks
2864 .set_immediate_ack_required();
2865 }
2866 Frame::HandshakeDone => {
2867 if self.side.is_server() {
2868 return Err(TransportError::PROTOCOL_VIOLATION(
2869 "client sent HANDSHAKE_DONE",
2870 ));
2871 }
2872 if self.spaces[SpaceId::Handshake].crypto.is_some() {
2873 self.discard_space(now, SpaceId::Handshake);
2874 }
2875 }
2876 }
2877 }
2878
2879 let space = &mut self.spaces[SpaceId::Data];
2880 if space
2881 .pending_acks
2882 .packet_received(now, number, ack_eliciting, &space.dedup)
2883 {
2884 self.timers
2885 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2886 }
2887
2888 let pending = &mut self.spaces[SpaceId::Data].pending;
2893 self.streams.queue_max_stream_id(pending);
2894
2895 if let Some(reason) = close {
2896 self.error = Some(reason.into());
2897 self.state = State::Draining;
2898 self.close = true;
2899 }
2900
2901 if remote != self.path.remote
2902 && !is_probing_packet
2903 && number == self.spaces[SpaceId::Data].rx_packet
2904 {
2905 debug_assert!(
2906 self.server_config
2907 .as_ref()
2908 .expect("packets from unknown remote should be dropped by clients")
2909 .migration,
2910 "migration-initiating packets should have been dropped immediately"
2911 );
2912 self.migrate(now, remote);
2913 self.update_rem_cid();
2915 self.spin = false;
2916 }
2917
2918 Ok(())
2919 }
2920
2921 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
2922 trace!(%remote, "migration initiated");
2923 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
2927 PathData::from_previous(remote, &self.path, now)
2928 } else {
2929 let peer_max_udp_payload_size =
2930 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
2931 .unwrap_or(u16::MAX);
2932 PathData::new(
2933 remote,
2934 self.allow_mtud,
2935 Some(peer_max_udp_payload_size),
2936 now,
2937 false,
2938 &self.config,
2939 )
2940 };
2941 new_path.challenge = Some(self.rng.gen());
2942 new_path.challenge_pending = true;
2943 let prev_pto = self.pto(SpaceId::Data);
2944
2945 let mut prev = mem::replace(&mut self.path, new_path);
2946 if prev.challenge.is_none() {
2948 prev.challenge = Some(self.rng.gen());
2949 prev.challenge_pending = true;
2950 self.prev_path = Some((self.rem_cids.active(), prev));
2953 }
2954
2955 self.timers.set(
2956 Timer::PathValidation,
2957 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
2958 );
2959 }
2960
2961 pub fn local_address_changed(&mut self) {
2963 self.update_rem_cid();
2964 self.ping();
2965 }
2966
2967 fn update_rem_cid(&mut self) {
2969 let (reset_token, retired) = match self.rem_cids.next() {
2970 Some(x) => x,
2971 None => return,
2972 };
2973
2974 self.spaces[SpaceId::Data]
2976 .pending
2977 .retire_cids
2978 .extend(retired);
2979 self.set_reset_token(reset_token);
2980 }
2981
2982 fn set_reset_token(&mut self, reset_token: ResetToken) {
2983 self.endpoint_events
2984 .push_back(EndpointEventInner::ResetToken(
2985 self.path.remote,
2986 reset_token,
2987 ));
2988 self.peer_params.stateless_reset_token = Some(reset_token);
2989 }
2990
2991 fn issue_first_cids(&mut self, now: Instant) {
2993 if self.local_cid_state.cid_len() == 0 {
2994 return;
2995 }
2996
2997 let n = self.peer_params.issue_cids_limit() - 1;
2999 self.endpoint_events
3000 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3001 }
3002
3003 fn populate_packet(
3004 &mut self,
3005 now: Instant,
3006 space_id: SpaceId,
3007 buf: &mut Vec<u8>,
3008 max_size: usize,
3009 pn: u64,
3010 ) -> SentFrames {
3011 let mut sent = SentFrames::default();
3012 let space = &mut self.spaces[space_id];
3013 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3014 space.pending_acks.maybe_ack_non_eliciting();
3015
3016 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3018 buf.write(frame::Type::HANDSHAKE_DONE);
3019 sent.retransmits.get_or_create().handshake_done = true;
3020 self.stats.frame_tx.handshake_done =
3022 self.stats.frame_tx.handshake_done.saturating_add(1);
3023 }
3024
3025 if mem::replace(&mut space.ping_pending, false) {
3027 trace!("PING");
3028 buf.write(frame::Type::PING);
3029 sent.non_retransmits = true;
3030 self.stats.frame_tx.ping += 1;
3031 }
3032
3033 if mem::replace(&mut space.immediate_ack_pending, false) {
3035 trace!("IMMEDIATE_ACK");
3036 buf.write(frame::Type::IMMEDIATE_ACK);
3037 sent.non_retransmits = true;
3038 self.stats.frame_tx.immediate_ack += 1;
3039 }
3040
3041 if space.pending_acks.can_send() {
3043 Self::populate_acks(
3044 now,
3045 self.receiving_ecn,
3046 &mut sent,
3047 space,
3048 buf,
3049 &mut self.stats,
3050 );
3051 }
3052
3053 if mem::replace(&mut space.pending.ack_frequency, false) {
3055 let sequence_number = self.ack_frequency.next_sequence_number();
3056
3057 let config = self.config.ack_frequency_config.as_ref().unwrap();
3059
3060 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3062 self.path.rtt.get(),
3063 config,
3064 &self.peer_params,
3065 );
3066
3067 trace!(?max_ack_delay, "ACK_FREQUENCY");
3068
3069 frame::AckFrequency {
3070 sequence: sequence_number,
3071 ack_eliciting_threshold: config.ack_eliciting_threshold,
3072 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3073 reordering_threshold: config.reordering_threshold,
3074 }
3075 .encode(buf);
3076
3077 sent.retransmits.get_or_create().ack_frequency = true;
3078
3079 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3080 self.stats.frame_tx.ack_frequency += 1;
3081 }
3082
3083 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3085 if let Some(token) = self.path.challenge {
3087 self.path.challenge_pending = false;
3089 sent.non_retransmits = true;
3090 sent.requires_padding = true;
3091 trace!("PATH_CHALLENGE {:08x}", token);
3092 buf.write(frame::Type::PATH_CHALLENGE);
3093 buf.write(token);
3094 self.stats.frame_tx.path_challenge += 1;
3095 }
3096 }
3097
3098 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3100 if let Some(token) = self.path_responses.pop_on_path(&self.path.remote) {
3101 sent.non_retransmits = true;
3102 sent.requires_padding = true;
3103 trace!("PATH_RESPONSE {:08x}", token);
3104 buf.write(frame::Type::PATH_RESPONSE);
3105 buf.write(token);
3106 self.stats.frame_tx.path_response += 1;
3107 }
3108 }
3109
3110 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3112 let mut frame = match space.pending.crypto.pop_front() {
3113 Some(x) => x,
3114 None => break,
3115 };
3116
3117 let max_crypto_data_size = max_size
3122 - buf.len()
3123 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3125 - 2; let len = frame
3128 .data
3129 .len()
3130 .min(2usize.pow(14) - 1)
3131 .min(max_crypto_data_size);
3132
3133 let data = frame.data.split_to(len);
3134 let truncated = frame::Crypto {
3135 offset: frame.offset,
3136 data,
3137 };
3138 trace!(
3139 "CRYPTO: off {} len {}",
3140 truncated.offset,
3141 truncated.data.len()
3142 );
3143 truncated.encode(buf);
3144 self.stats.frame_tx.crypto += 1;
3145 sent.retransmits.get_or_create().crypto.push_back(truncated);
3146 if !frame.data.is_empty() {
3147 frame.offset += len as u64;
3148 space.pending.crypto.push_front(frame);
3149 }
3150 }
3151
3152 if space_id == SpaceId::Data {
3153 self.streams.write_control_frames(
3154 buf,
3155 &mut space.pending,
3156 &mut sent.retransmits,
3157 &mut self.stats.frame_tx,
3158 max_size,
3159 );
3160 }
3161
3162 while buf.len() + 44 < max_size {
3164 let issued = match space.pending.new_cids.pop() {
3165 Some(x) => x,
3166 None => break,
3167 };
3168 trace!(
3169 sequence = issued.sequence,
3170 id = %issued.id,
3171 "NEW_CONNECTION_ID"
3172 );
3173 frame::NewConnectionId {
3174 sequence: issued.sequence,
3175 retire_prior_to: self.local_cid_state.retire_prior_to(),
3176 id: issued.id,
3177 reset_token: issued.reset_token,
3178 }
3179 .encode(buf);
3180 sent.retransmits.get_or_create().new_cids.push(issued);
3181 self.stats.frame_tx.new_connection_id += 1;
3182 }
3183
3184 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3186 let seq = match space.pending.retire_cids.pop() {
3187 Some(x) => x,
3188 None => break,
3189 };
3190 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3191 buf.write(frame::Type::RETIRE_CONNECTION_ID);
3192 buf.write_var(seq);
3193 sent.retransmits.get_or_create().retire_cids.push(seq);
3194 self.stats.frame_tx.retire_connection_id += 1;
3195 }
3196
3197 let mut sent_datagrams = false;
3199 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3200 match self.datagrams.write(buf, max_size) {
3201 true => {
3202 sent_datagrams = true;
3203 sent.non_retransmits = true;
3204 self.stats.frame_tx.datagram += 1;
3205 }
3206 false => break,
3207 }
3208 }
3209 if self.datagrams.send_blocked && sent_datagrams {
3210 self.events.push_back(Event::DatagramsUnblocked);
3211 self.datagrams.send_blocked = false;
3212 }
3213
3214 if space_id == SpaceId::Data {
3216 sent.stream_frames = self.streams.write_stream_frames(buf, max_size);
3217 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3218 }
3219
3220 sent
3221 }
3222
3223 fn populate_acks(
3228 now: Instant,
3229 receiving_ecn: bool,
3230 sent: &mut SentFrames,
3231 space: &mut PacketSpace,
3232 buf: &mut Vec<u8>,
3233 stats: &mut ConnectionStats,
3234 ) {
3235 debug_assert!(!space.pending_acks.ranges().is_empty());
3236
3237 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3239 let ecn = if receiving_ecn {
3240 Some(&space.ecn_counters)
3241 } else {
3242 None
3243 };
3244 sent.largest_acked = space.pending_acks.ranges().max();
3245
3246 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3247
3248 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3250 let delay = delay_micros >> ack_delay_exp.into_inner();
3251
3252 trace!(
3253 "ACK {:?}, Delay = {}us",
3254 space.pending_acks.ranges(),
3255 delay_micros
3256 );
3257
3258 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3259 stats.frame_tx.acks += 1;
3260 }
3261
3262 fn close_common(&mut self) {
3263 trace!("connection closed");
3264 for &timer in &Timer::VALUES {
3265 self.timers.stop(timer);
3266 }
3267 }
3268
3269 fn set_close_timer(&mut self, now: Instant) {
3270 self.timers
3271 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3272 }
3273
3274 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3276 if Some(self.orig_rem_cid) != params.initial_src_cid
3277 || (self.side.is_client()
3278 && (Some(self.initial_dst_cid) != params.original_dst_cid
3279 || self.retry_src_cid != params.retry_src_cid))
3280 {
3281 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3282 "CID authentication failure",
3283 ));
3284 }
3285
3286 self.set_peer_params(params);
3287
3288 Ok(())
3289 }
3290
3291 fn set_peer_params(&mut self, params: TransportParameters) {
3292 self.streams.set_params(¶ms);
3293 self.idle_timeout = match (self.config.max_idle_timeout, params.max_idle_timeout) {
3294 (None, VarInt(0)) => None,
3295 (None, x) => Some(x),
3296 (Some(x), VarInt(0)) => Some(x),
3297 (Some(x), y) => Some(cmp::min(x, y)),
3298 };
3299 if let Some(ref info) = params.preferred_address {
3300 self.rem_cids.insert(frame::NewConnectionId {
3301 sequence: 1,
3302 id: info.connection_id,
3303 reset_token: info.stateless_reset_token,
3304 retire_prior_to: 0,
3305 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3306 }
3307 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3308 self.peer_params = params;
3309 self.path.mtud.on_peer_max_udp_payload_size_received(
3310 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3311 );
3312 }
3313
3314 fn decrypt_packet(
3315 &mut self,
3316 now: Instant,
3317 packet: &mut Packet,
3318 ) -> Result<Option<u64>, Option<TransportError>> {
3319 let result = packet_crypto::decrypt_packet_body(
3320 packet,
3321 &self.spaces,
3322 self.zero_rtt_crypto.as_ref(),
3323 self.key_phase,
3324 self.prev_crypto.as_ref(),
3325 self.next_crypto.as_ref(),
3326 )?;
3327
3328 let result = match result {
3329 Some(r) => r,
3330 None => return Ok(None),
3331 };
3332
3333 if result.outgoing_key_update_acked {
3334 if let Some(prev) = self.prev_crypto.as_mut() {
3335 prev.end_packet = Some((result.number, now));
3336 self.set_key_discard_timer(now, packet.header.space());
3337 }
3338 }
3339
3340 if result.incoming_key_update {
3341 trace!("key update authenticated");
3342 self.update_keys(Some((result.number, now)), true);
3343 self.set_key_discard_timer(now, packet.header.space());
3344 }
3345
3346 Ok(Some(result.number))
3347 }
3348
3349 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3350 trace!("executing key update");
3351 let new = self
3355 .crypto
3356 .next_1rtt_keys()
3357 .expect("only called for `Data` packets");
3358 self.key_phase_size = new
3359 .local
3360 .confidentiality_limit()
3361 .saturating_sub(KEY_UPDATE_MARGIN);
3362 let old = mem::replace(
3363 &mut self.spaces[SpaceId::Data]
3364 .crypto
3365 .as_mut()
3366 .unwrap() .packet,
3368 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3369 );
3370 self.spaces[SpaceId::Data].sent_with_keys = 0;
3371 self.prev_crypto = Some(PrevCrypto {
3372 crypto: old,
3373 end_packet,
3374 update_unacked: remote,
3375 });
3376 self.key_phase = !self.key_phase;
3377 }
3378
3379 fn peer_supports_ack_frequency(&self) -> bool {
3380 self.peer_params.min_ack_delay.is_some()
3381 }
3382
3383 pub(crate) fn immediate_ack(&mut self) {
3388 self.spaces[self.highest_space].immediate_ack_pending = true;
3389 }
3390
3391 #[cfg(test)]
3393 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3394 let (first_decode, remaining) = match &event.0 {
3395 ConnectionEventInner::Datagram(DatagramConnectionEvent {
3396 first_decode,
3397 remaining,
3398 ..
3399 }) => (first_decode, remaining),
3400 _ => return None,
3401 };
3402
3403 if remaining.is_some() {
3404 panic!("Packets should never be coalesced in tests");
3405 }
3406
3407 let decrypted_header = packet_crypto::unprotect_header(
3408 first_decode.clone(),
3409 &self.spaces,
3410 self.zero_rtt_crypto.as_ref(),
3411 self.peer_params.stateless_reset_token,
3412 )?;
3413
3414 let mut packet = decrypted_header.packet?;
3415 packet_crypto::decrypt_packet_body(
3416 &mut packet,
3417 &self.spaces,
3418 self.zero_rtt_crypto.as_ref(),
3419 self.key_phase,
3420 self.prev_crypto.as_ref(),
3421 self.next_crypto.as_ref(),
3422 )
3423 .ok()?;
3424
3425 Some(packet.payload.to_vec())
3426 }
3427
3428 #[cfg(test)]
3431 pub(crate) fn bytes_in_flight(&self) -> u64 {
3432 self.path.in_flight.bytes
3433 }
3434
3435 #[cfg(test)]
3437 pub(crate) fn congestion_window(&self) -> u64 {
3438 self.path
3439 .congestion
3440 .window()
3441 .saturating_sub(self.path.in_flight.bytes)
3442 }
3443
3444 #[cfg(test)]
3446 pub(crate) fn is_idle(&self) -> bool {
3447 Timer::VALUES
3448 .iter()
3449 .filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
3450 .filter_map(|&t| Some((t, self.timers.get(t)?)))
3451 .min_by_key(|&(_, time)| time)
3452 .map_or(true, |(timer, _)| timer == Timer::Idle)
3453 }
3454
3455 #[cfg(test)]
3457 pub(crate) fn lost_packets(&self) -> u64 {
3458 self.lost_packets
3459 }
3460
3461 #[cfg(test)]
3463 pub(crate) fn using_ecn(&self) -> bool {
3464 self.path.sending_ecn
3465 }
3466
3467 #[cfg(test)]
3469 pub(crate) fn total_recvd(&self) -> u64 {
3470 self.path.total_recvd
3471 }
3472
3473 #[cfg(test)]
3474 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3475 self.local_cid_state.active_seq()
3476 }
3477
3478 #[cfg(test)]
3481 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3482 let n = self.local_cid_state.assign_retire_seq(v);
3483 self.endpoint_events
3484 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3485 }
3486
3487 #[cfg(test)]
3489 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3490 self.rem_cids.active_seq()
3491 }
3492
3493 #[cfg(test)]
3495 pub(crate) fn path_mtu(&self) -> u16 {
3496 self.path.current_mtu()
3497 }
3498
3499 fn can_send_1rtt(&self, max_size: usize) -> bool {
3503 self.streams.can_send_stream_data()
3504 || self.path.challenge_pending
3505 || self
3506 .prev_path
3507 .as_ref()
3508 .map_or(false, |(_, x)| x.challenge_pending)
3509 || !self.path_responses.is_empty()
3510 || self
3511 .datagrams
3512 .outgoing
3513 .front()
3514 .map_or(false, |x| x.size(true) <= max_size)
3515 }
3516
3517 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3519 for path in [&mut self.path]
3521 .into_iter()
3522 .chain(self.prev_path.as_mut().map(|(_, data)| data))
3523 {
3524 if path.remove_in_flight(pn, packet) {
3525 return;
3526 }
3527 }
3528 }
3529
3530 fn kill(&mut self, reason: ConnectionError) {
3532 self.close_common();
3533 self.error = Some(reason);
3534 self.state = State::Drained;
3535 self.endpoint_events.push_back(EndpointEventInner::Drained);
3536 }
3537
3538 pub fn current_mtu(&self) -> u16 {
3542 self.path.current_mtu()
3543 }
3544
3545 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3552 let pn_len = match pn {
3553 Some(pn) => PacketNumber::new(
3554 pn,
3555 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3556 )
3557 .len(),
3558 None => 4,
3560 };
3561
3562 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3564 }
3565
3566 fn tag_len_1rtt(&self) -> usize {
3567 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3568 Some(crypto) => Some(&*crypto.packet.local),
3569 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3570 };
3571 key.map_or(16, |x| x.tag_len())
3575 }
3576}
3577
3578impl fmt::Debug for Connection {
3579 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3580 f.debug_struct("Connection")
3581 .field("handshake_cid", &self.handshake_cid)
3582 .finish()
3583 }
3584}
3585
3586#[derive(Debug, Error, Clone, PartialEq, Eq)]
3588pub enum ConnectionError {
3589 #[error("peer doesn't implement any supported version")]
3591 VersionMismatch,
3592 #[error(transparent)]
3594 TransportError(#[from] TransportError),
3595 #[error("aborted by peer: {0}")]
3597 ConnectionClosed(frame::ConnectionClose),
3598 #[error("closed by peer: {0}")]
3600 ApplicationClosed(frame::ApplicationClose),
3601 #[error("reset by peer")]
3603 Reset,
3604 #[error("timed out")]
3610 TimedOut,
3611 #[error("closed")]
3613 LocallyClosed,
3614 #[error("CIDs exhausted")]
3618 CidsExhausted,
3619}
3620
3621impl From<Close> for ConnectionError {
3622 fn from(x: Close) -> Self {
3623 match x {
3624 Close::Connection(reason) => Self::ConnectionClosed(reason),
3625 Close::Application(reason) => Self::ApplicationClosed(reason),
3626 }
3627 }
3628}
3629
3630impl From<ConnectionError> for io::Error {
3632 fn from(x: ConnectionError) -> Self {
3633 use self::ConnectionError::*;
3634 let kind = match x {
3635 TimedOut => io::ErrorKind::TimedOut,
3636 Reset => io::ErrorKind::ConnectionReset,
3637 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3638 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3639 io::ErrorKind::Other
3640 }
3641 };
3642 Self::new(kind, x)
3643 }
3644}
3645
3646#[allow(unreachable_pub)] #[derive(Clone)]
3648pub enum State {
3649 Handshake(state::Handshake),
3650 Established,
3651 Closed(state::Closed),
3652 Draining,
3653 Drained,
3655}
3656
3657impl State {
3658 fn closed<R: Into<Close>>(reason: R) -> Self {
3659 Self::Closed(state::Closed {
3660 reason: reason.into(),
3661 })
3662 }
3663
3664 fn is_handshake(&self) -> bool {
3665 matches!(*self, Self::Handshake(_))
3666 }
3667
3668 fn is_established(&self) -> bool {
3669 matches!(*self, Self::Established)
3670 }
3671
3672 fn is_closed(&self) -> bool {
3673 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3674 }
3675
3676 fn is_drained(&self) -> bool {
3677 matches!(*self, Self::Drained)
3678 }
3679}
3680
3681mod state {
3682 use super::*;
3683
3684 #[allow(unreachable_pub)] #[derive(Clone)]
3686 pub struct Handshake {
3687 pub(super) rem_cid_set: bool,
3691 pub(super) expected_token: Bytes,
3695 pub(super) client_hello: Option<Bytes>,
3699 }
3700
3701 #[allow(unreachable_pub)] #[derive(Clone)]
3703 pub struct Closed {
3704 pub(super) reason: Close,
3705 }
3706}
3707
3708#[derive(Debug)]
3710pub enum Event {
3711 HandshakeDataReady,
3713 Connected,
3715 ConnectionLost {
3719 reason: ConnectionError,
3721 },
3722 Stream(StreamEvent),
3724 DatagramReceived,
3726 DatagramsUnblocked,
3728}
3729
3730fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
3731 if x > y {
3732 x - y
3733 } else {
3734 Duration::new(0, 0)
3735 }
3736}
3737
3738fn get_max_ack_delay(params: &TransportParameters) -> Duration {
3739 Duration::from_micros(params.max_ack_delay.0 * 1000)
3740}
3741
3742const MAX_BACKOFF_EXPONENT: u32 = 16;
3744const MIN_PACKET_SPACE: usize = 40;
3746const MAX_TRANSMIT_SEGMENTS: usize = 10;
3752
3753const KEY_UPDATE_MARGIN: u64 = 10_000;
3757
3758#[derive(Default)]
3759struct SentFrames {
3760 retransmits: ThinRetransmits,
3761 largest_acked: Option<u64>,
3762 stream_frames: StreamMetaVec,
3763 non_retransmits: bool,
3765 requires_padding: bool,
3766}
3767
3768impl SentFrames {
3769 fn is_ack_only(&self, streams: &StreamsState) -> bool {
3771 self.largest_acked.is_some()
3772 && !self.non_retransmits
3773 && self.stream_frames.is_empty()
3774 && self.retransmits.is_empty(streams)
3775 }
3776}