Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
# Max seconds to wait for an upstream HTTP response before aborting.
# PP_FETCH_TIMEOUT_SECS=10

# Number of times to retry a fetch on connection errors (timeout, reset, broken pipe).
# Set to 0 to disable retries.
# PP_FETCH_RETRY_COUNT=3

# Delay in milliseconds between retry attempts. Set to 0 for immediate retry.
# PP_FETCH_RETRY_DELAY_MS=0

# Max size (bytes) of a source file accepted for processing. Default: 20 MB.
# PP_MAX_SOURCE_BYTES=20971520

Expand Down
11 changes: 9 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::modules::cache::manager::CacheManager;
use crate::modules::proxy::fallback::FallbackImage;
use crate::modules::proxy::fetchable::Fetchable;
use crate::modules::proxy::sources::http::HttpFetcher;
use crate::modules::proxy::sources::{AliasSource, LocalSource, S3Source, SourceRouter};
use crate::modules::proxy::sources::{
AliasSource, LocalSource, RetryFetcher, S3Source, SourceRouter,
};
use crate::modules::security::allowlist::Allowlist;
use axum::Router;
use std::sync::Arc;
Expand Down Expand Up @@ -63,7 +65,12 @@ pub async fn router(
Arc::new(AliasSource::new(aliases.clone(), alias_http))
});

let fetcher: Arc<dyn Fetchable> = Arc::new(SourceRouter::new(http, s3, local, alias));
let fetcher: Arc<dyn Fetchable> = Arc::new(RetryFetcher::new(
Arc::new(SourceRouter::new(http, s3, local, alias)),
cfg.fetch_retry_count,
cfg.fetch_retry_delay_ms,
metrics.clone(),
));

let cors_layer = middlewares::cors_layer(&cfg.cors_allow_origin, cfg.cors_max_age_secs);

