Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3084e89
build: bump linkerd2-proxy-api to 0.19.0
unleashed May 28, 2026
326ba73
feat(client-policy): add config types for load biasing, retry-after, …
unleashed May 28, 2026
9c59e42
feat(client-policy): add load_bias and retry_after fields to protocol…
unleashed May 28, 2026
58cf9da
test(client-policy): add unit tests for config types and proto conver…
unleashed May 28, 2026
8675c7f
refactor(outbound): hand-write PartialEq, Eq, and Hash on Concrete
unleashed May 8, 2026
8e3af34
feat(classify): log dropped classifications on full channel
unleashed May 8, 2026
e832a38
refactor(client-policy): make FailureAccrual a struct
unleashed May 28, 2026
992a06f
test(client-policy): cover success-rate proto conversion
unleashed May 29, 2026
24c6735
feat(outbound): add duration hint stores and retry-after classification
unleashed May 28, 2026
cfcda2f
feat(outbound): add unified breaker three-state machine
unleashed May 28, 2026
7961afe
feat(outbound): gate each endpoint on its own breaker
unleashed May 8, 2026
547490e
feat(outbound): rewrite breaker onto unified state machine
unleashed May 8, 2026
0a9d701
test(outbound): verify backoff resets to base after recovery
unleashed May 28, 2026
efb1e9f
test(outbound): verify both policies disabled produces no trip
unleashed May 28, 2026
99af31e
test(outbound): cover breaker behavior end-to-end
unleashed May 28, 2026
b272d35
test(outbound): cover grpc hint extending backoff
unleashed May 28, 2026
072bbdc
test(outbound): cover combined HTTP and gRPC hint max-wins
unleashed May 28, 2026
2022895
test(outbound): verify cold-start restart after idle
unleashed May 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1557,13 +1557,15 @@ dependencies = [
"linkerd-app-core",
"linkerd-app-test",
"linkerd-distribute",
"linkerd-ewma",
"linkerd-http-box",
"linkerd-http-classify",
"linkerd-http-prom",
"linkerd-http-retry",
"linkerd-http-route",
"linkerd-identity",
"linkerd-io",
"linkerd-load-biaser",
"linkerd-meshtls",
"linkerd-mock-http-body",
"linkerd-opaq-route",
Expand Down Expand Up @@ -2693,9 +2695,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.18.0"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba9e3b341ca4992feaf43a4d2bdbfe2081aa3e2b9a503753544ce55242af6342"
checksum = "e0cd682795e8f91ea36e2131a2f7021da136c74f6d3cd3d9eabbf7842511042d"
dependencies = [
"h2",
"http",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default-features = false
features = ["tokio", "tracing"]

[workspace.dependencies.linkerd2-proxy-api]
version = "0.18.0"
version = "0.19.0"

[workspace.dependencies.rand]
version = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/core/src/classify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl classify::ClassifyEos for Eos {
}
}

fn grpc_code(hdrs: &http::HeaderMap) -> Option<grpc::Code> {
pub fn grpc_code(hdrs: &http::HeaderMap) -> Option<grpc::Code> {
hdrs.get("grpc-status")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u16>().ok())
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/integration/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ pub fn backend(dst: impl ToString) -> outbound::Backend {
default_rtt: Some(Duration::from_millis(30).try_into().unwrap()),
decay: Some(Duration::from_secs(10).try_into().unwrap()),
})),
ejection: None,
})),
}
}
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ tower = { workspace = true, features = ["util"] }
tracing = { workspace = true }

linkerd-app-core = { path = "../core" }
linkerd-ewma = { path = "../../ewma" }
linkerd-load-biaser = { path = "../../load-biaser" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
linkerd-http-classify = { path = "../../http/classify" }
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,15 @@ fn policy_for_backend(
timeout,
http1: policy::http::Http1 {
routes: routes.clone(),
failure_accrual: Default::default(),
failure_accrual: None,
load_bias: None,
retry_after: None,
},
http2: policy::http::Http2 {
routes,
failure_accrual: Default::default(),
failure_accrual: None,
load_bias: None,
retry_after: None,
},
opaque,
};
Expand Down
147 changes: 125 additions & 22 deletions linkerd/app/outbound/src/http/breaker.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,154 @@
//! Circuit breaker infrastructure for HTTP endpoints.
//!
//! This module implements a unified circuit breaker with two complementary failure
//! tracking mechanisms: consecutive failures and success rate (EWMA-based).
//!
//! The first trips the breaker after N consecutive failures (typically 5xx statuses).
//! It uses exponential backoff before allowing probe requests.
//!
//! The second trips when the success rate drops below a threshold. It treats both 5xx
//! and 429 as failures to enable rate limiting awareness.
//!
//! Both mechanisms are tracked by a single [`UnifiedBreaker`] policy, in which any
//! one of both conditions can trip the circuit.
//!
//! ## Recovery
//!
//! During probation, a probe request must be _non-5xx AND non-429_ to succeed.
//! This ensures the endpoint isn't still rate limiting before reopening.
//!
//! ## Retry-After Integration
//!
//! When a 429 response includes a `Retry-After` header, the duration is captured and used
//! to extend the circuit breaker's backoff period.
//!
//! ## Cold-start
//!
//! Cold-start protection applies to the success rate mechanism.
//!
//! ## Usage
//!
//! The breaker is integrated into the load balancer's endpoint stack via [`NewRetryAfterGateSet`],
//! which replaces the standard `NewClassifyGateSet` to add Retry-After extraction.

use linkerd_app_core::{classify, proxy::http::classify::gate, svc};
use linkerd_proxy_client_policy::FailureAccrual;

use tokio::time::Duration;
use tracing::{trace_span, Instrument};

mod consecutive_failures;
pub mod retry_after;
mod unified;
pub mod wrap_classify;

#[cfg(test)]
mod integration_tests;

pub use self::retry_after::{GrpcRetryPushbackStore, RetryAfterStore};
use self::unified::{UnifiedBreaker, UnifiedBreakerConfig};
pub use self::wrap_classify::{HasFailureAccrual, NewRetryAfterGateSet, RetryAfterGateParams};

use self::consecutive_failures::ConsecutiveFailures;
/// Reason why the circuit breaker tripped.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TripReason {
/// Tripped due to N consecutive 5xx failures.
ConsecutiveFailures,
/// Tripped due to EWMA success rate dropping below threshold.
LowSuccessRate,
}

