From cc5c5299d3172633e02ba718659b10a1872bf194 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 22 Sep 2025 10:29:52 +0300 Subject: [PATCH 1/2] feat: Reduce reallocations in RxStreamOrderer Let's see if this helps performance. --- neqo-transport/src/recv_stream.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/neqo-transport/src/recv_stream.rs b/neqo-transport/src/recv_stream.rs index f9bfe7480e..eb5cf327bc 100644 --- a/neqo-transport/src/recv_stream.rs +++ b/neqo-transport/src/recv_stream.rs @@ -175,6 +175,9 @@ impl RxStreamOrderer { return; } + // Check if this is the first (lowest key) entry before the mutable borrow + let first_key = self.data_ranges.keys().next().copied(); + let extend = if let Some((&prev_start, prev_vec)) = self.data_ranges.range_mut(..=new_start).next_back() { @@ -192,7 +195,13 @@ impl RxStreamOrderer { // If it is small enough, extend the previous buffer. // This can't always extend, because otherwise the buffer could end up // growing indefinitely without being released. - prev_vec.len() < 4096 && prev_end == new_start + // Use 1MB limit for first entry, 64KB for others. The logic is that with mostly + // in-order data, we'll mostly append to the first entry and we want to avoid + // reallocs while we wait for the app to read. For other (out-of-order), we want to + // keep at least a few packets in each entry, but not grow too large. + let is_first = first_key == Some(prev_start); + let limit = if is_first { 1024 * 1024 } else { 64 * 1024 }; + prev_vec.len() < limit && prev_end == new_start } else { // PPPPPP -> PPPPPP // NNNN @@ -267,7 +276,20 @@ impl RxStreamOrderer { buf.extend_from_slice(to_add); } } else { - self.data_ranges.insert(new_start, to_add.to_vec()); + // Pre-allocate capacity to avoid reallocations when buffer grows. + // Initial allocation: 64KB for first entry, 8KB for others. + // Check if this will be the first (lowest key) entry + let is_first = self + .data_ranges + .keys() + .next() + .map_or(true, |&first_key| new_start < first_key); + let initial_capacity = if is_first { 64 * 1024 } else { 8 * 1024 }; + // Allocate double the received data size up to the initial capacity limit. + let capacity = (to_add.len() * 2).min(initial_capacity).max(to_add.len()); + let mut vec = Vec::with_capacity(capacity); + vec.extend_from_slice(to_add); + self.data_ranges.insert(new_start, vec); } } } From 0adbaac3664356742b95f97446b0a9f576857a55 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 23 Sep 2025 14:21:17 +0300 Subject: [PATCH 2/2] Never append --- neqo-transport/src/recv_stream.rs | 106 +++++++++++++++--------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/neqo-transport/src/recv_stream.rs b/neqo-transport/src/recv_stream.rs index eb5cf327bc..9eeb3b91a7 100644 --- a/neqo-transport/src/recv_stream.rs +++ b/neqo-transport/src/recv_stream.rs @@ -176,44 +176,45 @@ impl RxStreamOrderer { } // Check if this is the first (lowest key) entry before the mutable borrow - let first_key = self.data_ranges.keys().next().copied(); - - let extend = if let Some((&prev_start, prev_vec)) = - self.data_ranges.range_mut(..=new_start).next_back() - { - let prev_end = prev_start + u64::try_from(prev_vec.len()).expect("usize fits in u64"); - if new_end > prev_end { - // PPPPPP -> PPPPPP - // NNNNNN NN - // NNNNNNNN NN - // Add a range containing only new data - // (In-order frames will take this path, with no overlap) - let overlap = prev_end.saturating_sub(new_start); - qtrace!("New frame {new_start}-{new_end} received, overlap: {overlap}"); - new_start += overlap; - new_data = &new_data[usize::try_from(overlap).expect("u64 fits in usize")..]; - // If it is small enough, extend the previous buffer. - // This can't always extend, because otherwise the buffer could end up - // growing indefinitely without being released. - // Use 1MB limit for first entry, 64KB for others. The logic is that with mostly - // in-order data, we'll mostly append to the first entry and we want to avoid - // reallocs while we wait for the app to read. For other (out-of-order), we want to - // keep at least a few packets in each entry, but not grow too large. - let is_first = first_key == Some(prev_start); - let limit = if is_first { 1024 * 1024 } else { 64 * 1024 }; - prev_vec.len() < limit && prev_end == new_start - } else { - // PPPPPP -> PPPPPP - // NNNN - // NNNN - // Do nothing - qtrace!("Dropping frame with already-received range {new_start}-{new_end}"); - return; - } - } else { - qtrace!("New frame {new_start}-{new_end} received"); - false - }; + // let first_key = self.data_ranges.keys().next().copied(); + + let extend = false; + // let extend = if let Some((&prev_start, prev_vec)) = + // self.data_ranges.range_mut(..=new_start).next_back() + // { + // let prev_end = prev_start + u64::try_from(prev_vec.len()).expect("usize fits in u64"); + // if new_end > prev_end { + // // PPPPPP -> PPPPPP + // // NNNNNN NN + // // NNNNNNNN NN + // // Add a range containing only new data + // // (In-order frames will take this path, with no overlap) + // let overlap = prev_end.saturating_sub(new_start); + // qtrace!("New frame {new_start}-{new_end} received, overlap: {overlap}"); + // new_start += overlap; + // new_data = &new_data[usize::try_from(overlap).expect("u64 fits in usize")..]; + // // If it is small enough, extend the previous buffer. + // // This can't always extend, because otherwise the buffer could end up + // // growing indefinitely without being released. + // // Use 1MB limit for first entry, 64KB for others. The logic is that with mostly + // // in-order data, we'll mostly append to the first entry and we want to avoid + // // reallocs while we wait for the app to read. For other (out-of-order), we want to + // // keep at least a few packets in each entry, but not grow too large. + // let is_first = first_key == Some(prev_start); + // let limit = if is_first { 1024 * 1024 } else { 64 * 1024 }; + // prev_vec.len() < limit && prev_end == new_start + // } else { + // // PPPPPP -> PPPPPP + // // NNNN + // // NNNN + // // Do nothing + // qtrace!("Dropping frame with already-received range {new_start}-{new_end}"); + // return; + // } + // } else { + // qtrace!("New frame {new_start}-{new_end} received"); + // false + // }; let mut to_add = new_data; if self @@ -276,20 +277,21 @@ impl RxStreamOrderer { buf.extend_from_slice(to_add); } } else { - // Pre-allocate capacity to avoid reallocations when buffer grows. - // Initial allocation: 64KB for first entry, 8KB for others. - // Check if this will be the first (lowest key) entry - let is_first = self - .data_ranges - .keys() - .next() - .map_or(true, |&first_key| new_start < first_key); - let initial_capacity = if is_first { 64 * 1024 } else { 8 * 1024 }; - // Allocate double the received data size up to the initial capacity limit. - let capacity = (to_add.len() * 2).min(initial_capacity).max(to_add.len()); - let mut vec = Vec::with_capacity(capacity); - vec.extend_from_slice(to_add); - self.data_ranges.insert(new_start, vec); + self.data_ranges.insert(new_start, to_add.to_vec()); + // // Pre-allocate capacity to avoid reallocations when buffer grows. + // // Initial allocation: 64KB for first entry, 8KB for others. + // // Check if this will be the first (lowest key) entry + // let is_first = self + // .data_ranges + // .keys() + // .next() + // .map_or(true, |&first_key| new_start < first_key); + // let initial_capacity = if is_first { 64 * 1024 } else { 8 * 1024 }; + // // Allocate double the received data size up to the initial capacity limit. + // let capacity = (to_add.len() * 2).min(initial_capacity).max(to_add.len()); + // let mut vec = Vec::with_capacity(capacity); + // vec.extend_from_slice(to_add); + // self.data_ranges.insert(new_start, vec); } } }