diff --git a/src/modules/metrics/mod.rs b/src/modules/metrics/mod.rs index c5be818..8b2470e 100644 --- a/src/modules/metrics/mod.rs +++ b/src/modules/metrics/mod.rs @@ -23,6 +23,9 @@ pub struct Metrics { pub workers_utilization: Gauge, pub buffer_default_size_bytes: Gauge, pub buffer_max_size_bytes: Gauge, + // Per-workflow breakdowns + pub source_fetch_duration_seconds: HistogramVec, + pub transform_duration_seconds: HistogramVec, // Cache pub cache_hits_total: IntCounterVec, pub cache_misses_total: IntCounterVec, @@ -180,6 +183,36 @@ impl Metrics { .unwrap() ); + let source_fetch_duration_seconds = register!( + HistogramVec::new( + HistogramOpts::new( + "source_fetch_duration_seconds", + "Upstream fetch latency by source type (http, s3, local, alias)" + ) + .namespace(ns) + .buckets(vec![ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 + ]), + &["source"] + ) + .unwrap() + ); + + let transform_duration_seconds = register!( + HistogramVec::new( + HistogramOpts::new( + "transform_duration_seconds", + "Transform pipeline latency by workflow (passthrough, resize, best, video, pdf)" + ) + .namespace(ns) + .buckets(vec![ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 + ]), + &["transform"] + ) + .unwrap() + ); + let cache_hits_total = register!( IntCounterVec::new( Opts::new("cache_hits_total", "Cache hits by layer").namespace(ns), @@ -236,6 +269,8 @@ impl Metrics { workers_utilization, buffer_default_size_bytes, buffer_max_size_bytes, + source_fetch_duration_seconds, + transform_duration_seconds, cache_hits_total, cache_misses_total, cache_memory_size_bytes, diff --git a/src/modules/proxy/service.rs b/src/modules/proxy/service.rs index dded1d8..00b1600 100644 --- a/src/modules/proxy/service.rs +++ b/src/modules/proxy/service.rs @@ -299,15 +299,30 @@ impl ProxyService { } // --- End streaming path (video/PDF fell through to here) --- - // 7. Fetch (Issue 4 fix: pass original error to guard) + // 7. Fetch tracing::info!(url = image_url.as_str(), "fetch start"); let download_start = Instant::now(); let fetch_result = self.fetcher.fetch(&image_url).await; + let fetch_elapsed = download_start.elapsed().as_secs_f64(); + let source_label = if image_url.starts_with("http://") || image_url.starts_with("https://") { + "http" + } else if image_url.starts_with("s3:/") { + "s3" + } else if image_url.starts_with("local:/") { + "local" + } else { + "alias" + }; self .metrics .request_span_duration_seconds .with_label_values(&["downloading"]) - .observe(download_start.elapsed().as_secs_f64()); + .observe(fetch_elapsed); + self + .metrics + .source_fetch_duration_seconds + .with_label_values(&[source_label]) + .observe(fetch_elapsed); let (mut src_bytes, mut src_ct) = match fetch_result { Ok(v) => { tracing::info!( @@ -432,6 +447,15 @@ impl ProxyService { let pipeline_result = if run_pipeline_flag { self.metrics.images_in_progress.inc(); let transform_start = Instant::now(); + let transform_label = if is_video { + "video" + } else if is_pdf { + "pdf" + } else if needs_best { + "best" + } else { + "resize" + }; let result = pipeline::run_pipeline( params, src_bytes, @@ -447,11 +471,17 @@ impl ProxyService { content_type: ct, }); self.metrics.images_in_progress.dec(); + let pipeline_elapsed = transform_start.elapsed().as_secs_f64(); self .metrics .request_span_duration_seconds .with_label_values(&["processing"]) - .observe(transform_start.elapsed().as_secs_f64()); + .observe(pipeline_elapsed); + self + .metrics + .transform_duration_seconds + .with_label_values(&[transform_label]) + .observe(pipeline_elapsed); if let Ok(ref entry) = result { tracing::info!( url = image_url.as_str(), diff --git a/src/modules/proxy/sources/http.rs b/src/modules/proxy/sources/http.rs index 8a70fa3..97d3af1 100644 --- a/src/modules/proxy/sources/http.rs +++ b/src/modules/proxy/sources/http.rs @@ -40,7 +40,9 @@ impl HttpFetcher { if !al.is_allowed(host) { return attempt.error("host_not_allowed"); } - if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&(host, 80u16)) { + if let Ok(addrs) = tokio::task::block_in_place(|| { + std::net::ToSocketAddrs::to_socket_addrs(&(host.to_string(), 80u16)) + }) { for addr in addrs { if is_private_ip(addr.ip()) { return attempt.error("host_not_allowed");