diff --git a/nexus/db-queries/src/db/datastore/multicast/members.rs b/nexus/db-queries/src/db/datastore/multicast/members.rs index 949c4703405..a9ecce508e4 100644 --- a/nexus/db-queries/src/db/datastore/multicast/members.rs +++ b/nexus/db-queries/src/db/datastore/multicast/members.rs @@ -39,9 +39,8 @@ use crate::db::pagination::paginated; /// - **SSM (232.0.0.0/8, ff3x::/32)**: Always use `specific_sources` per RFC 4607. /// The `has_any_source_member` flag is ignored because API validation /// prevents SSM joins without sources. -/// - **ASM**: Currently always passes `None` to DPD (Dendrite doesn't support -/// ASM filtering yet). TODO: if `has_any_source_member` is true, skip -/// switch-level filtering; otherwise use `specific_sources`. +/// - **ASM**: If `has_any_source_member` is true, passes `None` to DPD +/// (no switch-level filtering). Otherwise uses `specific_sources`. /// - **OPTE**: Always uses per-member source lists for fine-grained filtering, /// regardless of switch-level behavior. /// @@ -57,8 +56,8 @@ pub struct SourceFilterState { /// True if any member has empty `source_ips` (wants any source). /// - /// For ASM groups: currently unused (Dendrite doesn't support ASM filtering). - /// TODO: when true, switch-level filtering will be disabled. + /// For ASM groups: when true, switch-level source filtering is disabled + /// (sources passed as `None` to Dendrite). /// For SSM groups: ignored per RFC 4607 (API validation prevents SSM joins /// without sources). pub has_any_source_member: bool, diff --git a/nexus/src/app/background/tasks/multicast/groups.rs b/nexus/src/app/background/tasks/multicast/groups.rs index 1e30caf3394..933acc91242 100644 --- a/nexus/src/app/background/tasks/multicast/groups.rs +++ b/nexus/src/app/background/tasks/multicast/groups.rs @@ -109,21 +109,19 @@ fn dpd_state_matches_tag( dpd_group: &dpd_client::types::MulticastGroupExternalResponse, db_group: &MulticastGroup, ) -> bool { - match (&dpd_group.tag, &db_group.tag) { - (dpd_tag, Some(db_tag)) => dpd_tag == db_tag, - _ => false, + match &db_group.tag { + Some(db_tag) => dpd_group.tag.as_str() == db_tag, + None => false, } } /// Check if DPD sources match the expected state based on source filter. /// /// Source filtering logic per RFC 4607 (mirrors dataplane code): -/// - SSM (232/8, ff3x::/32): MUST have specific sources. `has_any_source_member` -/// is ignored because API validation prevents SSM joins without sources. -/// - ASM: Currently expects `None` (Dendrite doesn't support ASM filtering yet). -/// -/// TODO: Once Dendrite accepts ASM source filtering, enable it for ASM groups -/// where `has_any_source_member=false`. +/// - SSM (232/8, ff3x::/32): always expect specific sources. API validation +/// prevents SSM joins without sources. +/// - ASM: expect specific sources when all members specify sources, +/// otherwise `None` to allow any source at the switch level. fn dpd_state_matches_sources( dpd_group: &dpd_client::types::MulticastGroupExternalResponse, source_filter: &SourceFilterState, @@ -134,12 +132,11 @@ fn dpd_state_matches_sources( // Expected DPD state based on source filter logic (RFC 4607) let expected_sources = if is_ssm_address(group_ip) { - // SSM: always expect specific sources Some(&source_filter.specific_sources) - } else { - // ASM: Dendrite doesn't support ASM filtering yet - // TODO: check `has_any_source_member` to enable/disable filtering + } else if source_filter.has_any_source_member { None + } else { + Some(&source_filter.specific_sources) }; match (dpd_sources, expected_sources) { @@ -152,7 +149,7 @@ fn dpd_state_matches_sources( .into_iter() .filter_map(|src| match src { dpd_client::types::IpSrc::Exact(ip) => Some(ip), - _ => None, // Subnet matching removed in follow-up Dendrite TODO + _ => None, }) .collect(); dpd_ips.sort(); @@ -1051,8 +1048,8 @@ mod tests { #[test] fn test_dpd_state_matches_sources_asm_address() { - // ASM address (not 232.x.x.x) - should always expect None from DPD - // regardless of specific_sources (Dendrite limitation, see TODO) + // ASM address with all members specifying sources: expect those + // sources in DPD. let source_filter = SourceFilterState { specific_sources: BTreeSet::from(["10.0.0.1" .parse::() @@ -1062,15 +1059,15 @@ mod tests { let group = create_group("224.1.1.1"); // ASM address (not 232.x.x.x) - // DPD has None (correct for ASM) - let dpd_group = create_dpd_group(None); - assert!(dpd_state_matches_sources(&dpd_group, &source_filter, &group)); - - // DPD has sources (mismatch: ASM should have none) + // DPD has matching sources (correct) let dpd_group = create_dpd_group(Some(vec![dpd_client::types::IpSrc::Exact( "10.0.0.1".parse().unwrap(), )])); + assert!(dpd_state_matches_sources(&dpd_group, &source_filter, &group)); + + // DPD has None (mismatch: ASM with all-specific should have sources) + let dpd_group = create_dpd_group(None); assert!(!dpd_state_matches_sources(&dpd_group, &source_filter, &group)); } diff --git a/nexus/src/app/multicast/dataplane.rs b/nexus/src/app/multicast/dataplane.rs index 834b480e524..5a51320084a 100644 --- a/nexus/src/app/multicast/dataplane.rs +++ b/nexus/src/app/multicast/dataplane.rs @@ -101,19 +101,18 @@ impl IntoExternalResponse for MulticastGroupResponse { } } -/// Trait for converting database IPv6 types into DPD's -/// [`UnderlayMulticastIpv6`] type. -trait IntoUnderlayMulticastIpv6 { +/// Convert an [`IpAddr`] into a DPD [`UnderlayMulticastIpv6`], +/// rejecting IPv4. +/// +/// Note: named without the `Ipv6` suffix because the input type is the general +/// `IpAddr`. +trait IntoUnderlayMulticast { /// Convert to [`UnderlayMulticastIpv6`], rejecting IPv4 addresses. - fn into_underlay_multicast_ipv6( - self, - ) -> Result; + fn into_underlay_multicast(self) -> Result; } -impl IntoUnderlayMulticastIpv6 for IpAddr { - fn into_underlay_multicast_ipv6( - self, - ) -> Result { +impl IntoUnderlayMulticast for IpAddr { + fn into_underlay_multicast(self) -> Result { match self { IpAddr::V6(ipv6) => Ok(UnderlayMulticastIpv6(ipv6)), IpAddr::V4(_) => Err(Error::invalid_request( @@ -295,13 +294,13 @@ impl MulticastDataplaneClient { &self, client: &dpd_client::Client, group_ip: IpAddr, + tag: &MulticastTag, update: &MulticastGroupUpdateExternalEntry, create: &MulticastGroupCreateExternalEntry, switch: &SwitchLocation, ) -> MulticastDataplaneResult { - let tag: MulticastTag = "nexus".parse().unwrap(); match client - .multicast_group_update_external(&group_ip, &tag, update) + .multicast_group_update_external(&group_ip, tag, update) .await { Ok(r) => Ok(r.into_inner()), @@ -409,7 +408,7 @@ impl MulticastDataplaneClient { })? .map(u16::from); let underlay_ip_admin = - underlay_group.multicast_ip.ip().into_underlay_multicast_ipv6()?; + underlay_group.multicast_ip.ip().into_underlay_multicast()?; let underlay_ipv6 = match underlay_group.multicast_ip.ip() { IpAddr::V6(ipv6) => ipv6, IpAddr::V4(_) => { @@ -427,17 +426,12 @@ impl MulticastDataplaneClient { let external_group_ip = external_group.multicast_ip.ip(); - // Source filtering logic per RFC 4607: - // - SSM (232/8, ff3x::/32): MUST use specific sources. SSM semantically - // requires source specification; `has_any_source_member` is ignored - // because API validation prevents SSM joins without sources. - // - ASM: Use `has_any_source_member` to decide filtering behavior. - // - // TODO: Once Dendrite accepts ASM source filtering, enable it for ASM - // groups where `has_any_source_member=false`. Currently ASM always gets - // `None` because Dendrite only supports SSM filtering. + // Source filtering per RFC 4607: + // - SSM (232/8, ff3x::/32): always use specific sources. API + // validation prevents SSM joins without sources. + // - ASM: use specific sources when all members specify sources, + // otherwise None to allow any source at the switch level. let sources_dpd = if is_ssm_address(external_group_ip) { - // SSM: always use specific sources (RFC 4607 compliance) Some( source_filter .specific_sources @@ -445,10 +439,16 @@ impl MulticastDataplaneClient { .map(|ip| IpSrc::Exact(*ip)) .collect::>(), ) - } else { - // ASM: Dendrite doesn't support ASM filtering yet - // TODO: check `has_any_source_member` to enable/disable filtering + } else if source_filter.has_any_source_member { None + } else { + Some( + source_filter + .specific_sources + .iter() + .map(|ip| IpSrc::Exact(*ip)) + .collect::>(), + ) }; let create_operations = @@ -570,7 +570,7 @@ impl MulticastDataplaneClient { .underlay_group .multicast_ip .ip() - .into_underlay_multicast_ipv6()?; + .into_underlay_multicast()?; let underlay_ipv6 = match params.underlay_group.multicast_ip.ip() { IpAddr::V6(ipv6) => ipv6, IpAddr::V4(_) => { @@ -589,17 +589,12 @@ impl MulticastDataplaneClient { let new_name_str = params.new_name.to_string(); let external_group_ip = params.external_group.multicast_ip.ip(); - // Source filtering logic per RFC 4607: - // - SSM (232/8, ff3x::/32): MUST use specific sources. SSM semantically - // requires source specification; `has_any_source_member` is ignored - // because API validation prevents SSM joins without sources. - // - ASM: Use `has_any_source_member` to decide filtering behavior. - // - // TODO: Once Dendrite accepts ASM source filtering, enable it for ASM - // groups where `has_any_source_member=false`. Currently ASM always gets - // `None` because Dendrite only supports SSM filtering. + // Source filtering per RFC 4607: + // - SSM (232/8, ff3x::/32): always use specific sources. API + // validation prevents SSM joins without sources. + // - ASM: use specific sources when all members specify sources, + // otherwise None to allow any source at the switch level. let sources_dpd = if is_ssm_address(external_group_ip) { - // SSM: always use specific sources (RFC 4607 compliance) Some( params .source_filter @@ -608,10 +603,17 @@ impl MulticastDataplaneClient { .map(|ip| IpSrc::Exact(*ip)) .collect::>(), ) - } else { - // ASM: Dendrite doesn't support ASM filtering yet - // TODO: check `has_any_source_member` to enable/disable filtering + } else if params.source_filter.has_any_source_member { None + } else { + Some( + params + .source_filter + .specific_sources + .iter() + .map(|ip| IpSrc::Exact(*ip)) + .collect::>(), + ) }; let update_operations = @@ -622,25 +624,37 @@ impl MulticastDataplaneClient { let underlay_ip_admin = underlay_ip_admin.clone(); async move { // Ensure/get underlay members, create if missing - let members = match client + let (members, existing_tag) = match client .multicast_group_get_underlay(&underlay_ip_admin) .await { - Ok(r) => r.into_inner().members, + Ok(r) => { + let inner = r.into_inner(); + (inner.members, inner.tag) + } Err(DpdError::ErrorResponse(resp)) if resp.status() == reqwest::StatusCode::NOT_FOUND => { - // Create missing underlay group with new tag and empty members + // Create missing underlay group with DB tag and empty members + let db_tag = params + .underlay_group + .tag + .as_deref() + .ok_or_else(|| { + Error::internal_error( + "underlay multicast group missing tag", + ) + })?; let created = self .dpd_ensure_underlay_created( client, underlay_ip_admin.clone(), - &new_name, + db_tag, switch_location, ) .await?; - created.members + (created.members, created.tag) } Err(e) => { error!( @@ -656,10 +670,16 @@ impl MulticastDataplaneClient { } }; - // Update underlay tag preserving members + // Update underlay preserving members, using existing + // tag for authorization let underlay_entry = MulticastGroupUpdateUnderlayEntry { members }; - let tag: MulticastTag = "nexus".parse().unwrap(); + let tag: MulticastTag = + existing_tag.try_into().map_err(|e| { + Error::internal_error(&format!( + "invalid multicast tag: {e}" + )) + })?; let underlay_response = client .multicast_group_update_underlay( &underlay_ip_admin, @@ -700,6 +720,7 @@ impl MulticastDataplaneClient { .dpd_update_external_or_create( client, external_group_ip, + &tag, &update_entry, &create_entry, switch_location, @@ -776,14 +797,14 @@ impl MulticastDataplaneClient { let operation_name = operation_name.clone(); async move { - let underlay_ip_admin = underlay_ip.into_underlay_multicast_ipv6()?; + let underlay_ip_admin = underlay_ip.into_underlay_multicast()?; // Get current underlay group state, create if missing let current_group_res = client .multicast_group_get_underlay(&underlay_ip_admin) .await; - let (current_members, _current_tag) = match current_group_res { + let (current_members, current_tag) = match current_group_res { Ok(response) => { let inner = response.into_inner(); (inner.members, inner.tag) @@ -889,15 +910,17 @@ impl MulticastDataplaneClient { let dpd_operation_done = format!("{operation_name}_member_in_underlay_group"); - // Apply the modification function (consumes member) + // Apply the modification function let updated_members = modify_fn(current_members, member); - // Try to update the underlay group (move updated_members) let update_entry = MulticastGroupUpdateUnderlayEntry { members: updated_members, }; - let tag: MulticastTag = "nexus".parse().unwrap(); + let tag: MulticastTag = current_tag.clone().try_into() + .map_err(|e| Error::internal_error( + &format!("invalid multicast tag: {e}") + ))?; let update_res = client .multicast_group_update_underlay( @@ -925,22 +948,57 @@ impl MulticastDataplaneClient { "dpd_operation" => "modify_group_membership_recreate" ); - // Delete the stale underlay group - if let Err(e) = client + // TODO: this `reset_by_tag` fallback can be removed + // once DPD's `modify_group_internal` calls + // `process_membership_changes` in the + // empty-transition arm, preventing the 500 that + // triggers this recovery path. + // See https://github.com/oxidecomputer/dendrite/pull/232 + // + // Try to delete the stale underlay group. If this + // fails because the underlay group is still + // referenced by an external group via NAT target, + // fall back to `reset_by_tag`, which deletes + // external groups first so the ASIC state is clean + // for the next reconciler pass. + if let Err(del_err) = client .multicast_group_delete(&underlay_ip, &tag) .await { warn!( log, - "underlay delete failed during recovery (continuing)"; + "underlay delete failed, resetting all \ + groups by tag for clean ASIC state"; "underlay_ip" => %underlay_ip, "switch" => %location, - "error" => %e + "delete_error" => %del_err, + "dpd_operation" => "modify_group_membership_recreate" ); + + if let Err(reset_err) = client + .multicast_reset_by_tag(&tag) + .await + { + error!( + log, + "tag reset also failed during recovery"; + "underlay_ip" => %underlay_ip, + "switch" => %location, + "error" => %reset_err, + "dpd_operation" => "modify_group_membership_recreate" + ); + } + + // Return error so the reconciler retries. + // Drift correction will recreate the groups + // with clean ASIC state on the next pass. + return Err(Error::internal_error(&format!( + "underlay group recovery on {location}: \ + reset by tag after delete failed ({del_err})" + ))); } - // Recreate with the updated members (reuse from update_entry) - // Use authoritative tag from underlay_group, not stale current_tag + // Recreate with the updated members let create_entry = MulticastGroupCreateUnderlayEntry { group_ip: underlay_ip_admin.clone(), members: update_entry.members, @@ -1321,7 +1379,7 @@ impl MulticastDataplaneClient { match client .multicast_group_get_underlay( - &underlay_ip.into_underlay_multicast_ipv6()?, + &underlay_ip.into_underlay_multicast()?, ) .await { diff --git a/nexus/tests/integration_tests/multicast/failures.rs b/nexus/tests/integration_tests/multicast/failures.rs index 96735ce9684..2bb5c5f2134 100644 --- a/nexus/tests/integration_tests/multicast/failures.rs +++ b/nexus/tests/integration_tests/multicast/failures.rs @@ -45,10 +45,9 @@ use nexus_types::external_api::multicast::{ InstanceMulticastGroupJoin, MulticastGroup, MulticastGroupJoinSpec, MulticastGroupMember, }; -use omicron_common::api::external::{ - IdentityMetadataCreateParams, InstanceState, NameOrId, SwitchLocation, -}; +use omicron_common::api::external::InstanceState; use omicron_uuid_kinds::{InstanceUuid, MulticastGroupUuid}; +use sled_agent_types::early_networking::SwitchLocation; use super::*; use crate::integration_tests::instances as instance_helpers; diff --git a/nexus/tests/integration_tests/multicast/groups.rs b/nexus/tests/integration_tests/multicast/groups.rs index fffe0a3f19f..9ae620b7dc2 100644 --- a/nexus/tests/integration_tests/multicast/groups.rs +++ b/nexus/tests/integration_tests/multicast/groups.rs @@ -1057,7 +1057,7 @@ async fn test_ssm_source_ip_behavior(cptestctx: &ControlPlaneTestContext) { .flatten() .filter_map(|src| match src { dpd_types::IpSrc::Exact(ip) => Some(*ip), - dpd_types::IpSrc::Subnet(_) => None, // SSM uses exact sources + dpd_types::IpSrc::Any => None, // Any-source (ASM) wildcard }) .collect(); dpd_source_ips.sort(); diff --git a/nexus/tests/integration_tests/multicast/networking_integration.rs b/nexus/tests/integration_tests/multicast/networking_integration.rs index a68b2badf44..3b28892ef82 100644 --- a/nexus/tests/integration_tests/multicast/networking_integration.rs +++ b/nexus/tests/integration_tests/multicast/networking_integration.rs @@ -26,9 +26,6 @@ use nexus_types::external_api::instance::{ InstanceNetworkInterfaceAttachment, }; use nexus_types::external_api::ip_pool::{IpVersion, PoolSelector}; -use nexus_types::external_api::multicast::{ - MulticastGroup, MulticastGroupMember, -}; use omicron_common::api::external::{ ByteCount, IdentityMetadataCreateParams, Instance, InstanceCpuCount, NameOrId,