Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 25 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ aws-sdk-s3 = "1"
aws-config = "1"
urlencoding = "2"
url = "2"
httpdate = "1"
semver = "1"
prometheus = { version = "0.14", features = ["process"] }

Expand Down
58 changes: 51 additions & 7 deletions src/modules/cache/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use tracing::debug;
struct Meta {
content_type: String,
created_at: u64,
/// Per-entry TTL in seconds. None means use the DiskCache instance default.
#[serde(default)]
ttl_secs: Option<u64>,
}

/// Persistent on-disk cache stored as sharded flat files.
Expand Down Expand Up @@ -72,10 +75,11 @@ impl DiskCache {
let meta: Meta = serde_json::from_slice(&meta_bytes)?;
let now = now_unix();
let age = now.saturating_sub(meta.created_at);
let expired = if self.ttl_secs == 0 {
let effective_ttl = meta.ttl_secs.unwrap_or(self.ttl_secs);
let expired = if effective_ttl == 0 {
true
} else {
age >= self.ttl_secs
age >= effective_ttl
};
if expired {
debug!(key = %key, age_secs = age, ttl_secs = self.ttl_secs, path = %meta_path.display(), "disk cache TTL expired - removing entry");
Expand All @@ -93,12 +97,13 @@ impl DiskCache {
}

#[tracing::instrument(skip(self, entry), fields(key = %key, bytes = entry.bytes.len()))]
pub async fn set(&self, key: &str, entry: CacheEntry) -> Result<()> {
pub async fn set(&self, key: &str, entry: CacheEntry, ttl_override: Option<u64>) -> Result<()> {
let shard = self.shard_dir(key);
fs::create_dir_all(&shard).await?;
let meta = Meta {
content_type: entry.content_type,
created_at: now_unix(),
ttl_secs: ttl_override,
};
let bin_path = self.bin_path(key);
debug!(key = %key, bytes = entry.bytes.len(), path = %bin_path.display(), "writing entry to disk cache");
Expand Down Expand Up @@ -138,10 +143,11 @@ impl DiskCache {
};
if let Ok(meta) = serde_json::from_slice::<Meta>(&meta_bytes) {
let age = now.saturating_sub(meta.created_at);
let stale = if self.ttl_secs == 0 {
let effective_ttl = meta.ttl_secs.unwrap_or(self.ttl_secs);
let stale = if effective_ttl == 0 {
true
} else {
age >= self.ttl_secs
age >= effective_ttl
};
if stale {
debug!(key = %key, age_secs = age, ttl_secs = self.ttl_secs, "disk cleanup: evicting TTL-expired entry");
Expand Down Expand Up @@ -200,7 +206,7 @@ mod tests {
bytes: vec![1, 2, 3],
content_type: "image/png".to_string(),
};
disk.set("abc123def456", entry.clone()).await.unwrap();
disk.set("abc123def456", entry.clone(), None).await.unwrap();
let result = disk.get("abc123def456").await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().bytes, vec![1, 2, 3]);
Expand All @@ -214,7 +220,7 @@ mod tests {
bytes: vec![1],
content_type: "image/png".to_string(),
};
disk.set("stalekey0011", entry).await.unwrap();
disk.set("stalekey0011", entry, None).await.unwrap();
// Even with 0s TTL, entry is stale immediately
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let result = disk.get("stalekey0011").await.unwrap();
Expand All @@ -227,4 +233,42 @@ mod tests {
let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 86400, None);
assert!(disk.get("nonexistent00").await.unwrap().is_none());
}

#[tokio::test]
async fn test_per_entry_ttl_override() {
let dir = TempDir::new().unwrap();
// DiskCache default TTL = 1s, but entry stored with TTL = 86400s
let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 1, None);
let entry = CacheEntry {
bytes: vec![9, 8, 7],
content_type: "image/png".to_string(),
};
disk.set("aabbccdd0011", entry, Some(86400)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Should NOT be expired because per-entry TTL = 86400s
let result = disk.get("aabbccdd0011").await.unwrap();
assert!(
result.is_some(),
"entry with long per-entry TTL should still be live"
);
}

#[tokio::test]
async fn test_per_entry_ttl_short() {
let dir = TempDir::new().unwrap();
// DiskCache default TTL = 86400s, but entry stored with TTL = 0s
let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 86400, None);
let entry = CacheEntry {
bytes: vec![1],
content_type: "image/png".to_string(),
};
disk.set("aabbccdd0022", entry, Some(0)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Should be expired because per-entry TTL = 0s
let result = disk.get("aabbccdd0022").await.unwrap();
assert!(
result.is_none(),
"entry with TTL=0 should be expired immediately"
);
}
}
15 changes: 14 additions & 1 deletion src/modules/cache/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::modules::cache::{
disk::DiskCache,
inflight::InflightMap,
memory::{CacheEntry, MemoryCache},
origin::OriginCache,
};
use crate::modules::metrics::Metrics;
use sha2::{Digest, Sha256};
Expand All @@ -27,6 +28,7 @@ pub enum CacheHit {
pub struct CacheManager {
l1: MemoryCache,
pub l2: Arc<DiskCache>,
origin_l2: OriginCache,
inflight: InflightMap,
metrics: Arc<Metrics>,
}
Expand All @@ -42,9 +44,11 @@ impl CacheManager {
cfg.cache_disk_ttl_secs,
cfg.cache_disk_max_mb,
));
let origin_l2 = OriginCache::new(&cfg.cache_dir, cfg.cache_disk_ttl_secs);
Arc::new(Self {
l1,
l2,
origin_l2,
inflight: InflightMap::new(),
metrics,
})
Expand Down Expand Up @@ -97,7 +101,7 @@ impl CacheManager {
#[tracing::instrument(skip(self, entry), fields(key = %final_key, bytes = entry.bytes.len()))]
pub async fn set(&self, final_key: &str, entry: CacheEntry) {
debug!(key = %final_key, bytes = entry.bytes.len(), "storing entry to L1 and L2");
let _ = self.l2.set(final_key, entry.clone()).await;
let _ = self.l2.set(final_key, entry.clone(), None).await;
self.l1.set(final_key.to_string(), entry).await;
self.record_cache_sizes();
}
Expand All @@ -119,6 +123,14 @@ impl CacheManager {
// disk entry count is not tracked; DiskCache does not expose it
}

pub async fn get_origin(&self, url: &str) -> Option<CacheEntry> {
self.origin_l2.get(url).await
}

pub async fn set_origin(&self, url: &str, entry: CacheEntry, ttl_override: Option<u64>) {
self.origin_l2.set(url, entry, ttl_override).await;
}

pub fn inflight(&self) -> &InflightMap {
&self.inflight
}
Expand All @@ -144,5 +156,6 @@ impl CacheManager {

pub async fn run_cleanup(&self) {
let _ = self.l2.cleanup().await;
let _ = self.origin_l2.cleanup().await;
}
}
1 change: 1 addition & 0 deletions src/modules/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod disk;
pub mod inflight;
pub mod manager;
pub mod memory;
pub mod origin;

pub use memory::CacheEntry;
Loading