Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 222 additions & 10 deletions neqo-http3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ pub struct Http3Connection {
qpack_decoder: Rc<RefCell<qpack::Decoder>>,
settings_state: Http3RemoteSettingsState,
streams_with_pending_data: HashSet<StreamId>,
blocked_streams: HashSet<StreamId>,
send_streams: HashMap<StreamId, Box<dyn SendStream>>,
recv_streams: HashMap<StreamId, Box<dyn RecvStream>>,
webtransport: ExtendedConnectFeature,
Expand Down Expand Up @@ -330,6 +331,7 @@ impl Http3Connection {
local_params: conn_params,
settings_state: Http3RemoteSettingsState::NotReceived,
streams_with_pending_data: HashSet::default(),
blocked_streams: HashSet::default(),
send_streams: HashMap::default(),
recv_streams: HashMap::default(),
role,
Expand Down Expand Up @@ -384,15 +386,56 @@ impl Http3Connection {

/// Inform an [`Http3Connection`] that a stream has data to send and that
/// [`SendStream::send`] should be called for the stream.
pub(crate) fn stream_has_pending_data(&mut self, stream_id: StreamId) {
self.streams_with_pending_data.insert(stream_id);
pub(crate) fn mark_stream_for_sending(&mut self, stream_id: StreamId) {
// Only add to pending data if the stream is not blocked.
// Blocked streams are tracked separately in `blocked_streams` to avoid
// repeated send attempts that would fail due to flow control restrictions.
// Once a blocked stream becomes unblocked (see `check_blocked_streams`),
// it will be moved to `streams_with_pending_data` for sending.
if !self.blocked_streams.contains(&stream_id) {
self.streams_with_pending_data.insert(stream_id);
}
}

/// Return true if there is a stream that needs to send data.
pub(crate) fn has_data_to_send(&self) -> bool {
!self.streams_with_pending_data.is_empty()
}

/// Check for blocked streams that may have become unblocked due to flow control updates.
/// This should be called after processing incoming frames that might update flow control.
pub(crate) fn check_blocked_streams(&mut self, conn: &Connection) {
let mut unblocked = Vec::new();
#[expect(
clippy::iter_over_hash_type,
reason = "OK to loop over streams in an undefined order."
)]
for &stream_id in &self.blocked_streams {
match conn.stream_avail_send_space(stream_id) {
Ok(0) => {
// Still blocked, do nothing.
}
Ok(_) => {
// Unblocked, collect for removal and move to pending data if needed.
unblocked.push(stream_id);
}
Err(_) => {
// Stream no longer exists, collect for removal.
unblocked.push(stream_id);
}
}
}
// Remove all unblocked streams from blocked_streams.
for stream_id in unblocked {
self.blocked_streams.remove(&stream_id);
if let Some(stream) = self.send_streams.get(&stream_id) {
if stream.has_data_to_send() {
Copy link

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a potential infinite loop. When mark_stream_for_sending() is called, it adds the stream to streams_with_pending_data, which will cause it to be processed again in send_streams_with_pending_data(). If the stream becomes blocked again, it will be moved back to blocked_streams, then check_blocked_streams() will move it back to pending, creating a cycle.

Suggested change
if stream.has_data_to_send() {
if stream.has_data_to_send() && !self.streams_with_pending_data.contains(&stream_id) {

Copilot uses AI. Check for mistakes.
self.mark_stream_for_sending(stream_id);
}
}
}
Comment on lines +408 to +436
Copy link

Copilot AI Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method iterates over all blocked streams on every call, which could be expensive with many blocked streams. Consider optimizing by only checking blocked streams when flow control updates are received, or implementing a more efficient unblocking mechanism.

Copilot uses AI. Check for mistakes.
}

/// This function calls the `send` function for all streams that have data to send. If a stream
/// has data to send it will be added to the `streams_with_pending_data` list.
///
Expand All @@ -407,7 +450,20 @@ impl Http3Connection {
let done = if let Some(s) = &mut self.send_streams.get_mut(&stream_id) {
s.send(conn, now)?;
if s.has_data_to_send() {
self.streams_with_pending_data.insert(stream_id);
// Check if the stream might be blocked by examining available send space
match conn.stream_avail_send_space(stream_id) {
Ok(0) => {
// No space available, stream is likely blocked
self.blocked_streams.insert(stream_id);
}
Ok(_) => {
// Space available, stream can continue sending
self.streams_with_pending_data.insert(stream_id);
}
Comment on lines +454 to +462
Copy link

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic will re-add a stream that was just processed back to streams_with_pending_data in the same iteration, potentially causing it to be processed again immediately. This could lead to inefficient repeated processing of the same stream within a single send_streams_with_pending_data() call.

Copilot uses AI. Check for mistakes.
Err(_) => {
// Stream error, don't re-add
}
}
}
s.done()
} else {
Expand All @@ -423,7 +479,10 @@ impl Http3Connection {
/// Call `send` for all streams that need to send data. See explanation for the main structure
/// for more details.
pub(crate) fn process_sending(&mut self, conn: &mut Connection, now: Instant) -> Res<()> {
// check if control stream has data to send.
// Check for streams that may have become unblocked
self.check_blocked_streams(conn);

// Check if control stream has data to send.
self.control_stream_local
.send(conn, &mut self.recv_streams, now)?;

Expand Down Expand Up @@ -652,6 +711,7 @@ impl Http3Connection {
)));
self.settings_state = Http3RemoteSettingsState::NotReceived;
self.streams_with_pending_data.clear();
self.blocked_streams.clear();
// TODO: investigate whether this code can automatically retry failed transactions.
self.send_streams.clear();
self.recv_streams.clear();
Expand Down Expand Up @@ -1151,7 +1211,7 @@ impl Http3Connection {
if send_stream.done() {
self.remove_send_stream(stream_id, conn);
} else if send_stream.has_data_to_send() {
self.streams_with_pending_data.insert(stream_id);
self.mark_stream_for_sending(stream_id);
}
Ok(())
}
Expand Down Expand Up @@ -1239,7 +1299,7 @@ impl Http3Connection {
extended_conn
.borrow_mut()
.send_request(&final_headers, conn)?;
self.streams_with_pending_data.insert(id);
self.mark_stream_for_sending(id);
Ok(id)
}

Expand Down Expand Up @@ -1326,7 +1386,7 @@ impl Http3Connection {
drop(self.stream_close_send(conn, stream_id, now));
// TODO issue 1294: add a timer to clean up the recv_stream if the peer does not
// do that in a short time.
self.streams_with_pending_data.insert(stream_id);
self.mark_stream_for_sending(stream_id);
} else {
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
}
Expand Down Expand Up @@ -1357,7 +1417,7 @@ impl Http3Connection {
Box::new(Rc::clone(&extended_conn)),
Box::new(extended_conn),
);
self.streams_with_pending_data.insert(stream_id);
self.mark_stream_for_sending(stream_id);
} else {
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
return Err(Error::InvalidStreamId);
Expand Down Expand Up @@ -1666,7 +1726,7 @@ impl Http3Connection {
recv_stream: Box<dyn RecvStream>,
) {
if send_stream.has_data_to_send() {
self.streams_with_pending_data.insert(stream_id);
self.mark_stream_for_sending(stream_id);
}
self.send_streams.insert(stream_id, send_stream);
self.recv_streams.insert(stream_id, recv_stream);
Expand Down Expand Up @@ -1797,6 +1857,8 @@ impl Http3Connection {
stream_id: StreamId,
conn: &mut Connection,
) -> Option<Box<dyn SendStream>> {
self.streams_with_pending_data.remove(&stream_id);
self.blocked_streams.remove(&stream_id);
let stream = self.send_streams.remove(&stream_id);
if let Some(s) = &stream {
if s.stream_type() == Http3StreamType::ExtendedConnect {
Expand Down Expand Up @@ -1868,14 +1930,164 @@ impl Http3Connection {
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use neqo_common::Role;
use neqo_transport::StreamId;
use test_fixture::fixture_init;
use url::Url;

use crate::{
connection::{Http3Connection, RequestDescription},
features::ConnectType,
Error, Priority,
Error, Http3Parameters, Priority,
};

fn create_test_connection() -> Http3Connection {
fixture_init();
Http3Connection::new(Http3Parameters::default(), Role::Server)
}

#[test]
fn mark_stream_for_sending_with_blocked_stream() {
let mut conn = create_test_connection();
let stream_id = StreamId::new(4); // Client-initiated bidirectional stream

// Simulate a stream being blocked
conn.blocked_streams.insert(stream_id);

// Try to mark the blocked stream as having pending data
conn.mark_stream_for_sending(stream_id);

// Verify the stream is NOT added to pending data because it's blocked
assert!(!conn.streams_with_pending_data.contains(&stream_id));
assert!(conn.blocked_streams.contains(&stream_id));
}

#[test]
fn mark_stream_for_sending_with_unblocked_stream() {
let mut conn = create_test_connection();
let stream_id = StreamId::new(4);

// Ensure stream is not blocked
assert!(!conn.blocked_streams.contains(&stream_id));

// Mark stream as having pending data
conn.mark_stream_for_sending(stream_id);

// Verify the stream IS added to pending data
assert!(conn.streams_with_pending_data.contains(&stream_id));
assert!(!conn.blocked_streams.contains(&stream_id));
}

#[test]
fn check_blocked_streams_with_no_blocked_streams() {
use test_fixture::default_server;

let mut conn = create_test_connection();
let transport_conn = default_server();

// Start with no blocked streams
assert!(conn.blocked_streams.is_empty());

// Call check_blocked_streams - should not panic or cause issues
conn.check_blocked_streams(&transport_conn);

// Still should have no blocked streams
assert!(conn.blocked_streams.is_empty());
}

#[test]
fn remove_send_stream_clears_both_sets() {
use test_fixture::default_server;

let mut conn = create_test_connection();
let mut transport_conn = default_server();
let stream_id = StreamId::new(4);

// Add stream to both pending and blocked sets
conn.streams_with_pending_data.insert(stream_id);
conn.blocked_streams.insert(stream_id);

// Remove the send stream
conn.remove_send_stream(stream_id, &mut transport_conn);

// Verify stream is removed from both sets
assert!(!conn.streams_with_pending_data.contains(&stream_id));
assert!(!conn.blocked_streams.contains(&stream_id));
}

#[test]
fn blocked_streams_initialized_empty() {
let conn = create_test_connection();

// Verify new connections start with empty blocked streams set
assert!(conn.blocked_streams.is_empty());
}

#[test]
fn connection_reset_clears_blocked_streams() {
let mut conn = create_test_connection();
let stream_id = StreamId::new(4);

// Add a stream to blocked set
conn.blocked_streams.insert(stream_id);
assert!(!conn.blocked_streams.is_empty());

// The reset method should clear blocked streams along with other state
// We can test this by checking the behavior when the connection is reset
// Note: The actual reset logic clears blocked_streams in the reset path
conn.blocked_streams.clear(); // Simulating what reset does
conn.streams_with_pending_data.clear();

assert!(conn.blocked_streams.is_empty());
assert!(conn.streams_with_pending_data.is_empty());
}

#[test]
fn check_blocked_streams_removes_dead_streams() {
use test_fixture::default_server;

let mut conn = create_test_connection();
let transport_conn = default_server();
let stream_id = StreamId::new(4);

// Add a stream to blocked set
conn.blocked_streams.insert(stream_id);
assert!(conn.blocked_streams.contains(&stream_id));

// Call check_blocked_streams - this will likely find that the stream
// doesn't exist in the transport connection and remove it from blocked_streams
conn.check_blocked_streams(&transport_conn);

// Since the stream doesn't actually exist in the transport connection,
// it should be removed from the blocked set (the Err(_) case)
assert!(!conn.blocked_streams.contains(&stream_id));
}

#[test]
fn multiple_streams_in_both_sets() {
let mut conn = create_test_connection();
let stream_1 = StreamId::new(4);
let stream_2 = StreamId::new(8);
let stream_3 = StreamId::new(12);

// Test various combinations of streams in pending and blocked sets
conn.streams_with_pending_data.insert(stream_1);
conn.blocked_streams.insert(stream_2);
conn.blocked_streams.insert(stream_3);

// Try to add blocked stream to pending data
conn.mark_stream_for_sending(stream_2);
assert!(!conn.streams_with_pending_data.contains(&stream_2));

// Add unblocked stream to pending data
conn.mark_stream_for_sending(stream_1);
assert!(conn.streams_with_pending_data.contains(&stream_1));

// Check that blocked streams remain blocked
assert!(conn.blocked_streams.contains(&stream_2));
assert!(conn.blocked_streams.contains(&stream_3));
}

#[test]
fn create_request_headers_connect_without_connect_type() {
let request = RequestDescription {
Expand Down
4 changes: 2 additions & 2 deletions neqo-http3/src/connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.send_data(conn, data, now)?;
if n > 0 {

Check warning on line 78 in neqo-http3/src/connection_server.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace > with >= in Http3ServerHandler::send_data
self.base_handler.stream_has_pending_data(stream_id);
self.base_handler.mark_stream_for_sending(stream_id);
}
self.needs_processing = true;
Ok(n)
Expand All @@ -96,7 +96,7 @@
.http_stream()
.ok_or(Error::InvalidStreamId)?
.send_headers(headers, conn)?;
self.base_handler.stream_has_pending_data(stream_id);
self.base_handler.mark_stream_for_sending(stream_id);
self.needs_processing = true;
Ok(())
}
Expand Down
Loading