From 148617b06a8b77e9728d7e848811619c5a62b7dc Mon Sep 17 00:00:00 2001 From: ThinhHV Date: Thu, 9 Apr 2026 08:26:59 +0000 Subject: [PATCH] feat(cache): add L2 disk cache for original fetched files (HTTP/S3) --- Cargo.lock | 55 +++++----- Cargo.toml | 1 + src/modules/cache/disk.rs | 58 ++++++++-- src/modules/cache/manager.rs | 15 ++- src/modules/cache/mod.rs | 1 + src/modules/cache/origin.rs | 198 +++++++++++++++++++++++++++++++++++ src/modules/proxy/service.rs | 130 +++++++++++++++-------- 7 files changed, 376 insertions(+), 82 deletions(-) create mode 100644 src/modules/cache/origin.rs diff --git a/Cargo.lock b/Cargo.lock index d5972e8..7a32172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,9 +367,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.128.0" +version = "1.129.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99304b64672e0d81a3c100a589b93d9ef5e9c0ce12e21c848fd39e50f493c2a1" +checksum = "6d4e8410fadbc0ee453145dd77a4958227b18b05bf67c2795d0a8b8596c9aa0f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -940,9 +940,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.58" +version = "1.2.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e928d4b69e3077709075a938a05ffbedfa53a84c8f766efbf8220bb1ff60e1" +checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" dependencies = [ "find-msvc-tools", "jobserver", @@ -1526,9 +1526,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "fax" @@ -2269,9 +2269,9 @@ checksum = "e7c5cedc30da3a610cac6b4ba17597bdf7152cf974e8aab3afb3d54455e371c8" [[package]] name = "indexmap" -version = "2.13.0" +version = "2.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -3218,6 +3218,7 @@ dependencies = [ "hex", "hmac", "http-body-util", + "httpdate", "hyper 1.9.0", "image", "infer", @@ -3944,9 +3945,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" [[package]] name = "serde" @@ -4237,9 +4238,9 @@ dependencies = [ [[package]] name = "system-deps" -version = "7.0.7" +version = "7.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c8f33736f986f16d69b6cb8b03f55ddcad5c41acc4ccc39dd88e84aa805e7f" +checksum = "396a35feb67335377e0251fcbc1092fc85c484bd4e3a7a54319399da127796e7" dependencies = [ "cfg-expr", "heck", @@ -4419,9 +4420,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.50.0" +version = "1.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" dependencies = [ "bytes", "libc", @@ -4436,9 +4437,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -4480,9 +4481,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.12+spec-1.1.0" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" +checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ "indexmap", "serde_core", @@ -4490,14 +4491,14 @@ dependencies = [ "toml_datetime", "toml_parser", "toml_writer", - "winnow 0.7.15", + "winnow", ] [[package]] name = "toml_datetime" -version = "0.7.5+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] @@ -4508,7 +4509,7 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow 1.0.1", + "winnow", ] [[package]] @@ -5324,12 +5325,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" -[[package]] -name = "winnow" -version = "0.7.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" - [[package]] name = "winnow" version = "1.0.1" @@ -5449,9 +5444,9 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "xmlparser" diff --git a/Cargo.toml b/Cargo.toml index 3ca514f..fe36ca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ aws-sdk-s3 = "1" aws-config = "1" urlencoding = "2" url = "2" +httpdate = "1" semver = "1" prometheus = { version = "0.14", features = ["process"] } diff --git a/src/modules/cache/disk.rs b/src/modules/cache/disk.rs index 27f6793..2a26678 100644 --- a/src/modules/cache/disk.rs +++ b/src/modules/cache/disk.rs @@ -15,6 +15,9 @@ use tracing::debug; struct Meta { content_type: String, created_at: u64, + /// Per-entry TTL in seconds. None means use the DiskCache instance default. + #[serde(default)] + ttl_secs: Option, } /// Persistent on-disk cache stored as sharded flat files. @@ -72,10 +75,11 @@ impl DiskCache { let meta: Meta = serde_json::from_slice(&meta_bytes)?; let now = now_unix(); let age = now.saturating_sub(meta.created_at); - let expired = if self.ttl_secs == 0 { + let effective_ttl = meta.ttl_secs.unwrap_or(self.ttl_secs); + let expired = if effective_ttl == 0 { true } else { - age >= self.ttl_secs + age >= effective_ttl }; if expired { debug!(key = %key, age_secs = age, ttl_secs = self.ttl_secs, path = %meta_path.display(), "disk cache TTL expired - removing entry"); @@ -93,12 +97,13 @@ impl DiskCache { } #[tracing::instrument(skip(self, entry), fields(key = %key, bytes = entry.bytes.len()))] - pub async fn set(&self, key: &str, entry: CacheEntry) -> Result<()> { + pub async fn set(&self, key: &str, entry: CacheEntry, ttl_override: Option) -> Result<()> { let shard = self.shard_dir(key); fs::create_dir_all(&shard).await?; let meta = Meta { content_type: entry.content_type, created_at: now_unix(), + ttl_secs: ttl_override, }; let bin_path = self.bin_path(key); debug!(key = %key, bytes = entry.bytes.len(), path = %bin_path.display(), "writing entry to disk cache"); @@ -138,10 +143,11 @@ impl DiskCache { }; if let Ok(meta) = serde_json::from_slice::(&meta_bytes) { let age = now.saturating_sub(meta.created_at); - let stale = if self.ttl_secs == 0 { + let effective_ttl = meta.ttl_secs.unwrap_or(self.ttl_secs); + let stale = if effective_ttl == 0 { true } else { - age >= self.ttl_secs + age >= effective_ttl }; if stale { debug!(key = %key, age_secs = age, ttl_secs = self.ttl_secs, "disk cleanup: evicting TTL-expired entry"); @@ -200,7 +206,7 @@ mod tests { bytes: vec![1, 2, 3], content_type: "image/png".to_string(), }; - disk.set("abc123def456", entry.clone()).await.unwrap(); + disk.set("abc123def456", entry.clone(), None).await.unwrap(); let result = disk.get("abc123def456").await.unwrap(); assert!(result.is_some()); assert_eq!(result.unwrap().bytes, vec![1, 2, 3]); @@ -214,7 +220,7 @@ mod tests { bytes: vec![1], content_type: "image/png".to_string(), }; - disk.set("stalekey0011", entry).await.unwrap(); + disk.set("stalekey0011", entry, None).await.unwrap(); // Even with 0s TTL, entry is stale immediately tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; let result = disk.get("stalekey0011").await.unwrap(); @@ -227,4 +233,42 @@ mod tests { let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 86400, None); assert!(disk.get("nonexistent00").await.unwrap().is_none()); } + + #[tokio::test] + async fn test_per_entry_ttl_override() { + let dir = TempDir::new().unwrap(); + // DiskCache default TTL = 1s, but entry stored with TTL = 86400s + let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 1, None); + let entry = CacheEntry { + bytes: vec![9, 8, 7], + content_type: "image/png".to_string(), + }; + disk.set("aabbccdd0011", entry, Some(86400)).await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + // Should NOT be expired because per-entry TTL = 86400s + let result = disk.get("aabbccdd0011").await.unwrap(); + assert!( + result.is_some(), + "entry with long per-entry TTL should still be live" + ); + } + + #[tokio::test] + async fn test_per_entry_ttl_short() { + let dir = TempDir::new().unwrap(); + // DiskCache default TTL = 86400s, but entry stored with TTL = 0s + let disk = DiskCache::new(dir.path().to_str().unwrap().to_string(), 86400, None); + let entry = CacheEntry { + bytes: vec![1], + content_type: "image/png".to_string(), + }; + disk.set("aabbccdd0022", entry, Some(0)).await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + // Should be expired because per-entry TTL = 0s + let result = disk.get("aabbccdd0022").await.unwrap(); + assert!( + result.is_none(), + "entry with TTL=0 should be expired immediately" + ); + } } diff --git a/src/modules/cache/manager.rs b/src/modules/cache/manager.rs index 6ea2e81..3082a7e 100644 --- a/src/modules/cache/manager.rs +++ b/src/modules/cache/manager.rs @@ -3,6 +3,7 @@ use crate::modules::cache::{ disk::DiskCache, inflight::InflightMap, memory::{CacheEntry, MemoryCache}, + origin::OriginCache, }; use crate::modules::metrics::Metrics; use sha2::{Digest, Sha256}; @@ -27,6 +28,7 @@ pub enum CacheHit { pub struct CacheManager { l1: MemoryCache, pub l2: Arc, + origin_l2: OriginCache, inflight: InflightMap, metrics: Arc, } @@ -42,9 +44,11 @@ impl CacheManager { cfg.cache_disk_ttl_secs, cfg.cache_disk_max_mb, )); + let origin_l2 = OriginCache::new(&cfg.cache_dir, cfg.cache_disk_ttl_secs); Arc::new(Self { l1, l2, + origin_l2, inflight: InflightMap::new(), metrics, }) @@ -97,7 +101,7 @@ impl CacheManager { #[tracing::instrument(skip(self, entry), fields(key = %final_key, bytes = entry.bytes.len()))] pub async fn set(&self, final_key: &str, entry: CacheEntry) { debug!(key = %final_key, bytes = entry.bytes.len(), "storing entry to L1 and L2"); - let _ = self.l2.set(final_key, entry.clone()).await; + let _ = self.l2.set(final_key, entry.clone(), None).await; self.l1.set(final_key.to_string(), entry).await; self.record_cache_sizes(); } @@ -119,6 +123,14 @@ impl CacheManager { // disk entry count is not tracked; DiskCache does not expose it } + pub async fn get_origin(&self, url: &str) -> Option { + self.origin_l2.get(url).await + } + + pub async fn set_origin(&self, url: &str, entry: CacheEntry, ttl_override: Option) { + self.origin_l2.set(url, entry, ttl_override).await; + } + pub fn inflight(&self) -> &InflightMap { &self.inflight } @@ -144,5 +156,6 @@ impl CacheManager { pub async fn run_cleanup(&self) { let _ = self.l2.cleanup().await; + let _ = self.origin_l2.cleanup().await; } } diff --git a/src/modules/cache/mod.rs b/src/modules/cache/mod.rs index 16165ff..6f40953 100644 --- a/src/modules/cache/mod.rs +++ b/src/modules/cache/mod.rs @@ -2,5 +2,6 @@ pub mod disk; pub mod inflight; pub mod manager; pub mod memory; +pub mod origin; pub use memory::CacheEntry; diff --git a/src/modules/cache/origin.rs b/src/modules/cache/origin.rs new file mode 100644 index 0000000..730d68f --- /dev/null +++ b/src/modules/cache/origin.rs @@ -0,0 +1,198 @@ +use crate::modules::cache::{disk::DiskCache, memory::CacheEntry}; +use reqwest::header::{CACHE_CONTROL, EXPIRES, HeaderMap}; +use sha2::{Digest, Sha256}; +use tracing::warn; + +pub struct OriginCache { + disk: DiskCache, +} + +impl OriginCache { + pub fn new(cache_dir: &str, default_ttl_secs: u64) -> Self { + let origin_dir = format!("{}/origin", cache_dir); + Self { + disk: DiskCache::new(origin_dir, default_ttl_secs, None), + } + } + + fn url_key(url: &str) -> String { + format!("{:x}", Sha256::digest(url.as_bytes())) + } + + pub async fn get(&self, url: &str) -> Option { + match self.disk.get(&Self::url_key(url)).await { + Ok(Some(entry)) => Some(entry), + Ok(None) => None, + Err(e) => { + warn!(url = url, error = %e, "origin cache read error - treating as miss"); + None + } + } + } + + pub async fn set(&self, url: &str, entry: CacheEntry, ttl_override: Option) { + // Skip writing if TTL override is explicitly 0 (e.g. Expires in the past) + if ttl_override == Some(0) { + return; + } + let key = Self::url_key(url); + if let Err(e) = self.disk.set(&key, entry, ttl_override).await { + warn!(url = url, error = %e, "origin cache write error - ignoring"); + } + } + + pub fn extract_ttl(headers: &HeaderMap) -> Option { + // 1. Cache-Control: max-age=N takes priority + if let Some(val) = headers.get(CACHE_CONTROL) + && let Ok(s) = val.to_str() + { + for directive in s.split(',') { + let d = directive.trim(); + if let Some(rest) = d.strip_prefix("max-age=") + && let Ok(n) = rest.trim().parse::() + { + return Some(n); + } + } + } + // 2. Expires header + if let Some(val) = headers.get(EXPIRES) + && let Ok(s) = val.to_str() + && let Ok(expires) = httpdate::parse_http_date(s) + { + let now = std::time::SystemTime::now(); + return Some(match expires.duration_since(now) { + Ok(d) => d.as_secs(), + Err(_) => 0, // past = already expired + }); + } + None + } + + pub async fn cleanup(&self) -> anyhow::Result { + self.disk.cleanup().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reqwest::header::HeaderValue; + use tempfile::TempDir; + + fn make_entry(bytes: Vec) -> CacheEntry { + CacheEntry { + bytes, + content_type: "image/jpeg".to_string(), + } + } + + #[tokio::test] + async fn test_get_set() { + let dir = TempDir::new().unwrap(); + let cache = OriginCache::new(dir.path().to_str().unwrap(), 86400); + let entry = make_entry(vec![1, 2, 3]); + cache + .set("https://example.com/img.jpg", entry.clone(), None) + .await; + let result = cache.get("https://example.com/img.jpg").await; + assert!(result.is_some()); + assert_eq!(result.unwrap().bytes, vec![1, 2, 3]); + } + + #[tokio::test] + async fn test_different_urls_different_keys() { + let dir = TempDir::new().unwrap(); + let cache = OriginCache::new(dir.path().to_str().unwrap(), 86400); + cache + .set("https://example.com/a.jpg", make_entry(vec![1]), None) + .await; + cache + .set("https://example.com/b.jpg", make_entry(vec![2]), None) + .await; + assert_eq!( + cache.get("https://example.com/a.jpg").await.unwrap().bytes, + vec![1] + ); + assert_eq!( + cache.get("https://example.com/b.jpg").await.unwrap().bytes, + vec![2] + ); + } + + #[tokio::test] + async fn test_ttl_expired() { + let dir = TempDir::new().unwrap(); + // TTL=0 = immediately expired + let cache = OriginCache::new(dir.path().to_str().unwrap(), 0); + cache + .set("https://example.com/img.jpg", make_entry(vec![1]), None) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert!(cache.get("https://example.com/img.jpg").await.is_none()); + } + + #[tokio::test] + async fn test_miss_returns_none() { + let dir = TempDir::new().unwrap(); + let cache = OriginCache::new(dir.path().to_str().unwrap(), 86400); + assert!(cache.get("https://example.com/missing.jpg").await.is_none()); + } + + #[tokio::test] + async fn test_ttl_from_cache_control() { + let mut headers = HeaderMap::new(); + headers.insert( + CACHE_CONTROL, + HeaderValue::from_static("public, max-age=300"), + ); + assert_eq!(OriginCache::extract_ttl(&headers), Some(300)); + } + + #[tokio::test] + async fn test_ttl_from_cache_control_no_store() { + let mut headers = HeaderMap::new(); + headers.insert(CACHE_CONTROL, HeaderValue::from_static("no-store")); + assert_eq!(OriginCache::extract_ttl(&headers), None); + } + + #[tokio::test] + async fn test_ttl_from_expires_future() { + let mut headers = HeaderMap::new(); + let future = std::time::SystemTime::now() + std::time::Duration::from_secs(600); + let expires_str = httpdate::fmt_http_date(future); + headers.insert(EXPIRES, HeaderValue::from_str(&expires_str).unwrap()); + let ttl = OriginCache::extract_ttl(&headers); + assert!(ttl.is_some()); + let t = ttl.unwrap(); + assert!(t > 590 && t <= 600, "expected ~600s, got {t}"); + } + + #[tokio::test] + async fn test_ttl_from_expires_past() { + let mut headers = HeaderMap::new(); + // Already expired + headers.insert( + EXPIRES, + HeaderValue::from_static("Thu, 01 Jan 1970 00:00:00 GMT"), + ); + assert_eq!(OriginCache::extract_ttl(&headers), Some(0)); + } + + #[tokio::test] + async fn test_ttl_fallback_no_headers() { + let headers = HeaderMap::new(); + assert_eq!(OriginCache::extract_ttl(&headers), None); + } + + #[tokio::test] + async fn test_skip_set_when_ttl_zero() { + let dir = TempDir::new().unwrap(); + let cache = OriginCache::new(dir.path().to_str().unwrap(), 86400); + // Explicit TTL override of 0 - should not write entry + cache + .set("https://example.com/img.jpg", make_entry(vec![1]), Some(0)) + .await; + assert!(cache.get("https://example.com/img.jpg").await.is_none()); + } +} diff --git a/src/modules/proxy/service.rs b/src/modules/proxy/service.rs index d65c4c2..4f98870 100644 --- a/src/modules/proxy/service.rs +++ b/src/modules/proxy/service.rs @@ -219,6 +219,8 @@ impl ProxyService { drop(resp); // fall through to buffered path } else if content_type.starts_with("image/") { + let origin_ttl = crate::modules::cache::origin::OriginCache::extract_ttl(resp.headers()); + let image_url_bg = image_url.clone(); let (client_tx, client_rx) = mpsc::channel::(8); let (cache_tx, mut cache_rx) = mpsc::channel::>(8); @@ -275,6 +277,10 @@ impl ProxyService { }; let key = CacheManager::preliminary_key(&canonical_bg); cache.set(&key, entry.clone()).await; + // Also write to origin cache with TTL from response headers + cache + .set_origin(&image_url_bg, entry.clone(), origin_ttl) + .await; guard.complete(Ok(entry)); return; } @@ -299,55 +305,91 @@ impl ProxyService { } // --- End streaming path (video/PDF fell through to here) --- - // 7. Fetch + // Check origin cache for HTTP/S3 sources before fetching + let is_s3 = image_url.starts_with("s3:/"); + let mut origin_hit = false; + let origin_entry_opt = if is_http || is_s3 { + self.cache.get_origin(&image_url).await + } else { + None + }; + + // 7. Fetch (or restore from origin cache) tracing::info!(url = image_url.as_str(), "fetch start"); let download_start = Instant::now(); - let fetch_result = self.fetcher.fetch(&image_url).await; - let fetch_elapsed = download_start.elapsed().as_secs_f64(); - let source_label = if image_url.starts_with("http://") || image_url.starts_with("https://") { - "http" - } else if image_url.starts_with("s3:/") { - "s3" - } else if image_url.starts_with("local:/") { - "local" + + let (mut src_bytes, mut src_ct) = if let Some(origin_entry) = origin_entry_opt { + origin_hit = true; + tracing::info!( + url = image_url.as_str(), + bytes = origin_entry.bytes.len(), + "origin cache hit - skipping fetch" + ); + self + .metrics + .cache_hits_total + .with_label_values(&["origin"]) + .inc(); + (origin_entry.bytes, Some(origin_entry.content_type)) } else { - "alias" - }; - self - .metrics - .request_span_duration_seconds - .with_label_values(&["downloading"]) - .observe(fetch_elapsed); - self - .metrics - .source_fetch_duration_seconds - .with_label_values(&[source_label]) - .observe(fetch_elapsed); - let (mut src_bytes, mut src_ct) = match fetch_result { - Ok(v) => { - tracing::info!( - url = image_url.as_str(), - bytes = v.0.len(), - content_type = v.1.as_deref().unwrap_or(""), - "fetch complete" - ); - v - } - Err(e) => { - guard.complete(Err(e.clone())); - let error_type = if matches!(e, ProxyError::UpstreamTimeout) { - "timeout" - } else { - "downloading" - }; - self - .metrics - .errors_total - .with_label_values(&[error_type]) - .inc(); - return Err(e); + let fetch_result = self.fetcher.fetch(&image_url).await; + let fetch_elapsed = download_start.elapsed().as_secs_f64(); + let source_label = if image_url.starts_with("http://") || image_url.starts_with("https://") { + "http" + } else if image_url.starts_with("s3:/") { + "s3" + } else if image_url.starts_with("local:/") { + "local" + } else { + "alias" + }; + self + .metrics + .request_span_duration_seconds + .with_label_values(&["downloading"]) + .observe(fetch_elapsed); + self + .metrics + .source_fetch_duration_seconds + .with_label_values(&[source_label]) + .observe(fetch_elapsed); + match fetch_result { + Ok(v) => { + tracing::info!( + url = image_url.as_str(), + bytes = v.0.len(), + content_type = v.1.as_deref().unwrap_or(""), + "fetch complete" + ); + v + } + Err(e) => { + guard.complete(Err(e.clone())); + let error_type = if matches!(e, ProxyError::UpstreamTimeout) { + "timeout" + } else { + "downloading" + }; + self + .metrics + .errors_total + .with_label_values(&[error_type]) + .inc(); + return Err(e); + } } }; + // Write raw bytes to origin cache for HTTP/S3 on fetch miss (best-effort, default TTL) + if (is_http || is_s3) + && !origin_hit + && let Some(ref ct) = src_ct + { + let origin_entry = crate::modules::cache::memory::CacheEntry { + bytes: src_bytes.clone(), + content_type: ct.clone(), + }; + self.cache.set_origin(&image_url, origin_entry, None).await; + } self .metrics .buffer_size_bytes