Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions neqo-http3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,27 +396,33 @@ impl Http3Connection {
/// has data to send it will be added to the `streams_with_pending_data` list.
///
/// Control and QPACK streams are handled differently and are never added to the list.
fn send_non_control_streams(&mut self, conn: &mut Connection, now: Instant) -> Res<()> {
fn send_non_control_streams(&mut self, conn: &mut Connection, now: Instant) {
let to_send = mem::take(&mut self.streams_with_pending_data);
#[expect(
clippy::iter_over_hash_type,
reason = "OK to loop over active streams in an undefined order."
)]
for stream_id in to_send {
let done = if let Some(s) = &mut self.send_streams.get_mut(&stream_id) {
s.send(conn, now)?;
if s.has_data_to_send() {
self.streams_with_pending_data.insert(stream_id);
if let Err(e) = s.send(conn, now) {
// Per RFC 9114 Section 8.1, stream-level send failures
// (e.g., STOP_SENDING received) must not close the
// connection. Remove the affected stream and continue.
qdebug!("[{self}] Stream {stream_id} send error: {e}");
true
} else {
if s.has_data_to_send() {
self.streams_with_pending_data.insert(stream_id);
}
s.done()
}
s.done()
} else {
false
};
if done {
self.remove_send_stream(stream_id, conn);
}
}
Ok(())
}

/// Call `send` for all streams that need to send data. See explanation for the main structure
Expand All @@ -426,7 +432,7 @@ impl Http3Connection {
self.control_stream_local
.send(conn, &mut self.recv_streams, now)?;

self.send_non_control_streams(conn, now)?;
self.send_non_control_streams(conn, now);

self.qpack_decoder.borrow_mut().send(conn)?;
match self.qpack_encoder.borrow_mut().send_encoder_updates(conn) {
Expand Down
22 changes: 22 additions & 0 deletions neqo-http3/src/headers_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ pub fn headers_valid(headers: &[Header], message_type: MessageType) -> Res<()> {
return Err(Error::InvalidHeader);
}

// RFC 9114 Section 4.4: bare CONNECT MUST NOT include :scheme or :path.
if message_type == MessageType::Request
&& method_value == Some(b"CONNECT".as_ref())
&& protocol_value.is_none()
&& (pseudo_state.contains(PseudoHeaderState::Scheme)
|| pseudo_state.contains(PseudoHeaderState::Path))
{
return Err(Error::InvalidHeader);
}

Ok(())
}

Expand Down Expand Up @@ -318,4 +328,16 @@ mod tests {
];
assert!(headers_valid(&headers, MessageType::Request).is_err());
}

#[test]
fn bare_connect_with_forbidden_headers() {
for field in [":scheme", ":path"] {
let headers = vec![
Header::new(":method", "CONNECT"),
Header::new(":authority", "proxy.example.com:443"),
Header::new(field, "value"),
];
assert!(headers_valid(&headers, MessageType::Request).is_err());
}
}
}
5 changes: 4 additions & 1 deletion neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ impl SendStream for SendMessage {
/// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
/// info that the stream has been closed.)
fn send(&mut self, conn: &mut Connection, now: Instant) -> Res<()> {
let sent = Error::map_error(self.stream.send_buffer(conn, now), Error::HttpInternal(5))?;
let sent = self
.stream
.send_buffer(conn, now)
.map_err(|e| Error::map_stream_send_errors(&e))?;

qtrace!("[{self}] {sent} bytes sent");
if !self.stream.has_buffered_data() {
Expand Down
223 changes: 209 additions & 14 deletions neqo-http3/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
time::Instant,
};

use neqo_common::{Datagram, qtrace};
use neqo_common::{Datagram, Header, qtrace};
use neqo_crypto::{AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttChecker};
use neqo_transport::{
ConnectionIdGenerator, Output, OutputBatch,
Expand All @@ -22,7 +22,7 @@ use neqo_transport::{
use rustc_hash::FxHashMap as HashMap;

use crate::{
Http3Parameters, Http3StreamInfo, Res,
Error, Http3Parameters, Http3StreamInfo, Res,
connection::Http3State,
connection_server::Http3ServerHandler,
server_connection_events::{ConnectUdpEvent, Http3ServerConnEvent, WebTransportEvent},
Expand Down Expand Up @@ -317,6 +317,23 @@ impl Http3Server {
datagram,
);
}
Http3ServerConnEvent::RejectedConnect { stream_id, status } => {
let headers = [Header::new(":status", status.to_string())];
let conn = &mut conn.borrow_mut();
if handler_borrowed
.send_headers(stream_id, &headers, conn)
.is_ok()
{
_ = handler_borrowed.stream_close_send(stream_id, conn, now);
} else {
// Ensure the stream doesn't linger if we can't send the response.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. It's a very sensible way to avoid sitting on requests.

_ = handler_borrowed.cancel_fetch(
stream_id,
Error::HttpRequestRejected.code(),
conn,
);
}
}
}
}
}
Expand Down Expand Up @@ -504,12 +521,22 @@ mod tests {
assert_eq!(client.process(s4.dgram(), now()).dgram(), None);
}

const EXPECTED_CONTROL_STREAM_DATA: &[u8] = &[0x0, 0x4, 0x12, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64];

// Start a client/server and check setting frame.
fn connect_and_receive_settings_with_server(
server: &mut Http3Server,
) -> (Connection, ResumptionToken) {
const CONTROL_STREAM_DATA: &[u8] = &[0x0, 0x4, 0x12, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64];
connect_and_receive_server_settings(server, Some(EXPECTED_CONTROL_STREAM_DATA))
}

/// Connect transport and drain server-initiated streams.
/// If `expected_control_data` is `Some`, assert the control stream data matches exactly;
/// otherwise just drain it.
fn connect_and_receive_server_settings(
server: &mut Http3Server,
expected_control_data: Option<&[u8]>,
) -> (Connection, ResumptionToken) {
let mut client = default_client();
connect_transport(server, &mut client, false);

Expand All @@ -531,9 +558,13 @@ mod tests {
{
// the control stream
let mut buf = [0_u8; 100];
let (_, fin) = client.stream_recv(stream_id, &mut buf).unwrap();
let (amount, fin) = client.stream_recv(stream_id, &mut buf).unwrap();
assert!(!fin);
assert_eq!(&buf[..CONTROL_STREAM_DATA.len()], CONTROL_STREAM_DATA);
if let Some(expected) = expected_control_data {
assert_eq!(&buf[..expected.len()], expected);
} else {
assert!(amount > 0);
}
} else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID
|| stream_id == SERVER_SIDE_ENCODER_STREAM_ID
{
Expand Down Expand Up @@ -616,15 +647,23 @@ mod tests {
}
}

// Connect transport, send and receive settings.
fn connect_to(server: &mut Http3Server) -> (PeerConnection, ResumptionToken) {
let (mut neqo_trans_conn, token) = connect_and_receive_settings_with_server(server);
// Default client SETTINGS: QPACK_MAX_TABLE_CAPACITY=100, QPACK_BLOCKED_STREAMS=100.
const DEFAULT_CLIENT_SETTINGS: &[u8] = &[0x0, 0x4, 0x6, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64];

/// Connect transport, send SETTINGS, and set up QPACK streams.
/// Returns the `PeerConnection`, `ResumptionToken`, and the QPACK `Encoder`.
/// If `expected_server_control_data` is `None`, the server's control stream data
/// is drained without asserting on exact content.
fn connect_to_with_encoder(
server: &mut Http3Server,
settings: &[u8],
expected_server_control_data: Option<&[u8]>,
) -> (PeerConnection, ResumptionToken, qpack::Encoder) {
let (mut neqo_trans_conn, token) =
connect_and_receive_server_settings(server, expected_server_control_data);
let control_stream = neqo_trans_conn.stream_create(StreamType::UniDi).unwrap();
let mut sent = neqo_trans_conn.stream_send(
control_stream,
&[0x0, 0x4, 0x6, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64],
);
assert_eq!(sent, Ok(9));
let mut sent = neqo_trans_conn.stream_send(control_stream, settings);
assert_eq!(sent, Ok(settings.len()));
let mut encoder = qpack::Encoder::new(
&qpack::Settings {
max_table_size_encoder: 100,
Expand All @@ -642,7 +681,7 @@ mod tests {
let out2 = server.process(out1.dgram(), now());
drop(neqo_trans_conn.process(out2.dgram(), now()));

// assert no error occurred.
// Assert no error occurred.
assert_not_closed(server);

(
Expand All @@ -651,9 +690,20 @@ mod tests {
control_stream_id: control_stream,
},
token,
encoder,
)
}

// Connect transport, send and receive settings.
fn connect_to(server: &mut Http3Server) -> (PeerConnection, ResumptionToken) {
let (peer, token, _encoder) = connect_to_with_encoder(
server,
DEFAULT_CLIENT_SETTINGS,
Some(EXPECTED_CONTROL_STREAM_DATA),
);
(peer, token)
}

fn connect() -> (Http3Server, PeerConnection) {
let (server, client, _token) = connect_with_token();
(server, client)
Expand Down Expand Up @@ -1097,6 +1147,44 @@ mod tests {
assert_eq!(headers_frames, 1);
}

/// When a client sends `STOP_SENDING` in the same flight as a request,
/// the transport may deliver `SendStreamStopSending` before `NewStream`.
/// `handle_stream_stop_sending` silently no-ops because the HTTP/3 send
/// stream does not exist yet. The stream is then created normally, and
/// when the server responds, `process_sending` tries to flush data to a
/// transport stream that already has `STOP_SENDING`.
#[test]
fn stop_sending_before_response() {
let (mut hconn, mut peer_conn) = connect();

let stream_id = peer_conn.stream_create(StreamType::BiDi).unwrap();
peer_conn.stream_send(stream_id, REQUEST_WITH_BODY).unwrap();
peer_conn.stream_close_send(stream_id).unwrap();

// Client sends STOP_SENDING in the same flight as the request.
peer_conn
.stream_stop_sending(stream_id, Error::HttpNone.code())
.unwrap();
Comment on lines +1164 to +1167
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we change our code such that STOP_SENDING ends up causing other frames to not be sent? I mean, that's something we might want to consider doing, isn't it?

(If we had RESET_STREAM_AT, you could commit(), which would make the data you sent relevant. You wouldn't necessarily guarantee that the fin flag makes it through. The write would, so this is probably OK.)

The alternative approach here is to write the STOP_SENDING onto the wire directly, using the test frame writer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frame writer is AFAICT not easily accessible in neqo-http3 without some cargo feature additions. Worth it?

(@mxinden started implementing RESET_STREAM_AT in #3102 - should I leave a TODO` to revisit this when we land this?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting on RESET_STREAM_AT seems like a good plan.


let out = peer_conn.process_output(now());
hconn.process(out.dgram(), now());

// Server receives the request and responds.
while let Some(event) = hconn.next_event() {
if let Http3ServerEvent::Headers { stream, .. } = event {
stream
.send_headers(&[Header::new(":status", "200")])
.unwrap();
}
}

// Triggers `process_sending` which flushes the buffered response.
// The server must handle the transport error gracefully instead of
// propagating it as a connection error.
_ = hconn.process_output(now());
assert_not_closed(&hconn);
}

#[test]
fn server_request_with_body_server_reset() {
let (mut hconn, mut peer_conn) = connect();
Expand Down Expand Up @@ -1394,4 +1482,111 @@ mod tests {
assert!(client.tls_info().unwrap().resumed());
assert_eq!(client.zero_rtt_state(), ZeroRttState::Rejected);
}

// Client SETTINGS with ENABLE_CONNECT_PROTOCOL=1 appended.
const SETTINGS_WITH_CONNECT: &[u8] =
&[0x0, 0x4, 0x08, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64, 0x08, 0x01];

/// Set up a server and client with an extended CONNECT request using an
/// unknown `:protocol` queued on the stream. Packets have NOT been exchanged.
fn setup_rejected_extended_connect() -> (Http3Server, PeerConnection, StreamId) {
let mut server = create_server(http3params(DEFAULT_SETTINGS).connect(true));
let (mut client, _token, mut encoder) =
connect_to_with_encoder(&mut server, SETTINGS_WITH_CONNECT, None);

let stream_id = client.stream_create(StreamType::BiDi).unwrap();
let headers = vec![
Header::new(":method", "CONNECT"),
Header::new(":protocol", "unknown-protocol"),
Header::new(":authority", "example.com"),
Header::new(":path", "/"),
Header::new(":scheme", "https"),
];
let header_block = encoder.encode_header_block(&mut client.conn, &headers, stream_id);
let hframe = HFrame::Headers {
header_block: header_block.to_vec(),
};
let mut frame_encoder = Encoder::default();
hframe.encode(&mut frame_encoder);
client
.stream_send(stream_id, frame_encoder.as_ref())
.unwrap();

(server, client, stream_id)
}

fn assert_no_request_events(server: &Http3Server) {
let request_event = server.events().any(|e| {
matches!(
&e,
Http3ServerEvent::Headers { .. } | Http3ServerEvent::Data { .. }
)
});
assert!(
!request_event,
"rejected CONNECT should not produce a public event"
);
}

/// Exchange packets between client and server, then assert the server
/// did not emit any request events and the connection is still open.
fn process_rejected_connect(server: &mut Http3Server, client: &mut PeerConnection) {
let out = client.process_output(now());
let out = server.process(out.dgram(), now());
let out = client.process(out.dgram(), now());
let out = server.process(out.dgram(), now());
_ = client.process(out.dgram(), now());

assert_no_request_events(server);
assert_not_closed(server);
}

/// Extended CONNECT with an unknown `:protocol` gets a 501 response
/// and the stream is closed without generating a public `Http3ServerEvent`.
#[test]
fn rejected_extended_connect_unknown_protocol() {
let (mut server, mut client, stream_id) = setup_rejected_extended_connect();
process_rejected_connect(&mut server, &mut client);

// The server should have responded with 501 + FIN on the stream.
let mut got_response = false;
while let Some(e) = client.next_event() {
if let ConnectionEvent::RecvStreamReadable { stream_id: sid } = e
&& sid == stream_id
{
let mut buf = [0_u8; 1024];
let (amount, fin) = client.stream_recv(sid, &mut buf).unwrap();
assert!(amount > 0 || fin, "expected response data or FIN");
got_response = true;
}
}
assert!(got_response, "client did not receive the 501 response");
}

/// When the client sends `STOP_SENDING` before the server processes the
/// rejection, `send_headers` fails and the server falls back to
/// `cancel_fetch`, which sends `RESET_STREAM` to the client.
#[test]
fn rejected_extended_connect_fallback_on_stop_sending() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is a missing test: where the server has to reject the stream rather than send a response.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure I understand what you mean here. I added a check that the client receives a RESET_STREAM; is this what you meant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. For that you probably need to configure the client with a very small flow control buffer.

let (mut server, mut client, stream_id) = setup_rejected_extended_connect();

// Client tells the server to stop sending before the CONNECT is processed.
client
.stream_stop_sending(stream_id, Error::HttpRequestRejected.code())
.unwrap();

process_rejected_connect(&mut server, &mut client);

// Verify the client received a RESET_STREAM for the rejected stream.
assert!(client.events().any(|e| {
matches!(
e,
ConnectionEvent::RecvStreamReset {
stream_id: sid,
app_error,
} if sid == stream_id
&& app_error == Error::HttpRequestRejected.code()
)
}));
}
}
Loading
Loading