Skip to content

Commit 7f82c45

Browse files
sanityclaude
andauthored
feat(ring): redesign seeding logic with LRU byte-budget cache (#2232)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9f99030 commit 7f82c45

File tree

7 files changed

+687
-209
lines changed

7 files changed

+687
-209
lines changed

crates/core/src/node/testing_impl/in_memory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ where
104104
use crate::contract::ContractHandlerEvent;
105105
for (contract, state, subscription) in contracts {
106106
let key: ContractKey = contract.key();
107+
let state_size = state.size() as u64;
107108
self.op_manager
108109
.notify_contract_handler(ContractHandlerEvent::PutQuery {
109110
key,
@@ -122,7 +123,7 @@ where
122123
.unwrap()
123124
);
124125
if subscription {
125-
self.op_manager.ring.seed_contract(key);
126+
self.op_manager.ring.seed_contract(key, state_size);
126127
}
127128
if let Some(subscribers) = contract_subscribers.get(&key) {
128129
// add contract subscribers (test setup - no upstream_addr)

crates/core/src/operations/get.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,12 +1235,8 @@ impl Operation for GetOp {
12351235
false
12361236
};
12371237

1238-
// Determine if we should put the contract locally
1239-
let should_put = if is_original_requester && subscribe_requested {
1240-
true
1241-
} else {
1242-
op_manager.ring.should_seed(&key)
1243-
};
1238+
// Always cache contracts we encounter - LRU will handle eviction
1239+
let should_put = true;
12441240

12451241
// Put contract locally if needed
12461242
if should_put {
@@ -1277,7 +1273,7 @@ impl Operation for GetOp {
12771273
// State already cached and identical, mark as seeded if needed
12781274
if !op_manager.ring.is_seeding_contract(&key) {
12791275
tracing::debug!(tx = %id, %key, "Marking contract as seeded");
1280-
op_manager.ring.seed_contract(key);
1276+
op_manager.ring.record_get_access(key, value.size() as u64);
12811277
super::announce_contract_cached(op_manager, &key).await;
12821278
let child_tx =
12831279
super::start_subscription_request(op_manager, id, key);
@@ -1303,7 +1299,7 @@ impl Operation for GetOp {
13031299
// Start subscription if not already seeding
13041300
if !is_subscribed_contract {
13051301
tracing::debug!(tx = %id, %key, peer = ?op_manager.ring.connection_manager.get_own_addr(), "Contract not cached @ peer, caching");
1306-
op_manager.ring.seed_contract(key);
1302+
op_manager.ring.record_get_access(key, value.size() as u64);
13071303
super::announce_contract_cached(op_manager, &key).await;
13081304

13091305
let child_tx =

crates/core/src/operations/put.rs

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::node::IsOperationCompleted;
1717
use crate::{
1818
client_events::HostResult,
1919
contract::ContractHandlerEvent,
20-
message::{InnerMessage, NetMessage, NetMessageV1, Transaction},
20+
message::{InnerMessage, NetMessage, Transaction},
2121
node::{NetworkBridge, OpManager},
2222
ring::{Location, PeerKeyLocation},
2323
};
@@ -220,15 +220,8 @@ impl Operation for PutOp {
220220
_ => false,
221221
};
222222

223-
// Check if we're the initiator of this PUT operation
224-
// We only cache locally when either WE initiate the PUT, or when forwarding just of the peer should be seeding
225-
let should_seed = match &self.state {
226-
Some(PutState::PrepareRequest { .. }) => true,
227-
Some(PutState::AwaitingResponse { upstream, .. }) => {
228-
upstream.is_none() || op_manager.ring.should_seed(&key)
229-
}
230-
_ => op_manager.ring.should_seed(&key),
231-
};
223+
// Always cache contracts we encounter - LRU will handle eviction
224+
let should_seed = true;
232225

233226
let modified_value = if should_seed {
234227
// Cache locally when initiating a PUT. This ensures:
@@ -270,7 +263,7 @@ impl Operation for PutOp {
270263

271264
// Mark as seeded locally if not already
272265
if !is_already_seeding {
273-
op_manager.ring.seed_contract(key);
266+
op_manager.ring.seed_contract(key, value.size() as u64);
274267
super::announce_contract_cached(op_manager, &key).await;
275268
tracing::debug!(
276269
tx = %id,
@@ -415,8 +408,8 @@ impl Operation for PutOp {
415408
// Get the contract key and check if we should handle it
416409
let key = contract.key();
417410
let is_subscribed_contract = op_manager.ring.is_seeding_contract(&key);
418-
let should_seed = op_manager.ring.should_seed(&key);
419-
let should_handle_locally = !is_subscribed_contract && should_seed;
411+
// Always cache contracts - LRU handles eviction
412+
let should_handle_locally = !is_subscribed_contract;
420413

421414
tracing::debug!(
422415
tx = %id,
@@ -481,7 +474,7 @@ impl Operation for PutOp {
481474

482475
let child_tx = super::start_subscription_request(op_manager, *id, key);
483476
tracing::debug!(tx = %id, %child_tx, "started subscription as child operation");
484-
op_manager.ring.seed_contract(key);
477+
op_manager.ring.seed_contract(key, value.size() as u64);
485478
super::announce_contract_cached(op_manager, &key).await;
486479

487480
true
@@ -729,7 +722,7 @@ impl Operation for PutOp {
729722
peer = %op_manager.ring.connection_manager.own_location(),
730723
"Adding contract to local seed list"
731724
);
732-
op_manager.ring.seed_contract(key);
725+
op_manager.ring.seed_contract(key, state.size() as u64);
733726
super::announce_contract_cached(op_manager, &key).await;
734727
} else {
735728
tracing::debug!(
@@ -836,8 +829,8 @@ impl Operation for PutOp {
836829
let key = contract.key();
837830
let peer_loc = op_manager.ring.connection_manager.own_location();
838831
let is_seeding_contract = op_manager.ring.is_seeding_contract(&key);
839-
let should_seed = op_manager.ring.should_seed(&key);
840-
let should_handle_locally = should_seed && !is_seeding_contract;
832+
// Always cache contracts - LRU handles eviction
833+
let should_handle_locally = !is_seeding_contract;
841834

842835
tracing::debug!(
843836
tx = %id,
@@ -903,35 +896,12 @@ impl Operation for PutOp {
903896
.await?;
904897
}
905898

906-
// Start subscription and handle dropped contracts
907-
let (dropped_contract, old_subscribers) = {
908-
let child_tx = super::start_subscription_request(op_manager, *id, key);
909-
tracing::debug!(tx = %id, %child_tx, "started subscription as child operation");
910-
let result = op_manager.ring.seed_contract(key);
911-
super::announce_contract_cached(op_manager, &key).await;
912-
result
913-
};
914-
915-
// Notify subscribers of dropped contracts
916-
if let Some(dropped_key) = dropped_contract {
917-
for subscriber in old_subscribers {
918-
if let Some(addr) = subscriber.socket_addr() {
919-
conn_manager
920-
.send(
921-
addr,
922-
NetMessage::V1(NetMessageV1::Unsubscribed {
923-
transaction: Transaction::new::<PutMsg>(),
924-
key: dropped_key,
925-
from: op_manager
926-
.ring
927-
.connection_manager
928-
.own_location(),
929-
}),
930-
)
931-
.await?;
932-
}
933-
}
934-
}
899+
// Start subscription and record cache access
900+
let child_tx = super::start_subscription_request(op_manager, *id, key);
901+
tracing::debug!(tx = %id, %child_tx, "started subscription as child operation");
902+
let _evicted = op_manager.ring.seed_contract(key, new_value.size() as u64);
903+
super::announce_contract_cached(op_manager, &key).await;
904+
// Note: Evicted contracts are handled by SeedingManager (subscribers cleaned up internally)
935905
} else if last_hop && !already_put {
936906
// Last hop but not handling locally, still need to put
937907
put_contract(
@@ -1277,7 +1247,9 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
12771247
peer = %op_manager.ring.connection_manager.own_location(),
12781248
"Adding contract to local seed list"
12791249
);
1280-
op_manager.ring.seed_contract(key);
1250+
op_manager
1251+
.ring
1252+
.seed_contract(key, updated_value.size() as u64);
12811253
super::announce_contract_cached(op_manager, &key).await;
12821254

12831255
// Determine which peers need to be notified and broadcast the update

crates/core/src/operations/test_utils.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,22 +136,25 @@ impl MockRing {
136136
&self.own_location
137137
}
138138

139-
pub fn should_seed(&self, _key: &ContractKey) -> bool {
140-
// In tests, always willing to seed
141-
true
142-
}
143-
144139
pub fn is_seeding_contract(&self, key: &ContractKey) -> bool {
145140
self.seeding_contracts.lock().unwrap().contains(key)
146141
}
147142

148-
pub fn seed_contract(&self, key: ContractKey) {
143+
pub fn seed_contract(&self, key: ContractKey, _size_bytes: u64) {
149144
let mut seeding = self.seeding_contracts.lock().unwrap();
150145
if !seeding.contains(&key) {
151146
seeding.push(key);
152147
}
153148
}
154149

150+
pub fn record_get_access(&self, key: ContractKey, size_bytes: u64) {
151+
self.seed_contract(key, size_bytes);
152+
}
153+
154+
pub fn record_subscribe_access(&self, key: ContractKey, size_bytes: u64) {
155+
self.seed_contract(key, size_bytes);
156+
}
157+
155158
/// Simulates k_closest_potentially_caching
156159
pub fn k_closest_potentially_caching(
157160
&self,
@@ -274,7 +277,7 @@ mod tests {
274277
let key = make_contract_key(1);
275278
assert!(!ring.is_seeding_contract(&key));
276279

277-
ring.seed_contract(key);
280+
ring.seed_contract(key, 100);
278281
assert!(ring.is_seeding_contract(&key));
279282
}
280283

crates/core/src/ring/mod.rs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,8 @@ mod connection;
3737
mod live_tx;
3838
mod location;
3939
mod peer_key_location;
40-
mod score;
4140
mod seeding;
42-
43-
use self::score::Score;
41+
mod seeding_cache;
4442

4543
pub use self::live_tx::LiveTransactionTracker;
4644
pub use connection::Connection;
@@ -182,30 +180,33 @@ impl Ring {
182180
}
183181
}
184182

185-
/// Return if a contract is within appropiate seeding distance.
186-
pub fn should_seed(&self, key: &ContractKey) -> bool {
187-
match self.connection_manager.own_location().location() {
188-
Some(own_loc) => self.seeding_manager.should_seed(key, own_loc),
189-
None => {
190-
tracing::debug!(
191-
"should_seed: own location not yet available; deferring seeding decision"
192-
);
193-
false
194-
}
195-
}
183+
/// Record an access to a contract (GET, PUT, or SUBSCRIBE).
184+
///
185+
/// This adds the contract to the seeding cache if not present, or refreshes
186+
/// its LRU position if already cached. Returns the list of evicted contracts
187+
/// that need cleanup (unsubscription, state removal, etc.).
188+
///
189+
/// The `size_bytes` should be the size of the contract state.
190+
pub fn seed_contract(&self, key: ContractKey, size_bytes: u64) -> Vec<ContractKey> {
191+
use seeding_cache::AccessType;
192+
self.seeding_manager
193+
.record_contract_access(key, size_bytes, AccessType::Put)
196194
}
197195

198-
/// Add a new subscription for this peer.
199-
pub fn seed_contract(&self, key: ContractKey) -> (Option<ContractKey>, Vec<PeerKeyLocation>) {
200-
match self.connection_manager.own_location().location() {
201-
Some(own_loc) => self.seeding_manager.seed_contract(key, own_loc),
202-
None => {
203-
tracing::debug!(
204-
"seed_contract: own location not yet available; skipping seeding for now"
205-
);
206-
(None, Vec::new())
207-
}
208-
}
196+
/// Record a GET access to a contract.
197+
pub fn record_get_access(&self, key: ContractKey, size_bytes: u64) -> Vec<ContractKey> {
198+
use seeding_cache::AccessType;
199+
self.seeding_manager
200+
.record_contract_access(key, size_bytes, AccessType::Get)
201+
}
202+
203+
/// Record a subscribe access for a contract (for future use when subscribe
204+
/// operations directly record access rather than delegating to GET).
205+
#[allow(dead_code)]
206+
pub fn record_subscribe_access(&self, key: ContractKey, size_bytes: u64) -> Vec<ContractKey> {
207+
use seeding_cache::AccessType;
208+
self.seeding_manager
209+
.record_contract_access(key, size_bytes, AccessType::Subscribe)
209210
}
210211

211212
/// Whether this node already is seeding to this contract or not.
@@ -214,6 +215,12 @@ impl Ring {
214215
self.seeding_manager.is_seeding_contract(key)
215216
}
216217

218+
/// Remove a contract from the seeding cache (for future use in cleanup paths).
219+
#[allow(dead_code)]
220+
pub fn remove_seeded_contract(&self, key: &ContractKey) -> bool {
221+
self.seeding_manager.remove_seeded_contract(key)
222+
}
223+
217224
pub fn record_request(
218225
&self,
219226
recipient: PeerKeyLocation,

0 commit comments

Comments
 (0)