Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/modules/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct Metrics {
pub workers_utilization: Gauge,
pub buffer_default_size_bytes: Gauge,
pub buffer_max_size_bytes: Gauge,
// Per-workflow breakdowns
pub source_fetch_duration_seconds: HistogramVec,
pub transform_duration_seconds: HistogramVec,
// Cache
pub cache_hits_total: IntCounterVec,
pub cache_misses_total: IntCounterVec,
Expand Down Expand Up @@ -180,6 +183,36 @@ impl Metrics {
.unwrap()
);

let source_fetch_duration_seconds = register!(
HistogramVec::new(
HistogramOpts::new(
"source_fetch_duration_seconds",
"Upstream fetch latency by source type (http, s3, local, alias)"
)
.namespace(ns)
.buckets(vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
]),
&["source"]
)
.unwrap()
);

let transform_duration_seconds = register!(
HistogramVec::new(
HistogramOpts::new(
"transform_duration_seconds",
"Transform pipeline latency by workflow (passthrough, resize, best, video, pdf)"
)
.namespace(ns)
.buckets(vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
]),
&["transform"]
)
.unwrap()
);

let cache_hits_total = register!(
IntCounterVec::new(
Opts::new("cache_hits_total", "Cache hits by layer").namespace(ns),
Expand Down Expand Up @@ -236,6 +269,8 @@ impl Metrics {
workers_utilization,
buffer_default_size_bytes,
buffer_max_size_bytes,
source_fetch_duration_seconds,
transform_duration_seconds,
cache_hits_total,
cache_misses_total,
cache_memory_size_bytes,
Expand Down
36 changes: 33 additions & 3 deletions src/modules/proxy/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,30 @@ impl ProxyService {
}
// --- End streaming path (video/PDF fell through to here) ---

// 7. Fetch (Issue 4 fix: pass original error to guard)
// 7. Fetch
tracing::info!(url = image_url.as_str(), "fetch start");
let download_start = Instant::now();
let fetch_result = self.fetcher.fetch(&image_url).await;
let fetch_elapsed = download_start.elapsed().as_secs_f64();
let source_label = if image_url.starts_with("http://") || image_url.starts_with("https://") {
"http"
} else if image_url.starts_with("s3:/") {
"s3"
} else if image_url.starts_with("local:/") {
"local"
} else {
"alias"
};
self
.metrics
.request_span_duration_seconds
.with_label_values(&["downloading"])
.observe(download_start.elapsed().as_secs_f64());
.observe(fetch_elapsed);
self
.metrics
.source_fetch_duration_seconds
.with_label_values(&[source_label])
.observe(fetch_elapsed);
let (mut src_bytes, mut src_ct) = match fetch_result {
Ok(v) => {
tracing::info!(
Expand Down Expand Up @@ -432,6 +447,15 @@ impl ProxyService {
let pipeline_result = if run_pipeline_flag {
self.metrics.images_in_progress.inc();
let transform_start = Instant::now();
let transform_label = if is_video {
"video"
} else if is_pdf {
"pdf"
} else if needs_best {
"best"
} else {
"resize"
};
let result = pipeline::run_pipeline(
params,
src_bytes,
Expand All @@ -447,11 +471,17 @@ impl ProxyService {
content_type: ct,
});
self.metrics.images_in_progress.dec();
let pipeline_elapsed = transform_start.elapsed().as_secs_f64();
self
.metrics
.request_span_duration_seconds
.with_label_values(&["processing"])
.observe(transform_start.elapsed().as_secs_f64());
.observe(pipeline_elapsed);
self
.metrics
.transform_duration_seconds
.with_label_values(&[transform_label])
.observe(pipeline_elapsed);
if let Ok(ref entry) = result {
tracing::info!(
url = image_url.as_str(),
Expand Down
4 changes: 3 additions & 1 deletion src/modules/proxy/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ impl HttpFetcher {
if !al.is_allowed(host) {
return attempt.error("host_not_allowed");
}
if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&(host, 80u16)) {
if let Ok(addrs) = tokio::task::block_in_place(|| {
std::net::ToSocketAddrs::to_socket_addrs(&(host.to_string(), 80u16))
}) {
for addr in addrs {
if is_private_ip(addr.ip()) {
return attempt.error("host_not_allowed");
Expand Down