From 9297bd86e893801767ff1a3f51a0a1927ac3eb19 Mon Sep 17 00:00:00 2001 From: Filip Pytloun Date: Mon, 22 Jun 2026 12:28:08 +0200 Subject: [PATCH] feat(sources): add gRPC max connection age --- src/components/validation/runner/io.rs | 3 +- src/sources/opentelemetry/config.rs | 8 +- .../opentelemetry/integration_tests.rs | 3 + src/sources/opentelemetry/tests.rs | 30 ++ src/sources/util/grpc/mod.rs | 471 +++++++++++++++++- src/sources/vector/mod.rs | 165 +++++- .../sources/generated/opentelemetry.cue | 34 ++ .../components/sources/generated/vector.cue | 30 ++ 8 files changed, 726 insertions(+), 18 deletions(-) diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index e8d6be540079e..80125c80d60bf 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -22,7 +22,7 @@ use crate::{ Client as VectorClient, HealthCheckRequest, HealthCheckResponse, PushEventsRequest, PushEventsResponse, Server as VectorServer, Service as VectorService, ServingStatus, }, - sources::util::grpc::run_grpc_server, + sources::util::grpc::{GrpcKeepaliveConfig, run_grpc_server}, }; #[derive(Clone)] @@ -166,6 +166,7 @@ pub fn spawn_grpc_server( listen_addr.as_socket_addr(), tls_settings, service, + GrpcKeepaliveConfig::default(), shutdown_signal, ); pin!(server); diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index 5ae97fa491479..8da189ef61d79 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -14,7 +14,7 @@ use crate::{ grpc::Service, http::{build_warp_filter, run_http_server}, }, - util::grpc::run_grpc_server_with_routes, + util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes}, }, }; use futures::FutureExt; @@ -173,12 +173,17 @@ pub struct GrpcConfig { #[configurable(derived)] #[serde(default, skip_serializing_if = "Option::is_none")] pub tls: Option, + + #[configurable(derived)] + #[serde(default)] + pub keepalive: GrpcKeepaliveConfig, } fn example_grpc_config() -> GrpcConfig { GrpcConfig { address: "0.0.0.0:4317".parse().unwrap(), tls: None, + keepalive: GrpcKeepaliveConfig::default(), } } @@ -325,6 +330,7 @@ impl SourceConfig for OpentelemetryConfig { self.grpc.address, grpc_tls_settings, builder.routes(), + self.grpc.keepalive.clone(), cx.shutdown.clone(), ) .map_err(|error| { diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index 00d642f41e47e..6e592e28fc03d 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -53,6 +53,7 @@ async fn receive_logs_legacy_namespace() { grpc: GrpcConfig { address: source_grpc_address().parse().unwrap(), tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: source_http_address().parse().unwrap(), @@ -152,6 +153,7 @@ async fn receive_trace() { grpc: GrpcConfig { address: source_grpc_address().parse().unwrap(), tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: source_http_address().parse().unwrap(), @@ -257,6 +259,7 @@ async fn receive_metric() { grpc: GrpcConfig { address: source_grpc_address().parse().unwrap(), tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: source_http_address().parse().unwrap(), diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 3952f78d3f70f..8bdd611caa702 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -206,6 +206,30 @@ fn generate_config() { test_util::test_generate_config::(); } +#[test] +fn config_grpc_keepalive() { + let config: OpentelemetryConfig = toml::from_str( + r#" + [grpc] + address = "0.0.0.0:4317" + + [grpc.keepalive] + max_connection_age_secs = 300 + max_connection_age_grace_secs = 30 + + [http] + address = "0.0.0.0:4318" + "#, + ) + .unwrap(); + + assert_eq!(config.grpc.keepalive.max_connection_age_secs, Some(300)); + assert_eq!( + config.grpc.keepalive.max_connection_age_grace_secs, + Some(30) + ); +} + #[tokio::test] async fn receive_grpc_logs_vector_namespace() { assert_source_compliance(&SOURCE_TAGS, async { @@ -1175,6 +1199,7 @@ fn get_source_config_with_headers( grpc: GrpcConfig { address: grpc_addr, tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: http_addr, @@ -1510,6 +1535,7 @@ pub async fn build_otlp_test_env( grpc: GrpcConfig { address: grpc_addr, tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: http_addr, @@ -1589,6 +1615,7 @@ async fn http_logs_use_otlp_decoding_emits_metric() { grpc: GrpcConfig { address: grpc_addr, tls: Default::default(), + keepalive: Default::default(), }, http: HttpConfig { address: http_addr, @@ -1823,6 +1850,7 @@ mod otlp_decoding_config_tests { grpc: GrpcConfig { address: "0.0.0.0:4317".parse().unwrap(), tls: None, + keepalive: Default::default(), }, http: HttpConfig { address: "0.0.0.0:4318".parse().unwrap(), @@ -1863,6 +1891,7 @@ mod otlp_decoding_config_tests { grpc: GrpcConfig { address: "0.0.0.0:4317".parse().unwrap(), tls: None, + keepalive: Default::default(), }, http: HttpConfig { address: "0.0.0.0:4318".parse().unwrap(), @@ -1906,6 +1935,7 @@ mod otlp_decoding_config_tests { grpc: GrpcConfig { address: "0.0.0.0:4317".parse().unwrap(), tls: None, + keepalive: Default::default(), }, http: HttpConfig { address: "0.0.0.0:4318".parse().unwrap(), diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index e2b8887cd000c..ebe52532e19df 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -1,14 +1,30 @@ -use std::{convert::Infallible, net::SocketAddr, time::Duration}; +use std::{ + convert::Infallible, + net::SocketAddr, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; -use futures::FutureExt; -use http::{Request, Response}; -use hyper::Body; +use futures::{FutureExt, StreamExt, future::BoxFuture}; +use http::{HeaderMap, Request, Response}; +use hyper::{Body, body::HttpBody}; +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, + time::{Sleep, sleep}, +}; use tonic::{ body::BoxBody, server::NamedService, - transport::server::{Routes, Server}, + transport::server::{Connected, Routes, Server}, }; -use tower::Service; +use tower::{Layer, Service}; use tower_http::{ classify::{GrpcErrorsAsFailures, SharedClassifier}, trace::TraceLayer, @@ -18,16 +34,327 @@ use tracing::Span; use crate::{ internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent}, shutdown::{ShutdownSignal, ShutdownSignalToken}, - tls::MaybeTlsSettings, + tls::{MaybeTlsIncomingStream, MaybeTlsSettings}, }; +use vector_lib::configurable::configurable_component; mod decompression; pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; +#[cfg(test)] +static MAX_CONNECTION_AGE_CONNECTION_OBSERVATIONS: std::sync::Mutex> = + std::sync::Mutex::new(Vec::new()); + +#[cfg(test)] +#[allow(dead_code)] +pub(crate) fn reset_max_connection_age_connection_observations() { + MAX_CONNECTION_AGE_CONNECTION_OBSERVATIONS + .lock() + .unwrap() + .clear(); +} + +#[cfg(test)] +#[allow(dead_code)] +pub(crate) fn max_connection_age_connection_observations() -> Vec { + MAX_CONNECTION_AGE_CONNECTION_OBSERVATIONS + .lock() + .unwrap() + .clone() +} + +/// Configuration of gRPC server keepalive parameters. +#[configurable_component] +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct GrpcKeepaliveConfig { + /// The maximum amount of time a connection may exist before the server closes it. + /// + /// When unset, connections are not closed based on age. + #[serde(default)] + #[configurable(metadata(docs::examples = 300))] + #[configurable(metadata(docs::type_unit = "seconds"))] + #[configurable(metadata(docs::human_name = "Maximum Connection Age"))] + pub max_connection_age_secs: Option, + + /// The grace period added to `max_connection_age_secs` before the server closes the connection. + /// + /// This setting only applies when `max_connection_age_secs` is set. + #[serde(default)] + #[configurable(metadata(docs::examples = 30))] + #[configurable(metadata(docs::type_unit = "seconds"))] + #[configurable(metadata(docs::human_name = "Maximum Connection Age Grace"))] + pub max_connection_age_grace_secs: Option, +} + +impl GrpcKeepaliveConfig { + fn max_connection_lifetime(&self) -> Option { + self.max_connection_age_secs.map(|max_connection_age_secs| { + let age = Duration::from_secs(max_connection_age_secs); + let grace = self + .max_connection_age_grace_secs + .map(Duration::from_secs) + .unwrap_or_default(); + + age.checked_add(grace).unwrap_or(Duration::MAX) + }) + } +} + +struct MaxConnectionAgeIo { + inner: MaybeTlsIncomingStream, + state: MaxConnectionAgeState, +} + +impl MaxConnectionAgeIo { + fn new(inner: MaybeTlsIncomingStream, lifetime: Option) -> Self { + #[cfg(test)] + if lifetime.is_some() { + MAX_CONNECTION_AGE_CONNECTION_OBSERVATIONS + .lock() + .unwrap() + .push(inner.peer_addr()); + } + + Self { + inner, + state: MaxConnectionAgeState::new(lifetime), + } + } +} + +struct MaxConnectionAgeState { + deadline: Option>>, + read_expired: bool, + active_requests: Arc, +} + +impl MaxConnectionAgeState { + fn new(lifetime: Option) -> Self { + Self { + deadline: lifetime.map(|lifetime| Box::pin(sleep(lifetime))), + read_expired: false, + active_requests: Arc::new(AtomicUsize::new(0)), + } + } + + fn is_read_expired(&mut self, cx: &mut Context<'_>) -> bool { + if self.read_expired { + return true; + } + + self.read_expired = self + .deadline + .as_mut() + .is_some_and(|deadline| deadline.as_mut().poll(cx).is_ready()); + + self.read_expired + } + + fn is_write_expired(&mut self, cx: &mut Context<'_>) -> bool { + self.is_read_expired(cx) && self.active_requests.load(Ordering::Acquire) == 0 + } + + fn active_requests(&self) -> Arc { + Arc::clone(&self.active_requests) + } + + #[cfg(test)] + fn is_read_expired_for_test(&mut self, cx: &mut Context<'_>) -> bool { + self.is_read_expired(cx) + } + + #[cfg(test)] + fn is_write_expired_for_test(&mut self, cx: &mut Context<'_>) -> bool { + self.is_write_expired(cx) + } +} + +impl AsyncRead for MaxConnectionAgeIo { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + if this.state.is_read_expired(cx) { + Poll::Ready(Ok(())) + } else { + Pin::new(&mut this.inner).poll_read(cx, buf) + } + } +} + +impl AsyncWrite for MaxConnectionAgeIo { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + if this.state.is_write_expired(cx) { + Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) + } else { + Pin::new(&mut this.inner).poll_write(cx, buf) + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if this.state.is_write_expired(cx) { + Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) + } else { + Pin::new(&mut this.inner).poll_flush(cx) + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_shutdown(cx) + } +} + +impl Connected for MaxConnectionAgeIo { + type ConnectInfo = MaxConnectionAgeConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + MaxConnectionAgeConnectInfo { + active_requests: self.state.active_requests(), + } + } +} + +#[derive(Clone, Debug)] +struct MaxConnectionAgeConnectInfo { + active_requests: Arc, +} + +#[derive(Clone)] +struct MaxConnectionAgeLayer; + +impl MaxConnectionAgeLayer { + const fn new() -> Self { + Self + } +} + +impl Layer for MaxConnectionAgeLayer { + type Service = MaxConnectionAgeService; + + fn layer(&self, service: S) -> Self::Service { + MaxConnectionAgeService { service } + } +} + +#[derive(Clone)] +struct MaxConnectionAgeService { + service: S, +} + +impl NamedService for MaxConnectionAgeService +where + S: NamedService, +{ + const NAME: &'static str = S::NAME; +} + +struct ActiveRequestGuard { + active_requests: Arc, +} + +impl ActiveRequestGuard { + fn new(active_requests: Arc) -> Self { + active_requests.fetch_add(1, Ordering::AcqRel); + Self { active_requests } + } +} + +impl Drop for ActiveRequestGuard { + fn drop(&mut self) { + self.active_requests.fetch_sub(1, Ordering::AcqRel); + } +} + +#[pin_project] +struct MaxConnectionAgeBody { + #[pin] + inner: B, + _guard: Option, +} + +impl MaxConnectionAgeBody { + const fn new(inner: B, guard: Option) -> Self { + Self { + inner, + _guard: guard, + } + } +} + +impl HttpBody for MaxConnectionAgeBody +where + B: HttpBody, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().inner.poll_data(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> hyper::body::SizeHint { + self.inner.size_hint() + } +} + +impl Service> for MaxConnectionAgeService +where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + B: HttpBody + Send + 'static, +{ + type Response = Response>; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let guard = req + .extensions() + .get::() + .map(|connect_info| ActiveRequestGuard::new(Arc::clone(&connect_info.active_requests))); + let future = self.service.call(req); + + async move { + future + .await + .map(|response| response.map(|body| MaxConnectionAgeBody::new(body, guard))) + } + .boxed() + } +} + pub async fn run_grpc_server( address: SocketAddr, tls_settings: MaybeTlsSettings, service: S, + keepalive: GrpcKeepaliveConfig, shutdown: ShutdownSignal, ) -> crate::Result<()> where @@ -41,11 +368,15 @@ where let span = Span::current(); let (tx, rx) = tokio::sync::oneshot::channel::(); let listener = tls_settings.bind(&address).await?; - let stream = listener.accept_stream(); + let max_connection_lifetime = keepalive.max_connection_lifetime(); + let stream = listener + .accept_stream() + .map(move |stream| stream.map(|io| MaxConnectionAgeIo::new(io, max_connection_lifetime))); info!(%address, "Building gRPC server."); Server::builder() + .layer(MaxConnectionAgeLayer::new()) .layer(build_grpc_trace_layer(span.clone())) // This layer explicitly decompresses payloads, if compressed, and reports the number of message bytes we've // received if the message is processed successfully, aka `BytesReceived`. We do this because otherwise the only @@ -72,16 +403,21 @@ pub async fn run_grpc_server_with_routes( address: SocketAddr, tls_settings: MaybeTlsSettings, routes: Routes, + keepalive: GrpcKeepaliveConfig, shutdown: ShutdownSignal, ) -> crate::Result<()> { let span = Span::current(); let (tx, rx) = tokio::sync::oneshot::channel::(); let listener = tls_settings.bind(&address).await?; - let stream = listener.accept_stream(); + let max_connection_lifetime = keepalive.max_connection_lifetime(); + let stream = listener + .accept_stream() + .map(move |stream| stream.map(|io| MaxConnectionAgeIo::new(io, max_connection_lifetime))); info!(%address, "Building gRPC server."); Server::builder() + .layer(MaxConnectionAgeLayer::new()) .layer(build_grpc_trace_layer(span.clone())) .layer(DecompressionAndMetricsLayer) .add_routes(routes) @@ -134,3 +470,120 @@ pub fn build_grpc_trace_layer( .on_body_chunk(()) .on_eos(()) } + +#[cfg(test)] +mod tests { + use std::future::{Ready, ready}; + + use super::*; + + #[derive(Clone)] + struct EmptyBodyService; + + impl Service> for EmptyBodyService { + type Response = Response; + type Error = Infallible; + type Future = Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request) -> Self::Future { + ready(Ok(Response::new(Body::empty()))) + } + } + + #[tokio::test] + async fn max_connection_age_service_tracks_response_body_until_drop() { + let active_requests = Arc::new(AtomicUsize::new(0)); + let mut service = MaxConnectionAgeService { + service: EmptyBodyService, + }; + let mut request = Request::new(Body::empty()); + request + .extensions_mut() + .insert(MaxConnectionAgeConnectInfo { + active_requests: Arc::clone(&active_requests), + }); + + assert_eq!(active_requests.load(Ordering::Acquire), 0); + + let response = service + .call(request) + .await + .expect("service call should succeed"); + + assert_eq!(active_requests.load(Ordering::Acquire), 1); + + drop(response); + + assert_eq!(active_requests.load(Ordering::Acquire), 0); + } + + #[tokio::test] + async fn max_connection_age_service_tracks_active_requests_per_connection() { + let first_connection_active_requests = Arc::new(AtomicUsize::new(0)); + let second_connection_active_requests = Arc::new(AtomicUsize::new(0)); + let mut service = MaxConnectionAgeService { + service: EmptyBodyService, + }; + let mut first_request = Request::new(Body::empty()); + first_request + .extensions_mut() + .insert(MaxConnectionAgeConnectInfo { + active_requests: Arc::clone(&first_connection_active_requests), + }); + let mut second_request = Request::new(Body::empty()); + second_request + .extensions_mut() + .insert(MaxConnectionAgeConnectInfo { + active_requests: Arc::clone(&second_connection_active_requests), + }); + + let first_response = service + .call(first_request) + .await + .expect("first service call should succeed"); + + assert_eq!(first_connection_active_requests.load(Ordering::Acquire), 1); + assert_eq!(second_connection_active_requests.load(Ordering::Acquire), 0); + + let second_response = service + .call(second_request) + .await + .expect("second service call should succeed"); + + assert_eq!(first_connection_active_requests.load(Ordering::Acquire), 1); + assert_eq!(second_connection_active_requests.load(Ordering::Acquire), 1); + + drop(second_response); + + assert_eq!(first_connection_active_requests.load(Ordering::Acquire), 1); + assert_eq!(second_connection_active_requests.load(Ordering::Acquire), 0); + + drop(first_response); + + assert_eq!(first_connection_active_requests.load(Ordering::Acquire), 0); + assert_eq!(second_connection_active_requests.load(Ordering::Acquire), 0); + } + + #[tokio::test] + async fn max_connection_age_state_stops_reads_at_deadline_before_writes() { + let mut state = MaxConnectionAgeState::new(Some(Duration::from_millis(1))); + let active_requests = state.active_requests(); + active_requests.fetch_add(1, Ordering::AcqRel); + + sleep(Duration::from_millis(10)).await; + + let waker = futures::task::noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + assert!(state.is_read_expired_for_test(&mut cx)); + assert!(!state.is_write_expired_for_test(&mut cx)); + + active_requests.fetch_sub(1, Ordering::AcqRel); + + assert!(state.is_write_expired_for_test(&mut cx)); + } +} diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index a1e0137258489..52faab935dd64 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -23,7 +23,10 @@ use crate::{ internal_events::{EventsReceived, StreamClosedError}, proto::vector as proto, serde::bool_or_struct, - sources::{Source, util::grpc::run_grpc_server_with_routes}, + sources::{ + Source, + util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes}, + }, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; @@ -135,6 +138,10 @@ pub struct VectorConfig { #[serde(default, deserialize_with = "bool_or_struct")] acknowledgements: SourceAcknowledgementsConfig, + #[configurable(derived)] + #[serde(default)] + keepalive: GrpcKeepaliveConfig, + /// The namespace to use for logs. This overrides the global setting. #[serde(default)] #[configurable(metadata(docs::hidden))] @@ -158,6 +165,7 @@ impl Default for VectorConfig { address: "0.0.0.0:6000".parse().unwrap(), tls: None, acknowledgements: Default::default(), + keepalive: Default::default(), log_namespace: None, } } @@ -204,11 +212,16 @@ impl SourceConfig for VectorConfig { .add_service(health_service) .add_service(vector_service); - let source = - run_grpc_server_with_routes(self.address, tls_settings, builder.routes(), cx.shutdown) - .map_err(|error| { - error!(message = "Source future failed.", %error); - }); + let source = run_grpc_server_with_routes( + self.address, + tls_settings, + builder.routes(), + self.keepalive.clone(), + cx.shutdown, + ) + .map_err(|error| { + error!(message = "Source future failed.", %error); + }); Ok(Box::pin(source)) } @@ -241,13 +254,78 @@ mod test { use vrl::value::{Kind, kind::Collection}; use super::VectorConfig; - use crate::config::SourceConfig; + use crate::{ + SourceSender, + config::{SourceConfig, SourceContext}, + test_util, + }; #[test] fn generate_config() { crate::test_util::test_generate_config::(); } + #[test] + fn config_keepalive() { + let config: VectorConfig = toml::from_str( + r#" + address = "0.0.0.0:6000" + + [keepalive] + max_connection_age_secs = 300 + max_connection_age_grace_secs = 30 + "#, + ) + .unwrap(); + + assert_eq!(config.keepalive.max_connection_age_secs, Some(300)); + assert_eq!(config.keepalive.max_connection_age_grace_secs, Some(30)); + } + + #[tokio::test] + async fn max_connection_age_closes_idle_connection() { + use tokio::{ + io::AsyncReadExt, + net::TcpStream, + time::{Duration, sleep, timeout}, + }; + + let (_guard, addr) = test_util::addr::next_addr(); + let source_config = format!( + r#" + address = "{addr}" + + [keepalive] + max_connection_age_secs = 1 + "# + ); + let source: VectorConfig = toml::from_str(&source_config).unwrap(); + + let (tx, _rx) = SourceSender::new_test(); + let server = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(server); + test_util::wait_for_tcp(addr).await; + + let mut stream = TcpStream::connect(addr).await.unwrap(); + sleep(Duration::from_millis(1500)).await; + + let mut buf = [0; 32]; + let read = timeout(Duration::from_secs(2), async { + loop { + if stream.read(&mut buf).await.unwrap() == 0 { + break 0; + } + } + }) + .await + .unwrap(); + + assert_eq!(read, 0); + } + #[test] fn output_schema_definition_vector_namespace() { let config = VectorConfig::default(); @@ -402,6 +480,79 @@ mod tests { ); } + #[tokio::test] + async fn max_connection_age_allows_client_reconnect() { + use tokio::time::{Duration, sleep}; + use tonic::transport::Channel; + + use crate::sources::util::grpc::{ + max_connection_age_connection_observations, + reset_max_connection_age_connection_observations, + }; + + let (_guard, addr) = test_util::addr::next_addr(); + + let config = format!( + r#" + address = "{addr}" + + [keepalive] + max_connection_age_secs = 1 + "# + ); + let source: VectorConfig = toml::from_str(&config).unwrap(); + + reset_max_connection_age_connection_observations(); + + let (tx, _rx) = SourceSender::new_test(); + let server = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(server); + test_util::wait_for_tcp(addr).await; + + let endpoint = format!("http://{addr}"); + let channel = Channel::from_shared(endpoint) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = proto::Client::new(channel); + + let response = client + .health_check(proto::HealthCheckRequest {}) + .await + .unwrap(); + assert_eq!( + response.into_inner().status, + proto::ServingStatus::Serving as i32 + ); + let observations_before_expiry = max_connection_age_connection_observations(); + assert!(!observations_before_expiry.is_empty()); + + sleep(Duration::from_millis(1500)).await; + + let response = client + .health_check(proto::HealthCheckRequest {}) + .await + .unwrap(); + assert_eq!( + response.into_inner().status, + proto::ServingStatus::Serving as i32 + ); + let observations = max_connection_age_connection_observations(); + assert!( + observations.len() > observations_before_expiry.len(), + "expected second RPC to reconnect after max connection age elapsed, got observations: {observations:?}", + ); + assert!(observations.iter().any(|peer_addr| { + !observations_before_expiry + .iter() + .any(|observed| observed == peer_addr) + })); + } + #[tokio::test] async fn standard_grpc_health_check_works() { use tonic::transport::Channel; diff --git a/website/cue/reference/components/sources/generated/opentelemetry.cue b/website/cue/reference/components/sources/generated/opentelemetry.cue index ca4150550c771..82859ea7dfb19 100644 --- a/website/cue/reference/components/sources/generated/opentelemetry.cue +++ b/website/cue/reference/components/sources/generated/opentelemetry.cue @@ -28,6 +28,10 @@ generated: components: sources: opentelemetry: configuration: { type: object: { examples: [{ address: "0.0.0.0:4317" + keepalive: { + max_connection_age_grace_secs: null + max_connection_age_secs: null + } }] options: { address: { @@ -39,6 +43,36 @@ generated: components: sources: opentelemetry: configuration: { required: true type: string: examples: ["0.0.0.0:4317", "localhost:4317"] } + keepalive: { + description: "Configuration of gRPC server keepalive parameters." + required: false + type: object: options: { + max_connection_age_grace_secs: { + description: """ + The grace period added to `max_connection_age_secs` before the server closes the connection. + + This setting only applies when `max_connection_age_secs` is set. + """ + required: false + type: uint: { + examples: [30] + unit: "seconds" + } + } + max_connection_age_secs: { + description: """ + The maximum amount of time a connection may exist before the server closes it. + + When unset, connections are not closed based on age. + """ + required: false + type: uint: { + examples: [300] + unit: "seconds" + } + } + } + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false diff --git a/website/cue/reference/components/sources/generated/vector.cue b/website/cue/reference/components/sources/generated/vector.cue index 3c2fe06b852e7..3f50fa7b51538 100644 --- a/website/cue/reference/components/sources/generated/vector.cue +++ b/website/cue/reference/components/sources/generated/vector.cue @@ -31,6 +31,36 @@ generated: components: sources: vector: configuration: { required: true type: string: {} } + keepalive: { + description: "Configuration of gRPC server keepalive parameters." + required: false + type: object: options: { + max_connection_age_grace_secs: { + description: """ + The grace period added to `max_connection_age_secs` before the server closes the connection. + + This setting only applies when `max_connection_age_secs` is set. + """ + required: false + type: uint: { + examples: [30] + unit: "seconds" + } + } + max_connection_age_secs: { + description: """ + The maximum amount of time a connection may exist before the server closes it. + + When unset, connections are not closed based on age. + """ + required: false + type: uint: { + examples: [300] + unit: "seconds" + } + } + } + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false