Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 54 additions & 30 deletions neqo-transport/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,36 +160,46 @@ impl RxStreamOrderer {
return;
}

let extend = if let Some((&prev_start, prev_vec)) =
self.data_ranges.range_mut(..=new_start).next_back()
{
let prev_end = prev_start + u64::try_from(prev_vec.len()).expect("usize fits in u64");
if new_end > prev_end {
// PPPPPP -> PPPPPP
// NNNNNN NN
// NNNNNNNN NN
// Add a range containing only new data
// (In-order frames will take this path, with no overlap)
let overlap = prev_end.saturating_sub(new_start);
qtrace!("New frame {new_start}-{new_end} received, overlap: {overlap}");
new_start += overlap;
new_data = &new_data[usize::try_from(overlap).expect("u64 fits in usize")..];
// If it is small enough, extend the previous buffer.
// This can't always extend, because otherwise the buffer could end up
// growing indefinitely without being released.
prev_vec.len() < 4096 && prev_end == new_start
} else {
// PPPPPP -> PPPPPP
// NNNN
// NNNN
// Do nothing
qtrace!("Dropping frame with already-received range {new_start}-{new_end}");
return;
}
} else {
qtrace!("New frame {new_start}-{new_end} received");
false
};
// Check if this is the first (lowest key) entry before the mutable borrow
// let first_key = self.data_ranges.keys().next().copied();

let extend = false;
// let extend = if let Some((&prev_start, prev_vec)) =
// self.data_ranges.range_mut(..=new_start).next_back()
// {
// let prev_end = prev_start + u64::try_from(prev_vec.len()).expect("usize fits in u64");
// if new_end > prev_end {
// // PPPPPP -> PPPPPP
// // NNNNNN NN
// // NNNNNNNN NN
// // Add a range containing only new data
// // (In-order frames will take this path, with no overlap)
// let overlap = prev_end.saturating_sub(new_start);
// qtrace!("New frame {new_start}-{new_end} received, overlap: {overlap}");
// new_start += overlap;
// new_data = &new_data[usize::try_from(overlap).expect("u64 fits in usize")..];
// // If it is small enough, extend the previous buffer.
// // This can't always extend, because otherwise the buffer could end up
// // growing indefinitely without being released.
// // Use 1MB limit for first entry, 64KB for others. The logic is that with mostly
// // in-order data, we'll mostly append to the first entry and we want to avoid
// // reallocs while we wait for the app to read. For other (out-of-order), we want to
// // keep at least a few packets in each entry, but not grow too large.
// let is_first = first_key == Some(prev_start);
// let limit = if is_first { 1024 * 1024 } else { 64 * 1024 };
// prev_vec.len() < limit && prev_end == new_start
// } else {
// // PPPPPP -> PPPPPP
// // NNNN
// // NNNN
// // Do nothing
// qtrace!("Dropping frame with already-received range {new_start}-{new_end}");
// return;
// }
// } else {
// qtrace!("New frame {new_start}-{new_end} received");
// false
// };

let mut to_add = new_data;
if self
Expand Down Expand Up @@ -253,6 +263,20 @@ impl RxStreamOrderer {
}
} else {
self.data_ranges.insert(new_start, to_add.to_vec());
// // Pre-allocate capacity to avoid reallocations when buffer grows.
// // Initial allocation: 64KB for first entry, 8KB for others.
// // Check if this will be the first (lowest key) entry
// let is_first = self
// .data_ranges
// .keys()
// .next()
// .map_or(true, |&first_key| new_start < first_key);
// let initial_capacity = if is_first { 64 * 1024 } else { 8 * 1024 };
// // Allocate double the received data size up to the initial capacity limit.
// let capacity = (to_add.len() * 2).min(initial_capacity).max(to_add.len());
// let mut vec = Vec::with_capacity(capacity);
// vec.extend_from_slice(to_add);
// self.data_ranges.insert(new_start, vec);
}
}
}
Expand Down
Loading