From 4761a408a83566e0448d5939aded9b06a9c39911 Mon Sep 17 00:00:00 2001 From: erfrimod Date: Thu, 21 May 2026 14:33:11 -0700 Subject: [PATCH 1/2] handle guest-triggered workererrors --- vm/devices/net/netvsp/src/lib.rs | 172 ++++++++++++++++++++++++------- 1 file changed, 136 insertions(+), 36 deletions(-) diff --git a/vm/devices/net/netvsp/src/lib.rs b/vm/devices/net/netvsp/src/lib.rs index d6fc9e62a2..07ea5c163b 100644 --- a/vm/devices/net/netvsp/src/lib.rs +++ b/vm/devices/net/netvsp/src/lib.rs @@ -2018,8 +2018,6 @@ enum WorkerError { Queue(#[from] queue::Error), #[error("too many control messages")] TooManyControlMessages, - #[error("invalid rndis packet completion")] - InvalidRndisPacketCompletion, #[error("missing transaction id")] MissingTransactionId, #[error("invalid gpadl")] @@ -2052,6 +2050,8 @@ enum WorkerError { CoordinatorMessageSendFailed(#[source] TrySendError), #[error("invalid rx buffer configuration: {0}")] InvalidRxBufferConfig(#[source] RxBufferConfigError), + #[error("attempt to notify guest of VTL0 VF failed")] + GuestVfNotificationFailed, } impl From for WorkerError { @@ -2096,8 +2096,6 @@ enum PacketOrderError { SendReceiveBufferMissingMTU, #[error("SendSendBuffer already exists")] SendSendBufferExists, - #[error("SwitchDataPathCompletion during PrimaryChannelState")] - SwitchDataPathCompletionPrimaryChannelState, } #[derive(Debug)] @@ -2996,28 +2994,46 @@ impl NetChannel { )?; self.send_rndis_control_message(buffers, id, message_length)?; if let PrimaryChannelGuestVfState::Available { vfid } = primary.guest_vf_state { - if self.guest_vf_is_available( + // Init complete has been sent to the guest. + // Make a best-effort attempt to notify the guest the VF is available. + match self.guest_vf_is_available( Some(vfid), buffers.version, buffers.ndis_config, - )? { - // Ideally the VF would not be presented to the guest - // until the completion packet has arrived, so that the - // guest is prepared. This is most interesting for the - // case of a VF associated with multiple guest - // adapters, using a concept like vports. In this - // scenario it would be better if all of the adapters - // were aware a VF was available before the device - // arrived. This is not currently possible because the - // Linux netvsc driver ignores the completion requested - // flag on inband packets and won't send a completion - // packet. - primary.guest_vf_state = PrimaryChannelGuestVfState::AvailableAdvertised; - self.send_coordinator_update_vf(); - } else if let Some(true) = primary.is_data_path_switched { - tracing::error!( - "Data path switched, but current guest negotiation does not support VTL0 VF" - ); + ) { + Ok(true) => { + // Ideally the VF would not be presented to the guest + // until the completion packet has arrived, so that the + // guest is prepared. This is most interesting for the + // case of a VF associated with multiple guest + // adapters, using a concept like vports. In this + // scenario it would be better if all of the adapters + // were aware a VF was available before the device + // arrived. This is not currently possible because the + // Linux netvsc driver ignores the completion requested + // flag on inband packets and won't send a completion + // packet. + primary.guest_vf_state = + PrimaryChannelGuestVfState::AvailableAdvertised; + self.send_coordinator_update_vf(); + } + Ok(false) => { + if let Some(true) = primary.is_data_path_switched { + tracing::error!( + "Data path switched, but current guest negotiation does not support VTL0 VF" + ); + } + } + Err(err) => { + tracelimit::error_ratelimited!( + error = &err as &dyn std::error::Error, + vfid, + buffers_version = ?buffers.version, + buffers_ndis_config = ?buffers.ndis_config, + "failed to notify guest about VF availability post initialization" + ); + return Err(WorkerError::GuestVfNotificationFailed); + } } } } @@ -3236,13 +3252,35 @@ impl NetChannel { if let Some(message) = primary.control_messages.pop_front() { primary.control_messages_len -= message.data.len(); - self.handle_rndis_control_message( + if let Err(err) = self.handle_rndis_control_message( primary, buffers, message.message_type, message.data.as_ref(), id.0, - )?; + ) { + // For most message types, a failure indicates that no completion + // response has been sent to the guest. Either the error occurred + // before the response was sent, or the send itself failed. + // The guest will have no indication the message was processed, + // so we free the buffers here. + // For INITIALIZE messages, after sending a successful completion, + // it may send the guest a VF availability notification, which can + // fail. The guest is still expected to send a packet completion, + // which will free the buffers. If the failure came from the VF + // notification, it has already been traced. + let response_sent = matches!(&err, WorkerError::GuestVfNotificationFailed); + if !response_sent { + tracelimit::error_ratelimited!( + error = &err as &dyn std::error::Error, + message_type = message.message_type, + "failed to handle RNDIS control message, skipping" + ); + + drop(state.rx_bufs.free(id.0)); + primary.free_control_buffers.push(id); + } + } } else if primary.pending_offload_change && primary.rndis_state == RndisState::Operational { @@ -4882,8 +4920,16 @@ impl NetChannel { ) -> Result>, WorkerError> { let (mut read, _) = self.queue.split(); let packet = match read.try_read() { - Ok(packet) => parse_packet(&packet, send_buffer, version, external_data) - .map_err(WorkerError::Packet)?, + Ok(packet) => match parse_packet(&packet, send_buffer, version, external_data) { + Ok(p) => p, + Err(err) => { + tracelimit::error_ratelimited!( + error = &err as &dyn std::error::Error, + "failed to parse packet, skipping" + ); + return Ok(None); + } + }, Err(queue::TryReadError::Empty) => return Ok(None), Err(queue::TryReadError::Queue(err)) => return Err(err.into()), }; @@ -5024,12 +5070,37 @@ impl NetChannel { let sub_allocation_size = sub_allocation_size_for_mtu(mtu); - let recv_buffer = ReceiveBuffer::new( + let recv_buffer = match ReceiveBuffer::new( &self.gpadl_map, message.gpadl_handle, message.id, sub_allocation_size, - )?; + ) { + Ok(buf) => buf, + Err(err) => { + tracelimit::error_ratelimited!( + error = &err as &dyn std::error::Error, + "failed to set up receive buffer" + ); + self.send_completion( + packet.transaction_id, + Some(&self.message( + protocol::MESSAGE1_TYPE_SEND_RECEIVE_BUFFER_COMPLETE, + protocol::Message1SendReceiveBufferComplete { + status: protocol::Status::FAILURE, + num_sections: 0, + sections: [protocol::ReceiveBufferSection { + offset: 0, + sub_allocation_size: 0, + num_sub_allocations: 0, + end_offset: 0, + }], + }, + )), + )?; + continue; + } + }; self.send_completion( packet.transaction_id, @@ -5057,7 +5128,27 @@ impl NetChannel { )); } - let send_buffer = SendBuffer::new(&self.gpadl_map, message.gpadl_handle)?; + let send_buffer = + match SendBuffer::new(&self.gpadl_map, message.gpadl_handle) { + Ok(buf) => buf, + Err(err) => { + tracelimit::error_ratelimited!( + error = &err as &dyn std::error::Error, + "failed to set up send buffer" + ); + self.send_completion( + packet.transaction_id, + Some(&self.message( + protocol::MESSAGE1_TYPE_SEND_SEND_BUFFER_COMPLETE, + protocol::Message1SendSendBufferComplete { + status: protocol::Status::FAILURE, + section_size: 0, + }, + )), + )?; + continue; + } + }; self.send_completion( packet.transaction_id, Some(&self.message( @@ -5582,7 +5673,7 @@ impl NetChannel { } PacketData::RndisPacketComplete(_completion) => { data.rx_done.clear(); - state + if state .release_recv_buffers( packet .transaction_id @@ -5590,7 +5681,19 @@ impl NetChannel { &queue_state.rx_buffer_range, &mut data.rx_done, ) - .ok_or(WorkerError::InvalidRndisPacketCompletion)?; + .is_none() + { + // Transaction ID did not map to a valid receive + // buffer, so nothing was freed. Duplicate or invalid + // completions are traced. If a guest fails to send + // valid completions, it may eventually starve itself + // of available receive buffers. + tracelimit::error_ratelimited!( + transaction_id = packet.transaction_id, + "invalid RNDIS packet completion from guest, skipping" + ); + continue; + } queue_state .queue .rx_avail(&mut queue_state.pool, &data.rx_done); @@ -5674,10 +5777,7 @@ impl NetChannel { )?; } p => { - tracing::warn!(request = ?p, "unexpected packet"); - return Err(WorkerError::UnexpectedPacketOrder( - PacketOrderError::SwitchDataPathCompletionPrimaryChannelState, - )); + tracelimit::warn_ratelimited!(request = ?p, "unexpected packet, skipping"); } } } From dca3f071fcb754e537aa5bf72698880450063947 Mon Sep 17 00:00:00 2001 From: erfrimod Date: Thu, 21 May 2026 16:32:43 -0700 Subject: [PATCH 2/2] pr feedback --- vm/devices/net/netvsp/src/lib.rs | 34 +++++++++++++++----- vm/devices/net/netvsp/src/test.rs | 52 +++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/vm/devices/net/netvsp/src/lib.rs b/vm/devices/net/netvsp/src/lib.rs index 07ea5c163b..7153875787 100644 --- a/vm/devices/net/netvsp/src/lib.rs +++ b/vm/devices/net/netvsp/src/lib.rs @@ -2123,6 +2123,16 @@ struct Packet<'a> { external_data: &'a MultiPagedRangeBuf, } +/// Outcome of [`NetChannel::try_next_packet`]. +enum TryNextPacket<'a> { + /// A packet was read and parsed successfully. + Packet(Packet<'a>), + /// A packet was read but parsing failed. + Skipped, + /// The ring buffer is empty. + Empty, +} + type PacketReader<'a> = PagedRangesReader<'a, MultiPagedRangeIter<'a>>; impl Packet<'_> { @@ -4912,12 +4922,15 @@ impl Worker { } impl NetChannel { + /// Attempts to read the next packet from the ring buffer. + /// Possible returns are valid packets, malformed packets + /// that were skipped, empty ring buffers, or WorkerErrors. fn try_next_packet<'a>( &mut self, send_buffer: Option<&SendBuffer>, version: Option, external_data: &'a mut MultiPagedRangeBuf, - ) -> Result>, WorkerError> { + ) -> Result, WorkerError> { let (mut read, _) = self.queue.split(); let packet = match read.try_read() { Ok(packet) => match parse_packet(&packet, send_buffer, version, external_data) { @@ -4927,15 +4940,15 @@ impl NetChannel { error = &err as &dyn std::error::Error, "failed to parse packet, skipping" ); - return Ok(None); + return Ok(TryNextPacket::Skipped); } }, - Err(queue::TryReadError::Empty) => return Ok(None), + Err(queue::TryReadError::Empty) => return Ok(TryNextPacket::Empty), Err(queue::TryReadError::Queue(err)) => return Err(err.into()), }; tracing::trace!(target: "netvsp/vmbus", data = ?packet.data, "incoming vmbus packet"); - Ok(Some(packet)) + Ok(TryNextPacket::Packet(packet)) } async fn next_packet<'a>( @@ -5639,14 +5652,19 @@ impl NetChannel { if state.free_tx_packets.is_empty() { break; } - let packet = if let Some(packet) = self.try_next_packet( + let packet = match self.try_next_packet( buffers.send_buffer.as_ref(), Some(buffers.version), &mut data.external_data, )? { - packet - } else { - break; + TryNextPacket::Packet(packet) => packet, + TryNextPacket::Skipped => { + // A malformed packet was consumed; keep processing and + // count the wake as having done work. + did_some_work = true; + continue; + } + TryNextPacket::Empty => break, }; did_some_work = true; diff --git a/vm/devices/net/netvsp/src/test.rs b/vm/devices/net/netvsp/src/test.rs index ad0f1a2902..030230aebe 100644 --- a/vm/devices/net/netvsp/src/test.rs +++ b/vm/devices/net/netvsp/src/test.rs @@ -6700,6 +6700,58 @@ async fn rndis_rx_vlan_preserves_packet_data(driver: DefaultDriver) { ); } +#[async_test] +async fn rndis_invalid_packet_complete(driver: DefaultDriver) { + let endpoint_state = TestNicEndpointState::new(); + let endpoint = TestNicEndpoint::new(Some(endpoint_state.clone())); + let nic = Nic::builder().build( + &VmTaskDriverSource::new(SingleDriverBackend::new(driver.clone())), + Guid::new_random(), + Box::new(endpoint), + [1, 2, 3, 4, 5, 6].into(), + 0, + ); + + let mut nic_dev = TestNicDevice::new_with_nic(&driver, nic).await; + nic_dev.start_vmbus_channel(); + let mut channel = nic_dev.connect_vmbus_channel().await; + channel + .initialize(0, protocol::NdisConfigCapabilities::new()) + .await; + initialize_rndis_for_rx(&mut channel).await; + + // Send a completion for a transaction_id that was never allocated. + channel + .write(OutgoingPacket { + transaction_id: u32::MAX as u64, + packet_type: OutgoingPacketType::Completion, + payload: &NvspMessage { + header: protocol::MessageHeader { + message_type: protocol::MESSAGE1_TYPE_SEND_RNDIS_PACKET_COMPLETE, + }, + data: protocol::Message1SendRndisPacketComplete { + status: protocol::Status::SUCCESS, + }, + padding: &[], + } + .payload(), + }) + .await; + + // Inject an RX packet. Should be able to read the packet from the guest channel. + { + let locked_state = endpoint_state.lock(); + locked_state.send_rx(0, vec![0xAA; 60]); + } + channel + .read_with(|packet| match packet { + IncomingPacket::Data(_) => {} + _ => panic!("Unexpected packet type on RX"), + }) + .await + .expect("worker should keep processing after invalid completion"); +} + /// Helper: builds an RSS-enable parameter block that the set_rss_parameter /// OID path accepts. fn build_rss_enable_params() -> Vec {