Expand Down
6 changes: 6 additions & 0 deletions src/common/config/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct Configuration {
pub source_url_encryption_key: Option<Vec<u8>>,
// Fetching
pub fetch_timeout_secs: u64,
pub fetch_retry_count: u32,
pub fetch_retry_delay_ms: u64,
pub max_source_bytes: u64,
// Cache
pub cache_memory_max_mb: u64,
Expand Down Expand Up @@ -273,6 +275,8 @@ impl Configuration {
.map(|s| parse_hex_key("PP_SOURCE_URL_ENCRYPTION_KEY", &s)),
allowed_hosts,
fetch_timeout_secs: env_var_u64("PP_FETCH_TIMEOUT_SECS", 10),
fetch_retry_count: env_var_u64("PP_FETCH_RETRY_COUNT", 3) as u32,
fetch_retry_delay_ms: env_var_u64("PP_FETCH_RETRY_DELAY_MS", 0),
max_source_bytes: env_var_u64("PP_MAX_SOURCE_BYTES", 20_971_520),
cache_memory_max_mb: env_var_u64("PP_CACHE_MEMORY_MAX_MB", 256),
cache_memory_ttl_secs: env_var_u64("PP_CACHE_MEMORY_TTL_SECS", 3600),
Expand Down Expand Up @@ -406,6 +410,8 @@ impl std::fmt::Debug for Configuration {
)
.field("allowed_hosts", &self.allowed_hosts)
.field("fetch_timeout_secs", &self.fetch_timeout_secs)
.field("fetch_retry_count", &self.fetch_retry_count)
.field("fetch_retry_delay_ms", &self.fetch_retry_delay_ms)
.field("max_source_bytes", &self.max_source_bytes)
.field("cache_memory_max_mb", &self.cache_memory_max_mb)
.field("cache_memory_ttl_secs", &self.cache_memory_ttl_secs)
Expand Down
11 changes: 11 additions & 0 deletions src/common/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub enum ProxyError {
UpstreamNotFound,
#[error("upstream_timeout")]
UpstreamTimeout,
#[error("upstream_connection_error")]
UpstreamConnectionError,
#[error("too_many_redirects")]
TooManyRedirects,
#[error("not_an_image")]
Expand Down Expand Up @@ -41,6 +43,13 @@ pub enum ProxyError {
InternalError(String),
}

impl ProxyError {
/// Returns true for transient connection-level failures that are safe to retry.
pub fn is_connection_error(&self) -> bool {
matches!(self, ProxyError::UpstreamConnectionError)
}
}

impl From<anyhow::Error> for ProxyError {
fn from(e: anyhow::Error) -> Self {
ProxyError::InternalError(format!("{:#}", e))
Expand All @@ -60,6 +69,7 @@ impl IntoResponse for ProxyError {
match &self {
ProxyError::InternalError(detail) => error!("internal_error: {}", detail),
ProxyError::UpstreamTimeout
| ProxyError::UpstreamConnectionError
| ProxyError::TooManyRedirects
| ProxyError::WatermarkFetchFailed
| ProxyError::HeicDecodeError
Expand All @@ -70,6 +80,7 @@ impl IntoResponse for ProxyError {
let status = match &self {
ProxyError::UpstreamNotFound => StatusCode::NOT_FOUND,
ProxyError::UpstreamTimeout
| ProxyError::UpstreamConnectionError
| ProxyError::TooManyRedirects
| ProxyError::WatermarkFetchFailed => StatusCode::BAD_GATEWAY,
ProxyError::NotAnImage => StatusCode::UNPROCESSABLE_ENTITY,
Expand Down
14 changes: 14 additions & 0 deletions src/modules/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct Metrics {
pub workers_utilization: Gauge,
pub buffer_default_size_bytes: Gauge,
pub buffer_max_size_bytes: Gauge,
// Retry tracking
pub fetch_retries_total: IntCounter,
// Per-workflow breakdowns
pub source_fetch_duration_seconds: HistogramVec,
pub transform_duration_seconds: HistogramVec,
Expand Down Expand Up @@ -183,6 +185,17 @@ impl Metrics {
.unwrap()
);

let fetch_retries_total = register!(
IntCounter::with_opts(
Opts::new(
"fetch_retries_total",
"Total fetch retry attempts due to connection errors"
)
.namespace(ns)
)
.unwrap()
);

let source_fetch_duration_seconds = register!(
HistogramVec::new(
HistogramOpts::new(
Expand Down Expand Up @@ -269,6 +282,7 @@ impl Metrics {
workers_utilization,
buffer_default_size_bytes,
buffer_max_size_bytes,
fetch_retries_total,
source_fetch_duration_seconds,
transform_duration_seconds,
cache_hits_total,
Expand Down
2 changes: 2 additions & 0 deletions src/modules/proxy/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ mod concurrency_tests {
source_url_encryption_key: None,
allowed_hosts: vec![],
fetch_timeout_secs: 10,
fetch_retry_count: 0,
fetch_retry_delay_ms: 0,
max_source_bytes: 1_000_000,
cache_memory_max_mb: 16,
cache_memory_ttl_secs: 60,
Expand Down
2 changes: 2 additions & 0 deletions src/modules/proxy/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ mod tests {
source_url_encryption_key: None,
allowed_hosts: vec![],
fetch_timeout_secs: 10,
fetch_retry_count: 0,
fetch_retry_delay_ms: 0,
max_source_bytes: 1_000_000,
cache_memory_max_mb: 16,
cache_memory_ttl_secs: 60,
Expand Down
4 changes: 4 additions & 0 deletions src/modules/proxy/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ mod tests {
source_url_encryption_key: None,
allowed_hosts: vec![],
fetch_timeout_secs: 10,
fetch_retry_count: 0,
fetch_retry_delay_ms: 0,
max_source_bytes: 1_000_000,
cache_memory_max_mb: 16,
cache_memory_ttl_secs: 60,
Expand Down Expand Up @@ -834,6 +836,8 @@ mod streaming_tests {
source_url_encryption_key: None,
allowed_hosts: vec![],
fetch_timeout_secs: 10,
fetch_retry_count: 0,
fetch_retry_delay_ms: 0,
max_source_bytes: max_bytes,
cache_memory_max_mb: 16,
cache_memory_ttl_secs: 60,
Expand Down
12 changes: 11 additions & 1 deletion src/modules/proxy/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl HttpFetcher {
let resp = self.client.get(url).send().await.map_err(|e| {
if e.is_timeout() {
ProxyError::UpstreamTimeout
} else if e.is_connect() {
ProxyError::UpstreamConnectionError
} else if e.is_redirect() {
ProxyError::TooManyRedirects
} else {
Expand Down Expand Up @@ -128,7 +130,13 @@ impl HttpFetcher {
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| ProxyError::InternalError(e.to_string()))?;
let chunk = chunk.map_err(|e| {
if e.is_body() || e.is_connect() {
ProxyError::UpstreamConnectionError
} else {
ProxyError::InternalError(e.to_string())
}
})?;
buf.extend_from_slice(&chunk);
if buf.len() as u64 > self.max_bytes {
return Err(ProxyError::SourceTooLarge);
Expand Down Expand Up @@ -157,6 +165,8 @@ impl HttpFetcher {
let resp = self.client.get(url).send().await.map_err(|e| {
if e.is_timeout() {
ProxyError::UpstreamTimeout
} else if e.is_connect() {
ProxyError::UpstreamConnectionError
} else if e.is_redirect() {
ProxyError::TooManyRedirects
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/modules/proxy/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
pub mod alias;
pub mod http;
pub mod local;
pub mod retry;
pub mod router;
pub mod s3;
pub mod video;

pub use alias::AliasSource;
pub use http::HttpFetcher;
pub use local::LocalSource;
pub use retry::RetryFetcher;
pub use router::SourceRouter;
pub use s3::S3Source;
160 changes: 160 additions & 0 deletions src/modules/proxy/sources/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use crate::common::errors::ProxyError;
use crate::modules::metrics::Metrics;
use crate::modules::proxy::fetchable::Fetchable;
use std::sync::Arc;
use std::time::Duration;

/// Wraps any `Fetchable` and retries on transient connection errors.
///
/// Only retries when `ProxyError::is_connection_error()` returns true
/// (timeout, connection reset, broken pipe, etc.). All other errors
/// are returned immediately without retry.
pub struct RetryFetcher {
inner: Arc<dyn Fetchable>,
max_retries: u32,
delay: Duration,
metrics: Arc<Metrics>,
}

impl RetryFetcher {
pub fn new(
inner: Arc<dyn Fetchable>,
max_retries: u32,
delay_ms: u64,
metrics: Arc<Metrics>,
) -> Self {
Self {
inner,
max_retries,
delay: Duration::from_millis(delay_ms),
metrics,
}
}
}

#[async_trait::async_trait]
impl Fetchable for RetryFetcher {
async fn fetch(&self, url: &str) -> Result<(Vec<u8>, Option<String>), ProxyError> {
let mut last_err = ProxyError::InternalError("no attempts made".to_string());
for attempt in 0..=self.max_retries {
match self.inner.fetch(url).await {
Ok(result) => return Ok(result),
Err(e) if e.is_connection_error() => {
tracing::warn!(
url = url,
attempt = attempt,
error = %e,
"fetch failed with connection error, retrying"
);
self.metrics.fetch_retries_total.inc();
last_err = e;
if attempt < self.max_retries && !self.delay.is_zero() {
tokio::time::sleep(self.delay).await;
}
}
Err(e) => return Err(e),
}
}
Err(last_err)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::common::errors::ProxyError;
use crate::modules::metrics::Metrics;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

fn test_metrics() -> Arc<Metrics> {
Metrics::new("test_retry")
}

struct CountingFetcher {
calls: Arc<AtomicU32>,
errors: Vec<ProxyError>,
success: Option<(Vec<u8>, Option<String>)>,
}

impl CountingFetcher {
fn new(errors: Vec<ProxyError>, success: Option<(Vec<u8>, Option<String>)>) -> Arc<Self> {
Arc::new(Self {
calls: Arc::new(AtomicU32::new(0)),
errors,
success,
})
}
}

#[async_trait::async_trait]
impl Fetchable for CountingFetcher {
async fn fetch(&self, _url: &str) -> Result<(Vec<u8>, Option<String>), ProxyError> {
let i = self.calls.fetch_add(1, Ordering::SeqCst) as usize;
if i < self.errors.len() {
return Err(self.errors[i].clone());
}
match &self.success {
Some(v) => Ok(v.clone()),
None => Err(ProxyError::UpstreamNotFound),
}
}
}

#[tokio::test]
async fn retries_on_timeout_then_succeeds() {
let inner = CountingFetcher::new(
vec![
ProxyError::UpstreamConnectionError,
ProxyError::UpstreamConnectionError,
],
Some((b"ok".to_vec(), None)),
);
let calls = inner.calls.clone();
let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics());
let result = fetcher.fetch("http://example.com").await;
assert!(result.is_ok());
assert_eq!(calls.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn does_not_retry_non_connection_errors() {
let inner = CountingFetcher::new(
vec![ProxyError::UpstreamNotFound],
Some((b"ok".to_vec(), None)),
);
let calls = inner.calls.clone();
let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics());
let result = fetcher.fetch("http://example.com").await;
assert!(matches!(result, Err(ProxyError::UpstreamNotFound)));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn exhausts_retries_and_returns_last_error() {
let inner = CountingFetcher::new(
vec![
ProxyError::UpstreamConnectionError,
ProxyError::UpstreamConnectionError,
ProxyError::UpstreamConnectionError,
ProxyError::UpstreamConnectionError,
],
None,
);
let calls = inner.calls.clone();
let fetcher = RetryFetcher::new(inner, 3, 0, test_metrics());
let result = fetcher.fetch("http://example.com").await;
assert!(matches!(result, Err(ProxyError::UpstreamConnectionError)));
assert_eq!(calls.load(Ordering::SeqCst), 4); // 1 initial + 3 retries
}

#[tokio::test]
async fn zero_retries_makes_single_attempt() {
let inner = CountingFetcher::new(vec![ProxyError::UpstreamConnectionError], None);
let calls = inner.calls.clone();
let fetcher = RetryFetcher::new(inner, 0, 0, test_metrics());
let result = fetcher.fetch("http://example.com").await;
assert!(matches!(result, Err(ProxyError::UpstreamConnectionError)));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
}
Loading