Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Client {
Ok(Self {
inner,
rpc,
cache: QueryCache::new(),
cache: QueryCache::with_network(url),
dry_run: false,
url: url.to_string(),
})
Expand All @@ -133,7 +133,7 @@ impl Client {
let fresh = Self::connect_once(&self.url).await?;
self.inner = fresh.inner;
self.rpc = fresh.rpc;
self.cache = QueryCache::new();
self.cache = fresh.cache;
Ok(())
}

Expand Down
58 changes: 50 additions & 8 deletions src/queries/query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct QueryCache {
neurons_lite: Cache<u16, Arc<Vec<NeuronInfoLite>>>,
/// Whether to use the disk cache layer. Disabled for tests with custom TTLs.
use_disk: bool,
/// Optional network identifier for scoping disk cache keys.
/// Prevents cache contamination when switching between networks (finney vs testnet).
network_id: Option<String>,
}

impl QueryCache {
Expand All @@ -56,13 +59,48 @@ impl QueryCache {

/// Internal constructor with explicit disk cache control. Also used in tests.
pub(crate) fn with_ttl_and_disk(ttl: Duration, use_disk: bool) -> Self {
Self::with_ttl_disk_and_network(ttl, use_disk, None)
}

/// Constructor with network scoping for disk cache keys.
/// Prevents cross-network cache contamination (e.g., finney vs testnet).
pub fn with_network(network_id: impl Into<String>) -> Self {
let network_id: String = network_id.into();
let sanitized: String = network_id
.replace("/", "_")
.replace(":", "_")
.replace("?", "_")
.replace("&", "_");
Self::with_ttl_disk_and_network(
Duration::from_secs(DEFAULT_TTL_SECS),
true,
Some(sanitized),
)
}

/// Most flexible constructor: TTL, disk cache, and network scoping.
fn with_ttl_disk_and_network(
ttl: Duration,
use_disk: bool,
network_id: Option<String>,
) -> Self {
Self {
subnets: Cache::builder().time_to_live(ttl).max_capacity(1).build(),
all_dynamic: Cache::builder().time_to_live(ttl).max_capacity(1).build(),
dynamic_by_netuid: Cache::builder().time_to_live(ttl).max_capacity(100).build(),
delegates: Cache::builder().time_to_live(ttl).max_capacity(1).build(),
neurons_lite: Cache::builder().time_to_live(ttl).max_capacity(100).build(),
use_disk,
network_id,
}
}

/// Build disk cache key with optional network scoping.
/// e.g., "all_subnets" -> "wss___entrypoint_finney_all_subnets"
fn scoped_disk_key(&self, base_key: &str) -> String {
match &self.network_id {
Some(id) => format!("{}_{}", id, base_key),
None => base_key.to_string(),
}
}

Expand All @@ -75,11 +113,12 @@ impl QueryCache {
Fut: std::future::Future<Output = anyhow::Result<Vec<SubnetInfo>>>,
{
let disk = self.use_disk;
let disk_key = self.scoped_disk_key("all_subnets");
self.subnets
.try_get_with((), async {
// Check disk cache before hitting chain
if disk {
if let Some(cached) = super::disk_cache::get::<Vec<SubnetInfo>>("all_subnets", DISK_TTL) {
if let Some(cached) = super::disk_cache::get::<Vec<SubnetInfo>>(&disk_key, DISK_TTL) {
tracing::debug!(count = cached.len(), "cache hit: all_subnets (disk)");
return Ok(Arc::new(cached)) as anyhow::Result<_>;
}
Expand All @@ -91,7 +130,7 @@ impl QueryCache {
tracing::debug!(elapsed_ms = start.elapsed().as_millis() as u64, count = data.len(), "fetched all_subnets");
// Write through to disk cache (best-effort)
if disk {
if let Err(e) = super::disk_cache::put("all_subnets", &data) {
if let Err(e) = super::disk_cache::put(&disk_key, &data) {
tracing::warn!(error = %e, "failed to write all_subnets to disk cache");
}
}
Expand All @@ -100,7 +139,7 @@ impl QueryCache {
Err(e) => {
// Stale-while-error: serve expired disk cache on chain failure
if disk {
if let Some(stale) = super::disk_cache::get_stale::<Vec<SubnetInfo>>("all_subnets") {
if let Some(stale) = super::disk_cache::get_stale::<Vec<SubnetInfo>>(&disk_key) {
tracing::warn!(count = stale.len(), error = %e, "chain fetch failed, serving stale all_subnets from disk cache");
return Ok(Arc::new(stale));
}
Expand All @@ -126,11 +165,12 @@ impl QueryCache {
{
let per_netuid = self.dynamic_by_netuid.clone();
let disk = self.use_disk;
let disk_key = self.scoped_disk_key("all_dynamic_info");
self.all_dynamic
.try_get_with((), async {
// Check disk cache before hitting chain
if disk {
if let Some(cached) = super::disk_cache::get::<Vec<DynamicInfo>>("all_dynamic_info", DISK_TTL) {
if let Some(cached) = super::disk_cache::get::<Vec<DynamicInfo>>(&disk_key, DISK_TTL) {
tracing::debug!(count = cached.len(), "cache hit: all_dynamic_info (disk)");
let data = Arc::new(cached);
// Also populate per-netuid in-memory cache
Expand All @@ -147,7 +187,7 @@ impl QueryCache {
tracing::debug!(elapsed_ms = start.elapsed().as_millis() as u64, count = data.len(), "fetched all_dynamic_info");
// Write through to disk cache (best-effort)
if disk {
if let Err(e) = super::disk_cache::put("all_dynamic_info", &data) {
if let Err(e) = super::disk_cache::put(&disk_key, &data) {
tracing::warn!(error = %e, "failed to write all_dynamic_info to disk cache");
}
}
Expand All @@ -163,7 +203,7 @@ impl QueryCache {
Err(e) => {
// Stale-while-error: serve expired disk cache on chain failure
if disk {
if let Some(stale) = super::disk_cache::get_stale::<Vec<DynamicInfo>>("all_dynamic_info") {
if let Some(stale) = super::disk_cache::get_stale::<Vec<DynamicInfo>>(&disk_key) {
tracing::warn!(count = stale.len(), error = %e, "chain fetch failed, serving stale all_dynamic_info from disk cache");
let data = Arc::new(stale);
for d in data.iter() {
Expand Down Expand Up @@ -278,8 +318,10 @@ impl QueryCache {
self.delegates.invalidate_all();
self.neurons_lite.invalidate_all();
if self.use_disk {
super::disk_cache::remove("all_subnets");
super::disk_cache::remove("all_dynamic_info");
let subnets_key = self.scoped_disk_key("all_subnets");
let dynamic_key = self.scoped_disk_key("all_dynamic_info");
super::disk_cache::remove(&subnets_key);
super::disk_cache::remove(&dynamic_key);
}
}
}
Expand Down