From 109c6f48f1db0926f4783548250f3e31680ff98c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 4 Feb 2026 16:25:10 +0800 Subject: [PATCH 1/4] probe: exit when balance channel closed --- ginepro/src/service_probe.rs | 75 ++++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/ginepro/src/service_probe.rs b/ginepro/src/service_probe.rs index 535d2fa..6636d4a 100644 --- a/ginepro/src/service_probe.rs +++ b/ginepro/src/service_probe.rs @@ -91,11 +91,18 @@ impl GrpcServiceProbe { } /// Start probing the provided `hostname` for IP address changes. - /// The function will error if the receiving end of the tonic balance channel - /// is closed, e.g, the client has been deconstructed. + /// The probe loop exits once the receiving end of the tonic balance channel is closed, + /// e.g. when the client has been dropped. /// Any other errors are seen as transient, and therefore retried after `self.probe_interval`. pub async fn probe(mut self) -> Result<(), anyhow::Error> { loop { + // If the receiver is already gone (e.g. client dropped), exit promptly. + // Note: we must not rely solely on `ChangesetSenderClosed`, since when there is no + // endpoint change, we do not send anything and thus will not observe a closed channel. + if self.endpoint_reporter.is_closed() { + return Ok(()); + } + self.probe_once().await.or_else(|err| { // Only terminate if the changeset channel has been closed. if let ProbeError::ChangesetSenderClosed(_) = err { @@ -105,10 +112,12 @@ impl GrpcServiceProbe { } })?; - tokio::time::sleep(self.probe_interval).await; + tokio::select! { + _ = self.endpoint_reporter.closed() => return Ok(()), + _ = tokio::time::sleep(self.probe_interval) => {}, + } } } - /// Update tonic with a set of IPs that are retrieved by querying `hostname`. pub async fn probe_once(&mut self) -> Result<(), ProbeError> { match self @@ -235,3 +244,61 @@ impl GrpcServiceProbe { Some(endpoint) } } + + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use tokio::sync::mpsc; + use tonic::transport::Endpoint; + use tower::discover::Change; + + struct StaticLookup { + endpoints: HashSet, + } + + #[async_trait::async_trait] + impl LookupService for StaticLookup { + async fn resolve_service_endpoints( + &self, + _definition: &ServiceDefinition, + ) -> Result, anyhow::Error> { + Ok(self.endpoints.clone()) + } + } + + #[tokio::test] + async fn probe_exits_promptly_when_receiver_dropped_even_without_changes() { + let service_definition = ServiceDefinition::from_parts("localhost", 5000).unwrap(); + let endpoints = HashSet::from([SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000)]); + let dns_lookup = StaticLookup { endpoints }; + + let config = GrpcServiceProbeConfig { + service_definition, + dns_lookup, + probe_interval: tokio::time::Duration::from_secs(3600), + endpoint_timeout: None, + endpoint_connect_timeout: None, + }; + + let (tx, rx): ( + mpsc::Sender>, + mpsc::Receiver>, + ) = mpsc::channel(8); + + let mut probe = GrpcServiceProbe::new_with_reporter(config, tx); + + // First probe commits the initial endpoint set. + probe.probe_once().await.unwrap(); + + // Drop receiver so subsequent iterations should terminate immediately. + drop(rx); + + tokio::time::timeout(tokio::time::Duration::from_secs(2), probe.probe()) + .await + .expect("probe should exit promptly after receiver is dropped") + .unwrap(); + } +} From e2946ce9de65138231fb29b8ac15d0d38d510024 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 4 Feb 2026 16:48:52 +0800 Subject: [PATCH 2/4] remove unnecessary Signed-off-by: Bugen Zhao --- ginepro/src/service_probe.rs | 60 ------------------------------------ 1 file changed, 60 deletions(-) diff --git a/ginepro/src/service_probe.rs b/ginepro/src/service_probe.rs index 6636d4a..596314e 100644 --- a/ginepro/src/service_probe.rs +++ b/ginepro/src/service_probe.rs @@ -97,8 +97,6 @@ impl GrpcServiceProbe { pub async fn probe(mut self) -> Result<(), anyhow::Error> { loop { // If the receiver is already gone (e.g. client dropped), exit promptly. - // Note: we must not rely solely on `ChangesetSenderClosed`, since when there is no - // endpoint change, we do not send anything and thus will not observe a closed channel. if self.endpoint_reporter.is_closed() { return Ok(()); } @@ -244,61 +242,3 @@ impl GrpcServiceProbe { Some(endpoint) } } - - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashSet; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use tokio::sync::mpsc; - use tonic::transport::Endpoint; - use tower::discover::Change; - - struct StaticLookup { - endpoints: HashSet, - } - - #[async_trait::async_trait] - impl LookupService for StaticLookup { - async fn resolve_service_endpoints( - &self, - _definition: &ServiceDefinition, - ) -> Result, anyhow::Error> { - Ok(self.endpoints.clone()) - } - } - - #[tokio::test] - async fn probe_exits_promptly_when_receiver_dropped_even_without_changes() { - let service_definition = ServiceDefinition::from_parts("localhost", 5000).unwrap(); - let endpoints = HashSet::from([SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000)]); - let dns_lookup = StaticLookup { endpoints }; - - let config = GrpcServiceProbeConfig { - service_definition, - dns_lookup, - probe_interval: tokio::time::Duration::from_secs(3600), - endpoint_timeout: None, - endpoint_connect_timeout: None, - }; - - let (tx, rx): ( - mpsc::Sender>, - mpsc::Receiver>, - ) = mpsc::channel(8); - - let mut probe = GrpcServiceProbe::new_with_reporter(config, tx); - - // First probe commits the initial endpoint set. - probe.probe_once().await.unwrap(); - - // Drop receiver so subsequent iterations should terminate immediately. - drop(rx); - - tokio::time::timeout(tokio::time::Duration::from_secs(2), probe.probe()) - .await - .expect("probe should exit promptly after receiver is dropped") - .unwrap(); - } -} From fa49ef683968c123b2c819ba345386264ab4b04e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 4 Feb 2026 16:55:58 +0800 Subject: [PATCH 3/4] return error instead Signed-off-by: Bugen Zhao --- ginepro/src/service_probe.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ginepro/src/service_probe.rs b/ginepro/src/service_probe.rs index 4729d5e..5f7c4de 100644 --- a/ginepro/src/service_probe.rs +++ b/ginepro/src/service_probe.rs @@ -97,10 +97,16 @@ impl GrpcServiceProbe { /// e.g. when the client has been dropped. /// Any other errors are seen as transient, and therefore retried after `self.probe_interval`. pub async fn probe(mut self) -> Result<(), anyhow::Error> { + let client_closed_err = || { + anyhow::anyhow!( + "The gRPC client has closed the channel therefore the DNS probe loop will exit." + ) + }; + loop { // If the receiver is already gone (e.g. client dropped), exit promptly. if self.endpoint_reporter.is_closed() { - return Ok(()); + return Err(client_closed_err()); } self.probe_once().await.or_else(|err| { @@ -113,11 +119,12 @@ impl GrpcServiceProbe { })?; tokio::select! { - _ = self.endpoint_reporter.closed() => return Ok(()), + _ = self.endpoint_reporter.closed() => return Err(client_closed_err()), _ = tokio::time::sleep(self.probe_interval) => {}, } } } + /// Update tonic with a set of IPs that are retrieved by querying `hostname`. pub async fn probe_once(&mut self) -> Result<(), ProbeError> { match self From 4fb890ab5badbc92a79fb7435f47d79099cd90d3 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 22 Apr 2026 20:36:46 -0500 Subject: [PATCH 4/4] chore: bump tonic to 0.14 Prost was extracted into its own crates in tonic 0.14, so shared_proto now depends on tonic-prost for the generated codec and uses tonic-prost-build in the build script. Co-Authored-By: Claude Opus 4.7 (1M context) --- ginepro/Cargo.toml | 2 +- shared_proto/Cargo.toml | 7 ++++--- shared_proto/build.rs | 2 +- tests/Cargo.toml | 4 ++-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/ginepro/Cargo.toml b/ginepro/Cargo.toml index 0e5dfd7..f8aea41 100644 --- a/ginepro/Cargo.toml +++ b/ginepro/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1" http = "1" thiserror = "2" tokio = { version = "1", features = ["full"] } -tonic = { version = "0.13", features = ["tls-ring"] } +tonic = { version = "0.14", features = ["tls-ring"] } tower = { version = "0.5", default-features = false, features = ["discover"] } tracing = "0.1" hickory-resolver = { version = "0.25", features = ["tokio"] } diff --git a/shared_proto/Cargo.toml b/shared_proto/Cargo.toml index 3f3dde8..43cb180 100644 --- a/shared_proto/Cargo.toml +++ b/shared_proto/Cargo.toml @@ -6,8 +6,9 @@ edition = "2021" publish = false [dependencies] -prost = "0.13" -tonic = "0.13" +prost = "0.14" +tonic = "0.14" +tonic-prost = "0.14" [build-dependencies] -tonic-build = "0.13" +tonic-prost-build = "0.14" diff --git a/shared_proto/build.rs b/shared_proto/build.rs index 59fdd38..f8b5d70 100644 --- a/shared_proto/build.rs +++ b/shared_proto/build.rs @@ -3,7 +3,7 @@ //! tonic functionality. fn main() -> Result<(), Box> { - tonic_build::configure() + tonic_prost_build::configure() .build_server(true) .build_client(true) .compile_protos(&["proto/test.proto", "proto/echo.proto"], &["proto/"])?; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 1391a8e..6342ca4 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -13,7 +13,7 @@ hyper = "1" openssl = "0.10" tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.13", features = ["tls-ring"] } +tonic = { version = "0.14", features = ["tls-ring"] } tower-layer = "0.3" tower-service = "0.3" tracing = { version = "0.1", features = ["attributes", "log"] } @@ -22,4 +22,4 @@ tracing = { version = "0.1", features = ["attributes", "log"] } anyhow = "1" async-trait = "0.1" shared-proto = { path = "../shared_proto" } -tonic-health = "0.13" +tonic-health = "0.14"