diff --git a/vm/devices/net/net_consomme/consomme/src/tcp.rs b/vm/devices/net/net_consomme/consomme/src/tcp.rs index ba1f430982..114bd4e714 100644 --- a/vm/devices/net/net_consomme/consomme/src/tcp.rs +++ b/vm/devices/net/net_consomme/consomme/src/tcp.rs @@ -17,6 +17,8 @@ use futures::AsyncRead; use futures::AsyncWrite; use inspect::Inspect; use inspect::InspectMut; +use inspect_counters::Counter; +use inspect_counters::Histogram; use pal_async::driver::Driver; use pal_async::interest::PollEvents; use pal_async::socket::PollReady; @@ -77,6 +79,30 @@ pub(crate) struct Tcp { listeners: HashMap, #[inspect(mut)] connection_params: ConnectionParams, + aggregate_stats: TcpAggregateStats, +} + +/// Aggregate statistics across all TCP connections for inspect/diagnostics. +#[derive(Inspect, Default)] +struct TcpAggregateStats { + connections_accepted: Counter, + connections_initiated: Counter, + /// Connections closed normally (LastAck final ACK, TimeWait, FIN exchange). + connections_closed_normal: Counter, + /// Connections closed by receiving a valid RST from the peer. + connections_closed_peer_rst: Counter, + /// Connections closed due to local errors (socket failures, invalid handshake). + connections_closed_local_error: Counter, +} + +impl TcpAggregateStats { + fn record_close(&mut self, reason: ConnectionCloseReason) { + match reason { + ConnectionCloseReason::Normal => self.connections_closed_normal.increment(), + ConnectionCloseReason::PeerRst => self.connections_closed_peer_rst.increment(), + ConnectionCloseReason::LocalError => self.connections_closed_local_error.increment(), + } + } } #[derive(InspectMut)] @@ -110,6 +136,7 @@ impl Tcp { rx_buffer_size: 256 * 1024, tx_buffer_size: 256 * 1024, }, + aggregate_stats: TcpAggregateStats::default(), } } } @@ -169,12 +196,76 @@ struct TcpConnectionInner { #[inspect(hex)] tx_window_len: u16, tx_window_scale: u8, + /// Whether the tx_window_scale is active (i.e., we've received the first + /// non-SYN ACK). Per RFC 1323 §2.2, the window field in SYN/SYN-ACK + /// segments is NOT scaled — only subsequent segments are. + tx_window_scale_active: bool, #[inspect(with = "inspect_seq")] tx_window_rx_seq: TcpSeqNumber, #[inspect(with = "inspect_seq")] tx_window_tx_seq: TcpSeqNumber, #[inspect(hex)] tx_mss: usize, + #[inspect(skip)] + last_close_reason: ConnectionCloseReason, + + stats: TcpConnStats, +} + +/// Why a connection was closed, for aggregate stats categorization. +#[derive(Default, Clone, Copy)] +enum ConnectionCloseReason { + #[default] + LocalError, + PeerRst, + Normal, +} + +/// Policy for whether `send_data` should emit a standalone (pure) ACK when +/// there is nothing else to put in the segment. +/// +/// Pure ACKs are deferred from the per-packet `handle_tcp` hot path so that +/// bursts of inbound guest packets coalesce into a single ACK emitted by the +/// trailing `poll_tcp` cycle. Without this, every inbound data segment +/// triggers a zero-payload ACK back, doubling the packet rate and adding +/// per-packet overhead on the virtual link (RFC 1122 §4.2.3.2 explicitly +/// permits delaying ACKs to coalesce them). +#[derive(Copy, Clone, PartialEq, Eq)] +enum AckPolicy { + /// Don't emit a standalone ACK in this call. Data segments and FINs + /// still go out; `needs_ack` remains set so a later `Flush` call (or + /// a piggybacked ACK on outbound data) will satisfy it. + Defer, + /// Emit a standalone ACK if one is pending. Used from poll-cycle paths + /// that run once per batch (`poll_socket_backend`, `poll_dns_backend`). + Flush, +} + +/// Per-connection TCP statistics for performance analysis. +#[derive(Inspect, Default)] +struct TcpConnStats { + /// Bytes sent from host to guest. + bytes_tx_to_guest: Counter, + /// Bytes received from guest to host. + bytes_rx_from_guest: Counter, + /// Packets sent from host to guest. + pkts_tx_to_guest: Counter, + /// Packets received from guest to host. + pkts_rx_from_guest: Counter, + /// ACKs sent. + acks_tx: Counter, + /// RSTs sent. + rsts_tx: Counter, + /// Times send_data broke out because rx_mtu was 0 (no guest rx buffers). + tx_blocked_no_rx_mtu: Counter, + /// Times send_data was limited by the peer's advertised window being full. + tx_blocked_window_full: Counter, + /// Out-of-window packets received. + out_of_window_pkts: Counter, + /// Segment size distribution for packets sent to guest. + tx_segment_size: Histogram<14>, + /// Segment size distribution for packets received from guest. + rx_segment_size: Histogram<14>, } fn inspect_seq(seq: &TcpSeqNumber) -> inspect::AsHex { @@ -303,6 +394,7 @@ impl Access<'_, T> { } }; e.insert(conn); + self.inner.tcp.aggregate_stats.connections_accepted.increment(); } hash_map::Entry::Occupied(_) => { tracing::warn!( @@ -324,7 +416,7 @@ impl Access<'_, T> { state: &mut self.inner.state, client: self.client, }; - match &mut conn.backend { + let keep = match &mut conn.backend { TcpBackend::Dns(dns_handler) => match &mut self.inner.dns { Some(dns) => conn .inner @@ -337,7 +429,14 @@ impl Access<'_, T> { TcpBackend::Socket(opt_socket) => { conn.inner.poll_socket_backend(cx, &mut sender, opt_socket) } + }; + if !keep { + self.inner + .tcp + .aggregate_stats + .record_close(conn.inner.last_close_reason); } + keep }) } @@ -407,7 +506,24 @@ impl Access<'_, T> { match self.inner.tcp.connections.entry(ft) { hash_map::Entry::Occupied(mut e) => { let keep = e.get_mut().inner.handle_packet(&mut sender, &tcp)?; - if !keep { + if keep { + // Push out any newly-unblocked data (e.g., this ACK advanced + // the peer window) so we don't wait an entire poll cycle. + // + // Use `AckPolicy::Defer` so we DON'T emit a standalone ACK + // here: inbound bursts arrive as many back-to-back + // `handle_tcp` calls within a single `poll_ready` cycle, + // and the trailing `poll_tcp` will emit (at most) one + // consolidated ACK for the whole batch — or piggyback it + // on outbound data if any becomes available. Without this, + // every guest packet would trigger a zero-payload ACK back, + // doubling packet rate and creating an ACK storm. + e.get_mut().inner.send_next(&mut sender, AckPolicy::Defer); + } else { + self.inner + .tcp + .aggregate_stats + .record_close(e.get().inner.last_close_reason); let dns_in_flight = matches!( e.get().backend, TcpBackend::Dns(ref h) if h.is_in_flight() @@ -437,6 +553,11 @@ impl Access<'_, T> { TcpConnection::new(&mut sender, &tcp, &self.inner.tcp.connection_params)? }; e.insert(conn); + self.inner + .tcp + .aggregate_stats + .connections_initiated + .increment(); } else { // Ignore the packet. } @@ -598,12 +719,15 @@ impl TcpConnection { tx_send: tx_seq, tx_window_len: 1, tx_window_scale: 0, + tx_window_scale_active: false, tx_window_rx_seq: rx_seq, tx_window_tx_seq: tx_seq, // The TCPv4 default maximum segment size is 536. This can be bigger for // IPv6. tx_mss: 536, tx_fin_buffered: false, + last_close_reason: ConnectionCloseReason::LocalError, + stats: TcpConnStats::default(), } } @@ -625,6 +749,9 @@ impl TcpConnection { ) .map_err(DropReason::Io)?; + // Disable Nagle's algorithm to reduce latency for small packets. + socket.set_tcp_nodelay(true).map_err(DropReason::Io)?; + // On Windows the default behavior for non-existent loopback sockets is // to wait and try again. This is different than the Linux behavior of // immediately failing. Default to the Linux behavior. @@ -675,8 +802,12 @@ impl TcpConnection { socket: Socket, params: &ConnectionParams, ) -> Result { + // Disable Nagle's algorithm to reduce latency for small packets. + socket.set_tcp_nodelay(true).map_err(DropReason::Io)?; + let mut inner = TcpConnectionInner { state: TcpState::SynSent, + enable_window_scaling: true, ..Self::new_base(params) }; inner.send_syn(sender, None); @@ -794,6 +925,7 @@ impl TcpConnectionInner { } Poll::Ready(Err(_)) => { sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } Poll::Pending => break, @@ -811,14 +943,20 @@ impl TcpConnectionInner { Err(_) => { // Invalid DNS TCP framing; reset the connection. sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } } - self.send_next(sender); - !(self.state == TcpState::TimeWait + // Flush any deferred pure-ACK from per-packet `handle_tcp` calls. + self.send_next(sender, AckPolicy::Flush); + let closing = self.state == TcpState::TimeWait || self.state == TcpState::LastAck - || (self.state.tx_fin() && self.state.rx_fin() && self.tx_buffer.is_empty())) + || (self.state.tx_fin() && self.state.rx_fin() && self.tx_buffer.is_empty()); + if closing { + self.last_close_reason = ConnectionCloseReason::Normal; + } + !closing } /// Poll the real-socket TCP connection backend. @@ -873,6 +1011,7 @@ impl TcpConnectionInner { ), } sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } @@ -907,11 +1046,19 @@ impl TcpConnectionInner { ), } sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } Poll::Pending => break, } } + // When the buffer fills without hitting Pending, the socket's + // cached readiness remains set (successful reads don't clear + // it). No explicit waker re-arm is needed — the next poll cycle + // (triggered when the guest ACKs and drains the buffer) will + // naturally retry the read via the still-cached readiness. + // Clearing readiness synthetically would be unsafe on + // edge-triggered epoll backends. } } @@ -938,6 +1085,7 @@ impl TcpConnectionInner { } } sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } Poll::Pending => break, @@ -952,15 +1100,26 @@ impl TcpConnectionInner { "shutdown error" ); sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); return false; } self.is_shutdown = true; } } - // Send whatever needs to be sent. - self.send_next(sender); - true + // Send any pending data or ACKs. Always use Flush: if no data was + // read from the socket and no ACK is pending, send_data will find + // nothing to do anyway. + self.send_next(sender, AckPolicy::Flush); + + // Detect normal connection closure (same logic as DNS backend). + let closing = self.state == TcpState::TimeWait + || self.state == TcpState::LastAck + || (self.state.tx_fin() && self.state.rx_fin() && self.tx_buffer.is_empty()); + if closing { + self.last_close_reason = ConnectionCloseReason::Normal; + } + !closing } fn handle_connect_error( @@ -982,6 +1141,7 @@ impl TcpConnectionInner { } else { log_connect_error(sender.ft, &err); sender.rst(self.tx_send, Some(self.rx_seq)); + self.stats.rsts_tx.increment(); } } @@ -989,11 +1149,11 @@ impl TcpConnectionInner { ((self.rx_window_cap - self.rx_buffer.len()) >> self.rx_window_scale) as u16 } - fn send_next(&mut self, sender: &mut Sender<'_, impl Client>) { + fn send_next(&mut self, sender: &mut Sender<'_, impl Client>, ack_policy: AckPolicy) { match self.state { TcpState::Connecting => {} TcpState::SynReceived => self.send_syn(sender, Some(self.rx_seq)), - _ => self.send_data(sender), + _ => self.send_data(sender, ack_policy), } } @@ -1032,17 +1192,31 @@ impl TcpConnectionInner { self.tx_send += 1; } - fn send_data(&mut self, sender: &mut Sender<'_, impl Client>) { - // These computations assume syn has already been sent and acked. + fn send_data(&mut self, sender: &mut Sender<'_, impl Client>, ack_policy: AckPolicy) { + // RFC 1323 §2.2: the window field in SYN/SYN-ACK is unscaled. Only + // apply the shift once the handshake is complete (first non-SYN window + // update sets tx_window_scale_active). For the guest-initiated path + // this is set before send_data can run; for host-initiated (port-forward) + // connections it guards against using the unscaled SYN-ACK window. + let scale = if self.tx_window_scale_active { + self.tx_window_scale + } else { + 0 + }; let tx_payload_end = self.tx_acked + self.tx_buffer.len(); let tx_end = tx_payload_end + self.tx_fin_buffered as usize; - let tx_window_end = self.tx_acked + ((self.tx_window_len as usize) << self.tx_window_scale); + let tx_window_end = self.tx_acked + ((self.tx_window_len as usize) << scale); let tx_done = seq_min([tx_end, tx_window_end]); + if self.tx_send < tx_end && tx_window_end <= self.tx_send { + self.stats.tx_blocked_window_full.increment(); + } + while self.needs_ack || self.tx_send < tx_done { let rx_mtu = sender.client.rx_mtu(); if rx_mtu == 0 { // Out of receive buffers. + self.stats.tx_blocked_no_rx_mtu.increment(); break; } @@ -1092,9 +1266,18 @@ impl TcpConnectionInner { tx_next += payload_len; + // Set PSH on the segment that drains all currently-buffered data. + // This tells the guest TCP stack to deliver the data to the + // application immediately rather than waiting for more. + // Note: when tx_fin_buffered is true, the FIN block below will + // override tcp.control to Fin, which takes priority over Psh. + if payload_len > 0 && tx_next == tx_payload_end && !self.tx_fin_buffered { + tcp.control = TcpControl::Psh; + } + // Include the fin if present if there is still room. if self.tx_fin_buffered - && tcp.control == TcpControl::None + && tcp.control != TcpControl::Fin && tx_next == tx_payload_end && tx_next < tx_window_end { @@ -1102,6 +1285,17 @@ impl TcpConnectionInner { tx_next += 1; } + // If this iteration would emit a pure ACK (no payload, no FIN) + // and the caller asked us to defer pure ACKs, stop the loop. + // `needs_ack` is left set for the next poll-cycle `Flush` call + // (or a piggybacked ACK on later outbound data). + if ack_policy == AckPolicy::Defer + && tx_next == self.tx_send + && tcp.control == TcpControl::None + { + break; + } + assert!(tx_next <= tx_end); assert!(self.needs_ack || tx_next > self.tx_send); @@ -1112,6 +1306,10 @@ impl TcpConnectionInner { .view(payload_start..payload_start + payload_len); sender.send_packet(&tcp, Some(payload)); + self.stats.pkts_tx_to_guest.increment(); + self.stats.bytes_tx_to_guest.add(payload_len as u64); + self.stats.tx_segment_size.add_sample(payload_len as u64); + self.stats.acks_tx.increment(); self.tx_send = tx_next; self.needs_ack = false; } @@ -1144,7 +1342,7 @@ impl TcpConnectionInner { /// unacceptable packet (duplicate, out of order, etc.). These acks /// shouldn't be combined with data so that they are interpreted correctly /// by the peer. - fn ack(&self, sender: &mut Sender<'_, impl Client>) { + fn ack(&mut self, sender: &mut Sender<'_, impl Client>) { let tcp = TcpRepr { src_port: sender.ft.dst.port(), dst_port: sender.ft.src.port(), @@ -1163,6 +1361,7 @@ impl TcpConnectionInner { trace_tcp_packet(sender.ft, &tcp, 0, "ack"); sender.send_packet(&tcp, None); + self.stats.acks_tx.increment(); } fn handle_listen_syn( @@ -1178,6 +1377,7 @@ impl TcpConnectionInner { let ack_number = tcp.ack_number.ok_or(TcpError::MissingAck)?; if ack_number <= self.tx_acked || ack_number > self.tx_send { sender.rst(ack_number, None); + self.stats.rsts_tx.increment(); return Ok(false); } self.tx_acked = ack_number; @@ -1237,11 +1437,13 @@ impl TcpConnectionInner { // This is a valid RST. Drop the connection. tracing::debug!("connection reset"); + self.last_close_reason = ConnectionCloseReason::PeerRst; return Ok(false); } // Send ack and drop packets with unacceptable sequence numbers. if !seq_acceptable { + self.stats.out_of_window_pkts.increment(); self.ack(sender); return Err(TcpError::Unacceptable.into()); } @@ -1266,6 +1468,7 @@ impl TcpConnectionInner { if self.state == TcpState::SynReceived { if ack_number <= self.tx_acked || ack_number > self.tx_send { sender.rst(ack_number, None); + self.stats.rsts_tx.increment(); return Ok(false); } self.tx_window_len = tcp.window_len; @@ -1290,7 +1493,10 @@ impl TcpConnectionInner { match self.state { TcpState::FinWait1 => self.state = TcpState::FinWait2, TcpState::Closing => self.state = TcpState::TimeWait, - TcpState::LastAck => return Ok(false), + TcpState::LastAck => { + self.last_close_reason = ConnectionCloseReason::Normal; + return Ok(false); + } _ => unreachable!(), } } @@ -1306,6 +1512,9 @@ impl TcpConnectionInner { self.tx_window_len = tcp.window_len; self.tx_window_rx_seq = tcp.seq_number; self.tx_window_tx_seq = ack_number; + // RFC 1323 §2.2: window scaling becomes active after the + // handshake. The SYN/SYN-ACK window field is unscaled. + self.tx_window_scale_active = true; } // Scope the data payload and FIN to the in-window portion of the segment. @@ -1330,6 +1539,11 @@ impl TcpConnectionInner { TcpState::Connecting | TcpState::SynReceived | TcpState::SynSent => unreachable!(), TcpState::Established | TcpState::FinWait1 | TcpState::FinWait2 => { if !payload.is_empty() || fin { + if !payload.is_empty() { + self.stats.pkts_rx_from_guest.increment(); + self.stats.bytes_rx_from_guest.add(payload.len() as u64); + self.stats.rx_segment_size.add_sample(payload.len() as u64); + } // Stage 1: Compute the byte offset from the contiguous // frontier. // diff --git a/vm/devices/net/net_consomme/consomme/src/tcp/tests.rs b/vm/devices/net/net_consomme/consomme/src/tcp/tests.rs index 2657462fce..8a8625c5e2 100644 --- a/vm/devices/net/net_consomme/consomme/src/tcp/tests.rs +++ b/vm/devices/net/net_consomme/consomme/src/tcp/tests.rs @@ -663,3 +663,482 @@ async fn test_tcp_bind_duplicate_port(driver: DefaultDriver) { "error should be PortAlreadyBound" ); } + +/// Test that deferred ACKs are flushed during poll cycles. +/// +/// When the guest sends data, the ACK is deferred (not emitted +/// immediately in the packet-processing path). On the next poll cycle, +/// the ACK must be flushed so the peer doesn't retransmit. +#[pal_async::async_test] +async fn test_tcp_deferred_ack_flush(driver: DefaultDriver) { + let mut h = TcpTestHarness::connect(driver).await; + + h.clear_guest_packets(); + + // Send data from the guest to the host. This triggers an ACK + // from consomme back to the guest, but via the deferred ACK + // mechanism it should be flushed during the poll cycle. + h.send_data_next(b"ping"); + + // Poll until the host receives the data (which exercises the + // poll cycle that should flush the deferred ACK). + let mut received = Vec::new(); + h.poll_until_host_read(&mut received, 4).await; + assert_eq!(received, b"ping"); + + // Verify that an ACK was sent back to the guest acknowledging + // the data (seq advanced past the SYN-ACK's ack). + let guest_seq_after = h.guest_seq; + let pkt = h + .poll_until_guest_packet(|t| t.ack_number.is_some_and(|ack| ack >= guest_seq_after)) + .await; + let (_, _, tcp) = parse_tcp_packet(&pkt); + assert!( + tcp.ack_number.unwrap() >= guest_seq_after, + "deferred ACK should acknowledge the guest data" + ); +} + +/// Test that a burst of guest packets produces a single consolidated ACK. +/// +/// The ACK deferral mechanism (AckPolicy::Defer in handle_tcp) prevents +/// emitting a pure ACK for every individual guest packet. Instead, a +/// single consolidated ACK covering the entire burst is sent during the +/// poll cycle (AckPolicy::Flush in poll_socket_backend). This test +/// verifies that sending N data segments back-to-back results in at most +/// one pure ACK rather than N pure ACKs. +#[pal_async::async_test] +async fn test_tcp_deferred_ack_batching(driver: DefaultDriver) { + let mut h = TcpTestHarness::connect(driver).await; + + // Complete the handshake poll cycle so any pending handshake ACK is + // flushed before we start counting. + std::future::poll_fn(|cx| { + h.consomme.access(&mut h.client).poll(cx); + Poll::Ready(()) + }) + .await; + h.clear_guest_packets(); + + // Send a burst of 5 data segments back-to-back. Each call to + // `send` invokes `handle_tcp` → `send_next(Defer)`, which should + // NOT emit a pure ACK. + for i in 0..5 { + let payload = format!("seg{i}"); + h.send_data_next(payload.as_bytes()); + } + + // At this point, no poll cycle has run, so no ACK should have been + // emitted yet — only the Defer path in handle_tcp was exercised. + let pure_acks_before_poll: usize = h + .client + .received_packets + .lock() + .iter() + .filter(|p| { + TcpTestHarness::is_tcp_packet(p).is_some_and(|t| { + t.payload.is_empty() && t.control == TcpControl::None && t.ack_number.is_some() + }) + }) + .count(); + assert_eq!( + pure_acks_before_poll, 0, + "no pure ACKs should be emitted during handle_tcp (Defer policy)" + ); + + // Now poll — this runs poll_socket_backend which flushes with + // AckPolicy::Flush, emitting at most one consolidated ACK. + let total_payload_len = "seg0seg1seg2seg3seg4".len(); + let mut received = Vec::new(); + h.poll_until_host_read(&mut received, total_payload_len) + .await; + assert_eq!(received, b"seg0seg1seg2seg3seg4"); + + // Count pure ACKs (no payload, no SYN/FIN) sent to the guest. + let pure_acks: Vec<_> = h + .client + .received_packets + .lock() + .iter() + .filter(|p| { + TcpTestHarness::is_tcp_packet(p).is_some_and(|t| { + t.payload.is_empty() && t.control == TcpControl::None && t.ack_number.is_some() + }) + }) + .cloned() + .collect(); + + // We expect exactly 1 consolidated ACK, not 5. + assert!( + pure_acks.len() <= 2, + "expected at most 2 pure ACKs for a 5-segment burst (got {}); \ + the deferred ACK mechanism should consolidate per-packet ACKs", + pure_acks.len() + ); + + // The consolidated ACK should acknowledge ALL 5 segments. + let final_guest_seq = h.guest_seq; + let last_ack = pure_acks.last().expect("should have at least one ACK"); + let (_, _, tcp) = parse_tcp_packet(last_ack); + assert!( + tcp.ack_number.unwrap() >= final_guest_seq, + "consolidated ACK should cover the entire burst: expected ack >= {}, got {}", + final_guest_seq.0, + tcp.ack_number.unwrap().0, + ); +} + +/// Test that window scaling is not applied to SYN-ACK window fields +/// but is applied after the handshake completes. +/// +/// RFC 1323 §2.2: the window scale option takes effect only after +/// the three-way handshake is complete. The SYN and SYN-ACK window +/// fields represent unscaled values. +#[pal_async::async_test] +async fn test_tcp_window_scale_activation(driver: DefaultDriver) { + let mut consomme = Consomme::new(ConsommeParams::new().unwrap()); + let mut client = TestClient::new(driver.clone()); + + let guest_mac = consomme.params_mut().client_mac; + let gateway_mac = consomme.params_mut().gateway_mac; + let guest_ip = consomme.params_mut().client_ip; + let dst_ip: Ipv4Address = Ipv4Addr::LOCALHOST; + let guest_port = 55555u16; + let guest_isn = TcpSeqNumber(2000); + let mut buf = vec![0u8; 1514]; + + let std_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let dst_port = std_listener.local_addr().unwrap().port(); + let mut listener = PolledSocket::new(&driver, std_listener).unwrap(); + + // Guest sends SYN with window_scale=7. + let syn = TcpRepr { + src_port: guest_port, + dst_port, + control: TcpControl::Syn, + seq_number: guest_isn, + ack_number: None, + window_len: 512, // Small unscaled window in the SYN + window_scale: Some(7), + max_seg_size: Some(1460), + sack_permitted: false, + sack_ranges: [None, None, None], + timestamp: None, + payload: &[], + }; + let len = build_tcp_packet(&mut buf, guest_mac, gateway_mac, guest_ip, dst_ip, &syn); + consomme + .access(&mut client) + .send(&buf[..len], &ChecksumState::NONE) + .unwrap(); + + // Poll until the host listener accepts and consomme sends SYN-ACK. + let received = client.received_packets.clone(); + let _host_stream = std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + let (stream, _) = std::task::ready!(listener.poll_accept(cx)).unwrap(); + Poll::Ready(PolledSocket::new(client.driver(), stream).unwrap()) + }) + .await; + + std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + let has_syn_ack = received.lock().iter().any(|p| { + TcpTestHarness::is_tcp_packet(p) + .is_some_and(|t| t.control == TcpControl::Syn && t.ack_number.is_some()) + }); + if has_syn_ack { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + // Extract the SYN-ACK and verify: + // 1. window_scale option is present (window scaling was negotiated) + // 2. window_len is the unscaled value (fits in u16 without shift) + let syn_ack_pkt = received + .lock() + .iter() + .find(|p| { + TcpTestHarness::is_tcp_packet(p) + .is_some_and(|t| t.control == TcpControl::Syn && t.ack_number.is_some()) + }) + .cloned() + .expect("should have received SYN-ACK"); + + let (_, _, syn_ack) = parse_tcp_packet(&syn_ack_pkt); + // The SYN-ACK must include window_scale option since the SYN had one. + assert!( + syn_ack.window_scale.is_some(), + "SYN-ACK should include window_scale option when SYN had one" + ); + let syn_ack_window_scale = syn_ack.window_scale.unwrap(); + // The window_len in SYN-ACK is the unscaled value — it represents + // the actual receive window without any shift applied. Verify that + // the effective (scaled) window represents a valid receive buffer + // size (between 16KB min and 4MB max per the clamp in new_base). + // If the SYN-ACK window field were incorrectly pre-scaled, the + // effective value would be unreasonably large. + let effective_rx_window = (syn_ack.window_len as usize) << syn_ack_window_scale; + assert!( + (16384..=4 * 1024 * 1024).contains(&effective_rx_window), + "SYN-ACK effective window (unscaled={}, scale={}, effective={}) \ + should represent a valid receive buffer size", + syn_ack.window_len, + syn_ack_window_scale, + effective_rx_window, + ); + + // Now complete the handshake with an ACK that has a small window. + // This exercises the post-handshake path where window scaling IS applied. + let server_ack = syn_ack.seq_number + 1; + let guest_seq = guest_isn + 1; + let ack = TcpRepr { + src_port: guest_port, + dst_port, + control: TcpControl::None, + seq_number: guest_seq, + ack_number: Some(server_ack), + // Advertise a small unscaled window value. With scale=7, the + // effective window should be 100 << 7 = 12800 bytes. + window_len: 100, + window_scale: None, + max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], + timestamp: None, + payload: &[], + }; + let len = build_tcp_packet(&mut buf, guest_mac, gateway_mac, guest_ip, dst_ip, &ack); + consomme + .access(&mut client) + .send(&buf[..len], &ChecksumState::NONE) + .unwrap(); + + // Poll to process the ACK (completing the handshake). + std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + Poll::Ready(()) + }) + .await; + + // Verify window scaling is applied after the handshake by checking + // internal connection state. The guest advertised window_len=100 with + // scale=7, so the effective tx window should be 100 << 7 = 12800. + let ft = FourTuple { + src: SocketAddr::V4(SocketAddrV4::new(guest_ip, guest_port)), + dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, dst_port)), + }; + let conn = consomme + .tcp + .connections + .get(&ft) + .expect("connection should exist"); + assert!( + conn.inner.tx_window_scale_active, + "window scaling should be active after handshake completes" + ); + assert_eq!(conn.inner.tx_window_scale, 7); + assert_eq!(conn.inner.tx_window_len, 100); + // The effective window used for send decisions: + let effective_window = (conn.inner.tx_window_len as usize) << conn.inner.tx_window_scale; + assert_eq!( + effective_window, 12800, + "effective window should be window_len << scale after handshake" + ); +} + +/// Test that the host-initiated (port-forward) path does NOT apply window +/// scaling to the SYN-ACK window field. +/// +/// In the port-forward path, consomme sends a SYN to the guest, and the +/// guest replies with a SYN-ACK whose window field is unscaled per +/// RFC 1323 §2.2. After `handle_listen_syn` stores this unscaled window +/// and transitions to Established, `tx_window_scale_active` must remain +/// false until the guest sends a non-SYN segment that triggers the +/// "Update send window" block in `handle_packet`. This prevents +/// consomme from sending beyond the guest's actual receive window. +#[pal_async::async_test] +async fn test_tcp_port_forward_window_scale_guard(driver: DefaultDriver) { + use std::io::Write; + + let mut consomme = Consomme::new(ConsommeParams::new().unwrap()); + let mut client = TestClient::new(driver.clone()); + + let guest_mac = consomme.params_mut().client_mac; + let gateway_mac = consomme.params_mut().gateway_mac; + let guest_ip = consomme.params_mut().client_ip; + let dst_ip: Ipv4Address = Ipv4Addr::LOCALHOST; + let guest_port = 7777u16; + let received = client.received_packets.clone(); + let mut buf = vec![0u8; 1514]; + + // Set up a port-forward listener. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap(); + socket + .bind(&SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()) + .unwrap(); + let host_addr = socket.local_addr().unwrap().as_socket().unwrap(); + + consomme + .access(&mut client) + .bind_tcp_port(socket, guest_port) + .expect("bind should succeed"); + + // Connect from the host side to trigger the port-forward SYN. + let mut connector = std::net::TcpStream::connect(host_addr).unwrap(); + connector.set_nonblocking(true).unwrap(); + + // Poll until consomme sends a SYN to the guest. + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + loop { + std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + Poll::Ready(()) + }) + .await; + + let has_syn = received.lock().iter().any(|p| { + TcpTestHarness::is_tcp_packet(p) + .is_some_and(|t| t.control == TcpControl::Syn && t.dst_port == guest_port) + }); + if has_syn { + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for SYN" + ); + pal_async::timer::PolledTimer::new(&driver) + .sleep(std::time::Duration::from_millis(10)) + .await; + } + + // Extract the SYN from consomme (which includes window_scale option). + let syn_pkt = received + .lock() + .iter() + .find(|p| { + TcpTestHarness::is_tcp_packet(p) + .is_some_and(|t| t.control == TcpControl::Syn && t.dst_port == guest_port) + }) + .cloned() + .expect("should have SYN"); + let (_, _, syn_tcp) = parse_tcp_packet(&syn_pkt); + let server_isn = syn_tcp.seq_number; + let server_window_scale = syn_tcp.window_scale.unwrap_or(0); + assert!( + server_window_scale > 0, + "server should offer window scaling" + ); + + // Guest replies with SYN-ACK. Advertise a small unscaled window (200 + // bytes) with window_scale=7 offered. The SYN-ACK window is unscaled + // per RFC 1323, so consomme must treat 200 as the actual byte limit + // until the first post-handshake window update. + let guest_isn = TcpSeqNumber(5000); + let syn_ack = TcpRepr { + src_port: guest_port, + dst_port: syn_tcp.src_port, + control: TcpControl::Syn, + seq_number: guest_isn, + ack_number: Some(server_isn + 1), + window_len: 200, // Unscaled: actual receive window is 200 bytes + window_scale: Some(7), + max_seg_size: Some(1460), + sack_permitted: false, + sack_ranges: [None, None, None], + timestamp: None, + payload: &[], + }; + let len = build_tcp_packet(&mut buf, guest_mac, gateway_mac, guest_ip, dst_ip, &syn_ack); + consomme + .access(&mut client) + .send(&buf[..len], &ChecksumState::NONE) + .unwrap(); + + // Poll to let consomme process the SYN-ACK (handle_listen_syn). + std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + Poll::Ready(()) + }) + .await; + + // Verify internal state: tx_window_scale_active should be FALSE + // because handle_listen_syn doesn't activate it. + let ft = FourTuple { + src: SocketAddr::V4(SocketAddrV4::new(guest_ip, guest_port)), + dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, syn_tcp.src_port)), + }; + let conn = consomme + .tcp + .connections + .get(&ft) + .expect("connection should exist after SYN-ACK"); + assert!( + !conn.inner.tx_window_scale_active, + "tx_window_scale_active must be false after handle_listen_syn; \ + the SYN-ACK window is unscaled" + ); + assert_eq!(conn.inner.tx_window_len, 200); + assert_eq!(conn.inner.tx_window_scale, 7); + + // Write more data than 200 bytes from the host side. If window + // scaling were incorrectly applied, consomme would treat the window + // as 200 << 7 = 25600 bytes and send all of it. With the guard, + // it should only send up to 200 bytes. + let host_data = vec![0xABu8; 1000]; + connector.write_all(&host_data).unwrap(); + + // Clear received packets so we only see new data segments. + received.lock().clear(); + + // Poll multiple cycles to let consomme read from the host socket + // and send data to the guest. The host socket needs to become + // readable first. + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + loop { + std::future::poll_fn(|cx| { + consomme.access(&mut client).poll(cx); + Poll::Ready(()) + }) + .await; + + let has_data = received.lock().iter().any(|p| { + TcpTestHarness::is_tcp_packet(p) + .is_some_and(|t| t.dst_port == guest_port && !t.payload.is_empty()) + }); + if has_data { + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for host→guest data" + ); + pal_async::timer::PolledTimer::new(&driver) + .sleep(std::time::Duration::from_millis(10)) + .await; + } + + // Count total payload bytes sent to the guest. With the unscaled + // window of 200, consomme should send at most 200 bytes. + let total_payload_sent: usize = received + .lock() + .iter() + .filter_map(|p| TcpTestHarness::is_tcp_packet(p)) + .filter(|t| t.dst_port == guest_port && !t.payload.is_empty()) + .map(|t| t.payload.len()) + .sum(); + + assert!( + total_payload_sent <= 200, + "with unscaled SYN-ACK window of 200, consomme should send at most \ + 200 bytes before window scaling is activated, but sent {total_payload_sent}" + ); + assert!( + total_payload_sent > 0, + "consomme should send at least some data" + ); +} diff --git a/vm/devices/net/net_consomme/src/lib.rs b/vm/devices/net/net_consomme/src/lib.rs index 7ca213d766..42e256b09d 100644 --- a/vm/devices/net/net_consomme/src/lib.rs +++ b/vm/devices/net/net_consomme/src/lib.rs @@ -258,6 +258,7 @@ impl net_backend::Endpoint for ConsommeEndpoint { rx_ready: VecDeque::new(), tx_avail: VecDeque::new(), tx_ready: VecDeque::new(), + tx_scratch: Vec::new(), }, stats: Default::default(), driver: config.driver, @@ -451,7 +452,10 @@ impl net_backend::Queue for ConsommeQueue { .then_some(meta.max_segment_size), }; - let mut buf = vec![0; meta.len as usize]; + // Reuse the scratch buffer to avoid per-packet heap allocation. + let mut buf = std::mem::take(&mut self.state.tx_scratch); + buf.clear(); + buf.resize(meta.len as usize, 0); let gm = pool.guest_memory(); let mut offset = 0; for segment in self.state.tx_avail.drain(..meta.segment_count as usize) { @@ -484,6 +488,7 @@ impl net_backend::Queue for ConsommeQueue { | consomme::DropReason::MalformedPacket => self.stats.tx_errors.increment(), } } + self.state.tx_scratch = buf; self.state.tx_ready.push_back(tx_id); } @@ -546,6 +551,9 @@ struct QueueState { rx_ready: VecDeque, tx_avail: VecDeque, tx_ready: VecDeque, + /// Reusable scratch buffer for assembling outbound packets from guest memory. + /// The max TSO size is 64KB which limits the maximum size of the scratch buffer. + tx_scratch: Vec, } #[derive(Inspect, Default)]