Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,7 @@ impl Http3Client {
self.base_handler.handle_datagram(dgram);
}
ConnectionEvent::SendStreamComplete { .. }
| ConnectionEvent::OutgoingDatagramOutcome { .. }
| ConnectionEvent::IncomingDatagramDropped => {}
| ConnectionEvent::OutgoingDatagramOutcome { .. } => {}
}
}
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions neqo-http3/src/connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ impl Http3ServerHandler {
| ConnectionEvent::ResumptionToken(..) => return Err(Error::HttpInternal(4)),
ConnectionEvent::SendStreamComplete { .. }
| ConnectionEvent::SendStreamCreatable { .. }
| ConnectionEvent::OutgoingDatagramOutcome { .. }
| ConnectionEvent::IncomingDatagramDropped => {}
| ConnectionEvent::OutgoingDatagramOutcome { .. } => {}
}
}
Ok(())
Expand Down
4 changes: 1 addition & 3 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ impl Connection {
let quic_datagrams = QuicDatagrams::new(
conn_params.get_datagram_size(),
conn_params.get_outgoing_datagram_queue(),
conn_params.get_incoming_datagram_queue(),
events.clone(),
);

Expand Down Expand Up @@ -3372,8 +3371,7 @@ impl Connection {
}
Frame::Datagram { data, .. } => {
self.stats.borrow_mut().frame_rx.datagram += 1;
self.quic_datagrams
.handle_datagram(data, &mut self.stats.borrow_mut())?;
self.quic_datagrams.handle_datagram(data)?;
}
_ => unreachable!("All other frames are for streams"),
}
Expand Down
14 changes: 0 additions & 14 deletions neqo-transport/src/connection/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ pub struct ConnectionParameters {
preferred_address: PreferredAddressConfig,
datagram_size: u64,
outgoing_datagram_queue: usize,
incoming_datagram_queue: usize,
initial_rtt: Duration,
fast_pto: u8,
grease: bool,
Expand Down Expand Up @@ -171,7 +170,6 @@ impl Default for ConnectionParameters {
preferred_address: PreferredAddressConfig::Default,
datagram_size: MAX_DATAGRAM_FRAME_SIZE,
outgoing_datagram_queue: MAX_QUEUED_DATAGRAMS_DEFAULT,
incoming_datagram_queue: MAX_QUEUED_DATAGRAMS_DEFAULT,
initial_rtt: DEFAULT_INITIAL_RTT,
fast_pto: FAST_PTO_SCALE,
grease: true,
Expand Down Expand Up @@ -366,18 +364,6 @@ impl ConnectionParameters {
self
}

#[must_use]
pub const fn get_incoming_datagram_queue(&self) -> usize {
self.incoming_datagram_queue
}

#[must_use]
pub fn incoming_datagram_queue(mut self, v: usize) -> Self {
// The max queue length must be at least 1.
self.incoming_datagram_queue = max(v, 1);
self
}

#[must_use]
pub const fn get_fast_pto(&self) -> u8 {
self.fast_pto
Expand Down
52 changes: 1 addition & 51 deletions neqo-transport/src/connection/tests/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,13 @@ fn send_datagram(sender: &mut Connection, receiver: &mut Connection, data: Vec<u
#[test]
fn multiple_datagram_events() {
const DATA_SIZE: usize = MIN_INITIAL_PACKET_SIZE;
const MAX_QUEUE: usize = 3;
const FIRST_DATAGRAM: &[u8] = &[0; DATA_SIZE];
const SECOND_DATAGRAM: &[u8] = &[1; DATA_SIZE];
const THIRD_DATAGRAM: &[u8] = &[2; DATA_SIZE];
const FOURTH_DATAGRAM: &[u8] = &[3; DATA_SIZE];

let mut client = new_client(
ConnectionParameters::default()
.datagram_size(u64::try_from(DATA_SIZE).unwrap())
.incoming_datagram_queue(MAX_QUEUE),
ConnectionParameters::default().datagram_size(u64::try_from(DATA_SIZE).unwrap()),
);
let mut server = default_server();
connect_force_idle(&mut client, &mut server);
Expand Down Expand Up @@ -513,53 +510,6 @@ fn multiple_datagram_events() {
assert!(datagrams.next().is_none());
}

#[test]
fn too_many_datagram_events() {
const DATA_SIZE: usize = MIN_INITIAL_PACKET_SIZE;
const MAX_QUEUE: usize = 2;
const FIRST_DATAGRAM: &[u8] = &[0; DATA_SIZE];
const SECOND_DATAGRAM: &[u8] = &[1; DATA_SIZE];
const THIRD_DATAGRAM: &[u8] = &[2; DATA_SIZE];
const FOURTH_DATAGRAM: &[u8] = &[3; DATA_SIZE];

let mut client = new_client(
ConnectionParameters::default()
.datagram_size(u64::try_from(DATA_SIZE).unwrap())
.incoming_datagram_queue(MAX_QUEUE),
);
let mut server = default_server();
connect_force_idle(&mut client, &mut server);

send_datagram(&mut server, &mut client, FIRST_DATAGRAM.to_vec());
send_datagram(&mut server, &mut client, SECOND_DATAGRAM.to_vec());
send_datagram(&mut server, &mut client, THIRD_DATAGRAM.to_vec());

// Datagram with FIRST_DATAGRAM data will be dropped.
assert!(matches!(
client.next_event().unwrap(),
ConnectionEvent::IncomingDatagramDropped
));
assert!(matches!(
client.next_event().unwrap(),
ConnectionEvent::Datagram(data) if data == SECOND_DATAGRAM
));
assert!(matches!(
client.next_event().unwrap(),
ConnectionEvent::Datagram(data) if data == THIRD_DATAGRAM
));
assert!(client.next_event().is_none());
assert_eq!(client.stats().incoming_datagram_dropped, 1);

// New events can be queued.
send_datagram(&mut server, &mut client, FOURTH_DATAGRAM.to_vec());
assert!(matches!(
client.next_event().unwrap(),
ConnectionEvent::Datagram(data) if data == FOURTH_DATAGRAM
));
assert!(client.next_event().is_none());
assert_eq!(client.stats().incoming_datagram_dropped, 1);
}

#[test]
fn multiple_quic_datagrams_in_one_packet() {
let (mut client, mut server) = connect_datagram();
Expand Down
96 changes: 3 additions & 93 deletions neqo-transport/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
connection::State,
quic_datagrams::DatagramTracking,
stream_id::{StreamId, StreamType},
AppError, Stats,
AppError,
};

#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
Expand Down Expand Up @@ -77,7 +77,6 @@ pub enum ConnectionEvent {
id: u64,
outcome: OutgoingDatagramOutcome,
},
IncomingDatagramDropped,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -166,30 +165,7 @@ impl ConnectionEvents {
self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
}

// The number of datagrams in the events queue is limited to max_queued_datagrams.
// This function ensure this and deletes the oldest datagrams (head-drop) if needed.
fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) {
let mut queue = self.events.borrow_mut();
let count = queue
.iter()
.filter(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
.count();
if count < max_queued_datagrams {
// Below the limit. No action needed.
return;
}
let first = queue
.iter_mut()
.find(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
.expect("Checked above");
// Remove the oldest (head-drop), replacing it with an
// IncomingDatagramDropped placeholder.
*first = ConnectionEvent::IncomingDatagramDropped;
stats.incoming_datagram_dropped += 1;
}

pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) {
self.check_datagram_queued(max_queued_datagrams, stats);
pub fn add_datagram(&self, data: &[u8]) {
self.events
.borrow_mut()
.push_back(ConnectionEvent::Datagram(data.to_vec()));
Expand Down Expand Up @@ -253,7 +229,7 @@ impl EventProvider for ConnectionEvents {
mod tests {
use neqo_common::event::Provider as _;

use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, Stats, StreamId};
use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, StreamId};

#[test]
fn event_culling() {
Expand Down Expand Up @@ -314,70 +290,4 @@ mod tests {
evts.connection_state_change(State::Closed(CloseReason::Transport(Error::StreamState)));
assert_eq!(evts.events().count(), 1);
}

#[test]
fn datagram_queue_drops_oldest() {
const MAX_QUEUED: usize = 2;

// Fill the queue to capacity, verify that and that there are no drops yet.
let e = ConnectionEvents::default();
let mut stats = Stats::default();
e.add_datagram(MAX_QUEUED, &[1], &mut stats);
e.add_datagram(MAX_QUEUED, &[2], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 0);
assert_eq!(e.events.borrow().len(), MAX_QUEUED);

// Add one more datagram - this should drop the oldest ("1").
e.add_datagram(MAX_QUEUED, &[3], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 1);

// Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams.
assert_eq!(
e.events.borrow().iter().collect::<Vec<_>>(),
[
&ConnectionEvent::IncomingDatagramDropped,
&ConnectionEvent::Datagram(vec![2]),
&ConnectionEvent::Datagram(vec![3]),
]
);
}

/// Previously `check_datagram_queued` had a bug that caused it to
/// potentially drop an unrelated event.
///
/// See <https://github.com/mozilla/neqo/pull/3105> for details.
#[test]
fn datagram_queue_drops_datagram_not_unrelated_event() {
const MAX_QUEUED: usize = 2;

let e = ConnectionEvents::default();
let mut stats = Stats::default();

// Add unrelated event.
e.new_stream(4.into());

// Fill the queue with datagrams to capacity.
e.add_datagram(MAX_QUEUED, &[1], &mut stats);
e.add_datagram(MAX_QUEUED, &[2], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 0);
assert_eq!(e.events.borrow().len(), 1 + MAX_QUEUED);

// Add one more datagram - this should drop the oldest ("1"), not the
// unrelated event.
e.add_datagram(MAX_QUEUED, &[3], &mut stats);
assert_eq!(stats.incoming_datagram_dropped, 1);

// Should have one `IncomingDatagramDropped` event + `MAX_QUEUED` datagrams.
assert_eq!(
e.events.borrow().iter().collect::<Vec<_>>(),
[
&ConnectionEvent::NewStream {
stream_id: StreamId::new(4)
},
&ConnectionEvent::IncomingDatagramDropped,
&ConnectionEvent::Datagram(vec![2]),
&ConnectionEvent::Datagram(vec![3]),
]
);
}
}
10 changes: 2 additions & 8 deletions neqo-transport/src/quic_datagrams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ pub struct QuicDatagrams {
/// The max size of a datagram that would be acceptable by the peer.
remote_datagram_size: u64,
max_queued_outgoing_datagrams: usize,
/// The max number of datagrams that will be queued in connection events.
/// If the number is exceeded, the oldest datagram will be dropped.
max_queued_incoming_datagrams: usize,
/// Datagram queued for sending.
datagrams: VecDeque<QuicDatagram>,
conn_events: ConnectionEvents,
Expand All @@ -77,14 +74,12 @@ impl QuicDatagrams {
pub fn new(
local_datagram_size: u64,
max_queued_outgoing_datagrams: usize,
max_queued_incoming_datagrams: usize,
conn_events: ConnectionEvents,
) -> Self {
Self {
local_datagram_size,
remote_datagram_size: 0,
max_queued_outgoing_datagrams,
max_queued_incoming_datagrams,
datagrams: VecDeque::with_capacity(max_queued_outgoing_datagrams),
conn_events,
}
Expand Down Expand Up @@ -186,12 +181,11 @@ impl QuicDatagrams {
Ok(())
}

pub fn handle_datagram(&self, data: &[u8], stats: &mut Stats) -> Res<()> {
pub fn handle_datagram(&self, data: &[u8]) -> Res<()> {
if self.local_datagram_size < u64::try_from(data.len())? {
return Err(Error::ProtocolViolation);
}
self.conn_events
.add_datagram(self.max_queued_incoming_datagrams, data, stats);
self.conn_events.add_datagram(data);
Ok(())
}
}
4 changes: 0 additions & 4 deletions neqo-transport/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,6 @@ pub struct Stats {
/// Count frames sent.
pub frame_tx: FrameStats,

/// The number of incoming datagrams dropped due to reaching the limit
/// of the incoming queue.
pub incoming_datagram_dropped: usize,

pub datagram_tx: DatagramStats,

pub cc: CongestionControlStats,
Expand Down
Loading