Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 101 additions & 41 deletions vm/devices/net/netvsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ enum CoordinatorMessage {
Update(CoordinatorMessageUpdateType),
/// Restart endpoints and resume processing. This will also attempt to set VF and data path state to match current
/// expectations.
Restart,
/// Identifies the channel that requested the restart. 0 = primary; >0 = sub-channel
Restart { channel_idx: u16 },
/// Start a timer.
StartTimer(Instant),
}
Expand Down Expand Up @@ -1547,7 +1548,9 @@ impl Nic {
driver_builder.run_on_target(!self.adapter.tx_fast_completions);

#[expect(clippy::disallowed_methods)] // TODO
let (send, recv) = mpsc::channel(1);
// Capacity equals the number of workers.
// Each channel (primary or subchannel) can send at most one message.
let (send, recv) = mpsc::channel(self.adapter.max_queues as usize);
self.coordinator_send = Some(send);
self.coordinator.insert(
&self.adapter.driver,
Expand Down Expand Up @@ -3065,7 +3068,7 @@ impl<T: RingMem> NetChannel<T> {
// Restart the endpoint if the OID changed some critical
// endpoint property.
if restart_endpoint {
self.restart = Some(CoordinatorMessage::Restart);
self.restart = Some(CoordinatorMessage::Restart { channel_idx: 0 });
}
if let Some(filter) = packet_filter {
if self.packet_filter != filter {
Expand Down Expand Up @@ -3267,7 +3270,7 @@ impl<T: RingMem> NetChannel<T> {
guest_vf_state: guest_vf,
filter_state: packet_filter,
}));
} else if let Some(CoordinatorMessage::Restart) = self.restart {
} else if let Some(CoordinatorMessage::Restart { .. }) = self.restart {
// If a restart message is pending, do nothing.
// A restart will try to switch the data path based on primary.guest_vf_state.
// A restart will apply packet filter changes.
Expand Down Expand Up @@ -4051,30 +4054,14 @@ impl Coordinator {
state: &mut CoordinatorState,
) -> Result<(), task_control::Cancelled> {
loop {
// Drain any messages already queued on `recv` to decide
// whether to run the restart cycle.
self.drain_pending_messages(state).await;

// If anything (primary, sub, or endpoint) requested a restart, do
// it. Then loop back to re-drain in case more messages arrived.
if self.restart {
stop.until_stopped(self.stop_workers()).await?;
// The queue restart operation is not restartable, so do not
// poll on `stop` here.
if let Err(err) = self
.restart_queues(state)
.instrument(tracing::info_span!("netvsp_restart_queues"))
.await
{
tracing::error!(
error = &err as &dyn std::error::Error,
"failed to restart queues"
);
}
if let Some(primary) = self.primary_mut() {
primary.is_data_path_switched =
state.endpoint.get_data_path_to_guest_vf().await.ok();
tracing::info!(
is_data_path_switched = primary.is_data_path_switched,
"Query data path state"
);
}
self.restore_guest_vf_state(state).await;
self.restart = false;
self.run_restart_cycle(stop, state).await?;
}

// Ensure that all workers except the primary are started. The
Expand Down Expand Up @@ -4228,10 +4215,70 @@ impl Coordinator {
Ok(())
}

/// Called at the top of each iteration of [`Self::process`] loop.
/// Coalesce Sub-channel restart requests into `self.restart = true`.
/// Any Primary message landing concurrently with a `Restart` is
/// handled prior to running the restart cycle.
async fn drain_pending_messages(&mut self, state: &mut CoordinatorState) {
while let Ok(Some(msg)) = self.recv.try_next() {
self.handle_coordinator_message(msg, state).await;
}
Comment on lines +4218 to +4225
}

/// Called from the [`Self::process`] loop when a `Restart` message has
/// been observed from any channel.
async fn run_restart_cycle(
&mut self,
stop: &mut StopTask<'_>,
state: &mut CoordinatorState,
) -> Result<(), task_control::Cancelled> {
stop.until_stopped(self.stop_workers()).await?;

// All workers are stopped and cannot push new messages.
// Drain any messages that arrived prior to the stop.
self.drain_pending_messages(state).await;

// The queue restart operation is not restartable; do not poll on stop here.
if let Err(err) = self
.restart_queues(state)
.instrument(tracing::info_span!("netvsp_restart_queues"))
.await
{
tracing::error!(
error = &err as &dyn std::error::Error,
"failed to restart queues"
);
}
if let Some(primary) = self.primary_mut() {
primary.is_data_path_switched = state.endpoint.get_data_path_to_guest_vf().await.ok();
tracing::info!(
is_data_path_switched = primary.is_data_path_switched,
"Query data path state"
);
}
self.restore_guest_vf_state(state).await;
self.restart = false;
Ok(())
}

async fn handle_coordinator_message(
&mut self,
msg: CoordinatorMessage,
state: &mut CoordinatorState,
) {
match msg {
CoordinatorMessage::Restart { channel_idx } if channel_idx != 0 => {
tracing::info!(channel_idx, "sub-channel triggered restart");
self.restart = true;
}
_ => self.handle_primary_message(msg, state).await,
}
}

async fn handle_primary_message(
&mut self,
msg: CoordinatorMessage,
state: &mut CoordinatorState,
) {
self.workers[0].stop().await;
if let Some(worker) = self.workers[0].state_mut() {
Expand Down Expand Up @@ -4269,7 +4316,11 @@ impl Coordinator {
CoordinatorMessage::StartTimer(deadline) => {
self.sleep_deadline = Some(deadline);
}
CoordinatorMessage::Restart => self.restart = true,
CoordinatorMessage::Restart { channel_idx } => {
assert_eq!(channel_idx, 0);
tracing::info!(channel_idx, "primary-channel triggered restart");
self.restart = true;
}
}
}

Expand Down Expand Up @@ -4806,7 +4857,9 @@ impl<T: RingMem + 'static> Worker<T> {
};

// Wake up the coordinator task to start the queues.
let _ = self.coordinator_send.try_send(CoordinatorMessage::Restart);
let _ = self
.coordinator_send
.try_send(CoordinatorMessage::Restart { channel_idx: 0 });

tracelimit::info_ratelimited!("network initialized");
self.state = WorkerState::WaitingForCoordinator(Some(state));
Expand Down Expand Up @@ -4846,23 +4899,30 @@ impl<T: RingMem + 'static> Worker<T> {
Err(WorkerError::EndpointRequiresQueueRestart(err)) => {
tracelimit::warn_ratelimited!(
err = err.as_ref() as &dyn std::error::Error,
channel_idx = self.channel_idx,
"Endpoint requires queues to restart",
);
CoordinatorMessage::Restart
CoordinatorMessage::Restart {
channel_idx: self.channel_idx,
}
}
Err(err) => return Err(err),
};

let WorkerState::Ready(ready) = std::mem::replace(
&mut self.state,
WorkerState::WaitingForCoordinator(None),
) else {
unreachable!("must be running in ready state")
};
let _ = std::mem::replace(
&mut self.state,
WorkerState::WaitingForCoordinator(Some(ready)),
);
// Only the Primary channel transitions to `WaitingForCoordinator`.
// Sub-channels stay in `Ready(_)`.
if self.channel_idx == 0 {
let WorkerState::Ready(ready) = std::mem::replace(
&mut self.state,
WorkerState::WaitingForCoordinator(None),
) else {
unreachable!("must be running in ready state")
};
let _ = std::mem::replace(
&mut self.state,
WorkerState::WaitingForCoordinator(Some(ready)),
);
}
self.coordinator_send
.try_send(msg)
.map_err(WorkerError::CoordinatorMessageSendFailed)?;
Expand Down Expand Up @@ -5637,7 +5697,7 @@ impl<T: 'static + RingMem> NetChannel<T> {
let primary = state.primary.as_mut().unwrap();
primary.requested_num_queues = subchannel_count as u16 + 1;
primary.tx_spread_sent = false;
self.restart = Some(CoordinatorMessage::Restart);
self.restart = Some(CoordinatorMessage::Restart { channel_idx: 0 });
}
}
PacketData::RevokeReceiveBuffer(protocol::Message1RevokeReceiveBuffer { id })
Expand Down
Loading
Loading