/// Default EWMA decay window (used when success_rate is configured but has no explicit decay).
const DEFAULT_SUCCESS_RATE_DECAY: Duration = Duration::from_secs(10);

/// Params configuring a circuit breaker stack.
#[derive(Copy, Clone, Debug)]
///
/// Retry-After stores move hints from response classification to the
/// circuit breaker backoff logic. A new pair is built for each endpoint and
/// shared between that endpoint's classifier and its [`UnifiedBreaker`], so a
/// hint seen on one endpoint never extends the backoff of another.
#[derive(Clone, Debug)]
pub(crate) struct Params {
pub(crate) accrual: FailureAccrual,
pub(crate) accrual: Option<FailureAccrual>,
pub(crate) channel_capacity: usize,
/// Shared store for HTTP Retry-After hints.
pub(crate) retry_after_store: RetryAfterStore,
/// Shared store for gRPC retry pushback hints.
pub(crate) grpc_retry_pushback_store: GrpcRetryPushbackStore,
/// Maximum Retry-After duration the proxy will honor (clamping cap).
pub(crate) max_duration: Duration,
}

impl<T> svc::ExtractParam<gate::Params<classify::Class>, T> for Params {
fn extract_param(&self, _: &T) -> gate::Params<classify::Class> {
// Create a channel so that we can receive response summaries and
// control the gate.
let (prms, gate, rsps) = gate::Params::channel(self.channel_capacity);
let (prms, gate_tx, rsps) = gate::Params::channel(self.channel_capacity);

match self.accrual {
FailureAccrual::None => {
None => {
// No failure accrual for this target; construct a gate
// that will never close.
tracing::trace!("No failure accrual policy enabled.");
prms
}
FailureAccrual::ConsecutiveFailures {
max_failures,
backoff,
} => {
tracing::trace!(
max_failures,
backoff = ?backoff,
"Using consecutive-failures failure accrual policy.",
);
Some(ref accrual) => {
// When success_rate is Some, use configured EWMA policy values.
// When None (consecutive-only mode), disable EWMA by using
// threshold 0.0 (rate >= 0 always, so never trips) and
// min_requests usize::MAX (cold-start never passes).
let (success_rate_threshold, success_rate_decay, min_request_threshold) =
if let Some(sr) = accrual.success_rate {
if accrual.consecutive.max_failures == 0 {
tracing::trace!(
success_rate_threshold = %sr.threshold,
min_requests = sr.min_requests,
backoff = ?accrual.consecutive.backoff,
"Using circuit breaker with consecutive failures disabled and success rate tracking.",
);
} else {
tracing::trace!(
max_failures = accrual.consecutive.max_failures,
backoff = ?accrual.consecutive.backoff,
success_rate_threshold = %sr.threshold,
min_requests = sr.min_requests,
"Using unified circuit breaker with consecutive failures and success rate tracking.",
);
}
(sr.threshold, sr.decay, sr.min_requests as usize)
} else {
if accrual.consecutive.max_failures == 0 {
tracing::trace!(
backoff = ?accrual.consecutive.backoff,
"Using circuit breaker with consecutive failures disabled.",
);
} else {
tracing::trace!(
max_failures = accrual.consecutive.max_failures,
backoff = ?accrual.consecutive.backoff,
"Using circuit breaker with consecutive failures only.",
);
}
(0.0_f64, DEFAULT_SUCCESS_RATE_DECAY, usize::MAX)
};

// Spawn the unified breaker that handles both policies
let breaker = UnifiedBreaker::new(UnifiedBreakerConfig {
max_failures: accrual.consecutive.max_failures,
threshold: success_rate_threshold,
decay: success_rate_decay,
backoff: accrual.consecutive.backoff,
min_requests: min_request_threshold,
gate: gate_tx,
rsps,
retry_after_store: self.retry_after_store.clone(),
grpc_retry_pushback_store: self.grpc_retry_pushback_store.clone(),
max_duration: self.max_duration,
});

// 1. If the configured number of consecutive failures are encountered,
// shut the gate.
// 2. After an ejection timeout, open the gate so that 1 request can be processed.
// 3. If that request succeeds, open the gate. If it fails, increase the
// ejection timeout and repeat.
let breaker = ConsecutiveFailures::new(max_failures, backoff, gate, rsps);
tokio::spawn(
breaker
.run()
.instrument(trace_span!("consecutive_failures").or_current()),
.instrument(trace_span!("unified_breaker").or_current()),
);

prms
Expand Down
Loading
Loading