Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 120 additions & 46 deletions neqo-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
io::{self, IoSliceMut},
iter,
net::SocketAddr,
num::NonZeroUsize,
slice::{self, ChunksMut},
};

use log::{Level, log_enabled};
use neqo_common::{Datagram, Tos, datagram, qdebug, qtrace};
use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
pub use quinn_udp::{EcnCodepoint, Transmit};
use quinn_udp::{RecvMeta, UdpSocketState};

/// Receive buffer size
///
Expand Down Expand Up @@ -55,6 +57,38 @@
}
}

fn try_send_transmit(
state: &UdpSocketState,
socket: quinn_udp::UdpSockRef<'_>,
transmit: &Transmit<'_>,
) -> io::Result<()> {
let total_len = transmit.contents.len();
let segment_size = transmit.segment_size.unwrap_or(total_len);
let segment_count = if segment_size > 0 {

Check warning on line 67 in neqo-udp/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace > with >= in try_send_transmit

Check warning on line 67 in neqo-udp/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace > with < in try_send_transmit

Check warning on line 67 in neqo-udp/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace > with == in try_send_transmit
total_len.div_ceil(segment_size)
} else {
0
};
match state.try_send(socket, transmit) {
Ok(()) => {
qtrace!(
"sent {total_len} bytes ({segment_count} segments of {segment_size} bytes) to {}",
transmit.destination,
);
Ok(())
}
Err(e) if is_emsgsize(&e) => {

Check warning on line 80 in neqo-udp/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace match guard is_emsgsize(&e) with true in try_send_transmit
qdebug!(
"Failed to send {total_len} bytes ({segment_count} segments of {segment_size} bytes) to {}. \
PMTUD probe? Ignoring error: {e}",
transmit.destination,
);
Ok(())
}
Err(e) => Err(e),
}
}

pub fn send_inner(
state: &UdpSocketState,
socket: quinn_udp::UdpSockRef<'_>,
Expand All @@ -67,33 +101,7 @@
segment_size: Some(d.datagram_size().get()),
src_ip: None,
};

match state.try_send(socket, &transmit) {
Ok(()) => {}
Err(e) if is_emsgsize(&e) => {
qdebug!(
"Failed to send datagram of size {} bytes, in {} segments, each {} bytes, from {} to {}. PMTUD probe? Ignoring error: {e}",
d.data().len(),
d.num_datagrams(),
d.datagram_size().get(),
d.source(),
d.destination()
);
return Ok(());
}
e @ Err(_) => return e,
}

qtrace!(
"sent {} bytes, in {} segments, each {} bytes, from {} to {} ",
d.data().len(),
d.num_datagrams(),
d.datagram_size().get(),
d.source(),
d.destination(),
);

Ok(())
try_send_transmit(state, socket, &transmit)
}

#[expect(
Expand Down Expand Up @@ -240,6 +248,29 @@
send_inner(&self.state, (&self.inner).into(), d)
}

/// Send datagrams from a borrowed buffer, avoiding allocation.
///
/// When `segment_size` is `Some`, the buffer is split into segments of that
/// size and may be sent using GSO/USO where supported, potentially in a
/// single syscall. The final segment may be shorter if `data.len()` is not
/// a multiple of `segment_size`.
pub fn send_buffer(
&self,
destination: SocketAddr,
tos: Tos,
data: &[u8],
segment_size: Option<NonZeroUsize>,
) -> io::Result<()> {
Comment thread
larseggert marked this conversation as resolved.
let transmit = Transmit {
destination,
ecn: EcnCodepoint::from_bits(Into::<u8>::into(tos)),
contents: data,
segment_size: segment_size.map(NonZeroUsize::get),
src_ip: None,
};
try_send_transmit(&self.state, (&self.inner).into(), &transmit)
}

/// Returns the maximum number of GSO segments supported by this socket.
pub fn max_gso_segments(&self) -> usize {
self.state.max_gso_segments()
Expand Down Expand Up @@ -284,6 +315,29 @@
Ok(socket)
}

fn recv_segments(
Comment thread
larseggert marked this conversation as resolved.
receiver: &Socket<std::net::UdpSocket>,
recv_buf: &mut RecvBuf,
expected_count: usize,
expected_segment_size: usize,
) {
let receiver_addr = receiver.inner.local_addr().unwrap();
let mut num_received = 0;
while num_received < expected_count {
receiver
.recv(receiver_addr, recv_buf)
.expect("receive to succeed")
.for_each(|d| {
assert_eq!(
expected_segment_size,
d.len(),
"Expect received datagrams to have same length as sent datagrams"
);
num_received += 1;
});
}
}

#[test]
fn handle_empty_datagram() -> Result<(), io::Error> {
// quinn-udp doesn't support sending emtpy datagrams across all
Expand Down Expand Up @@ -404,13 +458,10 @@
ignore = "GRO not available"
)]
fn many_datagrams_through_gso_gro() -> Result<(), io::Error> {
use std::num::NonZeroUsize;

const SEGMENT_SIZE: usize = 128;

let sender = socket()?;
let receiver = socket()?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

let max_gso_segments = sender.max_gso_segments();
let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
Expand All @@ -423,23 +474,46 @@
);

sender.send(&batch)?;
let mut recv_buf = RecvBuf::default();
recv_segments(&receiver, &mut recv_buf, max_gso_segments, SEGMENT_SIZE);

// Allow for one GSO sendmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
Ok(())
}

#[test]
fn send_buffer_single() -> Result<(), io::Error> {
let sender = socket()?;
let receiver = socket()?;

let payload = b"Hello via send_buffer!";
sender.send_buffer(receiver.inner.local_addr()?, Tos::default(), payload, None)?;
let mut recv_buf = RecvBuf::default();
while num_received < max_gso_segments {
receiver
.recv(receiver_addr, &mut recv_buf)
.expect("receive to succeed")
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams"
);
num_received += 1;
});
}
recv_segments(&receiver, &mut recv_buf, 1, payload.len());

Ok(())
}

#[test]
#[cfg_attr(
not(any(target_os = "linux", target_os = "windows")),
ignore = "GSO not available"
)]
fn send_buffer_gso() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = socket()?;
let receiver = socket()?;

let max_gso_segments = sender.max_gso_segments();
let payload = vec![0xCD; SEGMENT_SIZE * max_gso_segments];
sender.send_buffer(
receiver.inner.local_addr()?,
Tos::default(),
&payload,
Some(NonZeroUsize::new(SEGMENT_SIZE).expect("SEGMENT_SIZE cannot be zero")),
)?;
let mut recv_buf = RecvBuf::default();
recv_segments(&receiver, &mut recv_buf, max_gso_segments, SEGMENT_SIZE);

Ok(())
}
Expand Down
Loading