Skip to content

Commit 30854f1

Browse files
sanityclaude
andauthored
feat: implement proximity-based update forwarding (#2228)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 52d76c9 commit 30854f1

File tree

12 files changed

+651
-19
lines changed

12 files changed

+651
-19
lines changed

crates/core/src/message.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use crate::{
1616
},
1717
ring::{Location, PeerKeyLocation},
1818
};
19-
use freenet_stdlib::prelude::{ContractContainer, ContractKey, DelegateKey, WrappedState};
19+
use freenet_stdlib::prelude::{
20+
ContractContainer, ContractInstanceId, ContractKey, DelegateKey, WrappedState,
21+
};
2022
pub(crate) use sealed_msg_type::{TransactionType, TransactionTypeId};
2123
use serde::{Deserialize, Serialize};
2224
use ulid::Ulid;
@@ -333,6 +335,30 @@ pub(crate) enum NetMessageV1 {
333335
},
334336
Update(UpdateMsg),
335337
Aborted(Transaction),
338+
/// Proximity cache protocol message for tracking which neighbors cache which contracts.
339+
ProximityCache {
340+
message: ProximityCacheMessage,
341+
},
342+
}
343+
344+
/// Messages for the proximity cache protocol.
345+
///
346+
/// This protocol allows neighbors to inform each other which contracts they have cached,
347+
/// enabling UPDATE forwarding to seeders who may not be explicitly subscribed.
348+
#[derive(Debug, Clone, Serialize, Deserialize)]
349+
#[allow(clippy::enum_variant_names)]
350+
pub enum ProximityCacheMessage {
351+
/// Announce changes to our cached contracts.
352+
CacheAnnounce {
353+
/// Contracts we've started caching.
354+
added: Vec<ContractInstanceId>,
355+
/// Contracts we've stopped caching.
356+
removed: Vec<ContractInstanceId>,
357+
},
358+
/// Request a neighbor's full cache state (used on new connections).
359+
CacheStateRequest,
360+
/// Response with the neighbor's full cache state.
361+
CacheStateResponse { contracts: Vec<ContractInstanceId> },
336362
}
337363

338364
trait Versioned {
@@ -357,6 +383,7 @@ impl Versioned for NetMessageV1 {
357383
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
358384
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
359385
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
386+
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
360387
}
361388
}
362389
}
@@ -427,6 +454,10 @@ pub(crate) enum NodeEvent {
427454
ExpectPeerConnection {
428455
addr: SocketAddr,
429456
},
457+
/// Broadcast a proximity cache message to all connected peers.
458+
BroadcastProximityCache {
459+
message: ProximityCacheMessage,
460+
},
430461
}
431462

432463
#[derive(Debug, Clone)]
@@ -506,6 +537,9 @@ impl Display for NodeEvent {
506537
NodeEvent::ExpectPeerConnection { addr } => {
507538
write!(f, "ExpectPeerConnection (from {addr})")
508539
}
540+
NodeEvent::BroadcastProximityCache { message } => {
541+
write!(f, "BroadcastProximityCache ({message:?})")
542+
}
509543
}
510544
}
511545
}
@@ -540,6 +574,7 @@ impl MessageStats for NetMessageV1 {
540574
NetMessageV1::Update(op) => op.id(),
541575
NetMessageV1::Aborted(tx) => tx,
542576
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
577+
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
543578
}
544579
}
545580

@@ -552,6 +587,7 @@ impl MessageStats for NetMessageV1 {
552587
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
553588
NetMessageV1::Aborted(_) => None,
554589
NetMessageV1::Unsubscribed { .. } => None,
590+
NetMessageV1::ProximityCache { .. } => None,
555591
}
556592
}
557593

@@ -564,6 +600,7 @@ impl MessageStats for NetMessageV1 {
564600
NetMessageV1::Update(op) => op.requested_location(),
565601
NetMessageV1::Aborted(_) => None,
566602
NetMessageV1::Unsubscribed { .. } => None,
603+
NetMessageV1::ProximityCache { .. } => None,
567604
}
568605
}
569606
}
@@ -583,6 +620,9 @@ impl Display for NetMessage {
583620
Unsubscribed { key, from, .. } => {
584621
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
585622
}
623+
ProximityCache { message } => {
624+
write!(f, "ProximityCache {{ {message:?} }}")?;
625+
}
586626
},
587627
};
588628
write!(f, "}}")

crates/core/src/node/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ mod message_processor;
6868
mod network_bridge;
6969
mod op_state_manager;
7070
mod p2p_impl;
71+
pub(crate) mod proximity_cache;
7172
mod request_router;
7273
pub(crate) mod testing_impl;
7374

