diff --git a/.env.sample b/.env.sample index 567f869..d7e4445 100644 --- a/.env.sample +++ b/.env.sample @@ -45,6 +45,13 @@ # Max seconds to wait for an upstream HTTP response before aborting. # PP_FETCH_TIMEOUT_SECS=10 +# Number of times to retry a fetch on connection errors (timeout, reset, broken pipe). +# Set to 0 to disable retries. +# PP_FETCH_RETRY_COUNT=3 + +# Delay in milliseconds between retry attempts. Set to 0 for immediate retry. +# PP_FETCH_RETRY_DELAY_MS=0 + # Max size (bytes) of a source file accepted for processing. Default: 20 MB. # PP_MAX_SOURCE_BYTES=20971520 diff --git a/src/app.rs b/src/app.rs index d4d589b..74cc642 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,7 +4,9 @@ use crate::modules::cache::manager::CacheManager; use crate::modules::proxy::fallback::FallbackImage; use crate::modules::proxy::fetchable::Fetchable; use crate::modules::proxy::sources::http::HttpFetcher; -use crate::modules::proxy::sources::{AliasSource, LocalSource, S3Source, SourceRouter}; +use crate::modules::proxy::sources::{ + AliasSource, LocalSource, RetryFetcher, S3Source, SourceRouter, +}; use crate::modules::security::allowlist::Allowlist; use axum::Router; use std::sync::Arc; @@ -63,7 +65,12 @@ pub async fn router( Arc::new(AliasSource::new(aliases.clone(), alias_http)) }); - let fetcher: Arc = Arc::new(SourceRouter::new(http, s3, local, alias)); + let fetcher: Arc = Arc::new(RetryFetcher::new( + Arc::new(SourceRouter::new(http, s3, local, alias)), + cfg.fetch_retry_count, + cfg.fetch_retry_delay_ms, + metrics.clone(), + )); let cors_layer = middlewares::cors_layer(&cfg.cors_allow_origin, cfg.cors_max_age_secs); diff --git a/src/common/config/loader.rs b/src/common/config/loader.rs index 5055ff7..ca6cb13 100644 --- a/src/common/config/loader.rs +++ b/src/common/config/loader.rs @@ -22,6 +22,8 @@ pub struct Configuration { pub source_url_encryption_key: Option>, // Fetching pub fetch_timeout_secs: u64, + pub fetch_retry_count: u32, + pub fetch_retry_delay_ms: u64, pub max_source_bytes: u64, // Cache pub cache_memory_max_mb: u64, @@ -273,6 +275,8 @@ impl Configuration { .map(|s| parse_hex_key("PP_SOURCE_URL_ENCRYPTION_KEY", &s)), allowed_hosts, fetch_timeout_secs: env_var_u64("PP_FETCH_TIMEOUT_SECS", 10), + fetch_retry_count: env_var_u64("PP_FETCH_RETRY_COUNT", 3) as u32, + fetch_retry_delay_ms: env_var_u64("PP_FETCH_RETRY_DELAY_MS", 0), max_source_bytes: env_var_u64("PP_MAX_SOURCE_BYTES", 20_971_520), cache_memory_max_mb: env_var_u64("PP_CACHE_MEMORY_MAX_MB", 256), cache_memory_ttl_secs: env_var_u64("PP_CACHE_MEMORY_TTL_SECS", 3600), @@ -406,6 +410,8 @@ impl std::fmt::Debug for Configuration { ) .field("allowed_hosts", &self.allowed_hosts) .field("fetch_timeout_secs", &self.fetch_timeout_secs) + .field("fetch_retry_count", &self.fetch_retry_count) + .field("fetch_retry_delay_ms", &self.fetch_retry_delay_ms) .field("max_source_bytes", &self.max_source_bytes) .field("cache_memory_max_mb", &self.cache_memory_max_mb) .field("cache_memory_ttl_secs", &self.cache_memory_ttl_secs) diff --git a/src/common/errors/mod.rs b/src/common/errors/mod.rs index 7492fc1..a946ccf 100644 --- a/src/common/errors/mod.rs +++ b/src/common/errors/mod.rs @@ -13,6 +13,8 @@ pub enum ProxyError { UpstreamNotFound, #[error("upstream_timeout")] UpstreamTimeout, + #[error("upstream_connection_error")] + UpstreamConnectionError, #[error("too_many_redirects")] TooManyRedirects, #[error("not_an_image")] @@ -41,6 +43,13 @@ pub enum ProxyError { InternalError(String), } +impl ProxyError { + /// Returns true for transient connection-level failures that are safe to retry. + pub fn is_connection_error(&self) -> bool { + matches!(self, ProxyError::UpstreamConnectionError) + } +} + impl From for ProxyError { fn from(e: anyhow::Error) -> Self { ProxyError::InternalError(format!("{:#}", e)) @@ -60,6 +69,7 @@ impl IntoResponse for ProxyError { match &self { ProxyError::InternalError(detail) => error!("internal_error: {}", detail), ProxyError::UpstreamTimeout + | ProxyError::UpstreamConnectionError | ProxyError::TooManyRedirects | ProxyError::WatermarkFetchFailed | ProxyError::HeicDecodeError @@ -70,6 +80,7 @@ impl IntoResponse for ProxyError { let status = match &self { ProxyError::UpstreamNotFound => StatusCode::NOT_FOUND, ProxyError::UpstreamTimeout + | ProxyError::UpstreamConnectionError | ProxyError::TooManyRedirects | ProxyError::WatermarkFetchFailed => StatusCode::BAD_GATEWAY, ProxyError::NotAnImage => StatusCode::UNPROCESSABLE_ENTITY, diff --git a/src/modules/metrics/mod.rs b/src/modules/metrics/mod.rs index 8b2470e..8415853 100644 --- a/src/modules/metrics/mod.rs +++ b/src/modules/metrics/mod.rs @@ -23,6 +23,8 @@ pub struct Metrics { pub workers_utilization: Gauge, pub buffer_default_size_bytes: Gauge, pub buffer_max_size_bytes: Gauge, + // Retry tracking + pub fetch_retries_total: IntCounter, // Per-workflow breakdowns pub source_fetch_duration_seconds: HistogramVec, pub transform_duration_seconds: HistogramVec, @@ -183,6 +185,17 @@ impl Metrics { .unwrap() ); + let fetch_retries_total = register!( + IntCounter::with_opts( + Opts::new( + "fetch_retries_total", + "Total fetch retry attempts due to connection errors" + ) + .namespace(ns) + ) + .unwrap() + ); + let source_fetch_duration_seconds = register!( HistogramVec::new( HistogramOpts::new( @@ -269,6 +282,7 @@ impl Metrics { workers_utilization, buffer_default_size_bytes, buffer_max_size_bytes, + fetch_retries_total, source_fetch_duration_seconds, transform_duration_seconds, cache_hits_total, diff --git a/src/modules/proxy/controller.rs b/src/modules/proxy/controller.rs index 4704305..2de08df 100644 --- a/src/modules/proxy/controller.rs +++ b/src/modules/proxy/controller.rs @@ -312,6 +312,8 @@ mod concurrency_tests { source_url_encryption_key: None, allowed_hosts: vec![], fetch_timeout_secs: 10, + fetch_retry_count: 0, + fetch_retry_delay_ms: 0, max_source_bytes: 1_000_000, cache_memory_max_mb: 16, cache_memory_ttl_secs: 60, diff --git a/src/modules/proxy/fallback.rs b/src/modules/proxy/fallback.rs index e06f6e2..3943533 100644 --- a/src/modules/proxy/fallback.rs +++ b/src/modules/proxy/fallback.rs @@ -92,6 +92,8 @@ mod tests { source_url_encryption_key: None, allowed_hosts: vec![], fetch_timeout_secs: 10, + fetch_retry_count: 0, + fetch_retry_delay_ms: 0, max_source_bytes: 1_000_000, cache_memory_max_mb: 16, cache_memory_ttl_secs: 60, diff --git a/src/modules/proxy/service.rs b/src/modules/proxy/service.rs index 00b1600..1b2439c 100644 --- a/src/modules/proxy/service.rs +++ b/src/modules/proxy/service.rs @@ -568,6 +568,8 @@ mod tests { source_url_encryption_key: None, allowed_hosts: vec![], fetch_timeout_secs: 10, + fetch_retry_count: 0, + fetch_retry_delay_ms: 0, max_source_bytes: 1_000_000, cache_memory_max_mb: 16, cache_memory_ttl_secs: 60, @@ -834,6 +836,8 @@ mod streaming_tests { source_url_encryption_key: None, allowed_hosts: vec![], fetch_timeout_secs: 10, + fetch_retry_count: 0, + fetch_retry_delay_ms: 0, max_source_bytes: max_bytes, cache_memory_max_mb: 16, cache_memory_ttl_secs: 60, diff --git a/src/modules/proxy/sources/http.rs b/src/modules/proxy/sources/http.rs index 97d3af1..db58e24 100644 --- a/src/modules/proxy/sources/http.rs +++ b/src/modules/proxy/sources/http.rs @@ -84,6 +84,8 @@ impl HttpFetcher { let resp = self.client.get(url).send().await.map_err(|e| { if e.is_timeout() { ProxyError::UpstreamTimeout + } else if e.is_connect() { + ProxyError::UpstreamConnectionError } else if e.is_redirect() { ProxyError::TooManyRedirects } else { @@ -128,7 +130,13 @@ impl HttpFetcher { let mut stream = resp.bytes_stream(); let mut buf: Vec = Vec::new(); while let Some(chunk) = stream.next().await { - let chunk = chunk.map_err(|e| ProxyError::InternalError(e.to_string()))?; + let chunk = chunk.map_err(|e| { + if e.is_body() || e.is_connect() { + ProxyError::UpstreamConnectionError + } else { + ProxyError::InternalError(e.to_string()) + } + })?; buf.extend_from_slice(&chunk); if buf.len() as u64 > self.max_bytes { return Err(ProxyError::SourceTooLarge); @@ -157,6 +165,8 @@ impl HttpFetcher { let resp = self.client.get(url).send().await.map_err(|e| { if e.is_timeout() { ProxyError::UpstreamTimeout + } else if e.is_connect() { + ProxyError::UpstreamConnectionError } else if e.is_redirect() { ProxyError::TooManyRedirects } else { diff --git a/src/modules/proxy/sources/mod.rs b/src/modules/proxy/sources/mod.rs index 7eaca5a..d901b3e 100644 --- a/src/modules/proxy/sources/mod.rs +++ b/src/modules/proxy/sources/mod.rs @@ -1,6 +1,7 @@ pub mod alias; pub mod http; pub mod local; +pub mod retry; pub mod router; pub mod s3; pub mod video; @@ -8,5 +9,6 @@ pub mod video; pub use alias::AliasSource; pub use http::HttpFetcher; pub use local::LocalSource; +pub use retry::RetryFetcher; pub use router::SourceRouter; pub use s3::S3Source; diff --git a/src/modules/proxy/sources/retry.rs b/src/modules/proxy/sources/retry.rs new file mode 100644 index 0000000..bd4cd2a --- /dev/null +++ b/src/modules/proxy/sources/retry.rs @@ -0,0 +1,160 @@ +use crate::common::errors::ProxyError; +use crate::modules::metrics::Metrics; +use crate::modules::proxy::fetchable::Fetchable; +use std::sync::Arc; +use std::time::Duration; + +/// Wraps any `Fetchable` and retries on transient connection errors. +/// +/// Only retries when `ProxyError::is_connection_error()` returns true +/// (timeout, connection reset, broken pipe, etc.). All other errors +/// are returned immediately without retry. +pub struct RetryFetcher { + inner: Arc, + max_retries: u32, + delay: Duration, + metrics: Arc, +} + +impl RetryFetcher { + pub fn new( + inner: Arc, + max_retries: u32, + delay_ms: u64, + metrics: Arc, + ) -> Self { + Self { + inner, + max_retries, + delay: Duration::from_millis(delay_ms), + metrics, + } + } +} + +#[async_trait::async_trait] +impl Fetchable for RetryFetcher { + async fn fetch(&self, url: &str) -> Result<(Vec, Option), ProxyError> { + let mut last_err = ProxyError::InternalError("no attempts made".to_string()); + for attempt in 0..=self.max_retries { + match self.inner.fetch(url).await { + Ok(result) => return Ok(result), + Err(e) if e.is_connection_error() => { + tracing::warn!( + url = url, + attempt = attempt, + error = %e, + "fetch failed with connection error, retrying" + ); + self.metrics.fetch_retries_total.inc(); + last_err = e; + if attempt < self.max_retries && !self.delay.is_zero() { + tokio::time::sleep(self.delay).await; + } + } + Err(e) => return Err(e), + } + } + Err(last_err) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::errors::ProxyError; + use crate::modules::metrics::Metrics; + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + + fn test_metrics() -> Arc { + Metrics::new("test_retry") + } + + struct CountingFetcher { + calls: Arc, + errors: Vec, + success: Option<(Vec, Option)>, + } + + impl CountingFetcher { + fn new(errors: Vec, success: Option<(Vec, Option)>) -> Arc { + Arc::new(Self { + calls: Arc::new(AtomicU32::new(0)), + errors, + success, + }) + } + } + + #[async_trait::async_trait] + impl Fetchable for CountingFetcher { + async fn fetch(&self, _url: &str) -> Result<(Vec, Option), ProxyError> { + let i = self.calls.fetch_add(1, Ordering::SeqCst) as usize; + if i < self.errors.len() { + return Err(self.errors[i].clone()); + } + match &self.success { + Some(v) => Ok(v.clone()), + None => Err(ProxyError::UpstreamNotFound), + } + } + } + + #[tokio::test] + async fn retries_on_timeout_then_succeeds() { + let inner = CountingFetcher::new( + vec![ + ProxyError::UpstreamConnectionError, + ProxyError::UpstreamConnectionError, + ], + Some((b"ok".to_vec(), None)), + ); + let calls = inner.calls.clone(); + let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics()); + let result = fetcher.fetch("http://example.com").await; + assert!(result.is_ok()); + assert_eq!(calls.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn does_not_retry_non_connection_errors() { + let inner = CountingFetcher::new( + vec![ProxyError::UpstreamNotFound], + Some((b"ok".to_vec(), None)), + ); + let calls = inner.calls.clone(); + let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics()); + let result = fetcher.fetch("http://example.com").await; + assert!(matches!(result, Err(ProxyError::UpstreamNotFound))); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn exhausts_retries_and_returns_last_error() { + let inner = CountingFetcher::new( + vec![ + ProxyError::UpstreamConnectionError, + ProxyError::UpstreamConnectionError, + ProxyError::UpstreamConnectionError, + ProxyError::UpstreamConnectionError, + ], + None, + ); + let calls = inner.calls.clone(); + let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics()); + let result = fetcher.fetch("http://example.com").await; + assert!(matches!(result, Err(ProxyError::UpstreamConnectionError))); + assert_eq!(calls.load(Ordering::SeqCst), 4); // 1 initial + 3 retries + } + + #[tokio::test] + async fn zero_retries_makes_single_attempt() { + let inner = CountingFetcher::new(vec![ProxyError::UpstreamConnectionError], None); + let calls = inner.calls.clone(); + let fetcher = RetryFetcher::new(inner, 0, 0, test_metrics()); + let result = fetcher.fetch("http://example.com").await; + assert!(matches!(result, Err(ProxyError::UpstreamConnectionError))); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } +} diff --git a/src/modules/proxy/sources/s3.rs b/src/modules/proxy/sources/s3.rs index faf4dc1..1f7d4be 100644 --- a/src/modules/proxy/sources/s3.rs +++ b/src/modules/proxy/sources/s3.rs @@ -60,6 +60,10 @@ impl Fetchable for S3Source { { return ProxyError::UpstreamNotFound; } + // DispatchFailure means the request never reached S3 - treat as connection error + if matches!(e, aws_sdk_s3::error::SdkError::DispatchFailure(_)) { + return ProxyError::UpstreamConnectionError; + } ProxyError::InternalError(e.to_string()) })?; @@ -72,11 +76,17 @@ impl Fetchable for S3Source { let content_type = resp.content_type().map(|s| s.to_string()); - let collected = resp - .body - .collect() - .await - .map_err(|e| ProxyError::InternalError(e.to_string()))?; + let collected = resp.body.collect().await.map_err(|e| { + let msg = e.to_string().to_lowercase(); + if msg.contains("connection reset") + || msg.contains("broken pipe") + || msg.contains("connection closed") + { + ProxyError::UpstreamConnectionError + } else { + ProxyError::InternalError(e.to_string()) + } + })?; let bytes = collected.into_bytes();