From dc591a390d29b15bda8085ad6450e7074824caca Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 30 Mar 2026 14:40:18 -0400 Subject: [PATCH 1/2] client: support incremental multicast group subscription Remove the guard that prevented adding multicast groups to an already running service, allowing users to incrementally add publish/subscribe groups without disconnecting first. --- CHANGELOG.md | 1 + client/doublezero/src/command/connect.rs | 17 -------------- e2e/main_test.go | 24 ++++++++++++++++++++ e2e/multicast_test.go | 18 +++++++++++---- e2e/qa_multicast_test.go | 29 +++++++++++++----------- 5 files changed, 55 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8f6592bf9..999ff521f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file. ### Changes - CLI + - Allow incremental multicast group addition without disconnecting - Reset SIGPIPE to SIG_DFL at the start of main() in all 3 CLI binaries (doublezero, doublezero-geolocation, doublezero-admin) so the process exits silently like standard CLI tools - SDK - Add Go SDK for shred subscription program with read-only account deserialization (epoch state, seat assignments, pricing, settlement, validator client rewards), PDA derivation helpers, RPC fetchers, compatibility tests, and a fetch example CLI diff --git a/client/doublezero/src/command/connect.rs b/client/doublezero/src/command/connect.rs index b866a49435..cb5ea306bf 100644 --- a/client/doublezero/src/command/connect.rs +++ b/client/doublezero/src/command/connect.rs @@ -204,23 +204,6 @@ impl ProvisioningCliCommand { client_ip: Ipv4Addr, spinner: &ProgressBar, ) -> eyre::Result<()> { - // Check if the daemon already has a multicast service running. The daemon - // does not support updating an existing service — both publisher and subscriber - // roles must be specified in a single connect command. - if let Ok(statuses) = controller.status().await { - if statuses.iter().any(|s| { - s.user_type - .as_ref() - .is_some_and(|t| t.eq_ignore_ascii_case("multicast")) - }) { - eyre::bail!( - "A multicast service is already running. Disconnect first with \ - `doublezero disconnect multicast`, then reconnect with all desired \ - groups in a single command (e.g. --publish and --subscribe)." - ); - } - } - let mcast_groups = client.list_multicastgroup(ListMulticastGroupCommand)?; // Resolve pub group codes to pubkeys diff --git a/e2e/main_test.go b/e2e/main_test.go index 288e425408..b32e3b4c75 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -534,6 +534,30 @@ func (dn *TestDevnet) ConnectMulticastSubscriberSkipAccessPass(t *testing.T, cli dn.log.Debug("--> Multicast subscriber connected") } +// AddMulticastPublisherGroupSkipAccessPass incrementally adds publish groups to +// an existing multicast service without disconnecting. +func (dn *TestDevnet) AddMulticastPublisherGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) { + dn.log.Debug("==> Adding multicast publisher groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes) + + groupArgs := strings.Join(multicastGroupCodes, " ") + _, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --publish " + groupArgs}) + require.NoError(t, err) + + dn.log.Debug("--> Multicast publisher groups added incrementally") +} + +// AddMulticastSubscriberGroupSkipAccessPass incrementally adds subscribe groups to +// an existing multicast service without disconnecting. +func (dn *TestDevnet) AddMulticastSubscriberGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) { + dn.log.Debug("==> Adding multicast subscriber groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes) + + groupArgs := strings.Join(multicastGroupCodes, " ") + _, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --subscribe " + groupArgs}) + require.NoError(t, err) + + dn.log.Debug("--> Multicast subscriber groups added incrementally") +} + func (dn *TestDevnet) DisconnectMulticastSubscriber(t *testing.T, client *devnet.Client) { dn.log.Debug("==> Disconnecting multicast subscriber", "clientIP", client.CYOANetworkIP) diff --git a/e2e/multicast_test.go b/e2e/multicast_test.go index aadb0b8d86..d54d226a2e 100644 --- a/e2e/multicast_test.go +++ b/e2e/multicast_test.go @@ -192,13 +192,23 @@ func TestE2E_Multicast(t *testing.T) { createMulticastGroupForBothClients(t, tdn, publisherClient, subscriberClient, "mg02") if !t.Run("connect", func(t *testing.T) { - // Connect publisher. - tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01", "mg02") + // Connect publisher to first group only. + tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01") err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second) require.NoError(t, err) - // Connect subscriber. - tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01", "mg02") + // Incrementally add second publish group without disconnecting. + tdn.AddMulticastPublisherGroupSkipAccessPass(t, publisherClient, "mg02") + err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + + // Connect subscriber to first group only. + tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01") + err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + + // Incrementally add second subscribe group without disconnecting. + tdn.AddMulticastSubscriberGroupSkipAccessPass(t, subscriberClient, "mg02") err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second) require.NoError(t, err) diff --git a/e2e/qa_multicast_test.go b/e2e/qa_multicast_test.go index 1618e813e0..f2f654cb40 100644 --- a/e2e/qa_multicast_test.go +++ b/e2e/qa_multicast_test.go @@ -347,17 +347,17 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { require.Greater(t, reportB.PacketCount, uint64(0), "subscriberB received no packets from group B") log.Info("Received multicast packets", "subscriber", subscriberB.Host, "group", groupB.Code, "packetCount", reportB.PacketCount) - // --- Phase 2: Dynamic subscription --- - // SubA disconnects and reconnects with both groups A+B — verify identity preserved and receives from both. - log.Debug("Phase 2: dynamic subscription") + // --- Phase 2: Incremental subscription --- + // SubA adds group B without disconnecting — verify tunnel stays up, identity preserved, and receives from both. + log.Debug("Phase 2: incremental subscription (no disconnect)") statusBefore, err := subscriberA.GetUserStatus(ctx) require.NoError(t, err, "failed to get subscriberA status before adding group B") log.Debug("SubscriberA status before", "status", statusBefore) - log.Debug("SubscriberA reconnecting with both groups", "codes", []string{groupA.Code, groupB.Code}) - err = subscriberA.ConnectUserMulticast_Subscriber_Wait(ctx, groupA.Code, groupB.Code) - require.NoError(t, err, "failed to reconnect subscriberA with both groups") + log.Debug("SubscriberA adding group B incrementally (no disconnect)", "code", groupB.Code) + err = subscriberA.ConnectUserMulticast_Subscriber_AddTunnel(ctx, groupB.Code) + require.NoError(t, err, "failed to incrementally add group B to subscriberA") err = subscriberA.WaitForStatusUp(ctx) require.NoError(t, err, "failed to wait for subscriberA status up after adding group B") @@ -389,15 +389,18 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { require.NotNil(t, reportB, "no report for group B") require.Greater(t, reportB.PacketCount, uint64(0), "no packets from group B") - log.Debug("Phase 2 passed: dynamic subscription verified", + log.Debug("Phase 2 passed: incremental subscription verified", "groupA_packets", reportA.PacketCount, "groupB_packets", reportB.PacketCount) - // --- Phase 3: Simultaneous pub+sub --- - // SubA reconnects as both publisher and subscriber on group A, sends to itself. - log.Debug("Phase 3: simultaneous pub+sub") + // --- Phase 3: Incremental publish after subscribe (cross-role) --- + // SubA adds a publisher role on group A without disconnecting. This triggers a + // full reprovision in the daemon (publisher role transition) but should still + // converge to a working pub+sub state. + log.Debug("Phase 3: incremental publish after subscribe (cross-role)") - err = subscriberA.ConnectUserMulticast_PubAndSub_Wait(ctx, []string{groupA.Code}, []string{groupA.Code}) - require.NoError(t, err, "failed to connect subscriberA as pub+sub") + log.Debug("SubscriberA adding publisher role on group A incrementally", "code", groupA.Code) + err = subscriberA.ConnectUserMulticast_Publisher_AddTunnel(ctx, groupA.Code) + require.NoError(t, err, "failed to incrementally add publisher role to subscriberA") err = subscriberA.WaitForStatusUp(ctx) require.NoError(t, err, "failed to wait for subscriberA status up as pub+sub") @@ -419,5 +422,5 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { reportPubSub, err := subscriberA.WaitForMulticastReport(ctx, groupA) require.NoError(t, err, "failed to get report for group A as pub+sub") require.Greater(t, reportPubSub.PacketCount, uint64(0), "pub+sub client received no packets") - log.Debug("Phase 3 passed: pub+sub verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount) + log.Debug("Phase 3 passed: incremental publish after subscribe verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount) } From 0fe87dd7ee453ddd92d532f9692cd008514f10f1 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 30 Mar 2026 15:26:03 -0400 Subject: [PATCH 2/2] e2e: fix misleading comment about cross-role reprovision The comment incorrectly stated that adding a publisher role to a subscriber triggers a full reprovision. Both roles use UserTypeMulticast so InfraEqual returns true and the incremental UpdateGroups path is taken instead. --- e2e/qa_multicast_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e/qa_multicast_test.go b/e2e/qa_multicast_test.go index f2f654cb40..f3b77bee9d 100644 --- a/e2e/qa_multicast_test.go +++ b/e2e/qa_multicast_test.go @@ -393,9 +393,9 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) { "groupA_packets", reportA.PacketCount, "groupB_packets", reportB.PacketCount) // --- Phase 3: Incremental publish after subscribe (cross-role) --- - // SubA adds a publisher role on group A without disconnecting. This triggers a - // full reprovision in the daemon (publisher role transition) but should still - // converge to a working pub+sub state. + // SubA adds a publisher role on group A without disconnecting. Both roles use + // UserTypeMulticast so InfraEqual returns true and the incremental UpdateGroups + // path is taken (no full reprovision). log.Debug("Phase 3: incremental publish after subscribe (cross-role)") log.Debug("SubscriberA adding publisher role on group A incrementally", "code", groupA.Code)