@@ -864,6 +865,13 @@ async fn process_message_v1<CB>(
864865
op_manager.ring.remove_subscriber(key, &peer_id);
865866
break;
866867
}
868+
NetMessageV1::ProximityCache { .. } => {
869+
// Legacy path doesn't have source_addr - ProximityCache requires connection-based routing
870+
tracing::warn!(
871+
"ProximityCache message received via legacy path (no source address)"
872+
);
873+
break;
874+
}
867875
_ => break, // Exit the loop if no applicable message type is found
868876
}
869877
}
@@ -1105,6 +1113,30 @@ where
11051113
op_manager.ring.remove_subscriber(key, &peer_id);
11061114
break;
11071115
}
1116+
NetMessageV1::ProximityCache { ref message } => {
1117+
let Some(source) = source_addr else {
1118+
tracing::warn!(
1119+
"Received ProximityCache message without source address (pure network)"
1120+
);
1121+
break;
1122+
};
1123+
tracing::debug!(
1124+
from = %source,
1125+
"Processing ProximityCache message (pure network)"
1126+
);
1127+
if let Some(response) = op_manager
1128+
.proximity_cache
1129+
.handle_message(source, message.clone())
1130+
{
1131+
// Send response back to sender
1132+
let response_msg =
1133+
NetMessage::V1(NetMessageV1::ProximityCache { message: response });
1134+
if let Err(err) = conn_manager.send(source, response_msg).await {
1135+
tracing::error!(%err, %source, "Failed to send ProximityCache response");
1136+
}
1137+
}
1138+
break;
1139+
}
11081140
_ => break, // Exit the loop if no applicable message type is found
11091141
}
11101142
}

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,12 @@ impl P2pConnManager {
799799
peer.pub_key().clone(),
800800
))
801801
.await;
802+
803+
// Clean up proximity cache for disconnected peer
804+
ctx.bridge
805+
.op_manager
806+
.proximity_cache
807+
.on_peer_disconnected(&peer_addr);
802808
if let Some(conn) = ctx.connections.remove(&peer_addr) {
803809
// Also remove from reverse lookup
804810
if let Some(pub_key) = pub_key_to_remove {
@@ -1271,6 +1277,27 @@ impl P2pConnManager {
12711277
}
12721278
}
12731279
}
1280+
NodeEvent::BroadcastProximityCache { message } => {
1281+
// Broadcast ProximityCache message to all connected peers
1282+
tracing::debug!(
1283+
?message,
1284+
peer_count = ctx.connections.len(),
1285+
"Broadcasting ProximityCache message to connected peers"
1286+
);
1287+
1288+
let msg = crate::message::NetMessage::V1(
1289+
crate::message::NetMessageV1::ProximityCache {
1290+
message: message.clone(),
1291+
},
1292+
);
1293+
1294+
for peer_addr in ctx.connections.keys() {
1295+
tracing::debug!(%peer_addr, "Sending ProximityCache to peer");
1296+
if let Err(e) = ctx.bridge.send(*peer_addr, msg.clone()).await {
1297+
tracing::warn!(%peer_addr, "Failed to send ProximityCache: {e}");
1298+
}
1299+
}
1300+
}
12741301
NodeEvent::Disconnect { cause } => {
12751302
tracing::info!(
12761303
"Disconnecting from network{}",

crates/core/src/node/op_state_manager.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use crate::{
3232
ring::{ConnectionManager, LiveTransactionTracker, PeerKeyLocation, Ring},
3333
};
3434

35-
use super::{network_bridge::EventLoopNotificationsSender, NetEventRegister, NodeConfig};
35+
use super::{
36+
network_bridge::EventLoopNotificationsSender, proximity_cache::ProximityCacheManager,
37+
NetEventRegister, NodeConfig,
38+
};
3639

3740
#[cfg(debug_assertions)]
3841
macro_rules! check_id_op {
@@ -213,6 +216,8 @@ pub(crate) struct OpManager {
213216
pub is_gateway: bool,
214217
/// Sub-operation tracking for atomic operation execution
215218
sub_op_tracker: SubOperationTracker,
219+
/// Proximity cache manager for tracking neighbor contract caches
220+
pub proximity_cache: Arc<ProximityCacheManager>,
216221
}
217222

218223
impl Clone for OpManager {
@@ -228,6 +233,7 @@ impl Clone for OpManager {
228233
peer_ready: self.peer_ready.clone(),
229234
is_gateway: self.is_gateway,
230235
sub_op_tracker: self.sub_op_tracker.clone(),
236+
proximity_cache: self.proximity_cache.clone(),
231237
}
232238
}
233239
}
@@ -284,6 +290,8 @@ impl OpManager {
284290
tracing::debug!("Regular peer node: peer_ready will be set after first handshake");
285291
}
286292

293+
let proximity_cache = Arc::new(ProximityCacheManager::new());
294+
287295
Ok(Self {
288296
ring,
289297
ops,
@@ -295,6 +303,7 @@ impl OpManager {
295303
peer_ready,
296304
is_gateway,
297305
sub_op_tracker,
306+
proximity_cache,
298307
})
299308
}
300309

0 commit comments

Comments
 (0)