diff --git a/tonic-xds/src/client/channel.rs b/tonic-xds/src/client/channel.rs index 3833dcfa4..e666fc438 100644 --- a/tonic-xds/src/client/channel.rs +++ b/tonic-xds/src/client/channel.rs @@ -5,9 +5,9 @@ use crate::client::lb::{ClusterDiscovery, XdsLbService}; use crate::client::route::{Router, XdsRoutingLayer}; use crate::xds::bootstrap::{BootstrapConfig, BootstrapError}; use crate::xds::cache::XdsCache; -use crate::xds::cluster_discovery::{ - EndpointConnector, XdsClusterDiscovery, default_endpoint_connector, -}; +#[cfg(feature = "_tls-any")] +use crate::xds::cert_provider::{CertProviderError, CertProviderRegistry}; +use crate::xds::cluster_discovery::XdsClusterDiscovery; use crate::xds::resource_manager::XdsResourceManager; use crate::xds::routing::XdsRouter; use http::Request; @@ -67,6 +67,10 @@ pub enum BuildError { /// Bootstrap configuration could not be loaded. #[error("bootstrap: {0}")] Bootstrap(#[from] BootstrapError), + /// A `certificate_providers` entry in the bootstrap failed to initialize. + #[cfg(feature = "_tls-any")] + #[error("certificate provider: {0}")] + CertProvider(#[from] CertProviderError), } /// Holds owned resources whose background tasks must live as long as the channel. @@ -187,9 +191,10 @@ impl XdsChannelBuilder { ))); } - // TODO(PR2/A29): Build CertProviderRegistry from bootstrap.certificate_providers - // and pass it to XdsClusterDiscovery so data-plane connections can use - // TLS/mTLS when CDS clusters specify UpstreamTlsContext. + #[cfg(feature = "_tls-any")] + let cert_provider_registry = Arc::new(CertProviderRegistry::from_bootstrap( + &bootstrap.certificate_providers, + )?); let node = Node::try_from(bootstrap.node)?; let client_config = ClientConfig::new(node, &server_uri); @@ -200,7 +205,13 @@ impl XdsChannelBuilder { let resource_manager = XdsResourceManager::new(xds_client.clone(), cache.clone(), listener_name); - Ok(self.build_from_cache(cache, xds_client, resource_manager)) + Ok(self.build_from_cache( + cache, + #[cfg(feature = "_tls-any")] + cert_provider_registry, + xds_client, + resource_manager, + )) } /// Internal builder that wires the service stack from a pre-built cache. @@ -210,13 +221,19 @@ impl XdsChannelBuilder { fn build_from_cache( &self, cache: Arc, + #[cfg(feature = "_tls-any")] cert_provider_registry: Arc, xds_client: XdsClient, resource_manager: XdsResourceManager, ) -> XdsChannelGrpc { let router: Arc = Arc::new(XdsRouter::new(&cache)); - let connector: EndpointConnector = Arc::new(default_endpoint_connector); - let discovery: Arc>> = - Arc::new(XdsClusterDiscovery::new(cache, connector)); + #[cfg(feature = "_tls-any")] + let discovery: Arc< + dyn ClusterDiscovery>, + > = Arc::new(XdsClusterDiscovery::new(cache, cert_provider_registry)); + #[cfg(not(feature = "_tls-any"))] + let discovery: Arc< + dyn ClusterDiscovery>, + > = Arc::new(XdsClusterDiscovery::new(cache)); let retry_policy = GrpcRetryPolicy::new(GrpcRetryPolicyConfig::default()); let resources = Arc::new(XdsChannelResources { @@ -535,6 +552,19 @@ mod tests { assert_eq!(response.into_inner().message, "retry-server: retry-test"); } + /// Helper: creates a minimal plaintext `ClusterResource` for tests that + /// drive `XdsClusterDiscovery`. The cluster watch in `discover_cluster` + /// blocks until a cluster is in the cache. + fn make_test_cluster(cluster_name: &str) -> Arc { + use crate::xds::resource::cluster::{ClusterResource, LbPolicy}; + Arc::new(ClusterResource { + name: cluster_name.to_string(), + eds_service_name: None, + lb_policy: LbPolicy::RoundRobin, + security: None, + }) + } + /// Helper: creates a `RouteConfigResource` that routes all traffic to the given cluster. fn make_test_route_config(cluster_name: &str) -> Arc { use crate::xds::resource::route_config::*; @@ -582,15 +612,24 @@ mod tests { /// Builds an XdsChannelGrpc using real XdsRouter and XdsClusterDiscovery /// backed by the given cache. async fn build_xds_channel_from_cache(cache: Arc) -> XdsChannelGrpc { - use crate::xds::cluster_discovery::{ - EndpointConnector, XdsClusterDiscovery, default_endpoint_connector, - }; + use crate::xds::cluster_discovery::XdsClusterDiscovery; use crate::xds::routing::XdsRouter; let router: Arc = Arc::new(XdsRouter::new(&cache)); - let connector: EndpointConnector = Arc::new(default_endpoint_connector); - let discovery: Arc>> = - Arc::new(XdsClusterDiscovery::new(cache, connector)); + + #[cfg(feature = "_tls-any")] + let discovery: Arc< + dyn ClusterDiscovery>, + > = { + use crate::xds::cert_provider::CertProviderRegistry; + let registry = + Arc::new(CertProviderRegistry::from_bootstrap(&Default::default()).unwrap()); + Arc::new(XdsClusterDiscovery::new(cache, registry)) + }; + #[cfg(not(feature = "_tls-any"))] + let discovery: Arc< + dyn ClusterDiscovery>, + > = Arc::new(XdsClusterDiscovery::new(cache)); let builder = XdsChannelBuilder::new(test_config()); builder.build_grpc_channel_from_parts(router, discovery, GrpcRetryPolicy::default()) @@ -608,6 +647,7 @@ mod tests { let cache = Arc::new(XdsCache::new()); cache.update_route_config(make_test_route_config(cluster_name)); + cache.update_cluster(cluster_name, make_test_cluster(cluster_name)); cache.update_endpoints(cluster_name, make_test_endpoints(cluster_name, &servers)); let channel = build_xds_channel_from_cache(cache).await; @@ -641,6 +681,7 @@ mod tests { let cache = Arc::new(XdsCache::new()); cache.update_route_config(make_test_route_config(cluster_name)); + cache.update_cluster(cluster_name, make_test_cluster(cluster_name)); // Start with only the first server. cache.update_endpoints( cluster_name, @@ -692,6 +733,15 @@ mod tests { XdsResourceManager::new(xds_client.clone(), cache.clone(), "test-listener".into()); let builder = XdsChannelBuilder::new(test_config()); + + #[cfg(feature = "_tls-any")] + let _channel = { + use crate::xds::cert_provider::CertProviderRegistry; + let registry = + Arc::new(CertProviderRegistry::from_bootstrap(&Default::default()).unwrap()); + builder.build_from_cache(cache, registry, xds_client, resource_manager) + }; + #[cfg(not(feature = "_tls-any"))] let _channel = builder.build_from_cache(cache, xds_client, resource_manager); // Construction should succeed without panicking. } diff --git a/tonic-xds/src/client/endpoint.rs b/tonic-xds/src/client/endpoint.rs index 81767414d..9f580dbcd 100644 --- a/tonic-xds/src/client/endpoint.rs +++ b/tonic-xds/src/client/endpoint.rs @@ -100,7 +100,6 @@ pub(crate) struct EndpointChannel { impl EndpointChannel { /// Creates a new `EndpointChannel`. /// This should be used by xDS implementations to construct channels to individual endpoints. - #[allow(dead_code)] pub(crate) fn new(inner: S) -> Self { Self { inner, diff --git a/tonic-xds/src/lib.rs b/tonic-xds/src/lib.rs index ae2856e10..d5be6be53 100644 --- a/tonic-xds/src/lib.rs +++ b/tonic-xds/src/lib.rs @@ -81,6 +81,45 @@ //! // let client = MyServiceClient::new(channel); //! ``` //! +//! ## TLS Security (gRFC A29) +//! +//! Upstream data-plane TLS is enabled when: +//! +//! 1. The crate is built with `tls-ring` *or* `tls-aws-lc` (exactly one). +//! 2. The bootstrap JSON declares `certificate_providers` entries — each +//! referenced by `instance_name` in CDS resources. +//! 3. A CDS `Cluster` carries `transport_socket: UpstreamTlsContext` naming +//! those instances (configured on the xDS control plane). +//! +//! Only the `file_watcher` plugin is built in. It reads PEM files from disk +//! and refreshes them on `refresh_interval` (default 600s) — rotated certs +//! reach existing TLS connectors on the next handshake. +//! +//! ```json +//! { +//! "xds_servers": [{"server_uri": "xds.example.com:443"}], +//! "certificate_providers": { +//! "root_ca": { "plugin_name": "file_watcher", "config": { +//! "ca_certificate_file": "/etc/certs/ca.pem" +//! }}, +//! "identity": { "plugin_name": "file_watcher", "config": { +//! "certificate_file": "/etc/certs/cert.pem", +//! "private_key_file": "/etc/certs/key.pem", +//! "refresh_interval": "60s" +//! }} +//! } +//! } +//! ``` +//! +//! When `match_typed_subject_alt_names` is set on the cluster's validation +//! context, the server cert's SAN list must match one of the configured +//! matchers ("any" semantics). An empty matcher list accepts any cert +//! chained to the configured CA roots. +//! +//! CDS updates that change a cluster's `transport_socket` rebuild that +//! cluster's connector — new endpoint connections pick up the new config; +//! existing TLS sessions continue. +//! //! ## xDS features //! //! | Feature | gRFC | Status | @@ -92,7 +131,7 @@ //! | Weighted cluster traffic splitting | [A28] | Supported | //! | Case-insensitive header matching | [A63] | Supported | //! | Client-side P2C load balancing | | Supported | -//! | TLS endpoint connections | [A29] | Planned | +//! | TLS endpoint connections | [A29] | Supported | //! | Least-request load balancing | [A48] | Planned | //! //! [A27]: https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md diff --git a/tonic-xds/src/xds/bootstrap.rs b/tonic-xds/src/xds/bootstrap.rs index 6701b39ba..913e2a7bb 100644 --- a/tonic-xds/src/xds/bootstrap.rs +++ b/tonic-xds/src/xds/bootstrap.rs @@ -54,7 +54,9 @@ pub struct BootstrapConfig { /// /// [`CertificateProviderPluginInstance`]: https://github.com/envoyproxy/envoy/blob/main/api/envoy/extensions/transport_sockets/tls/v3/common.proto #[serde(default)] - #[allow(dead_code)] // Consumed when CertProviderRegistry is wired in (PR2/A29). + // Consumed by `CertProviderRegistry::from_bootstrap` only under TLS + // features; parsed regardless so non-TLS builds accept the same JSON. + #[cfg_attr(not(feature = "_tls-any"), allow(dead_code))] pub(crate) certificate_providers: HashMap, } @@ -105,6 +107,9 @@ pub(crate) enum ChannelCredentialType { /// /// [gRFC A29]: https://github.com/grpc/proposal/blob/master/A29-xds-tls-security.md #[derive(Debug, Clone, Deserialize)] +// In non-TLS builds `cert_provider` is gated out, so nothing reads these +// fields after serde populates them. +#[cfg_attr(not(feature = "_tls-any"), allow(dead_code))] pub(crate) struct CertProviderPluginConfig { pub plugin_name: String, #[serde(default)] diff --git a/tonic-xds/src/xds/cache.rs b/tonic-xds/src/xds/cache.rs index 6c19d6580..14773324f 100644 --- a/tonic-xds/src/xds/cache.rs +++ b/tonic-xds/src/xds/cache.rs @@ -141,7 +141,6 @@ impl XdsCache { } /// Watches cluster resource changes for a specific cluster. - #[allow(dead_code)] // Will be used when LB policy dispatch is wired (A48). pub(crate) fn watch_cluster(&self, name: &str) -> CacheWatch { self.clusters.watch(name) } diff --git a/tonic-xds/src/xds/cert_provider/file_watcher.rs b/tonic-xds/src/xds/cert_provider/file_watcher.rs index 4aa2ca6dd..7bb0fe97a 100644 --- a/tonic-xds/src/xds/cert_provider/file_watcher.rs +++ b/tonic-xds/src/xds/cert_provider/file_watcher.rs @@ -22,13 +22,21 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwap; +use rustls::RootCertStore; +use rustls::pki_types::CertificateDer; use serde::Deserialize; +use crate::common::async_util::AbortOnDrop; + use super::{CertProviderError, CertificateData, CertificateProvider, Identity}; /// Plugin name used in the bootstrap `certificate_providers` JSON. pub(crate) const PLUGIN_NAME: &str = "file_watcher"; +/// Refresh interval used when `FileWatcherConfig::refresh_interval` is unset. +/// Matches grpc-go's `defaultCertRefreshDuration`-equivalent for proxyless gRPC. +const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(600); + /// Configuration for the `file_watcher` certificate provider. #[derive(Debug, Clone, Default, Deserialize)] pub(crate) struct FileWatcherConfig { @@ -71,38 +79,51 @@ where /// A certificate provider that reads PEM files from disk. /// -/// On construction, reads all configured files and caches the results. -/// The `fetch()` method returns the cached data. -// TODO(PR3/A29): Spawn a background task that calls `refresh()` on a timer -// driven by `config.refresh_interval` (default 600s). The task should be -// started in `new()` and cancelled on drop (e.g., via a JoinHandle + -// AbortHandle or a CancellationToken). +/// On construction, reads all configured files synchronously and spawns a +/// background task that re-reads them on `config.refresh_interval`. +/// Read or parse failures during refresh are logged; +/// the previously cached data is kept. pub(crate) struct FileWatcherProvider { - config: FileWatcherConfig, - cached: ArcSwap, + cached: Arc>, + _refresh_task: AbortOnDrop, } impl FileWatcherProvider { /// Create a new provider from a parsed `FileWatcherConfig`. pub(crate) fn new(config: FileWatcherConfig) -> Result { let data = read_certificate_data(&config)?; - + let cached = Arc::new(ArcSwap::from_pointee(data)); + let task = tokio::spawn(refresh_loop(config, Arc::clone(&cached))); Ok(Self { - config, - cached: ArcSwap::from_pointee(data), + cached, + _refresh_task: AbortOnDrop(task), }) } +} + +/// Background task: periodically re-read the configured files and update +/// the shared cache. +async fn refresh_loop(config: FileWatcherConfig, cached: Arc>) { + let period = config.refresh_interval.unwrap_or(DEFAULT_REFRESH_INTERVAL); + let mut ticker = tokio::time::interval(period); + // `interval` fires immediately on the first `tick()`. The initial data was + // already loaded synchronously in `new()`, so discard that first tick. + ticker.tick().await; + loop { + ticker.tick().await; + refresh_once(&config, &cached); + } +} - /// Re-read files from disk and update the cache. - /// - /// Returns `Ok(())` if the files were successfully read, or an error - /// if any configured file could not be read. On error the cache retains - /// the previous good data. - #[allow(dead_code)] // Used when background refresh is added. - pub(crate) fn refresh(&self) -> Result<(), CertProviderError> { - let data = read_certificate_data(&self.config)?; - self.cached.store(Arc::new(data)); - Ok(()) +/// Re-read the configured files once and update the cache. On failure, +/// log and leave the cache unchanged. +fn refresh_once(config: &FileWatcherConfig, cached: &ArcSwap) { + match read_certificate_data(config) { + Ok(data) => cached.store(Arc::new(data)), + Err(e) => tracing::warn!( + error = ?e, + "file_watcher cert refresh failed; keeping last good data", + ), } } @@ -114,6 +135,11 @@ impl CertificateProvider for FileWatcherProvider { /// Read certificate data from the files specified in the config. /// +/// CA roots are parsed into [`Arc`] in this function — once per +/// refresh — so the verifier can use them directly on every TLS handshake +/// without re-parsing. Identity bytes are kept as PEM because +/// [`tonic::transport::Identity::from_pem`] is bytes-only on the consumer side. +/// /// This function is the single validation boundary between the permissive /// JSON-parsed [`FileWatcherConfig`] and the invariant-enforcing /// [`CertificateData`]. It checks both A65 rules: @@ -123,7 +149,7 @@ fn read_certificate_data(config: &FileWatcherConfig) -> Result Result, CertProviderError> { }) } +fn read_and_parse_roots(path: &Path) -> Result, CertProviderError> { + let pem = read_file(path)?; + let mut reader = std::io::Cursor::new(&pem); + let certs: Vec> = rustls_pemfile::certs(&mut reader) + .collect::>() + .map_err(|e| CertProviderError::PemParse { + path: path.display().to_string(), + reason: e.to_string(), + })?; + let mut store = RootCertStore::empty(); + let (added, _) = store.add_parsable_certificates(certs); + if added == 0 { + return Err(CertProviderError::PemParse { + path: path.display().to_string(), + reason: "no usable certificates in PEM".into(), + }); + } + Ok(Arc::new(store)) +} + #[cfg(test)] mod tests { use super::*; use std::io::Write; use tempfile::NamedTempFile; + /// Generate a self-signed CA cert in PEM form, suitable for parsing into + /// a [`RootCertStore`]. Returns the raw PEM bytes. + fn gen_ca_pem() -> Vec { + let mut params = rcgen::CertificateParams::new(vec!["test-ca".into()]).unwrap(); + params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained); + let key = rcgen::KeyPair::generate().unwrap(); + let cert = params.self_signed(&key).unwrap(); + cert.pem().into_bytes() + } + fn write_temp_file(content: &[u8]) -> NamedTempFile { let mut f = NamedTempFile::new().unwrap(); f.write_all(content).unwrap(); @@ -171,26 +227,22 @@ mod tests { } } - #[test] - fn reads_ca_certificate() { - let ca_file = - write_temp_file(b"-----BEGIN CERTIFICATE-----\ntest-ca\n-----END CERTIFICATE-----\n"); + #[tokio::test] + async fn reads_ca_certificate() { + let ca_file = write_temp_file(&gen_ca_pem()); let provider = FileWatcherProvider::new(make_config(ca_file.path().to_str(), None, None)).unwrap(); let data = provider.fetch().unwrap(); assert!(matches!(*data, CertificateData::RootsOnly { .. })); - assert!( - data.roots() - .unwrap() - .starts_with(b"-----BEGIN CERTIFICATE-----") - ); + let roots = data.roots().unwrap(); + assert_eq!(roots.len(), 1); assert!(data.identity().is_none()); } - #[test] - fn reads_identity_cert_and_key() { + #[tokio::test] + async fn reads_identity_cert_and_key() { let cert_file = write_temp_file(b"cert-chain-pem"); let key_file = write_temp_file(b"private-key-pem"); @@ -209,9 +261,9 @@ mod tests { assert!(data.roots().is_none()); } - #[test] - fn reads_all_files() { - let ca_file = write_temp_file(b"ca-pem"); + #[tokio::test] + async fn reads_all_files() { + let ca_file = write_temp_file(&gen_ca_pem()); let cert_file = write_temp_file(b"cert-pem"); let key_file = write_temp_file(b"key-pem"); @@ -224,7 +276,7 @@ mod tests { let data = provider.fetch().unwrap(); assert!(matches!(*data, CertificateData::Both { .. })); - assert_eq!(data.roots(), Some(b"ca-pem".as_slice())); + assert_eq!(data.roots().unwrap().len(), 1); let identity = data.identity().unwrap(); assert_eq!(identity.cert_chain.as_slice(), b"cert-pem"); assert_eq!(identity.key.as_slice(), b"key-pem"); @@ -271,44 +323,36 @@ mod tests { } #[test] - fn refresh_updates_cached_data() { - let mut ca_file = NamedTempFile::new().unwrap(); - ca_file.write_all(b"old-ca").unwrap(); + fn refresh_once_updates_cache() { + let ca_file = write_temp_file(&gen_ca_pem()); + let config = make_config(ca_file.path().to_str(), None, None); + let cached = ArcSwap::from_pointee(read_certificate_data(&config).unwrap()); + let initial = cached.load_full(); - let provider = - FileWatcherProvider::new(make_config(ca_file.path().to_str(), None, None)).unwrap(); - assert_eq!( - provider.fetch().unwrap().roots(), - Some(b"old-ca".as_slice()) - ); + std::fs::write(ca_file.path(), gen_ca_pem()).unwrap(); + refresh_once(&config, &cached); - std::fs::write(ca_file.path(), b"new-ca").unwrap(); - provider.refresh().unwrap(); - assert_eq!( - provider.fetch().unwrap().roots(), - Some(b"new-ca".as_slice()) + let after = cached.load_full(); + assert!( + !Arc::ptr_eq(&initial, &after), + "expected refresh_once to swap cached Arc", ); } #[test] - fn refresh_keeps_old_data_on_failure() { - let ca_file = write_temp_file(b"good-ca"); - let path = ca_file.path().to_str().unwrap().to_string(); - - let provider = FileWatcherProvider::new(make_config(Some(&path), None, None)).unwrap(); - assert_eq!( - provider.fetch().unwrap().roots(), - Some(b"good-ca".as_slice()) - ); + fn refresh_once_keeps_old_data_on_failure() { + let ca_file = write_temp_file(&gen_ca_pem()); + let config = make_config(ca_file.path().to_str(), None, None); + let cached = ArcSwap::from_pointee(read_certificate_data(&config).unwrap()); + let initial = cached.load_full(); - // Delete the file — refresh should fail. drop(ca_file); - assert!(provider.refresh().is_err()); + refresh_once(&config, &cached); - // Cached data should still be the old value. - assert_eq!( - provider.fetch().unwrap().roots(), - Some(b"good-ca".as_slice()) + let after = cached.load_full(); + assert!( + Arc::ptr_eq(&initial, &after), + "expected cache to keep last good data on failure", ); } @@ -367,13 +411,13 @@ mod tests { ); } - #[test] - fn registry_integration() { + #[tokio::test] + async fn registry_integration() { use crate::xds::bootstrap::CertProviderPluginConfig; use crate::xds::cert_provider::CertProviderRegistry; use std::collections::HashMap; - let ca_file = write_temp_file(b"ca-data"); + let ca_file = write_temp_file(&gen_ca_pem()); let mut configs = HashMap::new(); configs.insert( @@ -387,11 +431,11 @@ mod tests { ); let registry = CertProviderRegistry::from_bootstrap(&configs).unwrap(); - assert!(registry.contains("my_certs")); - assert!(!registry.contains("other")); + assert!(registry.get("my_certs").is_some()); + assert!(registry.get("other").is_none()); let provider = registry.get("my_certs").unwrap(); let data = provider.fetch().unwrap(); - assert_eq!(data.roots(), Some(b"ca-data".as_slice())); + assert_eq!(data.roots().unwrap().len(), 1); } } diff --git a/tonic-xds/src/xds/cert_provider/mod.rs b/tonic-xds/src/xds/cert_provider/mod.rs index ddab6aca1..a5bd3f7b4 100644 --- a/tonic-xds/src/xds/cert_provider/mod.rs +++ b/tonic-xds/src/xds/cert_provider/mod.rs @@ -1,5 +1,3 @@ -// TODO: remove once A29 data plane TLS consumes all types. -#![allow(dead_code)] //! Certificate provider plugin framework for gRFC A29. //! //! The xDS control plane references certificate providers by instance name @@ -11,12 +9,12 @@ //! [`CertificateProviderPluginInstance`]: https://github.com/envoyproxy/envoy/blob/main/api/envoy/extensions/transport_sockets/tls/v3/common.proto pub(crate) mod file_watcher; -#[cfg(any(feature = "tls-ring", feature = "tls-aws-lc"))] pub(crate) mod verifier; use std::collections::HashMap; use std::sync::Arc; +use rustls::RootCertStore; use serde::Deserialize; use crate::xds::bootstrap::CertProviderPluginConfig; @@ -30,6 +28,11 @@ pub(crate) struct Identity { /// Certificate material returned by a [`CertificateProvider`] plugin. /// +/// CA roots are pre-parsed into an [`Arc`] so consumers reach +/// for it on every TLS handshake without paying parse cost. Identity material +/// stays as PEM bytes since [`tonic::transport::Identity::from_pem`] is +/// itself bytes-only. +/// /// The variants encode two invariants from gRFC A29 and A65 at the type level: /// /// 1. **Cert/key pairing** (A65): identity cert and private key are paired or @@ -47,18 +50,21 @@ pub(crate) struct Identity { pub(crate) enum CertificateData { /// CA trust bundle only — used by TLS clients that don't present an /// identity. - RootsOnly { roots: Vec }, + RootsOnly { roots: Arc }, /// Identity only — used by TLS servers that don't validate peers /// (non-mTLS). Peer validation falls back to system roots at the /// consumer layer if needed. IdentityOnly { identity: Identity }, /// Both roots and identity — used for mTLS on either end. - Both { roots: Vec, identity: Identity }, + Both { + roots: Arc, + identity: Identity, + }, } impl CertificateData { - /// PEM-encoded CA trust bundle, if present. - pub(crate) fn roots(&self) -> Option<&[u8]> { + /// Parsed CA trust bundle, if present. + pub(crate) fn roots(&self) -> Option<&Arc> { match self { Self::RootsOnly { roots } | Self::Both { roots, .. } => Some(roots), Self::IdentityOnly { .. } => None, @@ -76,12 +82,14 @@ impl CertificateData { /// Errors from certificate provider operations. #[derive(Debug, thiserror::Error)] -pub(crate) enum CertProviderError { +pub enum CertProviderError { #[error("failed to read certificate file '{path}': {source}")] FileRead { path: String, source: std::io::Error, }, + #[error("failed to parse PEM in '{path}': {reason}")] + PemParse { path: String, reason: String }, #[error("unknown certificate provider plugin: {0}")] UnknownPlugin(String), #[error("invalid config for plugin '{plugin}': {source}")] @@ -174,11 +182,6 @@ impl CertProviderRegistry { pub(crate) fn get(&self, instance_name: &str) -> Option<&Arc> { self.providers.get(instance_name) } - - /// Returns `true` if the given instance name is configured. - pub(crate) fn contains(&self, instance_name: &str) -> bool { - self.providers.contains_key(instance_name) - } } #[cfg(test)] @@ -209,8 +212,8 @@ mod tests { } #[test] - fn contains_returns_false_for_missing_instance() { + fn get_returns_none_for_missing_instance() { let registry = CertProviderRegistry::from_bootstrap(&HashMap::new()).unwrap(); - assert!(!registry.contains("nonexistent")); + assert!(registry.get("nonexistent").is_none()); } } diff --git a/tonic-xds/src/xds/cert_provider/verifier.rs b/tonic-xds/src/xds/cert_provider/verifier.rs index df590f99f..ce46e9968 100644 --- a/tonic-xds/src/xds/cert_provider/verifier.rs +++ b/tonic-xds/src/xds/cert_provider/verifier.rs @@ -15,34 +15,41 @@ use std::sync::Arc; use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; use rustls::client::verify_server_cert_signed_by_trust_anchor; use rustls::crypto::{WebPkiSupportedAlgorithms, verify_tls12_signature, verify_tls13_signature}; -use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName, UnixTime}; +use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::server::ParsedCertificate; -use rustls::{ - CertificateError, ClientConfig, DigitallySignedStruct, Error as RustlsError, RootCertStore, - SignatureScheme, -}; +use rustls::{CertificateError, DigitallySignedStruct, Error as RustlsError, SignatureScheme}; use x509_parser::extensions::{GeneralName, ParsedExtension}; use x509_parser::oid_registry::OID_X509_EXT_SUBJECT_ALT_NAME; use x509_parser::prelude::FromDer; -use crate::xds::cert_provider::{CertProviderError, CertProviderRegistry, CertificateData}; +use crate::xds::cert_provider::CertificateProvider; use crate::xds::resource::san_matcher::{SanEntry, SanMatcher}; -use crate::xds::resource::security::ClusterSecurityConfig; /// Verifier that chain-validates the peer cert and enforces gRFC A29 SAN -/// matching. -#[derive(Debug)] +/// matching. Sources CA roots from a [`CertificateProvider`] per handshake +/// so cert rotation in the provider is picked up automatically. pub(crate) struct XdsServerCertVerifier { - roots: Arc, + ca_provider: Arc, supported_algs: WebPkiSupportedAlgorithms, san_matchers: Vec, } +impl std::fmt::Debug for XdsServerCertVerifier { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("XdsServerCertVerifier") + .field("san_matchers", &self.san_matchers) + .finish_non_exhaustive() + } +} + impl XdsServerCertVerifier { - pub(crate) fn new(roots: RootCertStore, san_matchers: Vec) -> Self { + pub(crate) fn new( + ca_provider: Arc, + san_matchers: Vec, + ) -> Self { let provider = default_crypto_provider(); Self { - roots: Arc::new(roots), + ca_provider, supported_algs: provider.signature_verification_algorithms, san_matchers, } @@ -74,10 +81,18 @@ impl ServerCertVerifier for XdsServerCertVerifier { ) -> Result { // `server_name` is intentionally unused — gRFC A29 replaces stdlib // hostname verification with the SAN matcher list below. + let data = self + .ca_provider + .fetch() + .map_err(|e| RustlsError::General(format!("CA provider fetch failed: {e}")))?; + let roots = data + .roots() + .ok_or_else(|| RustlsError::General("CA provider has no roots".into()))?; + let cert = ParsedCertificate::try_from(end_entity)?; verify_server_cert_signed_by_trust_anchor( &cert, - &self.roots, + roots, intermediates, now, self.supported_algs.all, @@ -176,113 +191,12 @@ fn parse_ip_san(bytes: &[u8]) -> Option { } } -/// Errors building a [`rustls::ClientConfig`] from a cluster's security config. -#[derive(Debug, thiserror::Error)] -pub(crate) enum ClientConfigError { - /// Provider lookup or contents (missing roots/identity, unknown instance). - #[error("provider: {0}")] - Provider(String), - /// Provider failed to fetch certificate material. - #[error("provider fetch: {0}")] - Fetch(#[from] CertProviderError), - /// PEM parsing failed or yielded no usable cert/key. - #[error("pem: {0}")] - Pem(String), - /// rustls rejected the supplied verifier or client-auth identity. - #[error("rustls: {0}")] - Rustls(String), -} - -/// Build a [`rustls::ClientConfig`] for a cluster from its [`ClusterSecurityConfig`] -/// and the bootstrap [`CertProviderRegistry`]. -/// -/// Resolves the CA + (optional) identity provider instances by name, fetches -/// the certificate material, builds an [`XdsServerCertVerifier`] with the -/// cluster's SAN matchers, and assembles a `ClientConfig` with the custom -/// verifier installed via `.dangerous().with_custom_certificate_verifier(...)`. -/// -/// The returned config is ready to be plugged into a TLS connector once an -/// upstream API exists for that — see the TODO in -/// [`crate::xds::cluster_discovery::build_connector`]. -pub(crate) fn build_client_config( - registry: &CertProviderRegistry, - security: &ClusterSecurityConfig, -) -> Result { - let ca_data = fetch_provider_data(registry, &security.ca_instance_name, "CA")?; - let ca_pem = ca_data.roots().ok_or_else(|| { - ClientConfigError::Provider(format!( - "CA instance '{}' has no roots", - security.ca_instance_name - )) - })?; - let root_store = build_root_store(ca_pem)?; - - let verifier = Arc::new(XdsServerCertVerifier::new( - root_store, - security.san_matchers.clone(), - )); - - let builder = ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(verifier); - - let config = match &security.identity_instance_name { - Some(name) => { - let id_data = fetch_provider_data(registry, name, "identity")?; - let identity = id_data.identity().ok_or_else(|| { - ClientConfigError::Provider(format!("identity instance '{name}' has no identity")) - })?; - let cert_chain = parse_pem_certs(&identity.cert_chain)?; - let key = parse_pem_key(&identity.key)?; - builder - .with_client_auth_cert(cert_chain, key) - .map_err(|e| ClientConfigError::Rustls(e.to_string()))? - } - None => builder.with_no_client_auth(), - }; - - Ok(config) -} - -fn fetch_provider_data( - registry: &CertProviderRegistry, - name: &str, - role: &str, -) -> Result, ClientConfigError> { - let provider = registry - .get(name) - .ok_or_else(|| ClientConfigError::Provider(format!("unknown {role} instance '{name}'")))?; - Ok(provider.fetch()?) -} - -fn build_root_store(pem: &[u8]) -> Result { - let certs = parse_pem_certs(pem)?; - let mut store = RootCertStore::empty(); - let (added, _) = store.add_parsable_certificates(certs); - if added == 0 { - return Err(ClientConfigError::Pem("no certificates in PEM".into())); - } - Ok(store) -} - -fn parse_pem_certs(pem: &[u8]) -> Result>, ClientConfigError> { - let mut reader = std::io::Cursor::new(pem); - rustls_pemfile::certs(&mut reader) - .collect::>() - .map_err(|e| ClientConfigError::Pem(e.to_string())) -} - -fn parse_pem_key(pem: &[u8]) -> Result, ClientConfigError> { - let mut reader = std::io::Cursor::new(pem); - rustls_pemfile::private_key(&mut reader) - .map_err(|e| ClientConfigError::Pem(e.to_string()))? - .ok_or_else(|| ClientConfigError::Pem("no private key in PEM".into())) -} - #[cfg(test)] mod tests { use super::*; + use crate::xds::cert_provider::{CertProviderError, CertificateData, Identity}; use rcgen::{CertificateParams, SanType as RcgenSanType}; + use rustls::RootCertStore; /// Generate a self-signed DER cert carrying the given SANs. fn gen_cert_with_sans(sans: Vec) -> CertificateDer<'static> { @@ -405,6 +319,21 @@ mod tests { store } + /// Test shim: a [`CertificateProvider`] that returns a fixed snapshot. + struct StaticProvider(Arc); + + impl CertificateProvider for StaticProvider { + fn fetch(&self) -> Result, CertProviderError> { + Ok(self.0.clone()) + } + } + + fn provider_with_roots(store: RootCertStore) -> Arc { + Arc::new(StaticProvider(Arc::new(CertificateData::RootsOnly { + roots: Arc::new(store), + }))) + } + fn uri_matcher(spiffe_uri: &str) -> SanMatcher { use envoy_types::pb::envoy::extensions::transport_sockets::tls::v3::{ SubjectAltNameMatcher, subject_alt_name_matcher::SanType, @@ -426,7 +355,7 @@ mod tests { fn spiffe_uri_only_cert_with_matching_uri_matcher_passes() { let (ca_der, leaf_der) = build_chain_with_spiffe_leaf("spiffe://td/ns/prod/sa/api"); let verifier = XdsServerCertVerifier::new( - root_store_with(ca_der), + provider_with_roots(root_store_with(ca_der)), vec![uri_matcher("spiffe://td/ns/prod/sa/api")], ); @@ -440,7 +369,7 @@ mod tests { fn spiffe_uri_only_cert_with_non_matching_matcher_fails() { let (ca_der, leaf_der) = build_chain_with_spiffe_leaf("spiffe://td/ns/prod/sa/api"); let verifier = XdsServerCertVerifier::new( - root_store_with(ca_der), + provider_with_roots(root_store_with(ca_der)), vec![uri_matcher("spiffe://td/ns/prod/sa/other")], ); @@ -458,11 +387,34 @@ mod tests { fn spiffe_uri_only_cert_with_empty_matchers_passes_ca_only() { // per gRFC A29 §'Server Authorization': an empty matcher list passes let (ca_der, leaf_der) = build_chain_with_spiffe_leaf("spiffe://td/ns/prod/sa/api"); - let verifier = XdsServerCertVerifier::new(root_store_with(ca_der), vec![]); + let verifier = + XdsServerCertVerifier::new(provider_with_roots(root_store_with(ca_der)), vec![]); let server_name = ServerName::try_from("any.connect.hostname").unwrap(); let result = verifier.verify_server_cert(&leaf_der, &[], &server_name, &[], UnixTime::now()); assert!(result.is_ok(), "expected Ok, got {result:?}"); } + + #[test] + fn verify_fails_when_provider_has_no_roots() { + let (_ca_der, leaf_der) = build_chain_with_spiffe_leaf("spiffe://td/ns/prod/sa/api"); + let provider: Arc = + Arc::new(StaticProvider(Arc::new(CertificateData::IdentityOnly { + identity: Identity { + cert_chain: b"chain".to_vec(), + key: b"key".to_vec(), + }, + }))); + let verifier = XdsServerCertVerifier::new(provider, vec![]); + + let server_name = ServerName::try_from("any.connect.hostname").unwrap(); + let err = verifier + .verify_server_cert(&leaf_der, &[], &server_name, &[], UnixTime::now()) + .unwrap_err(); + assert!( + matches!(err, RustlsError::General(ref msg) if msg.contains("no roots")), + "expected General(\"...no roots...\"), got {err:?}", + ); + } } diff --git a/tonic-xds/src/xds/cluster_discovery.rs b/tonic-xds/src/xds/cluster_discovery.rs index cec412c6d..4c9e232a0 100644 --- a/tonic-xds/src/xds/cluster_discovery.rs +++ b/tonic-xds/src/xds/cluster_discovery.rs @@ -1,39 +1,68 @@ //! xDS-backed [`ClusterDiscovery`] implementation. //! -//! Bridges [`XdsCache`] endpoint watches and [`EndpointManager`] diffing -//! to provide the [`ClusterDiscovery`] trait required by [`XdsLbService`]. +//! Per cluster, [`XdsClusterDiscovery::discover_cluster`] spawns a task that +//! drives two concurrent watches: +//! +//! 1. The cluster resource watch — produces a fresh [`Connector`] on each +//! CDS update (e.g. when `transport_socket` changes). The connector is +//! held inside a [`ConnectorSwap`] so the diff loop reads the latest +//! snapshot per endpoint connection. +//! 2. The endpoint watch — produces `Change::Insert` / `Change::Remove` +//! events forwarded to the LB layer. +//! +//! On a CDS update whose security config fails validation, the previous +//! connector is kept and a warning is logged. use std::sync::Arc; +use arc_swap::ArcSwap; +use tokio::sync::mpsc; +use tokio_stream::StreamExt as _; +use tokio_stream::wrappers::ReceiverStream; use tonic::transport::{Channel, Endpoint}; -use crate::client::endpoint::{EndpointAddress, EndpointChannel}; +use crate::client::endpoint::{Connector, EndpointAddress, EndpointChannel}; use crate::client::lb::{BoxDiscover, ClusterDiscovery}; +use crate::common::async_util::BoxFuture; use crate::xds::cache::XdsCache; -use crate::xds::endpoint_manager::EndpointManager; +#[cfg(feature = "_tls-any")] +use crate::xds::cert_provider::verifier::XdsServerCertVerifier; +#[cfg(feature = "_tls-any")] +use crate::xds::cert_provider::{CertProviderRegistry, CertificateProvider}; +use crate::xds::endpoint_manager::{ConnectorSwap, EndpointManager}; +use crate::xds::resource::ClusterResource; +#[cfg(feature = "_tls-any")] +use crate::xds::resource::security::ClusterSecurityConfig; -/// Shared connector function that creates endpoint services from addresses. -// TODO: Refactor to a trait when adding TLS support (A29). A trait can carry -// configuration (TLS settings, timeouts) and be shared across EndpointManager, -// ClusterDiscovery, and LB reconnect logic. -pub(crate) type EndpointConnector = - Arc EndpointChannel + Send + Sync>; +/// Buffer capacity for the discovery channel between the spawned task and +/// Tower's LB layer. +const DISCOVER_CHANNEL_CAPACITY: usize = 64; -/// xDS-backed cluster discovery that resolves cluster names into endpoint -/// change streams by watching the [`XdsCache`]. +/// xDS-backed cluster discovery. +/// +/// Resolves cluster names into endpoint change streams by watching the +/// [`XdsCache`]. Builds per-cluster [`Connector`]s based on the cluster's +/// [`ClusterSecurityConfig`] (if any) and the bootstrap-built +/// [`CertProviderRegistry`]. pub(crate) struct XdsClusterDiscovery { cache: Arc, - endpoint_manager: EndpointManager>, + #[cfg(feature = "_tls-any")] + cert_provider_registry: Arc, } impl XdsClusterDiscovery { - /// Creates a new `XdsClusterDiscovery`. - pub(crate) fn new(cache: Arc, connector: EndpointConnector) -> Self { + #[cfg(feature = "_tls-any")] + pub(crate) fn new(cache: Arc, registry: Arc) -> Self { Self { cache, - endpoint_manager: EndpointManager::new(connector), + cert_provider_registry: registry, } } + + #[cfg(not(feature = "_tls-any"))] + pub(crate) fn new(cache: Arc) -> Self { + Self { cache } + } } impl ClusterDiscovery> for XdsClusterDiscovery { @@ -41,29 +70,361 @@ impl ClusterDiscovery> for XdsClusterD &self, cluster_name: &str, ) -> BoxDiscover> { - let watch = self.cache.watch_endpoints(cluster_name); - self.endpoint_manager.discover_endpoints(watch) + let cache = self.cache.clone(); + let cluster_name = cluster_name.to_string(); + #[cfg(feature = "_tls-any")] + let registry = self.cert_provider_registry.clone(); + + let (tx, rx) = mpsc::channel(DISCOVER_CHANNEL_CAPACITY); + + tokio::spawn(async move { + let mut cluster_watch = cache.watch_cluster(&cluster_name); + + let connector_swap: ConnectorSwap> = loop { + let Some(cluster) = cluster_watch.next().await else { + return; + }; + let result = build_connector( + &cluster, + #[cfg(feature = "_tls-any")] + ®istry, + ); + match result { + Ok(c) => break Arc::new(ArcSwap::from_pointee(c)), + Err(e) => tracing::warn!( + cluster = %cluster_name, + error = %e, + "initial CDS update rejected; awaiting next update", + ), + } + }; + + let manager = EndpointManager::new(Arc::clone(&connector_swap)); + let mut endpoints = manager.discover_endpoints(cache.watch_endpoints(&cluster_name)); + + loop { + tokio::select! { + Some(change) = endpoints.next() => { + if tx.send(change).await.is_err() { + return; + } + } + Some(cluster) = cluster_watch.next() => { + let result = build_connector( + &cluster, + #[cfg(feature = "_tls-any")] + ®istry, + ); + match result { + Ok(new) => connector_swap.store(Arc::new(new)), + Err(e) => tracing::warn!( + cluster = %cluster_name, + error = %e, + "CDS update rejected; keeping previous connector", + ), + } + } + else => return, + } + } + }); + + Box::pin(ReceiverStream::new(rx)) } } -/// Default connector that creates a lazily-connected [`EndpointChannel`] for -/// each endpoint address. +/// Build a [`Connector`] for the given cluster. /// -/// Uses insecure (plaintext) connections. -// TODO(PR2/A29): Replace this with a TLS-aware connector that receives the -// CertProviderRegistry and per-cluster UpstreamTlsContext (from ClusterResource). -// When a cluster has transport_socket configured, the connector should: -// 1. Look up root + identity cert provider instances from the registry -// 2. Build ClientTlsConfig with the fetched CertificateData -// 3. Apply SAN matching for server authorization -// 4. Use connect() instead of connect_lazy() for TLS handshake -pub(crate) fn default_endpoint_connector(addr: &EndpointAddress) -> EndpointChannel { - let uri = format!("http://{addr}"); - // Safety: EndpointAddress only holds validated Ipv4/Ipv6/Hostname + u16 port, - // and its Display impl produces "ip:port" or "hostname:port". Prefixing with - // "http://" always yields a valid URI, so from_shared cannot fail here. - let channel = Endpoint::from_shared(uri) - .expect("EndpointAddress Display guarantees valid URI") - .connect_lazy(); - EndpointChannel::new(channel) +/// - `cluster.security == None` → [`PlaintextConnector`]. +/// - `cluster.security == Some(_)` under a TLS feature → [`TlsConnector`]. +/// - `cluster.security == Some(_)` without a TLS feature → error. +fn build_connector( + cluster: &ClusterResource, + #[cfg(feature = "_tls-any")] registry: &CertProviderRegistry, +) -> Result> + Send + Sync>, ConnectorBuildError> +{ + match &cluster.security { + None => Ok(Arc::new(PlaintextConnector)), + #[cfg(feature = "_tls-any")] + Some(sec) => Ok(Arc::new(TlsConnector::new(registry, sec)?)), + #[cfg(not(feature = "_tls-any"))] + Some(_) => Err(ConnectorBuildError::TlsFeatureMissing), + } +} + +/// Errors building a per-cluster [`Connector`] from a [`ClusterResource`]. +#[derive(Debug, thiserror::Error)] +pub(crate) enum ConnectorBuildError { + /// TLS connector build failed (unknown provider instance, etc.). + #[cfg(feature = "_tls-any")] + #[error("build TLS connector: {0}")] + Tls(#[from] TlsConnectorBuildError), + /// The cluster requires TLS but the binary was built without a TLS + /// crypto backend feature. + #[cfg(not(feature = "_tls-any"))] + #[error("cluster requires TLS but no TLS feature enabled (build with tls-ring or tls-aws-lc)")] + TlsFeatureMissing, +} + +/// Errors constructing a [`TlsConnector`]. +#[cfg(feature = "_tls-any")] +#[derive(Debug, thiserror::Error)] +pub(crate) enum TlsConnectorBuildError { + #[error("CA provider instance '{0}' is not configured in bootstrap.certificate_providers")] + UnknownCaInstance(String), + #[error( + "identity provider instance '{0}' is not configured in bootstrap.certificate_providers" + )] + UnknownIdentityInstance(String), +} + +/// Plaintext (non-TLS) [`Connector`] that produces a lazily-connected +/// `tonic::Channel` for each endpoint. +pub(crate) struct PlaintextConnector; + +impl Connector for PlaintextConnector { + type Service = EndpointChannel; + + fn connect(&self, addr: &EndpointAddress) -> BoxFuture { + // EndpointAddress only holds validated Ipv4/Ipv6/Hostname + u16 port, + // and its Display impl produces "ip:port" or "hostname:port". Prefixing + // with "http://" always yields a valid URI, so from_shared cannot fail. + let channel = Endpoint::from_shared(format!("http://{addr}")) + .expect("EndpointAddress Display guarantees valid URI") + .connect_lazy(); + let svc = EndpointChannel::new(channel); + Box::pin(async move { svc }) + } +} + +/// TLS [`Connector`] for clusters whose CDS resource carries an +/// `UpstreamTlsContext`. Holds: +/// +/// - a verifier that reads CA roots from its [`CertificateProvider`] on +/// each handshake (so `file_watcher`-driven CA rotation is picked up +/// automatically), and +/// - an optional identity provider for mTLS — fetched per [`connect`] call +/// so identity rotation is picked up on each new connection. +/// +/// The connector is rebuilt by [`build_connector`] on every CDS update, so +/// changes to `ca_instance_name` / `identity_instance_name` / SAN matchers +/// also propagate as the cluster watch swaps the connector. +#[cfg(feature = "_tls-any")] +pub(crate) struct TlsConnector { + verifier: Arc, + identity_provider: Option>, +} + +#[cfg(feature = "_tls-any")] +impl TlsConnector { + pub(crate) fn new( + registry: &CertProviderRegistry, + security: &ClusterSecurityConfig, + ) -> Result { + let ca_provider = registry + .get(&security.ca_instance_name) + .ok_or_else(|| { + TlsConnectorBuildError::UnknownCaInstance(security.ca_instance_name.clone()) + })? + .clone(); + let verifier = Arc::new(XdsServerCertVerifier::new( + ca_provider, + security.san_matchers.clone(), + )); + + let identity_provider = security + .identity_instance_name + .as_ref() + .map(|name| { + registry + .get(name) + .cloned() + .ok_or_else(|| TlsConnectorBuildError::UnknownIdentityInstance(name.clone())) + }) + .transpose()?; + + Ok(Self { + verifier, + identity_provider, + }) + } +} + +#[cfg(feature = "_tls-any")] +impl Connector for TlsConnector { + type Service = EndpointChannel; + + fn connect(&self, addr: &EndpointAddress) -> BoxFuture { + use rustls::client::danger::ServerCertVerifier; + + let verifier: Arc = self.verifier.clone(); + + // Identity is fetched per `connect` so file_watcher-driven identity + // rotation reaches each new connection. `Identity::from_pem` is + // bytes-only; the rustls parse happens inside `tls_config_with_verifier`. + let identity = self + .identity_provider + .as_ref() + .and_then(|p| match p.fetch() { + Ok(data) => data + .identity() + .map(|id| tonic::transport::Identity::from_pem(&id.cert_chain, &id.key)), + Err(e) => { + tracing::error!( + error = %e, + "identity provider fetch failed; falling back to TLS-only", + ); + None + } + }); + + let mut tls_config = tonic::transport::ClientTlsConfig::new(); + if let Some(id) = identity { + tls_config = tls_config.identity(id); + } + + let uri = format!("https://{addr}"); + let endpoint = Endpoint::from_shared(uri.clone()) + .expect("EndpointAddress Display guarantees valid URI"); + + let channel = match endpoint.tls_config_with_verifier(tls_config, verifier) { + Ok(ep) => ep.connect_lazy(), + Err(e) => { + // tls_config_with_verifier only errors on UDS endpoints + // (see tonic's endpoint.rs), which we never construct. The + // defensive fallback returns a non-TLS lazy channel — the + // request will fail at the wire, surfacing the misconfig. + tracing::error!( + error = %e, address = %addr, + "tls_config_with_verifier failed; non-TLS lazy fallback", + ); + Endpoint::from_shared(uri) + .expect("EndpointAddress Display guarantees valid URI") + .connect_lazy() + } + }; + let svc = EndpointChannel::new(channel); + Box::pin(async move { svc }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::xds::resource::cluster::{ClusterResource, LbPolicy}; + + fn plaintext_cluster() -> ClusterResource { + ClusterResource { + name: "c".into(), + eds_service_name: None, + lb_policy: LbPolicy::RoundRobin, + security: None, + } + } + + #[cfg(feature = "_tls-any")] + fn empty_registry() -> CertProviderRegistry { + use std::collections::HashMap; + CertProviderRegistry::from_bootstrap(&HashMap::new()).unwrap() + } + + /// Plaintext dispatch under TLS feature. + #[cfg(feature = "_tls-any")] + #[test] + fn build_connector_plaintext_tls_feature_on() { + assert!(build_connector(&plaintext_cluster(), &empty_registry()).is_ok()); + } + + /// Plaintext dispatch without any TLS feature. + #[cfg(not(feature = "_tls-any"))] + #[test] + fn build_connector_plaintext_no_tls() { + assert!(build_connector(&plaintext_cluster()).is_ok()); + } + + /// Cluster with TLS pointing at an instance not in the registry surfaces + /// a clear error — useful for misconfig diagnostics. + #[cfg(feature = "_tls-any")] + #[test] + fn build_connector_tls_unknown_ca() { + use crate::xds::resource::security::ClusterSecurityConfig; + + let mut cluster = plaintext_cluster(); + cluster.security = Some(ClusterSecurityConfig { + ca_instance_name: "missing-ca".into(), + identity_instance_name: None, + san_matchers: vec![], + }); + let Err(err) = build_connector(&cluster, &empty_registry()) else { + panic!("expected UnknownCaInstance error"); + }; + assert!(matches!( + err, + ConnectorBuildError::Tls(TlsConnectorBuildError::UnknownCaInstance(ref name)) + if name == "missing-ca" + )); + } + + /// `TlsConnector::connect` fetches the identity provider on every call, + /// which is what gives us identity rotation between CDS updates. Counter + /// shim verifies the call count without standing up a TLS handshake. + #[cfg(feature = "_tls-any")] + #[tokio::test] + async fn tls_connector_fetches_identity_per_connect() { + use crate::xds::cert_provider::{ + CertProviderError, CertificateData, CertificateProvider, Identity, + }; + use rustls::RootCertStore; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct CountingIdentity { + count: AtomicUsize, + data: Arc, + } + impl CertificateProvider for CountingIdentity { + fn fetch(&self) -> Result, CertProviderError> { + self.count.fetch_add(1, Ordering::Relaxed); + Ok(self.data.clone()) + } + } + + struct StaticCa(Arc); + impl CertificateProvider for StaticCa { + fn fetch(&self) -> Result, CertProviderError> { + Ok(self.0.clone()) + } + } + + let ca_provider: Arc = + Arc::new(StaticCa(Arc::new(CertificateData::RootsOnly { + roots: Arc::new(RootCertStore::empty()), + }))); + let verifier = Arc::new(XdsServerCertVerifier::new(ca_provider, vec![])); + + let identity_data = Arc::new(CertificateData::IdentityOnly { + identity: Identity { + cert_chain: b"cert".to_vec(), + key: b"key".to_vec(), + }, + }); + let counter = Arc::new(CountingIdentity { + count: AtomicUsize::new(0), + data: identity_data, + }); + let identity_provider: Arc = counter.clone(); + let connector = TlsConnector { + verifier, + identity_provider: Some(identity_provider), + }; + + let addr = EndpointAddress::from("1.2.3.4:443".parse::().unwrap()); + let _ = connector.connect(&addr).await; + let _ = connector.connect(&addr).await; + + assert_eq!( + counter.count.load(Ordering::Relaxed), + 2, + "TlsConnector should fetch identity provider on every connect call", + ); + } } diff --git a/tonic-xds/src/xds/endpoint_manager.rs b/tonic-xds/src/xds/endpoint_manager.rs index 59de34532..c21e75c46 100644 --- a/tonic-xds/src/xds/endpoint_manager.rs +++ b/tonic-xds/src/xds/endpoint_manager.rs @@ -9,12 +9,13 @@ use std::collections::HashSet; use std::sync::Arc; +use arc_swap::ArcSwap; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tower::BoxError; use tower::discover::Change; -use crate::client::endpoint::EndpointAddress; +use crate::client::endpoint::{Connector, EndpointAddress}; use crate::client::lb::BoxDiscover; use crate::xds::cache::CacheWatch; use crate::xds::resource::EndpointsResource; @@ -23,19 +24,27 @@ use crate::xds::resource::EndpointsResource; /// and Tower's load balancer. const ENDPOINT_CHANNEL_CAPACITY: usize = 64; +/// An atomically-swappable [`Connector`] held by an [`EndpointManager`]. +/// +/// `XdsClusterDiscovery` stores a snapshot of the cluster's per-CDS-update +/// connector here. The diff loop calls `load_full()` on every new endpoint +/// so each connection picks up the latest snapshot. Existing endpoint +/// channels keep their `EndpointChannel` instance (and any in-flight TLS +/// session) — only freshly-discovered endpoints see the swapped value. +pub(crate) type ConnectorSwap = Arc + Send + Sync>>>; + /// Converts endpoint cache watches into incremental [`Change`] streams. /// /// `EndpointManager` is a pure diff-and-connect component: the caller -/// (typically `XdsResourceManager`) obtains a [`CacheWatch`] from the -/// [`XdsCache`](crate::xds::cache::XdsCache) and passes it here. +/// (typically `XdsClusterDiscovery`) obtains a [`CacheWatch`] from the +/// [`XdsCache`](crate::xds::cache::XdsCache) and passes it here, plus a +/// [`ConnectorSwap`] that the caller may swap on CDS updates. pub(crate) struct EndpointManager { - /// Creates a service for each new endpoint address (e.g., wrapping a - /// lazily-connected `tonic::transport::Channel` in an `EndpointChannel`). - connector: Arc S + Send + Sync>, + connector: ConnectorSwap, } impl EndpointManager { - pub(crate) fn new(connector: Arc S + Send + Sync>) -> Self { + pub(crate) fn new(connector: ConnectorSwap) -> Self { Self { connector } } @@ -67,7 +76,7 @@ impl EndpointManager { /// new endpoints followed by `Remove` for gone ones. async fn diff_loop( mut watch: CacheWatch, - connector: Arc S + Send + Sync>, + connector: ConnectorSwap, tx: mpsc::Sender, BoxError>>, ) { let mut active: HashSet = HashSet::new(); @@ -79,7 +88,7 @@ async fn diff_loop( .collect(); for added in new_set.difference(&active) { - let svc = connector(added); + let svc = connector.load_full().connect(added).await; if tx .send(Ok(Change::Insert(added.clone(), svc))) .await @@ -102,12 +111,26 @@ async fn diff_loop( #[cfg(test)] mod tests { use super::*; + use crate::common::async_util::BoxFuture; use crate::xds::cache::XdsCache; use crate::xds::resource::endpoints::{HealthStatus, LocalityEndpoints, ResolvedEndpoint}; use tokio_stream::StreamExt; - fn test_connector() -> Arc String + Send + Sync> { - Arc::new(|addr: &EndpointAddress| addr.to_string()) + /// Test [`Connector`] that returns the address as its `Service` (just a + /// `String`). + struct StringConnector; + + impl Connector for StringConnector { + type Service = String; + fn connect(&self, addr: &EndpointAddress) -> BoxFuture { + let s = addr.to_string(); + Box::pin(async move { s }) + } + } + + fn test_swap() -> ConnectorSwap { + let conn: Arc + Send + Sync> = Arc::new(StringConnector); + Arc::new(ArcSwap::from_pointee(conn)) } fn make_endpoints(cluster: &str, addrs: &[(&str, u16)]) -> Arc { @@ -132,7 +155,7 @@ mod tests { #[tokio::test] async fn initial_endpoints_emitted_as_inserts() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints( "c1", @@ -155,7 +178,7 @@ mod tests { #[tokio::test] async fn added_endpoint_emits_insert() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints("c1", make_endpoints("c1", &[("10.0.0.1", 8080)])); @@ -176,7 +199,7 @@ mod tests { #[tokio::test] async fn removed_endpoint_emits_remove() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints( "c1", @@ -200,7 +223,7 @@ mod tests { #[tokio::test] async fn unhealthy_endpoint_removed() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints("c1", make_endpoints("c1", &[("10.0.0.1", 8080)])); @@ -231,7 +254,7 @@ mod tests { #[tokio::test] async fn cache_removal_closes_stream() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints("c1", make_endpoints("c1", &[("10.0.0.1", 8080)])); @@ -246,7 +269,7 @@ mod tests { #[tokio::test] async fn multiple_clusters_independent() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints("c1", make_endpoints("c1", &[("10.0.0.1", 8080)])); cache.update_endpoints("c2", make_endpoints("c2", &[("10.0.0.2", 9090)])); @@ -267,7 +290,7 @@ mod tests { #[tokio::test] async fn endpoint_swap_emits_insert_then_remove() { let cache = XdsCache::new(); - let manager = EndpointManager::new(test_connector()); + let manager = EndpointManager::new(test_swap()); cache.update_endpoints("c1", make_endpoints("c1", &[("10.0.0.1", 8080)])); diff --git a/tonic-xds/src/xds/mod.rs b/tonic-xds/src/xds/mod.rs index a2416a6e4..479258a30 100644 --- a/tonic-xds/src/xds/mod.rs +++ b/tonic-xds/src/xds/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod bootstrap; pub(crate) mod cache; +#[cfg(feature = "_tls-any")] pub(crate) mod cert_provider; pub(crate) mod cluster_discovery; pub(crate) mod endpoint_manager; diff --git a/tonic-xds/src/xds/resource/san_matcher.rs b/tonic-xds/src/xds/resource/san_matcher.rs index f1c71b2a9..34079d7fe 100644 --- a/tonic-xds/src/xds/resource/san_matcher.rs +++ b/tonic-xds/src/xds/resource/san_matcher.rs @@ -27,6 +27,13 @@ pub(crate) enum SanMatcher { Uri(StringMatcher), Email(StringMatcher), IpAddress(StringMatcher), + /// Type-agnostic matcher synthesized from the deprecated + /// `CertificateValidationContext.match_subject_alt_names` field, which + /// carries plain `StringMatcher`s without an explicit SAN type. Matches + /// against any SAN entry in the peer cert regardless of type — required + /// for interop with control planes (notably Istio) that still emit the + /// deprecated field with SPIFFE URI content. + AnyType(StringMatcher), } /// A SAN entry extracted from a peer X.509 certificate. @@ -81,6 +88,14 @@ impl SanMatcher { // (RFC 5952). `IpAddr`'s `Display` implementation already produces // that form (lowercase, zero-compressed IPv6). (Self::IpAddress(m), SanEntry::IpAddress(ip)) => m.is_match(&ip.to_string()), + // AnyType: apply the string matcher to whatever the SAN entry + // carries, regardless of type. DNS entries don't get wildcard + // semantics here — the deprecated field predates typed wildcard + // handling. + (Self::AnyType(m), SanEntry::Dns(v)) => m.is_match(v), + (Self::AnyType(m), SanEntry::Uri(v)) => m.is_match(v), + (Self::AnyType(m), SanEntry::Email(v)) => m.is_match(v), + (Self::AnyType(m), SanEntry::IpAddress(ip)) => m.is_match(&ip.to_string()), _ => false, // type mismatch } } @@ -300,4 +315,39 @@ mod tests { ]; assert!(m.matches_any(&sans)); } + + #[test] + fn any_type_matches_uri_san() { + let m = SanMatcher::AnyType( + StringMatcher::from_proto(exact("spiffe://td/ns/prod/sa/x")).unwrap(), + ); + assert!(m.matches_any(&[SanEntry::Uri("spiffe://td/ns/prod/sa/x".into())])); + } + + #[test] + fn any_type_matches_dns_san() { + let m = SanMatcher::AnyType(StringMatcher::from_proto(exact("api.example.com")).unwrap()); + assert!(m.matches_any(&[SanEntry::Dns("api.example.com".into())])); + } + + #[test] + fn any_type_matches_email_san() { + let m = SanMatcher::AnyType(StringMatcher::from_proto(exact("svc@corp.test")).unwrap()); + assert!(m.matches_any(&[SanEntry::Email("svc@corp.test".into())])); + } + + #[test] + fn any_type_matches_ip_san_canonical_form() { + let m = SanMatcher::AnyType(StringMatcher::from_proto(exact("10.0.0.1")).unwrap()); + assert!(m.matches_any(&[SanEntry::IpAddress("10.0.0.1".parse().unwrap())])); + } + + #[test] + fn any_type_does_not_apply_dns_wildcard_semantics() { + // Wildcards in the *cert* are honored only when the matcher type is + // `Dns` (typed) — the deprecated `AnyType` path predates that and + // compares as literal strings. + let m = SanMatcher::AnyType(StringMatcher::from_proto(exact("foo.example.com")).unwrap()); + assert!(!m.matches_any(&[SanEntry::Dns("*.example.com".into())])); + } } diff --git a/tonic-xds/src/xds/resource/security.rs b/tonic-xds/src/xds/resource/security.rs index 05a326580..9e5a60f88 100644 --- a/tonic-xds/src/xds/resource/security.rs +++ b/tonic-xds/src/xds/resource/security.rs @@ -163,11 +163,16 @@ fn parse_san_matchers(ctx: &CertificateValidationContext) -> xds_client::Result< .map(SanMatcher::from_proto) .collect(); } + // Deprecated `match_subject_alt_names` field carries untyped `StringMatcher`s. + // Per grpc-go behavior, each one applies against any SAN entry type in the + // peer cert. This matters for Istio interop: istiod emits SPIFFE URIs + // (`spiffe://...`) in this deprecated field, and the cert's matching SAN + // is a URI entry. #[allow(deprecated)] ctx.match_subject_alt_names .iter() .cloned() - .map(|m| StringMatcher::from_proto(m).map(SanMatcher::Dns)) + .map(|m| StringMatcher::from_proto(m).map(SanMatcher::AnyType)) .collect() } @@ -179,17 +184,6 @@ fn reject_unsupported_common_fields(ctx: &CommonTlsContext) -> xds_client::Resul ctx.custom_handshaker.is_some(), "CommonTlsContext.custom_handshaker", )?; - #[allow(deprecated)] - { - reject( - ctx.tls_certificate_certificate_provider.is_some(), - "CommonTlsContext.tls_certificate_certificate_provider", - )?; - reject( - ctx.tls_certificate_certificate_provider_instance.is_some(), - "CommonTlsContext.tls_certificate_certificate_provider_instance", - )?; - } Ok(()) } @@ -390,8 +384,7 @@ mod tests { .unwrap() .unwrap(); assert_eq!(cfg.san_matchers.len(), 1); - // Legacy field treats entries as DNS SAN matchers. - assert!(matches!(cfg.san_matchers[0], SanMatcher::Dns(_))); + assert!(matches!(cfg.san_matchers[0], SanMatcher::AnyType(_))); } #[test] @@ -568,4 +561,66 @@ mod tests { let err = parse_transport_socket(Some(wrap_upstream(common_ctx(cvc)))).unwrap_err(); assert!(err.to_string().contains("custom_validator_config")); } + + /// Real-world control planes (Istio in particular) emit both the current + /// `tls_certificate_provider_instance` field and the deprecated + /// `tls_certificate_certificate_provider_instance` for backward compat. + /// Our parser must accept this shape and read identity from the current + /// field. + #[test] + fn deprecated_and_current_identity_fields_coexist() { + use envoy_types::pb::envoy::extensions::transport_sockets::tls::v3::common_tls_context::CertificateProviderInstance; + + #[allow(deprecated)] + let common = CommonTlsContext { + tls_certificate_provider_instance: Some(provider_instance("identity")), + // Istio also emits this older field with the same instance name. + tls_certificate_certificate_provider_instance: Some(CertificateProviderInstance { + instance_name: "identity".into(), + certificate_name: String::new(), + }), + validation_context_type: Some( + common_tls_context::ValidationContextType::ValidationContext(ca_validation_ctx( + "root_ca", + )), + ), + ..Default::default() + }; + let cfg = parse_transport_socket(Some(wrap_upstream(common))) + .unwrap() + .unwrap(); + assert_eq!(cfg.ca_instance_name, "root_ca"); + assert_eq!(cfg.identity_instance_name.as_deref(), Some("identity")); + } + + /// Deprecated `match_subject_alt_names` (untyped string matchers) must + /// produce `SanMatcher::AnyType` so that SPIFFE URI content emitted by + /// Istio matches against the cert's URI SAN entry. Without this, the + /// matchers would only match DNS SANs and fail handshake against + /// URI-only certs. + #[test] + fn deprecated_match_subject_alt_names_produces_any_type() { + use crate::xds::resource::san_matcher::{SanEntry, SanMatcher}; + + let cvc = CertificateValidationContext { + ca_certificate_provider_instance: Some(provider_instance("root_ca")), + #[allow(deprecated)] + match_subject_alt_names: vec![StringMatcherProto { + match_pattern: Some(MatchPattern::Exact( + "spiffe://cluster.local/ns/xds-test/sa/greeter".into(), + )), + ignore_case: false, + }], + ..Default::default() + }; + let cfg = parse_transport_socket(Some(wrap_upstream(common_ctx(cvc)))) + .unwrap() + .unwrap(); + assert_eq!(cfg.san_matchers.len(), 1); + assert!(matches!(cfg.san_matchers[0], SanMatcher::AnyType(_))); + // The matcher must match a URI-typed SAN entry, not just DNS. + assert!(cfg.san_matchers[0].matches_any(&[SanEntry::Uri( + "spiffe://cluster.local/ns/xds-test/sa/greeter".into(), + )])); + } }