diff --git a/CHANGELOG.md b/CHANGELOG.md index a8f6592bf..999ff521f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file. ### Changes - CLI + - Allow incremental multicast group addition without disconnecting - Reset SIGPIPE to SIG_DFL at the start of main() in all 3 CLI binaries (doublezero, doublezero-geolocation, doublezero-admin) so the process exits silently like standard CLI tools - SDK - Add Go SDK for shred subscription program with read-only account deserialization (epoch state, seat assignments, pricing, settlement, validator client rewards), PDA derivation helpers, RPC fetchers, compatibility tests, and a fetch example CLI diff --git a/client/doublezero/src/command/connect.rs b/client/doublezero/src/command/connect.rs index b866a4943..cb5ea306b 100644 --- a/client/doublezero/src/command/connect.rs +++ b/client/doublezero/src/command/connect.rs @@ -204,23 +204,6 @@ impl ProvisioningCliCommand { client_ip: Ipv4Addr, spinner: &ProgressBar, ) -> eyre::Result<()> { - // Check if the daemon already has a multicast service running. The daemon - // does not support updating an existing service — both publisher and subscriber - // roles must be specified in a single connect command. - if let Ok(statuses) = controller.status().await { - if statuses.iter().any(|s| { - s.user_type - .as_ref() - .is_some_and(|t| t.eq_ignore_ascii_case("multicast")) - }) { - eyre::bail!( - "A multicast service is already running. Disconnect first with \ - `doublezero disconnect multicast`, then reconnect with all desired \ - groups in a single command (e.g. --publish and --subscribe)." - ); - } - } - let mcast_groups = client.list_multicastgroup(ListMulticastGroupCommand)?; // Resolve pub group codes to pubkeys diff --git a/e2e/main_test.go b/e2e/main_test.go index 288e42540..b32e3b4c7 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -534,6 +534,30 @@ func (dn *TestDevnet) ConnectMulticastSubscriberSkipAccessPass(t *testing.T, cli dn.log.Debug("--> Multicast subscriber connected") } +// AddMulticastPublisherGroupSkipAccessPass incrementally adds publish groups to +// an existing multicast service without disconnecting. +func (dn *TestDevnet) AddMulticastPublisherGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) { + dn.log.Debug("==> Adding multicast publisher groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes) + + groupArgs := strings.Join(multicastGroupCodes, " ") + _, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --publish " + groupArgs}) + require.NoError(t, err) + + dn.log.Debug("--> Multicast publisher groups added incrementally") +} + +// AddMulticastSubscriberGroupSkipAccessPass incrementally adds subscribe groups to +// an existing multicast service without disconnecting. +func (dn *TestDevnet) AddMulticastSubscriberGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) { + dn.log.Debug("==> Adding multicast subscriber groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes) + + groupArgs := strings.Join(multicastGroupCodes, " ") + _, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --subscribe " + groupArgs}) + require.NoError(t, err) + + dn.log.Debug("--> Multicast subscriber groups added incrementally") +} + func (dn *TestDevnet) DisconnectMulticastSubscriber(t *testing.T, client *devnet.Client) { dn.log.Debug("==> Disconnecting multicast subscriber", "clientIP", client.CYOANetworkIP) diff --git a/e2e/multicast_test.go b/e2e/multicast_test.go index aadb0b8d8..d54d226a2 100644 --- a/e2e/multicast_test.go +++ b/e2e/multicast_test.go @@ -192,13 +192,23 @@ func TestE2E_Multicast(t *testing.T) { createMulticastGroupForBothClients(t, tdn, publisherClient, subscriberClient, "mg02") if !t.Run("connect", func(t *testing.T) { - // Connect publisher. - tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01", "mg02") + // Connect publisher to first group only. + tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01") err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second) require.NoError(t, err) - // Connect subscriber. - tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01", "mg02") + // Incrementally add second publish group without disconnecting. + tdn.AddMulticastPublisherGroupSkipAccessPass(t, publisherClient, "mg02") + err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + + // Connect subscriber to first group only. + tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01") + err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + + // Incrementally add second subscribe group without disconnecting. + tdn.AddMulticastSubscriberGroupSkipAccessPass(t, subscriberClient, "mg02") err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second) require.NoError(t, err) diff --git a/e2e/qa_multicast_test.go b/e2e/qa_multicast_test.go index 1618e813e..f3b77bee9 100644 --- a/e2e/qa_multicast_test.go +++ b/e2e/qa_multicast_test.go @@ -347,17 +347,17 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { require.Greater(t, reportB.PacketCount, uint64(0), "subscriberB received no packets from group B") log.Info("Received multicast packets", "subscriber", subscriberB.Host, "group", groupB.Code, "packetCount", reportB.PacketCount) - // --- Phase 2: Dynamic subscription --- - // SubA disconnects and reconnects with both groups A+B — verify identity preserved and receives from both. - log.Debug("Phase 2: dynamic subscription") + // --- Phase 2: Incremental subscription --- + // SubA adds group B without disconnecting — verify tunnel stays up, identity preserved, and receives from both. + log.Debug("Phase 2: incremental subscription (no disconnect)") statusBefore, err := subscriberA.GetUserStatus(ctx) require.NoError(t, err, "failed to get subscriberA status before adding group B") log.Debug("SubscriberA status before", "status", statusBefore) - log.Debug("SubscriberA reconnecting with both groups", "codes", []string{groupA.Code, groupB.Code}) - err = subscriberA.ConnectUserMulticast_Subscriber_Wait(ctx, groupA.Code, groupB.Code) - require.NoError(t, err, "failed to reconnect subscriberA with both groups") + log.Debug("SubscriberA adding group B incrementally (no disconnect)", "code", groupB.Code) + err = subscriberA.ConnectUserMulticast_Subscriber_AddTunnel(ctx, groupB.Code) + require.NoError(t, err, "failed to incrementally add group B to subscriberA") err = subscriberA.WaitForStatusUp(ctx) require.NoError(t, err, "failed to wait for subscriberA status up after adding group B") @@ -389,15 +389,18 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { require.NotNil(t, reportB, "no report for group B") require.Greater(t, reportB.PacketCount, uint64(0), "no packets from group B") - log.Debug("Phase 2 passed: dynamic subscription verified", + log.Debug("Phase 2 passed: incremental subscription verified", "groupA_packets", reportA.PacketCount, "groupB_packets", reportB.PacketCount) - // --- Phase 3: Simultaneous pub+sub --- - // SubA reconnects as both publisher and subscriber on group A, sends to itself. - log.Debug("Phase 3: simultaneous pub+sub") + // --- Phase 3: Incremental publish after subscribe (cross-role) --- + // SubA adds a publisher role on group A without disconnecting. Both roles use + // UserTypeMulticast so InfraEqual returns true and the incremental UpdateGroups + // path is taken (no full reprovision). + log.Debug("Phase 3: incremental publish after subscribe (cross-role)") - err = subscriberA.ConnectUserMulticast_PubAndSub_Wait(ctx, []string{groupA.Code}, []string{groupA.Code}) - require.NoError(t, err, "failed to connect subscriberA as pub+sub") + log.Debug("SubscriberA adding publisher role on group A incrementally", "code", groupA.Code) + err = subscriberA.ConnectUserMulticast_Publisher_AddTunnel(ctx, groupA.Code) + require.NoError(t, err, "failed to incrementally add publisher role to subscriberA") err = subscriberA.WaitForStatusUp(ctx) require.NoError(t, err, "failed to wait for subscriberA status up as pub+sub") @@ -419,5 +422,5 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { reportPubSub, err := subscriberA.WaitForMulticastReport(ctx, groupA) require.NoError(t, err, "failed to get report for group A as pub+sub") require.Greater(t, reportPubSub.PacketCount, uint64(0), "pub+sub client received no packets") - log.Debug("Phase 3 passed: pub+sub verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount) + log.Debug("Phase 3 passed: incremental publish after subscribe verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount) }