Skip to content

Commit f76de10

Browse files
iduartgomezclaudegithub-actions[bot]
authored
fix: separate network from local subscriptions (#2229)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
1 parent 50f94db commit f76de10

File tree

7 files changed

+197
-54
lines changed

7 files changed

+197
-54
lines changed

Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/contract/executor/runtime.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,28 @@ impl Executor<Runtime> {
11861186
Ok(())
11871187
}
11881188

1189+
/// Delivers update notifications to LOCAL client subscriptions via websocket channels.
1190+
///
1191+
/// # Architecture: Local vs Network Subscriptions
1192+
///
1193+
/// Freenet uses two distinct subscription delivery mechanisms:
1194+
///
1195+
/// 1. **Local subscriptions** (handled here): Clients connected to this node's websocket
1196+
/// API register via `register_contract_notifier()`. Updates are delivered directly
1197+
/// through executor notification channels stored in `self.update_notifications`.
1198+
/// This path is triggered by `LocalSubscribeComplete` events (see `message.rs`).
1199+
///
1200+
/// 2. **Network subscriptions** (handled in `operations/update.rs`): Remote peers
1201+
/// subscribe via `ring.seeding_manager.subscribers`. Updates propagate through
1202+
/// the network via `get_broadcast_targets_update()` which filters network peers
1203+
/// to receive UPDATE messages.
1204+
///
1205+
/// This separation allows local clients to receive updates immediately without
1206+
/// network round-trips, while network subscriptions follow the peer-to-peer protocol.
1207+
///
1208+
/// See also:
1209+
/// - `operations/subscribe.rs::complete_local_subscription()` - triggers LocalSubscribeComplete
1210+
/// - `operations/update.rs::get_broadcast_targets_update()` - network subscription routing
11891211
async fn send_update_notification(
11901212
&mut self,
11911213
key: &ContractKey,

crates/core/src/message.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,18 @@ pub(crate) enum NodeEvent {
406406
TransactionTimedOut(Transaction),
407407
/// Transaction completed successfully - cleanup client subscription
408408
TransactionCompleted(Transaction),
409-
/// Local subscription completed - deliver SubscribeResponse to client via result router
409+
/// Local subscription completed - deliver SubscribeResponse to client via result router.
410+
///
411+
/// **Architecture Note (Issue #2075):**
412+
/// This event is part of the decoupled subscription architecture. Local client subscriptions
413+
/// are handled separately from network peer subscriptions:
414+
/// - This event notifies the client layer that a subscription request has been processed
415+
/// - Subsequent contract updates are delivered via the executor's `update_notifications`
416+
/// channels (see `send_update_notification` in runtime.rs)
417+
/// - Network peer subscriptions use the `seeding_manager.subscribers` for UPDATE propagation
418+
///
419+
/// This separation keeps the ops/ module (network operations) independent from the
420+
/// client_events/ module (local WebSocket client handling).
410421
LocalSubscribeComplete {
411422
tx: Transaction,
412423
key: ContractKey,

crates/core/src/operations/subscribe.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -277,36 +277,32 @@ pub(crate) async fn request_subscribe(
277277
Ok(())
278278
}
279279

280+
/// Complete a local subscription by notifying the client layer.
281+
///
282+
/// **Architecture Note (Issue #2075):**
283+
/// Local client subscriptions are deliberately kept separate from network subscriptions:
284+
/// - **Network subscriptions** are stored in `ring.seeding_manager.subscribers` and are used
285+
/// for peer-to-peer UPDATE propagation between nodes
286+
/// - **Local subscriptions** are managed by the contract executor via `update_notifications`
287+
/// channels, which deliver `UpdateNotification` directly to WebSocket clients
288+
///
289+
/// This separation eliminates the need for workarounds like the previous `allow_self` hack
290+
/// in `get_broadcast_targets_update()`, and ensures clean architectural boundaries between
291+
/// the network layer (ops/) and the client layer (client_events/).
280292
async fn complete_local_subscription(
281293
op_manager: &OpManager,
282294
id: Transaction,
283295
key: ContractKey,
284296
) -> Result<(), OpError> {
285-
let subscriber = op_manager.ring.connection_manager.own_location();
286-
let subscriber_addr = subscriber
287-
.socket_addr()
288-
.expect("own location must have socket address");
289-
// Local subscription - no upstream NAT address
290-
if let Err(err) = op_manager
291-
.ring
292-
.add_subscriber(&key, subscriber.clone(), None)
293-
{
294-
tracing::warn!(
295-
%key,
296-
tx = %id,
297-
subscriber = %subscriber_addr,
298-
error = ?err,
299-
"Failed to register local subscriber"
300-
);
301-
} else {
302-
tracing::debug!(
303-
%key,
304-
tx = %id,
305-
subscriber = %subscriber_addr,
306-
"Registered local subscriber"
307-
);
308-
}
309-
297+
tracing::debug!(
298+
%key,
299+
tx = %id,
300+
"Local subscription completed - client will receive updates via executor notification channel"
301+
);
302+
303+
// Notify client layer that subscription is complete.
304+
// The actual update delivery happens through the executor's update_notifications
305+
// when contract state changes, not through network broadcast targets.
310306
op_manager
311307
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
312308
tx: id,

crates/core/src/operations/update.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,15 @@ async fn try_to_broadcast(
741741
}
742742

743743
impl OpManager {
744+
/// Get the list of network subscribers to broadcast an UPDATE to.
745+
///
746+
/// **Architecture Note (Issue #2075):**
747+
/// This function returns only **network peer** subscribers, not local client subscriptions.
748+
/// Local clients receive updates through a separate path via the contract executor's
749+
/// `update_notifications` channels (see `send_update_notification` in runtime.rs).
750+
///
751+
/// This clean separation eliminates the previous `allow_self` workaround that was needed
752+
/// when local subscriptions were mixed with network subscriptions.
744753
pub(crate) fn get_broadcast_targets_update(
745754
&self,
746755
key: &ContractKey,
@@ -750,21 +759,10 @@ impl OpManager {
750759
.ring
751760
.subscribers_of(key)
752761
.map(|subs| {
753-
let self_addr = self.ring.connection_manager.get_own_addr();
754-
let allow_self = self_addr.as_ref().map(|me| me == sender).unwrap_or(false);
755762
subs.value()
756763
.iter()
757-
.filter(|pk| {
758-
// Allow the sender (or ourselves) to stay in the broadcast list when we're
759-
// originating the UPDATE so local auto-subscribes still receive events.
760-
let is_sender = pk.socket_addr().as_ref() == Some(sender);
761-
let is_self = self_addr.as_ref() == pk.socket_addr().as_ref();
762-
if is_sender || is_self {
763-
allow_self
764-
} else {
765-
true
766-
}
767-
})
764+
// Filter out the sender to avoid sending the update back to where it came from
765+
.filter(|pk| pk.socket_addr().as_ref() != Some(sender))
768766
.cloned()
769767
.collect::<Vec<_>>()
770768
})

crates/core/src/ring/seeding.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,4 +572,78 @@ mod tests {
572572

573573
assert!(seeding_manager.is_seeding_contract(&key));
574574
}
575+
576+
/// Test that validates the broadcast target filtering logic used by
577+
/// `get_broadcast_targets_update` in update.rs.
578+
///
579+
/// **Architecture Note (Issue #2075):**
580+
/// After decoupling local from network subscriptions, `get_broadcast_targets_update`
581+
/// simply filters out the sender from the subscriber list. This test validates
582+
/// that the seeding_manager correctly stores and retrieves network subscribers,
583+
/// which is the foundation for UPDATE broadcast targeting.
584+
#[test]
585+
fn test_subscribers_for_broadcast_targeting() {
586+
let seeding_manager = SeedingManager::new();
587+
let contract_key = ContractKey::from(ContractInstanceId::new([3u8; 32]));
588+
589+
// Create network peers (not local clients)
590+
let peer1 = test_peer_id(1);
591+
let peer2 = test_peer_id(2);
592+
let peer3 = test_peer_id(3);
593+
594+
let peer_loc1 = PeerKeyLocation::new(peer1.pub_key.clone(), peer1.addr);
595+
let peer_loc2 = PeerKeyLocation::new(peer2.pub_key.clone(), peer2.addr);
596+
let peer_loc3 = PeerKeyLocation::new(peer3.pub_key.clone(), peer3.addr);
597+
598+
// Register network subscribers
599+
seeding_manager
600+
.add_subscriber(&contract_key, peer_loc1.clone(), None)
601+
.expect("should add peer1");
602+
seeding_manager
603+
.add_subscriber(&contract_key, peer_loc2.clone(), None)
604+
.expect("should add peer2");
605+
seeding_manager
606+
.add_subscriber(&contract_key, peer_loc3.clone(), None)
607+
.expect("should add peer3");
608+
609+
// Retrieve subscribers (as get_broadcast_targets_update would)
610+
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();
611+
612+
// All network peers should be in the list
613+
assert_eq!(subs.len(), 3, "Should have 3 network subscribers");
614+
615+
// Simulate filtering out the sender (as get_broadcast_targets_update does)
616+
// If peer1 is the sender of an UPDATE, it should be filtered out
617+
let sender_addr = peer1.addr;
618+
let broadcast_targets: Vec<_> = subs
619+
.iter()
620+
.filter(|pk| pk.socket_addr().as_ref() != Some(&sender_addr))
621+
.cloned()
622+
.collect();
623+
624+
// Only peer2 and peer3 should receive the broadcast
625+
assert_eq!(
626+
broadcast_targets.len(),
627+
2,
628+
"Should exclude sender from broadcast targets"
629+
);
630+
assert!(
631+
broadcast_targets
632+
.iter()
633+
.any(|p| p.socket_addr() == Some(peer2.addr)),
634+
"peer2 should be in broadcast targets"
635+
);
636+
assert!(
637+
broadcast_targets
638+
.iter()
639+
.any(|p| p.socket_addr() == Some(peer3.addr)),
640+
"peer3 should be in broadcast targets"
641+
);
642+
assert!(
643+
!broadcast_targets
644+
.iter()
645+
.any(|p| p.socket_addr() == Some(peer1.addr)),
646+
"sender (peer1) should NOT be in broadcast targets"
647+
);
648+
}
575649
}

crates/core/tests/operations.rs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -633,12 +633,22 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
633633
Ok(())
634634
}
635635

636-
// This test is disabled due to race conditions in subscription propagation logic.
636+
// This test validates the REMOTE subscription path (local subscribe → remote update):
637+
//
638+
// Architecture being tested:
639+
// - Client on Node A subscribes to a contract
640+
// - Contract is updated on Node B (or gateway)
641+
// - UPDATE propagates through the network to Node A
642+
// - Node A's upsert_contract_state() is called with the new state
643+
// - This triggers send_update_notification() which delivers to the local subscriber
644+
//
645+
// Combined with test_get_with_subscribe_flag (which tests local subscribe → local update),
646+
// this provides full coverage of the executor notification delivery paths.
647+
//
648+
// NOTE: This test is disabled due to race conditions in subscription propagation logic.
637649
// The test expects multiple clients across different nodes to receive subscription updates,
638650
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
639-
// Re-enabled after recent fixes to subscription logic - previously exhibited race conditions.
640-
// If this test becomes flaky again, see issue #1798 for historical context.
641-
// Ignored again due to recurring flakiness - fails intermittently with timeout waiting for
651+
// Ignored due to recurring flakiness - fails intermittently with timeout waiting for
642652
// cross-node subscription notifications (Client 3 timeout). See issue #1798.
643653
#[ignore]
644654
#[freenet_test(
@@ -1222,6 +1232,19 @@ async fn test_multiple_clients_subscription(ctx: &mut TestContext) -> TestResult
12221232
tokio_worker_threads = 4
12231233
)]
12241234
async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
1235+
// This test validates the LOCAL subscription path (Issue #2075 decoupling):
1236+
//
1237+
// Architecture being tested:
1238+
// - Both clients connect to the SAME node (node-a) via websocket
1239+
// - Client 2 subscribes via GET with subscribe=true
1240+
// - The subscription is registered LOCALLY via register_contract_notifier()
1241+
// in the executor, NOT via network peer registration in ring.seeding_manager
1242+
// - When Client 1 updates the contract, the update notification is delivered
1243+
// directly through the executor's notification channels (send_update_notification)
1244+
//
1245+
// This test confirms that local subscriptions work independently of network
1246+
// subscription propagation - no remote peer registration is required.
1247+
12251248
// Load test contract
12261249
const TEST_CONTRACT: &str = "test-contract-integration";
12271250
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
@@ -1306,6 +1329,15 @@ async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
13061329
}
13071330
}
13081331

1332+
// At this point, Client 2's subscription is registered LOCALLY in the executor
1333+
// via register_contract_notifier(). The subscription is NOT registered in the
1334+
// network's ring.seeding_manager.subscribers - that's only for remote peer subscriptions.
1335+
// This validates the decoupled architecture from Issue #2075.
1336+
tracing::info!(
1337+
"Client 2: Local subscription registered via GET with subscribe=true - \
1338+
notification delivery will use executor channels, not network broadcast"
1339+
);
1340+
13091341
// Create a new to-do list by deserializing the current state, adding a task, and serializing it back
13101342
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
13111343
.unwrap_or_else(|_| test_utils::TodoList {
@@ -1431,11 +1463,21 @@ async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
14311463
tokio::time::sleep(Duration::from_millis(100)).await;
14321464
}
14331465

1434-
// Assert that client 1 received the notification (proving auto-subscribe worked)
1466+
// Assert that Client 2 received the notification, proving that:
1467+
// 1. Local subscription via GET with subscribe=true works correctly
1468+
// 2. The executor's send_update_notification() delivers to local subscribers
1469+
// 3. Network peer registration is NOT required for same-node subscriptions
14351470
assert!(
1436-
client2_node_a_received_notification,
1437-
"Client 2 did not receive update notification within timeout period (auto-subscribe via GET failed)"
1438-
);
1471+
client2_node_a_received_notification,
1472+
"Client 2 did not receive update notification - local subscription path failed. \
1473+
This validates that executor notification channels work independently of network \
1474+
subscription propagation (Issue #2075 decoupling)."
1475+
);
1476+
1477+
tracing::info!(
1478+
"SUCCESS: Local subscription delivered update via executor channels - \
1479+
no network registration was required"
1480+
);
14391481

14401482
Ok(())
14411483
}

0 commit comments

Comments
 (0)