Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.
### Changes

- CLI
- Allow incremental multicast group addition without disconnecting
- Reset SIGPIPE to SIG_DFL at the start of main() in all 3 CLI binaries (doublezero, doublezero-geolocation, doublezero-admin) so the process exits silently like standard CLI tools
- SDK
- Add Go SDK for shred subscription program with read-only account deserialization (epoch state, seat assignments, pricing, settlement, validator client rewards), PDA derivation helpers, RPC fetchers, compatibility tests, and a fetch example CLI
Expand Down
17 changes: 0 additions & 17 deletions client/doublezero/src/command/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,6 @@ impl ProvisioningCliCommand {
client_ip: Ipv4Addr,
spinner: &ProgressBar,
) -> eyre::Result<()> {
// Check if the daemon already has a multicast service running. The daemon
// does not support updating an existing service — both publisher and subscriber
// roles must be specified in a single connect command.
if let Ok(statuses) = controller.status().await {
if statuses.iter().any(|s| {
s.user_type
.as_ref()
.is_some_and(|t| t.eq_ignore_ascii_case("multicast"))
}) {
eyre::bail!(
"A multicast service is already running. Disconnect first with \
`doublezero disconnect multicast`, then reconnect with all desired \
groups in a single command (e.g. --publish and --subscribe)."
);
}
}

let mcast_groups = client.list_multicastgroup(ListMulticastGroupCommand)?;

// Resolve pub group codes to pubkeys
Expand Down
24 changes: 24 additions & 0 deletions e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,30 @@ func (dn *TestDevnet) ConnectMulticastSubscriberSkipAccessPass(t *testing.T, cli
dn.log.Debug("--> Multicast subscriber connected")
}

// AddMulticastPublisherGroupSkipAccessPass incrementally adds publish groups to
// an existing multicast service without disconnecting.
func (dn *TestDevnet) AddMulticastPublisherGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) {
dn.log.Debug("==> Adding multicast publisher groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes)

groupArgs := strings.Join(multicastGroupCodes, " ")
_, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --publish " + groupArgs})
require.NoError(t, err)

dn.log.Debug("--> Multicast publisher groups added incrementally")
}

// AddMulticastSubscriberGroupSkipAccessPass incrementally adds subscribe groups to
// an existing multicast service without disconnecting.
func (dn *TestDevnet) AddMulticastSubscriberGroupSkipAccessPass(t *testing.T, client *devnet.Client, multicastGroupCodes ...string) {
dn.log.Debug("==> Adding multicast subscriber groups incrementally", "clientIP", client.CYOANetworkIP, "groups", multicastGroupCodes)

groupArgs := strings.Join(multicastGroupCodes, " ")
_, err := client.Exec(t.Context(), []string{"bash", "-c", "doublezero connect multicast --subscribe " + groupArgs})
require.NoError(t, err)

dn.log.Debug("--> Multicast subscriber groups added incrementally")
}

func (dn *TestDevnet) DisconnectMulticastSubscriber(t *testing.T, client *devnet.Client) {
dn.log.Debug("==> Disconnecting multicast subscriber", "clientIP", client.CYOANetworkIP)

Expand Down
18 changes: 14 additions & 4 deletions e2e/multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,23 @@ func TestE2E_Multicast(t *testing.T) {
createMulticastGroupForBothClients(t, tdn, publisherClient, subscriberClient, "mg02")

if !t.Run("connect", func(t *testing.T) {
// Connect publisher.
tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01", "mg02")
// Connect publisher to first group only.
tdn.ConnectMulticastPublisherSkipAccessPass(t, publisherClient, "mg01")
err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second)
require.NoError(t, err)

// Connect subscriber.
tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01", "mg02")
// Incrementally add second publish group without disconnecting.
tdn.AddMulticastPublisherGroupSkipAccessPass(t, publisherClient, "mg02")
err = publisherClient.WaitForTunnelUp(t.Context(), 90*time.Second)
require.NoError(t, err)

// Connect subscriber to first group only.
tdn.ConnectMulticastSubscriberSkipAccessPass(t, subscriberClient, "mg01")
err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second)
require.NoError(t, err)

// Incrementally add second subscribe group without disconnecting.
tdn.AddMulticastSubscriberGroupSkipAccessPass(t, subscriberClient, "mg02")
err = subscriberClient.WaitForTunnelUp(t.Context(), 90*time.Second)
require.NoError(t, err)

Expand Down
29 changes: 16 additions & 13 deletions e2e/qa_multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,17 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) {
require.Greater(t, reportB.PacketCount, uint64(0), "subscriberB received no packets from group B")
log.Info("Received multicast packets", "subscriber", subscriberB.Host, "group", groupB.Code, "packetCount", reportB.PacketCount)

// --- Phase 2: Dynamic subscription ---
// SubA disconnects and reconnects with both groups A+B — verify identity preserved and receives from both.
log.Debug("Phase 2: dynamic subscription")
// --- Phase 2: Incremental subscription ---
// SubA adds group B without disconnecting — verify tunnel stays up, identity preserved, and receives from both.
log.Debug("Phase 2: incremental subscription (no disconnect)")

statusBefore, err := subscriberA.GetUserStatus(ctx)
require.NoError(t, err, "failed to get subscriberA status before adding group B")
log.Debug("SubscriberA status before", "status", statusBefore)

log.Debug("SubscriberA reconnecting with both groups", "codes", []string{groupA.Code, groupB.Code})
err = subscriberA.ConnectUserMulticast_Subscriber_Wait(ctx, groupA.Code, groupB.Code)
require.NoError(t, err, "failed to reconnect subscriberA with both groups")
log.Debug("SubscriberA adding group B incrementally (no disconnect)", "code", groupB.Code)
err = subscriberA.ConnectUserMulticast_Subscriber_AddTunnel(ctx, groupB.Code)
require.NoError(t, err, "failed to incrementally add group B to subscriberA")

err = subscriberA.WaitForStatusUp(ctx)
require.NoError(t, err, "failed to wait for subscriberA status up after adding group B")
Expand Down Expand Up @@ -389,15 +389,18 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) {
require.NotNil(t, reportB, "no report for group B")
require.Greater(t, reportB.PacketCount, uint64(0), "no packets from group B")

log.Debug("Phase 2 passed: dynamic subscription verified",
log.Debug("Phase 2 passed: incremental subscription verified",
"groupA_packets", reportA.PacketCount, "groupB_packets", reportB.PacketCount)

// --- Phase 3: Simultaneous pub+sub ---
// SubA reconnects as both publisher and subscriber on group A, sends to itself.
log.Debug("Phase 3: simultaneous pub+sub")
// --- Phase 3: Incremental publish after subscribe (cross-role) ---
// SubA adds a publisher role on group A without disconnecting. Both roles use
// UserTypeMulticast so InfraEqual returns true and the incremental UpdateGroups
// path is taken (no full reprovision).
log.Debug("Phase 3: incremental publish after subscribe (cross-role)")

err = subscriberA.ConnectUserMulticast_PubAndSub_Wait(ctx, []string{groupA.Code}, []string{groupA.Code})
require.NoError(t, err, "failed to connect subscriberA as pub+sub")
log.Debug("SubscriberA adding publisher role on group A incrementally", "code", groupA.Code)
err = subscriberA.ConnectUserMulticast_Publisher_AddTunnel(ctx, groupA.Code)
require.NoError(t, err, "failed to incrementally add publisher role to subscriberA")

err = subscriberA.WaitForStatusUp(ctx)
require.NoError(t, err, "failed to wait for subscriberA status up as pub+sub")
Expand All @@ -419,5 +422,5 @@ func TestQA_MulticastPublisherMultipleGroups(t *testing.T) {
reportPubSub, err := subscriberA.WaitForMulticastReport(ctx, groupA)
require.NoError(t, err, "failed to get report for group A as pub+sub")
require.Greater(t, reportPubSub.PacketCount, uint64(0), "pub+sub client received no packets")
log.Debug("Phase 3 passed: pub+sub verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount)
log.Debug("Phase 3 passed: incremental publish after subscribe verified", "group", groupA.Code, "packetCount", reportPubSub.PacketCount)
}
Loading