diff --git a/tonic-xds/Cargo.toml b/tonic-xds/Cargo.toml index d071f373e..dd27421fb 100644 --- a/tonic-xds/Cargo.toml +++ b/tonic-xds/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1", features = ["sync", "time"] } fastrand = "2" indexmap = "2" tracing = "0.1" -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = "0.7" backoff = "0.4" shared_http_body = "0.1" diff --git a/tonic-xds/src/client/endpoint.rs b/tonic-xds/src/client/endpoint.rs index 81767414d..ec23012bb 100644 --- a/tonic-xds/src/client/endpoint.rs +++ b/tonic-xds/src/client/endpoint.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use tower::{Service, load::Load}; /// Represents the host part of an endpoint address -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] enum EndpointHost { Ipv4(std::net::Ipv4Addr), Ipv6(std::net::Ipv6Addr), @@ -25,7 +25,7 @@ impl From for EndpointHost { } /// Represents a validated endpoint address extracted from xDS -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct EndpointAddress { /// The IP address or hostname host: EndpointHost, diff --git a/tonic-xds/src/client/loadbalance/channel_state.rs b/tonic-xds/src/client/loadbalance/channel_state.rs index 7916c9bb8..159838991 100644 --- a/tonic-xds/src/client/loadbalance/channel_state.rs +++ b/tonic-xds/src/client/loadbalance/channel_state.rs @@ -26,8 +26,9 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use pin_project_lite::pin_project; use tower::Service; @@ -36,6 +37,181 @@ use tower::load::Load; use crate::client::endpoint::{Connector, EndpointAddress}; use crate::common::async_util::BoxFuture; +// --------------------------------------------------------------------------- +// EndpointCounters / OutlierChannelState +// --------------------------------------------------------------------------- + +/// Lock-free success/failure counter for one endpoint. Records RPC +/// outcomes from the data path; the outlier-detection actor reads and +/// resets between intervals. +#[derive(Debug, Default)] +pub(crate) struct EndpointCounters { + success: AtomicU64, + failure: AtomicU64, +} + +impl EndpointCounters { + pub(crate) fn record_success(&self) { + self.success.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn record_failure(&self) { + self.failure.fetch_add(1, Ordering::Relaxed); + } + + /// Read and zero both counters. The two swaps are not atomic against + /// each other; bias from in-flight RPCs is bounded and well below + /// the precision of the failure-percentage threshold. + pub(crate) fn snapshot_and_reset(&self) -> (u64, u64) { + let s = self.success.swap(0, Ordering::Relaxed); + let f = self.failure.swap(0, Ordering::Relaxed); + (s, f) + } +} + +/// Per-channel outlier-detection state, shared via `Arc` between the +/// data path (per-RPC outcome recording + threshold-based ejection), +/// the housekeeping actor, and the load balancer. +/// +/// Ejection state is encoded in [`Self::ejected_at_nanos`]: zero means +/// not ejected, non-zero is the nanos-since-epoch of the ejection's +/// start. [`Self::try_eject`] / [`Self::try_uneject`] use CAS so callers +/// can update registry-level counters exactly once per transition. +#[derive(Debug)] +pub(crate) struct OutlierChannelState { + addr: EndpointAddress, + counters: EndpointCounters, + /// `true` while this channel is counted in the registry's + /// `qualifying_count` (i.e. has hit `request_volume` in the + /// current interval). + is_qualifying: AtomicBool, + /// Bumped on each ejection; decremented (saturating) on each + /// healthy interval. + ejection_multiplier: AtomicU32, + /// `0` when not ejected; otherwise nanos since [`Self::epoch`] of + /// the current ejection's start. + ejected_at_nanos: AtomicU64, + /// Origin for `ejected_at_nanos`. Set at construction. + epoch: Instant, +} + +impl OutlierChannelState { + pub(crate) fn new(addr: EndpointAddress) -> Self { + Self { + addr, + counters: EndpointCounters::default(), + is_qualifying: AtomicBool::new(false), + ejection_multiplier: AtomicU32::new(0), + ejected_at_nanos: AtomicU64::new(0), + epoch: Instant::now(), + } + } + + /// Endpoint address this state belongs to. + pub(crate) fn addr(&self) -> &EndpointAddress { + &self.addr + } + + pub(crate) fn record_success(&self) { + self.counters.record_success(); + } + + pub(crate) fn record_failure(&self) { + self.counters.record_failure(); + } + + /// Returns `(success, failure)` without resetting. The two reads + /// are not atomic together; bias is bounded by in-flight RPCs. + pub(crate) fn counters(&self) -> (u64, u64) { + let s = self.counters.success.load(Ordering::Relaxed); + let f = self.counters.failure.load(Ordering::Relaxed); + (s, f) + } + + /// Read and zero the counters. Returns `(success, failure)`. + pub(crate) fn snapshot_and_reset(&self) -> (u64, u64) { + self.counters.snapshot_and_reset() + } + + /// Set `is_qualifying` to `true`. Returns `true` if this call + /// performed the false → true transition (so the caller can bump + /// the registry counter exactly once per crossing). + pub(crate) fn mark_qualifying(&self) -> bool { + !self.is_qualifying.swap(true, Ordering::AcqRel) + } + + /// Clear `is_qualifying`. Returns the previous value. + pub(crate) fn clear_qualifying(&self) -> bool { + self.is_qualifying.swap(false, Ordering::AcqRel) + } + + /// Atomically mark this channel as ejected starting at `now`. + /// Returns `true` on the not-ejected → ejected transition and + /// bumps the multiplier; `false` if already ejected. + pub(crate) fn try_eject(&self, now: Instant) -> bool { + let nanos = now + .saturating_duration_since(self.epoch) + .as_nanos() + .min(u64::MAX as u128) as u64; + // 0 means "not ejected"; use 1 as a sentinel if the channel + // was created at exactly `now`. + let stamp = nanos.max(1); + if self + .ejected_at_nanos + .compare_exchange(0, stamp, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return false; + } + self.ejection_multiplier.fetch_add(1, Ordering::Relaxed); + true + } + + /// Atomically clear the ejection. Returns `true` on the + /// ejected → not-ejected transition. + pub(crate) fn try_uneject(&self) -> bool { + self.ejected_at_nanos.swap(0, Ordering::AcqRel) != 0 + } + + /// Current ejection state. + pub(crate) fn is_ejected(&self) -> bool { + self.ejected_at_nanos.load(Ordering::Acquire) != 0 + } + + /// Returns the elapsed time since this channel was ejected, or + /// `None` if it is not currently ejected. + pub(crate) fn ejected_duration(&self, now: Instant) -> Option { + let nanos = self.ejected_at_nanos.load(Ordering::Relaxed); + if nanos == 0 { + return None; + } + let ejected_at = self.epoch + Duration::from_nanos(nanos); + Some(now.saturating_duration_since(ejected_at)) + } + + /// Current ejection multiplier. + pub(crate) fn ejection_multiplier(&self) -> u32 { + self.ejection_multiplier.load(Ordering::Relaxed) + } + + /// Decrement the multiplier, saturating at zero. Atomic against + /// concurrent `try_eject` and other decrements. + pub(crate) fn decrement_multiplier(&self) { + let _ = self + .ejection_multiplier + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + if v > 0 { Some(v - 1) } else { None } + }); + } + + /// Test-only multiplier setter for driving housekeeping without + /// going through `try_eject`. + #[cfg(test)] + pub(crate) fn set_ejection_multiplier(&self, value: u32) { + self.ejection_multiplier.store(value, Ordering::Relaxed); + } +} + /// Configuration for an ejected channel. #[derive(Debug, Clone)] pub(crate) struct EjectionConfig { @@ -47,7 +223,8 @@ pub(crate) struct EjectionConfig { /// Result of an ejection expiring. pub(crate) enum UnejectedChannel { - /// The channel is ready to serve again (ejection expired, no reconnect needed). + /// Cooldown elapsed; the original connection is reused with its + /// outlier state reattached. Ready(ReadyChannel), /// A fresh connection has been started. Connecting(ConnectingChannel), @@ -73,7 +250,7 @@ impl IdleChannel { where C::Service: Send + 'static, { - ConnectingChannel::new(connector.connect(&self.addr), self.addr) + ConnectingChannel::new(connector.connect(&self.addr)) } } @@ -83,29 +260,24 @@ impl IdleChannel { /// A channel that is in the process of connecting. /// -/// Implements [`Future`] -- resolves to [`ReadyChannel`] when connected. -/// Cancellation is handled externally via [`KeyedFutures::cancel`]. +/// `impl Future` — resolves to the connected service when +/// the connection completes. The caller wraps the resolved service +/// into a [`ReadyChannel`]. Cancellation is handled externally via +/// [`KeyedFutures::cancel`]. /// /// [`KeyedFutures::cancel`]: crate::client::loadbalance::keyed_futures::KeyedFutures::cancel pub(crate) struct ConnectingChannel { - inner: Pin> + Send>>, + inner: Pin + Send>>, } impl ConnectingChannel { - pub(crate) fn new(fut: BoxFuture, addr: EndpointAddress) -> Self { - Self { - inner: Box::pin(async move { - ReadyChannel { - addr, - inner: fut.await, - } - }), - } + pub(crate) fn new(fut: BoxFuture) -> Self { + Self { inner: fut } } } impl Future for ConnectingChannel { - type Output = ReadyChannel; + type Output = S; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().inner.as_mut().poll(cx) @@ -119,15 +291,32 @@ impl Future for ConnectingChannel { /// A channel that is connected and ready to serve requests. /// /// Holds the raw service `S` and delegates [`Service`] calls directly, -/// preserving `S::Future` and `S::Error` with no wrapping or type erasure. +/// preserving `S::Future` and `S::Error`. Shares +/// [`OutlierChannelState`] with the outlier-detection actor via `Arc`. #[derive(Clone)] pub(crate) struct ReadyChannel { addr: EndpointAddress, inner: S, + outlier: Arc, } impl ReadyChannel { - /// Eject this channel (e.g., due to outlier detection). Consumes self. + pub(crate) fn new(addr: EndpointAddress, inner: S, outlier: Arc) -> Self { + Self { + addr, + inner, + outlier, + } + } + + /// Per-channel outlier-detection state. Cloned cheaply via `Arc`. + pub(crate) fn outlier(&self) -> &Arc { + &self.outlier + } + + /// Eject this channel. Consumes self; the outlier state is moved + /// into the [`EjectedChannel`] so it can be reattached to the + /// [`ReadyChannel`] produced when the cooldown elapses. pub(crate) fn eject(self, config: EjectionConfig, connector: Arc) -> EjectedChannel where C: Connector + Send + Sync + 'static, @@ -136,13 +325,15 @@ impl ReadyChannel { EjectedChannel { addr: self.addr, inner: self.inner, + outlier: self.outlier, config, connector, ejection_timer, } } - /// Start reconnecting. Consumes self, dropping the old connection. + /// Drop the connection and start a fresh connect for the same + /// address. The outlier state remains in the registry. pub(crate) fn reconnect>( self, connector: Arc, @@ -150,7 +341,7 @@ impl ReadyChannel { where S: Send + 'static, { - ConnectingChannel::new(connector.connect(&self.addr), self.addr) + ConnectingChannel::new(connector.connect(&self.addr)) } } @@ -184,15 +375,17 @@ impl Load for ReadyChannel { // --------------------------------------------------------------------------- pin_project! { - /// A channel that has been ejected and is cooling down. + /// A channel that has been ejected and is cooling down. The + /// underlying connection is kept alive but cannot serve requests. /// - /// The underlying connection is kept alive but cannot serve requests. - /// Implements [`Future`] -- resolves once the ejection timer expires to either: - /// - [`UnejectedChannel::Ready`] if no reconnect is needed - /// - [`UnejectedChannel::Connecting`] if a fresh connection is required + /// `impl Future>` — resolves when + /// `config.timeout` elapses, to [`UnejectedChannel::Ready`] if + /// `needs_reconnect` is false, otherwise + /// [`UnejectedChannel::Connecting`]. pub(crate) struct EjectedChannel { addr: EndpointAddress, inner: S, + outlier: Arc, config: EjectionConfig, connector: Arc + Send + Sync>, #[pin] @@ -209,15 +402,14 @@ impl Future for EjectedChannel { Poll::Ready(()) => { if this.config.needs_reconnect { let fut = this.connector.connect(this.addr); - Poll::Ready(UnejectedChannel::Connecting(ConnectingChannel::new( - fut, - this.addr.clone(), - ))) + Poll::Ready(UnejectedChannel::Connecting(ConnectingChannel::new(fut))) } else { - Poll::Ready(UnejectedChannel::Ready(ReadyChannel { - addr: this.addr.clone(), - inner: this.inner.clone(), - })) + let ready = ReadyChannel::new( + this.addr.clone(), + this.inner.clone(), + this.outlier.clone(), + ); + Poll::Ready(UnejectedChannel::Ready(ready)) } } Poll::Pending => Poll::Pending, @@ -286,17 +478,24 @@ mod tests { assert_eq!(connector.connect_count.load(Ordering::SeqCst), 1); } + fn wrap_ready(addr: EndpointAddress, svc: MockService) -> ReadyChannel { + let state = Arc::new(OutlierChannelState::new(addr.clone())); + ReadyChannel::new(addr, svc, state) + } + #[tokio::test] - async fn test_connecting_future_yields_ready() { + async fn test_connecting_future_yields_service() { let connector = MockConnector::new(); - let ready = IdleChannel::new(test_addr()).connect(connector).await; - assert_eq!(ready.addr, test_addr()); + let svc: MockService = IdleChannel::new(test_addr()).connect(connector).await; + // The bare service is what `ConnectingChannel` resolves to. + let _ready = wrap_ready(test_addr(), svc); } #[tokio::test] async fn test_ready_service_delegates() { let connector = MockConnector::new(); - let mut ready = IdleChannel::new(test_addr()).connect(connector).await; + let svc = IdleChannel::new(test_addr()).connect(connector).await; + let mut ready = wrap_ready(test_addr(), svc); let resp: &str = ready.call("hello").await.unwrap(); assert_eq!(resp, "ok"); } @@ -304,9 +503,10 @@ mod tests { #[tokio::test] async fn test_ready_to_connecting_via_reconnect() { let connector = MockConnector::new(); - let ready = IdleChannel::new(test_addr()) + let svc = IdleChannel::new(test_addr()) .connect(connector.clone()) .await; + let ready = wrap_ready(test_addr(), svc); let _reconnecting = ready.reconnect(connector.clone()); assert_eq!(connector.connect_count.load(Ordering::SeqCst), 2); } @@ -316,10 +516,9 @@ mod tests { #[tokio::test] async fn test_connecting_in_keyed_futures() { let (tx, rx) = tokio::sync::oneshot::channel::(); - let connecting = - ConnectingChannel::new(Box::pin(async move { rx.await.unwrap() }), test_addr()); + let connecting = ConnectingChannel::new(Box::pin(async move { rx.await.unwrap() })); - let mut set: KeyedFutures> = KeyedFutures::new(); + let mut set: KeyedFutures = KeyedFutures::new(); set.add(test_addr(), connecting).unwrap(); assert!(matches!(set.poll_next(&mut noop_cx()), Poll::Pending)); @@ -334,10 +533,9 @@ mod tests { #[tokio::test] async fn test_connecting_cancelled_via_keyed_futures() { - let connecting = - ConnectingChannel::new(Box::pin(future::pending::()), test_addr()); + let connecting = ConnectingChannel::new(Box::pin(future::pending::())); - let mut set: KeyedFutures> = KeyedFutures::new(); + let mut set: KeyedFutures = KeyedFutures::new(); set.add(test_addr(), connecting).unwrap(); assert!(matches!(set.poll_next(&mut noop_cx()), Poll::Pending)); @@ -349,9 +547,10 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_ejected_in_keyed_futures_ready() { let connector = MockConnector::new(); - let ready = IdleChannel::new(test_addr()) + let svc = IdleChannel::new(test_addr()) .connect(connector.clone()) .await; + let ready = wrap_ready(test_addr(), svc); let ejected = ready.eject( EjectionConfig { timeout: Duration::from_secs(5), @@ -374,9 +573,10 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_ejected_in_keyed_futures_needs_reconnect() { let connector = MockConnector::new(); - let ready = IdleChannel::new(test_addr()) + let svc = IdleChannel::new(test_addr()) .connect(connector.clone()) .await; + let ready = wrap_ready(test_addr(), svc); let ejected = ready.eject( EjectionConfig { timeout: Duration::from_secs(5), diff --git a/tonic-xds/src/client/loadbalance/keyed_futures.rs b/tonic-xds/src/client/loadbalance/keyed_futures.rs index 74319c6f3..701ff865f 100644 --- a/tonic-xds/src/client/loadbalance/keyed_futures.rs +++ b/tonic-xds/src/client/loadbalance/keyed_futures.rs @@ -89,6 +89,11 @@ where self.futures.len() } + /// True if a live (non-cancelled) future is tracked for `key`. + pub(crate) fn contains_key(&self, key: &K) -> bool { + self.cancellations.contains_key(key) + } + /// Advance the internal futures. Yields `(K, T)` when a future completes, /// skipping cancelled futures silently. /// diff --git a/tonic-xds/src/client/loadbalance/loadbalancer.rs b/tonic-xds/src/client/loadbalance/loadbalancer.rs index 3a1a0171f..ac58b6080 100644 --- a/tonic-xds/src/client/loadbalance/loadbalancer.rs +++ b/tonic-xds/src/client/loadbalance/loadbalancer.rs @@ -1,28 +1,43 @@ //! Load balancer tower service. //! -//! Receives endpoint updates via [`tower::discover::Discover`] (yielding -//! [`IdleChannel`]s), manages the connection lifecycle via the channel state -//! machine, and routes requests to ready endpoints via a [`ChannelPicker`]. +//! Receives endpoint updates via [`tower::discover::Discover`], +//! manages the connection lifecycle via the channel state machine, +//! and routes requests to ready endpoints via a [`ChannelPicker`]. +//! +//! Outlier detection (gRFC A50) is integrated via an optional +//! [`OutlierDetector`]. Eject requests arrive on an mpsc channel from +//! the data path; the LB consumes the matching [`ReadyChannel`] via +//! [`ReadyChannel::eject`] and tracks the resulting +//! [`EjectedChannel`] in [`Self::ejected`]. When the timer fires, the +//! resolved [`UnejectedChannel`] is routed back into `ready` or +//! `connecting`. +//! +//! [`EjectedChannel`]: crate::client::loadbalance::channel_state::EjectedChannel +//! [`UnejectedChannel`]: crate::client::loadbalance::channel_state::UnejectedChannel use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use std::time::{Duration, Instant}; use indexmap::IndexMap; use tower::Service; use tower::discover::{Change, Discover}; use crate::client::endpoint::{Connector, EndpointAddress}; -use crate::client::loadbalance::channel_state::{IdleChannel, ReadyChannel}; +use crate::client::loadbalance::channel_state::{ + EjectionConfig, IdleChannel, OutlierChannelState, ReadyChannel, UnejectedChannel, +}; use crate::client::loadbalance::errors::LbError; use crate::client::loadbalance::keyed_futures::KeyedFutures; +use crate::client::loadbalance::outlier_detection::{ + OutlierDetector, OutlierStatsRegistry, RegistryAlreadyWired, +}; use crate::client::loadbalance::pickers::ChannelPicker; -/// Future returned by [`LoadBalancer::call`]. -/// -/// Either resolves immediately with an [`LbError`], or drives `poll_ready` + -/// `call` on the selected channel asynchronously. +/// Future returned by [`LoadBalancer::call`]. Either resolves +/// immediately with an [`LbError`] or drives the selected channel. pub(crate) enum LbFuture { Error(Option), Pending(Pin> + Send>>), @@ -53,15 +68,18 @@ impl Future for LbFuture { /// `C::Service` is the underlying service type held in ready channels. /// - `Req`: The request type. pub(crate) struct LoadBalancer { - /// Discovery stream providing endpoint additions/removals. discovery: D, - /// Connector for creating connections from idle channels. connector: Arc, - /// In-flight connection attempts, keyed by endpoint address. - connecting: KeyedFutures>, - /// Ready-to-serve channels, keyed by endpoint address. + /// In-flight connection attempts. + connecting: KeyedFutures, + /// Ready-to-serve channels. ready: IndexMap>, - /// Channel picker for load balancing. + /// Currently-ejected channels. Each entry is an + /// [`EjectedChannel`] whose `Sleep` fires when the ejection + /// window expires. + ejected: KeyedFutures>, + /// `None` disables outlier detection. + outlier: Option, picker: Arc, Req> + Send + Sync>, } @@ -70,57 +88,209 @@ where D: Discover + Unpin, D::Error: Into, C: Connector + Send + Sync + 'static, - C::Service: Send + 'static, + C::Service: Clone + Send + 'static, { - /// Create a new load balancer with the given picker. + /// Create a load balancer with no outlier detection. pub(crate) fn new( discovery: D, connector: Arc, picker: Arc, Req> + Send + Sync>, ) -> Self { - Self { + // Infallible: `with_outlier(.., None)` never wires a registry. + Self::with_outlier(discovery, connector, picker, None) + .expect("with_outlier(.., None) is infallible") + } + + /// Create a load balancer, optionally enabling outlier detection. + /// When `outlier` is `Some`, the registry's housekeeping actor is + /// spawned and bound to this LB. Returns + /// [`RegistryAlreadyWired`] if the registry already drives + /// another LB. + pub(crate) fn with_outlier( + discovery: D, + connector: Arc, + picker: Arc, Req> + Send + Sync>, + outlier: Option>, + ) -> Result { + let outlier = outlier.map(OutlierDetector::new).transpose()?; + Ok(Self { discovery, connector, connecting: KeyedFutures::new(), ready: IndexMap::new(), + ejected: KeyedFutures::new(), + outlier, picker, + }) + } + + /// Purge all state for `addr`, including the outlier-detection + /// registry entry. Called on `Change::Remove`. + fn purge_endpoint(&mut self, addr: &EndpointAddress) { + let _ = self.connecting.cancel(addr); + self.ready.swap_remove(addr); + let _ = self.ejected.cancel(addr); + if let Some(o) = self.outlier.as_ref() { + o.registry().remove_channel(addr); } } - /// Drain pending discovery events. Either resolves to an error - /// ([`LbError::DiscoverClosed`] or [`LbError::DiscoverError`]) or stays - /// pending — there is no success outcome since the loop only exits on - /// pending or error. + /// Clear stale connecting/ready/ejected slots for `addr` but + /// preserve the outlier-detection registry entry. Called on + /// `Change::Insert` so transient discovery flaps don't lose + /// counters or ejection state, matching grpc-go and Envoy. + fn reset_active_slots(&mut self, addr: &EndpointAddress) { + let _ = self.connecting.cancel(addr); + self.ready.swap_remove(addr); + let _ = self.ejected.cancel(addr); + } + + /// Drain pending discovery events. Resolves to an error + /// ([`LbError::DiscoverClosed`] or [`LbError::DiscoverError`]) + /// or stays pending — there is no success outcome. fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll { loop { match ready!(Pin::new(&mut self.discovery).poll_discover(cx)) { None => { - // tower::discover::Discover::poll_discover() returns Ready(None) when the - // discover object is closed, as indicated by Stream trait. tracing::error!("discover object is closed"); return Poll::Ready(LbError::DiscoverClosed); } Some(Err(e)) => return Poll::Ready(LbError::DiscoverError(e.into())), Some(Ok(Change::Insert(addr, idle))) => { tracing::trace!("discovery: insert {addr}"); - let _ = self.connecting.cancel(&addr); - self.ready.swap_remove(&addr); + self.reset_active_slots(&addr); let connecting = idle.connect(self.connector.clone()); let _ = self.connecting.add(addr, connecting); } Some(Ok(Change::Remove(addr))) => { tracing::trace!("discovery: remove {addr}"); - let _ = self.connecting.cancel(&addr); - self.ready.swap_remove(&addr); + self.purge_endpoint(&addr); } } } } - /// Drain completed connection futures into the ready set. + /// Drain completed connection futures. If the outlier state for + /// a re-discovered endpoint is still ejected, the new channel is + /// re-ejected for the *remaining* duration; if the deadline has + /// already passed, it is un-ejected and routed to `ready`. fn poll_connecting(&mut self, cx: &mut Context<'_>) { - while let Poll::Ready(Some((addr, ready))) = self.connecting.poll_next(cx) { - self.ready.insert(addr, ready); + while let Poll::Ready(Some((addr, svc))) = self.connecting.poll_next(cx) { + let state = match self.outlier.as_ref() { + Some(o) => o.registry().add_channel(addr.clone()), + None => Arc::new(OutlierChannelState::new(addr.clone())), + }; + let ready = ReadyChannel::new(addr.clone(), svc, state.clone()); + let remaining = self + .outlier + .as_ref() + .and_then(|o| o.registry().remaining_ejection(&state, Instant::now())); + self.place_after_connect(addr, ready, remaining); + } + } + + /// Route a freshly-connected `ReadyChannel` based on its + /// preserved outlier state: `None` → ready; `Some(0)` → un-eject + /// then ready; `Some(d)` → ejected for `d`. + fn place_after_connect( + &mut self, + addr: EndpointAddress, + ready: ReadyChannel, + remaining: Option, + ) { + match remaining { + None => { + self.ready.insert(addr, ready); + } + Some(d) if d.is_zero() => { + if let Some(o) = self.outlier.as_ref() { + o.registry().note_uneject(ready.outlier()); + } + self.ready.insert(addr, ready); + } + Some(d) => { + let ejected = ready.eject( + EjectionConfig { + timeout: d, + needs_reconnect: false, + }, + self.connector.clone(), + ); + tracing::debug!("outlier detection: re-eject {addr} for {d:?}"); + let _ = self.ejected.add(addr, ejected); + } + } + } + + /// Drain eject requests from the outlier detector's mpsc and + /// move each named `ReadyChannel` into [`Self::ejected`]. The + /// per-channel ejection flag has already been set by + /// `record_outcome`. + fn poll_eject_requests(&mut self, cx: &mut Context<'_>) { + loop { + let Some(o) = self.outlier.as_mut() else { + return; + }; + let addr = match o.poll_eject_request(cx) { + Poll::Ready(Some(a)) => a, + _ => return, + }; + let registry = o.registry().clone(); + // Channel may have been removed by discovery in the + // meantime; if so, nothing to eject. + let Some(ch) = self.ready.swap_remove(&addr) else { + continue; + }; + let state = ch.outlier().clone(); + match registry.remaining_ejection(&state, Instant::now()) { + Some(d) if !d.is_zero() => { + let ejected = ch.eject( + EjectionConfig { + timeout: d, + needs_reconnect: false, + }, + self.connector.clone(), + ); + tracing::debug!("outlier detection: eject {addr} for {d:?}"); + let _ = self.ejected.add(addr, ejected); + } + Some(_) => { + // Deadline already past — un-eject. + registry.note_uneject(&state); + self.ready.insert(addr, ch); + } + None => { + // No longer ejected (raced with un-eject). + self.ready.insert(addr, ch); + } + } + } + } + + /// Drain completed `EjectedChannel` timers. Clears the + /// registry-level ejection counter and routes the resolved + /// channel back into `ready` (with its outlier state already + /// reattached) or `connecting`. + fn poll_unejection(&mut self, cx: &mut Context<'_>) { + while let Poll::Ready(Some((addr, unejected))) = self.ejected.poll_next(cx) { + match unejected { + UnejectedChannel::Ready(ready) => { + if let Some(o) = self.outlier.as_ref() { + o.registry().note_uneject(ready.outlier()); + } + tracing::debug!("outlier detection: uneject {addr}"); + self.ready.insert(addr, ready); + } + // `needs_reconnect = false` for A50; this arm is + // reserved for future policies. + UnejectedChannel::Connecting(future) => { + if let Some(o) = self.outlier.as_ref() { + let state = o.registry().add_channel(addr.clone()); + o.registry().note_uneject(&state); + } + let _ = self.connecting.add(addr, future); + } + } } } } @@ -142,21 +312,23 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let discover_result = self.poll_discover(cx); + // Un-ejections before ejections so `ejected_count` is current + // when the next eject is evaluated. + self.poll_unejection(cx); self.poll_connecting(cx); + self.poll_eject_requests(cx); if !self.ready.is_empty() { return Poll::Ready(Ok(())); } - // No ready endpoints. Check if we should fail fast. + // No ready endpoints. Fail fast iff discovery is closed and + // nothing else can produce one. match discover_result { Poll::Ready(LbError::DiscoverClosed) if self.connecting.len() == 0 => { - // Discovery is closed and nothing is connecting — no progress is possible. Poll::Ready(Err(LbError::Stagnation)) } Poll::Ready(e) => { - // Other discovery errors (or DiscoverClosed with connecting in flight) - // are non-fatal — log and stay pending. tracing::warn!("discovery yielded error: {e}"); Poll::Pending } @@ -174,16 +346,20 @@ where let Some(picked) = self.picker.pick(&req, &self.ready) else { return LbFuture::Error(Some(LbError::Unavailable)); }; - // `picked` is a read-only borrow into `self.ready`. Clone to get an - // owned service we can drive in the async block. + // Cheap clones (all Arc-shared internals) so the async block + // can take ownership without holding the picker borrow. let mut svc = picked.clone(); + let outlier_state = picked.outlier().clone(); + let registry = self.outlier.as_ref().map(|o| o.registry().clone()); LbFuture::Pending(Box::pin(async move { tower::ServiceExt::ready(&mut svc) .await .map_err(|e| LbError::LbChannelPollReadyError(e.into()))?; - svc.call(req) - .await - .map_err(|e| LbError::LbChannelCallError(e.into())) + let result = svc.call(req).await; + if let Some(registry) = registry.as_ref() { + registry.record_outcome(&outlier_state, result.is_ok()); + } + result.map_err(|e| LbError::LbChannelCallError(e.into())) })) } } @@ -658,4 +834,335 @@ mod tests { "expected LbChannelCallError, got {result:?}" ); } + + // -- Outlier-detection integration tests -- + + use crate::client::loadbalance::outlier_detection::OutlierStatsRegistry; + use crate::xds::resource::outlier_detection::{ + FailurePercentageConfig, OutlierDetectionConfig, Percentage, + }; + use std::time::Duration; + + fn pct(v: u32) -> Percentage { + Percentage::new(v).unwrap() + } + + fn fp_config( + threshold: u32, + request_volume: u32, + minimum_hosts: u32, + ) -> OutlierDetectionConfig { + OutlierDetectionConfig { + interval: Duration::from_secs(60), + base_ejection_time: Duration::from_secs(30), + max_ejection_time: Duration::from_secs(300), + max_ejection_percent: pct(100), + success_rate: None, + failure_percentage: Some(FailurePercentageConfig { + threshold: pct(threshold), + enforcing_failure_percentage: pct(100), + minimum_hosts, + request_volume, + }), + } + } + + /// Build an LB with outlier detection enabled. + fn make_lb_with_outlier( + discover: MockDiscover, + config: OutlierDetectionConfig, + ) -> (Lb, Arc, Arc) { + let connector = Arc::new(MockConnector::new()); + let picker: Arc, &'static str> + Send + Sync> = + Arc::new(P2cPicker); + let registry = OutlierStatsRegistry::new(config); + let lb = + LoadBalancer::with_outlier(discover, connector.clone(), picker, Some(registry.clone())) + .expect("registry not yet wired"); + (lb, connector, registry) + } + + /// Drive the LB through one call per port. Asserts each succeeds. + async fn call_each(lb: &mut Lb, n: usize) { + for _ in 0..n { + lb.call("hello").await.unwrap(); + } + } + + #[tokio::test] + async fn test_outlier_detection_ejects_failing_endpoint() { + // 5 endpoints, all healthy except 8084. Once 8084's failures + // cross the threshold, it should be moved out of `ready` and + // into `ejected`. + let (tx, discover) = new_discover(); + let (mut lb, connector, registry) = make_lb_with_outlier( + discover, + fp_config( + /*threshold*/ 50, /*request_volume*/ 5, /*minimum_hosts*/ 3, + ), + ); + + for port in 8080..=8084 { + tx.send(Ok(Change::Insert(addr(port), IdleChannel::new(addr(port))))) + .await + .unwrap(); + } + drive_to_ready(&mut lb, &connector).await; + assert_eq!(lb.ready.len(), 5); + + // Configure 8084 to always fail. Other endpoints stay healthy. + connector + .service(&addr(8084)) + .fail_call + .store(true, Ordering::Relaxed); + + // Drive enough calls to ensure 8084 reaches request_volume + // and its failure rate triggers ejection. With 5 endpoints + // and P2C picking, each gets ~k/5 calls; drive 100 to be safe. + for _ in 0..100 { + let _ = lb.call("hello").await; + } + + // poll_ready drains the eject mpsc and transitions 8084 into + // `self.ejected` via `ReadyChannel::eject`. + let _ = poll_ready_now(&mut lb); + assert!( + lb.ejected.contains_key(&addr(8084)), + "8084 should be ejected; ejected.len()={}, ready keys: {:?}", + lb.ejected.len(), + lb.ready.keys().collect::>(), + ); + assert!(!lb.ready.contains_key(&addr(8084))); + // The registry's `ejected_count` should reflect the same. + assert!(registry.len() == 5); + } + + #[tokio::test] + async fn test_outlier_detection_healthy_cluster_no_ejections() { + let (tx, discover) = new_discover(); + let (mut lb, connector, _registry) = make_lb_with_outlier(discover, fp_config(50, 5, 3)); + + for port in 8080..=8084 { + tx.send(Ok(Change::Insert(addr(port), IdleChannel::new(addr(port))))) + .await + .unwrap(); + } + drive_to_ready(&mut lb, &connector).await; + assert_eq!(lb.ready.len(), 5); + + call_each(&mut lb, 50).await; + + let _ = poll_ready_now(&mut lb); + assert_eq!(lb.ejected.len(), 0); + assert_eq!(lb.ready.len(), 5); + } + + #[tokio::test] + async fn test_outlier_detection_endpoint_removal_cleans_registry() { + let (tx, discover) = new_discover(); + let (mut lb, connector, registry) = make_lb_with_outlier(discover, fp_config(50, 5, 3)); + + tx.send(Ok(Change::Insert(addr(8080), IdleChannel::new(addr(8080))))) + .await + .unwrap(); + drive_to_ready(&mut lb, &connector).await; + assert_eq!(registry.len(), 1); + + tx.send(Ok(Change::Remove(addr(8080)))).await.unwrap(); + let _ = poll_ready_now(&mut lb); + assert_eq!(registry.len(), 0); + assert_eq!(lb.ready.len(), 0); + } + + /// Re-discovering an endpoint (Insert for an address the LB + /// already tracks) must preserve its outlier-detection counters + /// and multiplier. Matches grpc-go / Envoy behavior. + #[tokio::test] + async fn test_outlier_detection_reinsert_preserves_state() { + let (tx, discover) = new_discover(); + let (mut lb, connector, registry) = make_lb_with_outlier(discover, fp_config(50, 5, 3)); + + tx.send(Ok(Change::Insert(addr(8080), IdleChannel::new(addr(8080))))) + .await + .unwrap(); + drive_to_ready(&mut lb, &connector).await; + let state = registry.add_channel(addr(8080)); // idempotent — returns the existing state + // Drive some successes through the data path so the channel + // accumulates counter state worth preserving. + for _ in 0..3 { + lb.call("hello").await.unwrap(); + } + let (s_before, f_before) = state.counters(); + assert!( + s_before > 0, + "expected accumulated successes before re-insert" + ); + let registry_before = Arc::as_ptr(&state); + + // Re-insert the same address. State must survive. + tx.send(Ok(Change::Insert(addr(8080), IdleChannel::new(addr(8080))))) + .await + .unwrap(); + drive_to_ready(&mut lb, &connector).await; + + let state_after = registry.add_channel(addr(8080)); + assert_eq!( + Arc::as_ptr(&state_after), + registry_before, + "registry entry should be the same Arc — state continuity preserved", + ); + let (s_after, f_after) = state_after.counters(); + assert_eq!( + (s_after, f_after), + (s_before, f_before), + "counters must survive re-insert", + ); + assert_eq!(registry.len(), 1); + } + + /// A re-discovered endpoint whose preserved state says "ejected" + /// is placed directly into the ejected pool, not the ready set, so + /// no traffic is routed to it until the housekeeping actor + /// un-ejects it. + #[tokio::test] + async fn test_outlier_detection_reinsert_while_ejected_stays_ejected() { + let (tx, discover) = new_discover(); + let (mut lb, connector, registry) = make_lb_with_outlier(discover, fp_config(50, 5, 3)); + + // Bring up 5 endpoints; make 8084 fail enough to be ejected. + for port in 8080..=8084 { + tx.send(Ok(Change::Insert(addr(port), IdleChannel::new(addr(port))))) + .await + .unwrap(); + } + drive_to_ready(&mut lb, &connector).await; + connector + .service(&addr(8084)) + .fail_call + .store(true, Ordering::Relaxed); + for _ in 0..100 { + let _ = lb.call("hello").await; + } + let _ = poll_ready_now(&mut lb); + let state_8084 = registry.add_channel(addr(8084)); + assert!( + state_8084.is_ejected(), + "8084 must be ejected before re-insert" + ); + assert!( + lb.ejected.contains_key(&addr(8084)), + "8084 should be in the ejected pool" + ); + + // Re-insert 8084. The ejected slot's old EjectedChannel is + // cancelled, but the registry entry (is_ejected=true, + // ejected_at_nanos preserved) survives. The new channel + // should be re-ejected with the *remaining* ejection time. + // Drive the steps explicitly because `lb.ready` is non-empty + // throughout (8080..=8083), so `drive_to_ready` may return + // before the new 8084 connect resolves. + tx.send(Ok(Change::Insert(addr(8084), IdleChannel::new(addr(8084))))) + .await + .unwrap(); + // 1. Drain the Insert into `self.connecting`. + let _ = poll_ready_now(&mut lb); + // 2. Synchronously resolve the new connect future. + connector.resolve_all(); + // 3. Drain the now-ready connecting future; `poll_connecting` + // sees `state.is_ejected() == true` and re-ejects. + let _ = poll_ready_now(&mut lb); + + assert!( + !lb.ready.contains_key(&addr(8084)), + "8084 must not be in ready while still logically ejected" + ); + assert!( + lb.ejected.contains_key(&addr(8084)), + "8084 must remain in the ejected pool after re-insert" + ); + assert!(state_8084.is_ejected()); + } + + /// Once `base × multiplier` time elapses on an ejected channel, + /// the [`EjectedChannel`]'s timer fires and the LB's + /// `poll_unejection` should move the channel back to `ready`. + #[tokio::test(start_paused = true)] + async fn test_outlier_detection_timer_driven_unejection() { + let mut config = fp_config(50, 5, 3); + // Short base for fast test; multiplier is 1 on first eject. + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(60); + + let (tx, discover) = new_discover(); + let (mut lb, connector, registry) = make_lb_with_outlier(discover, config); + + for port in 8080..=8084 { + tx.send(Ok(Change::Insert(addr(port), IdleChannel::new(addr(port))))) + .await + .unwrap(); + } + drive_to_ready(&mut lb, &connector).await; + connector + .service(&addr(8084)) + .fail_call + .store(true, Ordering::Relaxed); + for _ in 0..100 { + let _ = lb.call("hello").await; + } + let _ = poll_ready_now(&mut lb); + assert!( + lb.ejected.contains_key(&addr(8084)), + "8084 must be ejected before the timer fires" + ); + assert!(registry.add_channel(addr(8084)).is_ejected()); + + // Stop 8084 from failing so it can serve again, then advance + // past `base × multiplier = 10s`. + connector + .service(&addr(8084)) + .fail_call + .store(false, Ordering::Relaxed); + tokio::time::advance(Duration::from_secs(11)).await; + // Drive poll_ready; `EjectedChannel`'s timer fires and + // `poll_unejection` routes 8084 back to ready. + let _ = poll_ready_now(&mut lb); + + assert!( + !lb.ejected.contains_key(&addr(8084)), + "8084 must leave the ejected pool once the timer fires" + ); + assert!( + lb.ready.contains_key(&addr(8084)), + "8084 must be back in ready after un-ejection" + ); + assert!(!registry.add_channel(addr(8084)).is_ejected()); + } + + /// Sharing one `OutlierStatsRegistry` across two `LoadBalancer`s is + /// not supported — the eject-signal receiver is one-shot. The + /// second `with_outlier` call must return an error rather than + /// panic. + #[tokio::test] + async fn test_outlier_registry_cannot_be_wired_twice() { + let (_tx1, discover1) = new_discover(); + let (_tx2, discover2) = new_discover(); + let connector = Arc::new(MockConnector::new()); + let picker: Arc, &'static str> + Send + Sync> = + Arc::new(P2cPicker); + let registry = OutlierStatsRegistry::new(fp_config(50, 5, 3)); + + // First wiring succeeds. + LoadBalancer::with_outlier( + discover1, + connector.clone(), + picker.clone(), + Some(registry.clone()), + ) + .expect("first wire"); + + // Second wiring of the same registry must error, not panic. + let result = + LoadBalancer::with_outlier(discover2, connector, picker, Some(registry.clone())); + assert!(result.is_err()); + } } diff --git a/tonic-xds/src/client/loadbalance/mod.rs b/tonic-xds/src/client/loadbalance/mod.rs index 66ccb1772..1c4ffa395 100644 --- a/tonic-xds/src/client/loadbalance/mod.rs +++ b/tonic-xds/src/client/loadbalance/mod.rs @@ -3,4 +3,5 @@ pub(crate) mod channel_state; pub(crate) mod errors; pub(crate) mod keyed_futures; pub(crate) mod loadbalancer; +pub(crate) mod outlier_detection; pub(crate) mod pickers; diff --git a/tonic-xds/src/client/loadbalance/outlier_detection.rs b/tonic-xds/src/client/loadbalance/outlier_detection.rs new file mode 100644 index 000000000..8cff6dce9 --- /dev/null +++ b/tonic-xds/src/client/loadbalance/outlier_detection.rs @@ -0,0 +1,736 @@ +//! [gRFC A50] outlier detection. +//! +//! Work is split across three sites: +//! +//! - **Data path** ([`OutlierStatsRegistry::record_outcome`]): runs +//! inline per RPC. Updates per-channel counters, applies the +//! failure-percentage gate, and on transition to ejected sends the +//! address through an mpsc channel. +//! - **Load balancer**: drains the eject mpsc in `poll_ready`, +//! consumes the matching [`ReadyChannel`] via +//! [`ReadyChannel::eject`], and tracks the resulting +//! [`EjectedChannel`] in a `KeyedFutures`. Each ejected channel's +//! sleep fires at `base × multiplier` (capped by +//! `max_ejection_time`); the LB then routes the resolved +//! [`UnejectedChannel`] back into the ready set. +//! - **Housekeeping actor** ([`spawn_actor`]): on each +//! `config.interval` tick, resets counters and decrements +//! multipliers for non-ejected channels. The actor never ejects or +//! un-ejects. +//! +//! Only the failure-percentage algorithm is implemented; success-rate +//! (cross-endpoint mean/stdev) is left to a follow-up. +//! +//! [gRFC A50]: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md +//! [`ReadyChannel`]: crate::client::loadbalance::channel_state::ReadyChannel +//! [`ReadyChannel::eject`]: crate::client::loadbalance::channel_state::ReadyChannel::eject +//! [`EjectedChannel`]: crate::client::loadbalance::channel_state::EjectedChannel +//! [`UnejectedChannel`]: crate::client::loadbalance::channel_state::UnejectedChannel + +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use dashmap::DashMap; +use tokio::sync::mpsc; + +use crate::client::endpoint::EndpointAddress; +use crate::client::loadbalance::channel_state::OutlierChannelState; +use crate::common::async_util::AbortOnDrop; +use crate::xds::resource::outlier_detection::OutlierDetectionConfig; + +/// Returned when an [`OutlierStatsRegistry`] is handed to a second +/// load balancer. The eject-signal receiver is one-shot. +#[derive(Debug, thiserror::Error)] +#[error("OutlierStatsRegistry is already wired to a LoadBalancer")] +pub(crate) struct RegistryAlreadyWired; + +/// Shared outlier-detection state, owned by `Arc` and accessed +/// concurrently by the data path ([`Self::record_outcome`]), the +/// housekeeping actor ([`Self::run_housekeeping`]), and the load +/// balancer ([`Self::note_uneject`], [`Self::remaining_ejection`]). +pub(crate) struct OutlierStatsRegistry { + channels: DashMap>, + /// Channels with `total >= request_volume` in the active + /// interval. Drives the `minimum_hosts` gate. + qualifying_count: AtomicU64, + /// Channels currently ejected. Drives the + /// `max_ejection_percent` cap. + ejected_count: AtomicU64, + config: OutlierDetectionConfig, + /// Sender half of the eject signal. The receiver is owned by the + /// LB's [`OutlierDetector`]. + eject_tx: mpsc::UnboundedSender, + /// Receiver moved out exactly once by [`Self::take_eject_rx`]. + eject_rx: Mutex>>, +} + +impl OutlierStatsRegistry { + pub(crate) fn new(config: OutlierDetectionConfig) -> Arc { + let (eject_tx, eject_rx) = mpsc::unbounded_channel(); + Arc::new(Self { + channels: DashMap::new(), + qualifying_count: AtomicU64::new(0), + ejected_count: AtomicU64::new(0), + config, + eject_tx, + eject_rx: Mutex::new(Some(eject_rx)), + }) + } + + /// Take the eject-signal receiver. Returns + /// [`RegistryAlreadyWired`] on a second call — a registry can + /// drive at most one load balancer. + fn take_eject_rx( + &self, + ) -> Result, RegistryAlreadyWired> { + self.eject_rx + .lock() + .expect("eject_rx mutex poisoned") + .take() + .ok_or(RegistryAlreadyWired) + } + + /// Get or create the state for `addr`. Idempotent — existing + /// state is preserved across reconnect. + pub(crate) fn add_channel(&self, addr: EndpointAddress) -> Arc { + self.channels + .entry(addr.clone()) + .or_insert_with(|| Arc::new(OutlierChannelState::new(addr))) + .clone() + } + + /// Drop the state for `addr`, decrementing cluster-wide counters + /// (`qualifying_count`, `ejected_count`) if it was contributing. + pub(crate) fn remove_channel(&self, addr: &EndpointAddress) { + if let Some((_, state)) = self.channels.remove(addr) { + if state.clear_qualifying() { + self.qualifying_count.fetch_sub(1, Ordering::Relaxed); + } + if state.is_ejected() { + self.ejected_count.fetch_sub(1, Ordering::Relaxed); + } + } + } + + /// Number of registered channels. + pub(crate) fn len(&self) -> usize { + self.channels.len() + } + + /// Per-RPC entry point. Records the outcome and, if all gates + /// pass, transitions the channel to ejected and dispatches the + /// address on the eject mpsc. + pub(crate) fn record_outcome(&self, state: &OutlierChannelState, success: bool) { + if success { + state.record_success(); + } else { + state.record_failure(); + } + + let Some(fp) = self.config.failure_percentage.as_ref() else { + return; + }; + + let (s, f) = state.counters(); + let total = s + f; + let request_volume = u64::from(fp.request_volume); + + // Bump `qualifying_count` exactly once per channel per + // interval so the `minimum_hosts` gate is a single atomic load. + if total >= request_volume && state.mark_qualifying() { + self.qualifying_count.fetch_add(1, Ordering::Relaxed); + } + + if state.is_ejected() { + return; + } + if total < request_volume { + return; + } + if self.qualifying_count.load(Ordering::Relaxed) < u64::from(fp.minimum_hosts) { + return; + } + if self.ejected_count.load(Ordering::Relaxed) >= self.max_ejections() { + return; + } + + // failure_pct = 100 * failure / total. A50 uses strict ">". + let failure_pct = 100 * f / total; + if failure_pct <= u64::from(fp.threshold.get()) { + return; + } + if !roll(fp.enforcing_failure_percentage.get()) { + return; + } + + if state.try_eject(Instant::now()) { + self.ejected_count.fetch_add(1, Ordering::Relaxed); + // Send failure (LB receiver dropped during shutdown) is + // ignored; the registry will be torn down momentarily. + let _ = self.eject_tx.send(state.addr().clone()); + } + } + + /// Clear the ejection: flip the state, decrement + /// `ejected_count`, and decrement the multiplier (gRFC A50 + /// step 6.b: same sweep that un-ejects also decrements). Returns + /// `true` on the ejected → not-ejected transition. + pub(crate) fn note_uneject(&self, state: &OutlierChannelState) -> bool { + if state.try_uneject() { + self.ejected_count.fetch_sub(1, Ordering::Relaxed); + state.decrement_multiplier(); + true + } else { + false + } + } + + /// Time remaining on `state`'s ejection (capped by + /// `max_ejection_time`). `None` if not ejected; + /// `Some(Duration::ZERO)` if the deadline has passed (caller + /// should un-eject rather than start a fresh sleep). + pub(crate) fn remaining_ejection( + &self, + state: &OutlierChannelState, + now: Instant, + ) -> Option { + let elapsed = state.ejected_duration(now)?; + let multiplier = state.ejection_multiplier(); + let cap = self + .config + .base_ejection_time + .max(self.config.max_ejection_time); + let target = self + .config + .base_ejection_time + .checked_mul(multiplier) + .unwrap_or(cap) + .min(cap); + Some(target.checked_sub(elapsed).unwrap_or_default()) + } + + /// Interval-boundary housekeeping. Resets counters and + /// decrements multipliers for non-ejected channels. Does not + /// un-eject — that is driven by each `EjectedChannel`'s timer. + pub(crate) fn run_housekeeping(&self) { + for entry in self.channels.iter() { + let state = entry.value(); + state.snapshot_and_reset(); + if state.clear_qualifying() { + self.qualifying_count.fetch_sub(1, Ordering::Relaxed); + } + if !state.is_ejected() { + state.decrement_multiplier(); + } + } + } + + /// Resolve `max_ejection_percent` against the current channel count. + fn max_ejections(&self) -> u64 { + self.channels.len() as u64 * u64::from(self.config.max_ejection_percent.get()) / 100 + } +} + +/// Spawn the housekeeping actor. Ticks every `config.interval` and +/// calls [`OutlierStatsRegistry::run_housekeeping`]. Dropping the +/// returned [`AbortOnDrop`] stops the task. +pub(crate) fn spawn_actor(registry: Arc) -> AbortOnDrop { + let interval = registry.config.interval; + let task = tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + ticker.tick().await; + registry.run_housekeeping(); + } + }); + AbortOnDrop(task) +} + +/// Per-LB outlier-detection plumbing: shared registry, eject-signal +/// receiver, and the housekeeping actor handle (aborted on drop). The +/// LB holds this as `Option`. +pub(crate) struct OutlierDetector { + registry: Arc, + eject_rx: mpsc::UnboundedReceiver, + _actor: AbortOnDrop, +} + +impl OutlierDetector { + /// Take ownership of the registry's eject-signal receiver and + /// spawn the housekeeping actor. Returns + /// [`RegistryAlreadyWired`] if the registry is already wired to + /// another LB. + pub(crate) fn new(registry: Arc) -> Result { + let eject_rx = registry.take_eject_rx()?; + let _actor = spawn_actor(registry.clone()); + Ok(Self { + registry, + eject_rx, + _actor, + }) + } + + /// Shared registry handle. + pub(crate) fn registry(&self) -> &Arc { + &self.registry + } + + /// Poll for the next address the data path has decided to eject. + pub(crate) fn poll_eject_request( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + self.eject_rx.poll_recv(cx) + } +} + +/// Return true with probability `pct / 100` (clamped at 100 ⇒ always). +fn roll(pct: u8) -> bool { + if pct >= 100 { + return true; + } + if pct == 0 { + return false; + } + fastrand::u32(0..100) < u32::from(pct) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::xds::resource::outlier_detection::{ + FailurePercentageConfig, OutlierDetectionConfig, Percentage, + }; + use std::sync::atomic::Ordering; + use std::time::Duration; + + fn addr(port: u16) -> EndpointAddress { + EndpointAddress::new("10.0.0.1", port) + } + + fn pct(v: u32) -> Percentage { + Percentage::new(v).unwrap() + } + + fn base_config() -> OutlierDetectionConfig { + OutlierDetectionConfig { + interval: Duration::from_secs(1), + base_ejection_time: Duration::from_secs(30), + max_ejection_time: Duration::from_secs(300), + max_ejection_percent: pct(100), + success_rate: None, + failure_percentage: None, + } + } + + fn fp_config( + threshold: u32, + request_volume: u32, + minimum_hosts: u32, + ) -> OutlierDetectionConfig { + let mut c = base_config(); + c.failure_percentage = Some(FailurePercentageConfig { + threshold: pct(threshold), + enforcing_failure_percentage: pct(100), + minimum_hosts, + request_volume, + }); + c + } + + /// Drive `n` outcomes through `record_outcome` for one channel. + fn drive( + registry: &OutlierStatsRegistry, + state: &OutlierChannelState, + successes: u64, + failures: u64, + ) { + for _ in 0..successes { + registry.record_outcome(state, true); + } + for _ in 0..failures { + registry.record_outcome(state, false); + } + } + + // ----- record_outcome: failure-percentage detection ----- + + #[test] + fn ejects_above_threshold_inline() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + let bad = registry.add_channel(addr(8084)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + drive(®istry, &bad, 10, 90); + assert!(bad.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1); + } + + #[test] + fn skips_below_threshold() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + // 30% failure → below 50% threshold. + drive(®istry, &s, 70, 30); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn at_threshold_does_not_eject() { + // A50 specifies a strict "greater than" comparison. + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 50, 50); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn minimum_hosts_gates_ejection() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 5)); + // Only 2 hosts have request_volume ≥ 10; minimum_hosts is 5 ⇒ skip. + let mut all = vec![]; + for port in 8080..=8081 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 0, 100); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn request_volume_filters_low_traffic() { + let registry = OutlierStatsRegistry::new(fp_config(50, 100, 3)); + let bad = registry.add_channel(addr(8080)); + drive(®istry, &bad, 0, 5); + for port in 8081..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 200, 0); + } + assert!(!bad.is_ejected()); + } + + #[test] + fn enforcement_zero_percent_never_ejects() { + let mut config = fp_config(50, 10, 3); + config + .failure_percentage + .as_mut() + .unwrap() + .enforcing_failure_percentage = pct(0); + let registry = OutlierStatsRegistry::new(config); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 0, 100); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn max_ejection_percent_caps_concurrent_ejections() { + let mut config = fp_config(50, 10, 3); + config.max_ejection_percent = pct(20); + let registry = OutlierStatsRegistry::new(config); + + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + all.push(s); + } + // Drive all hosts to bad state in parallel pseudo-order. + for s in &all { + drive(®istry, s, 0, 100); + } + + let ejected = all.iter().filter(|s| s.is_ejected()).count(); + // 5 hosts × 20% = 1 max ejection. + assert_eq!(ejected, 1); + } + + #[test] + fn remove_channel_decrements_counters() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + let mut all = vec![]; + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + all.push(s); + } + let bad = registry.add_channel(addr(8084)); + drive(®istry, &bad, 0, 100); + assert!(bad.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1); + // Each healthy host crossed request_volume; bad too. So + // qualifying_count = 5. + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 5); + + registry.remove_channel(&addr(8084)); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 0); + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 4); + } + + #[test] + fn ejection_dispatches_address_through_mpsc() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + let mut rx = registry.take_eject_rx().expect("receiver available"); + let bad = registry.add_channel(addr(8084)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + drive(®istry, &bad, 10, 90); + + // Eject dispatched exactly once via the mpsc. + assert_eq!(rx.try_recv(), Ok(addr(8084))); + assert!(matches!( + rx.try_recv(), + Err(mpsc::error::TryRecvError::Empty) + )); + } + + // ----- Housekeeping ----- + + #[test] + fn housekeeping_resets_counters_and_qualifying() { + let registry = OutlierStatsRegistry::new(fp_config(50, 10, 3)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 4); + + registry.run_housekeeping(); + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 0); + for port in 8080..=8083 { + let s = registry.channels.get(&addr(port)).unwrap(); + assert_eq!(s.counters(), (0, 0)); + } + } + + #[test] + fn housekeeping_decrements_multiplier_on_healthy_interval() { + let registry = OutlierStatsRegistry::new(base_config()); + let s = registry.add_channel(addr(8080)); + // Force multiplier to 3 directly (no traffic, no eject). + s.set_ejection_multiplier(3); + + registry.run_housekeeping(); + assert_eq!(s.ejection_multiplier(), 2); + } + + #[test] + fn housekeeping_leaves_ejected_multipliers_alone() { + let registry = OutlierStatsRegistry::new(base_config()); + let s = registry.add_channel(addr(8080)); + s.try_eject(Instant::now()); + s.set_ejection_multiplier(3); + + registry.run_housekeeping(); + // Ejected channels keep their multiplier; un-ejection is the + // LB's job (timer-driven via EjectedChannel). + assert_eq!(s.ejection_multiplier(), 3); + assert!(s.is_ejected()); + } + + // ----- remaining_ejection / note_uneject ----- + + #[test] + fn remaining_ejection_returns_full_duration_for_fresh_eject() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(60); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + let t0 = Instant::now(); + s.try_eject(t0); + // Multiplier is 1 after the first eject, so target = 10s. + let remaining = registry.remaining_ejection(&s, t0).unwrap(); + assert_eq!(remaining, Duration::from_secs(10)); + } + + #[test] + fn remaining_ejection_capped_at_max_ejection_time() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(15); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + let t0 = Instant::now(); + s.try_eject(t0); + s.set_ejection_multiplier(10); // base * 10 = 100s, but cap = 15s. + let remaining = registry.remaining_ejection(&s, t0).unwrap(); + assert_eq!(remaining, Duration::from_secs(15)); + } + + #[test] + fn remaining_ejection_subtracts_elapsed_for_re_discovery() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(30); + config.max_ejection_time = Duration::from_secs(60); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + let t0 = Instant::now(); + s.try_eject(t0); + // Re-discovered 10s into the ejection — should still have 20s left. + let remaining = registry + .remaining_ejection(&s, t0 + Duration::from_secs(10)) + .unwrap(); + assert_eq!(remaining, Duration::from_secs(20)); + } + + #[test] + fn remaining_ejection_zero_past_deadline() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(60); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + let t0 = Instant::now(); + s.try_eject(t0); + // 60s have passed but target is 10s — caller should un-eject. + let remaining = registry + .remaining_ejection(&s, t0 + Duration::from_secs(60)) + .unwrap(); + assert_eq!(remaining, Duration::ZERO); + } + + #[test] + fn remaining_ejection_none_when_not_ejected() { + let registry = OutlierStatsRegistry::new(base_config()); + let s = registry.add_channel(addr(8080)); + assert!(registry.remaining_ejection(&s, Instant::now()).is_none()); + } + + #[test] + fn note_uneject_clears_state_and_decrements_counter() { + let registry = OutlierStatsRegistry::new(base_config()); + let s = registry.add_channel(addr(8080)); + s.try_eject(Instant::now()); // bumps multiplier 0 → 1 + registry.ejected_count.fetch_add(1, Ordering::Relaxed); + assert!(s.is_ejected()); + assert_eq!(s.ejection_multiplier(), 1); + + assert!(registry.note_uneject(&s)); + assert!(!s.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 0); + // A50 step 6.b: same sweep that un-ejects also decrements + // the multiplier. + assert_eq!(s.ejection_multiplier(), 0); + + // Second call is a no-op. + assert!(!registry.note_uneject(&s)); + assert_eq!(s.ejection_multiplier(), 0); + } + + /// A50 step 6.b: un-eject and multiplier decrement happen at the + /// same sweep. Re-eject right after un-eject must size the + /// backoff with the *decremented* multiplier. + #[test] + fn re_eject_after_uneject_uses_fresh_multiplier() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(300); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + + let t0 = Instant::now(); + s.try_eject(t0); // multiplier 0 → 1 + registry.ejected_count.fetch_add(1, Ordering::Relaxed); + assert_eq!(s.ejection_multiplier(), 1); + + // Backoff elapses; LB calls note_uneject. + registry.note_uneject(&s); + assert_eq!(s.ejection_multiplier(), 0); + + // Channel immediately misbehaves again and gets re-ejected. + let t1 = t0 + Duration::from_secs(11); + s.try_eject(t1); // multiplier 0 → 1, not 1 → 2 + assert_eq!(s.ejection_multiplier(), 1); + // Remaining ejection duration should be `base * 1 = 10s`, + // not `base * 2 = 20s`. + assert_eq!( + registry.remaining_ejection(&s, t1).unwrap(), + Duration::from_secs(10), + ); + } + + // ----- Spawned actor ----- + // + // The actor's algorithmic behavior is fully exercised by the + // synchronous `housekeeping_*` tests above; here we only verify + // that dropping the `AbortOnDrop` handle reliably stops the task. + + #[tokio::test(start_paused = true)] + async fn dropping_abort_stops_actor() { + let mut config = base_config(); + config.interval = Duration::from_millis(50); + let registry = OutlierStatsRegistry::new(config); + let s = registry.add_channel(addr(8080)); + s.set_ejection_multiplier(5); + + let abort = spawn_actor(registry.clone()); + drop(abort); + + // Even with several tick periods elapsed, no housekeeping + // should have run because the task was aborted. + tokio::time::advance(Duration::from_millis(500)).await; + tokio::task::yield_now().await; + + assert_eq!(s.ejection_multiplier(), 5); + } + + // ----- OutlierChannelState sanity (kept in this file as it is the + // primary consumer of the type) ----- + + #[test] + fn channel_state_records_and_resets() { + let s = OutlierChannelState::new(addr(8080)); + s.record_success(); + s.record_success(); + s.record_failure(); + assert_eq!(s.snapshot_and_reset(), (2, 1)); + assert_eq!(s.snapshot_and_reset(), (0, 0)); + } + + #[test] + fn channel_state_try_eject_uneject_transitions_atomically() { + let s = OutlierChannelState::new(addr(8080)); + assert!(!s.is_ejected()); + assert!(s.try_eject(Instant::now())); + assert!(s.is_ejected()); + // Second call is a no-op. + assert!(!s.try_eject(Instant::now())); + assert!(s.try_uneject()); + assert!(!s.is_ejected()); + assert!(!s.try_uneject()); + } + + #[test] + fn channel_state_remembers_its_address() { + let s = OutlierChannelState::new(addr(9090)); + assert_eq!(s.addr(), &addr(9090)); + } +} diff --git a/tonic-xds/src/xds/resource/outlier_detection.rs b/tonic-xds/src/xds/resource/outlier_detection.rs index a31fd6c60..970232bea 100644 --- a/tonic-xds/src/xds/resource/outlier_detection.rs +++ b/tonic-xds/src/xds/resource/outlier_detection.rs @@ -2,27 +2,19 @@ //! //! [`OutlierDetectionConfig`] is the input to the outlier-detection //! algorithm. The two sub-configs gate which ejection algorithms run. -//! -//! Note: A50 specifies outlier detection as a load-balancing policy -//! wrapping a `child_policy`. `tonic-xds` currently runs P2C as its only -//! load balancer and integrates outlier detection as a filter on the -//! `Discover` stream feeding it, so there is no `child_policy` field -//! here yet. It will be added when more balancers are supported. +//! The `child_policy` field from A50 is not modeled — `tonic-xds` +//! currently runs P2C as its only load balancer. //! //! [gRFC A50]: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md use std::time::Duration; -/// A 0–100 percentage. Construction is fallible; once held, every -/// `Percentage` is guaranteed to be in range, so the algorithm never -/// has to re-validate. +/// A 0–100 percentage, validated at construction. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct Percentage(u8); impl Percentage { /// Construct from a raw value, returning `Err` if it exceeds 100. - /// Accepts `u32` to match the proto wire type without forcing callers - /// to cast at every site. pub(crate) fn new(value: u32) -> Result { if value > 100 { Err(PercentageError(value)) @@ -68,7 +60,8 @@ pub(crate) struct SuccessRateConfig { /// An endpoint is a candidate for ejection when its success rate falls /// below `mean - stdev * (stdev_factor / 1000.0)`. pub stdev_factor: u32, - /// Probability that a candidate is actually ejected. + /// Probability that a flagged candidate is actually ejected. + /// Set to 0 to compute statistics without enforcing. pub enforcing_success_rate: Percentage, /// Minimum number of candidate endpoints required to run the algorithm. pub minimum_hosts: u32, @@ -83,7 +76,8 @@ pub(crate) struct FailurePercentageConfig { /// Failure rate at or above which an endpoint is a candidate for /// ejection. pub threshold: Percentage, - /// Probability that a candidate is actually ejected. + /// Probability that a flagged candidate is actually ejected. + /// Set to 0 to compute statistics without enforcing. pub enforcing_failure_percentage: Percentage, /// Minimum number of candidate endpoints required to run the algorithm. pub minimum_hosts: u32, @@ -93,8 +87,7 @@ pub(crate) struct FailurePercentageConfig { } impl OutlierDetectionConfig { - /// True when at least one ejection algorithm is enabled and the detector - /// should do work. If false, the cluster can skip instantiating detection. + /// True when at least one ejection algorithm is enabled. pub(crate) fn is_enabled(&self) -> bool { self.success_rate.is_some() || self.failure_percentage.is_some() }