From 75fcbe3a1ad640a7b7ef38e2e3bb67e8734f870b Mon Sep 17 00:00:00 2001 From: Filip Pytloun Date: Mon, 22 Jun 2026 16:46:30 +0200 Subject: [PATCH] feat(vector sink): add multiple endpoint strategies --- .../vector_sink_multiple_backends.feature.md | 3 + src/sinks/util/service/health.rs | 19 +- src/sinks/vector/config.rs | 403 ++++++++++++++++-- src/sinks/vector/mod.rs | 388 ++++++++++++++++- .../components/sinks/generated/vector.cue | 62 ++- 5 files changed, 846 insertions(+), 29 deletions(-) create mode 100644 changelog.d/vector_sink_multiple_backends.feature.md diff --git a/changelog.d/vector_sink_multiple_backends.feature.md b/changelog.d/vector_sink_multiple_backends.feature.md new file mode 100644 index 0000000000000..848b16bbc7451 --- /dev/null +++ b/changelog.d/vector_sink_multiple_backends.feature.md @@ -0,0 +1,3 @@ +Add support for configuring multiple endpoints in the `vector` sink via the new `addresses` option, enabling built-in load balancing and failover across downstream Vector instances. + +authors: fpytloun diff --git a/src/sinks/util/service/health.rs b/src/sinks/util/service/health.rs index b764ebcad64b5..f87b73f90d9d4 100644 --- a/src/sinks/util/service/health.rs +++ b/src/sinks/util/service/health.rs @@ -29,7 +29,7 @@ const UNHEALTHY_AMOUNT_OF_ERRORS: usize = 5; /// Options for determining the health of an endpoint. #[serde_as] #[configurable_component] -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] #[serde(rename_all = "snake_case")] pub struct HealthConfig { /// Initial delay between attempts to reactivate endpoints once they become unhealthy. @@ -46,6 +46,15 @@ pub struct HealthConfig { pub retry_max_duration_secs: Duration, } +impl Default for HealthConfig { + fn default() -> Self { + Self { + retry_initial_backoff_secs: default_retry_initial_backoff_secs(), + retry_max_duration_secs: default_retry_max_duration_secs(), + } + } +} + const fn default_retry_initial_backoff_secs() -> u64 { RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT } @@ -329,4 +338,12 @@ mod tests { counters.inc_healthy(); assert!(counters.healthy(snapshot).is_ok()); } + + #[test] + fn default_health_config_matches_documented_defaults() { + let config = HealthConfig::default(); + + assert_eq!(config.retry_initial_backoff_secs, 1); + assert_eq!(config.retry_max_duration_secs, Duration::from_secs(3_600)); + } } diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index ae2847073a649..be51d5b6d6724 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -1,9 +1,18 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, +}; + +use futures::{FutureExt, TryFutureExt, future::BoxFuture}; use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; use tonic::body::BoxBody; -use tower::ServiceBuilder; +use tower::{Service, ServiceBuilder}; use vector_lib::configurable::configurable_component; use super::{ @@ -22,8 +31,9 @@ use crate::{ sinks::{ Healthcheck, VectorSink as VectorSinkType, util::{ - BatchConfig, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt, - TowerRequestConfig, retries::RetryLogic, + BatchConfig, RealtimeEventBasedDefaultBatchSettings, TowerRequestConfig, + retries::RetryLogic, + service::{HealthConfig, HealthLogic, ServiceBuilderExt}, }, }, tls::{MaybeTlsSettings, TlsEnableableConfig}, @@ -45,10 +55,28 @@ pub struct VectorConfig { /// Both IP address and hostname are accepted formats. /// /// The address _must_ include a port. + /// + /// This option is mutually exclusive with `addresses`. Set exactly one of + /// `address` or `addresses`. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "92.12.333.224:6000"))] #[configurable(metadata(docs::examples = "https://somehost:6000"))] - address: String, + #[serde(default)] + address: Option, + + /// The downstream Vector addresses to which to connect. + /// + /// Both IP addresses and hostnames are accepted formats. + /// + /// Each address _must_ include a port. + /// + /// This option is mutually exclusive with `address`. Set exactly one of + /// `address` or `addresses`. + #[configurable(validation(format = "uri"))] + #[configurable(metadata(docs::examples = "92.12.333.224:6000"))] + #[configurable(metadata(docs::examples = "https://somehost:6000"))] + #[serde(default)] + addresses: Vec, /// Compression algorithm for requests. /// @@ -72,6 +100,17 @@ pub struct VectorConfig { #[serde(default)] pub request: TowerRequestConfig, + /// Options for determining the health of Vector endpoints. + #[serde(default)] + #[configurable(derived)] + pub endpoint_health: Option, + + /// Strategy for routing requests across multiple configured addresses. + /// + /// This option is only used when `addresses` is configured. + #[serde(default)] + pub endpoint_strategy: EndpointStrategy, + #[configurable(derived)] #[serde(default)] tls: Option, @@ -102,10 +141,13 @@ impl GenerateConfig for VectorConfig { fn default_config(address: &str) -> VectorConfig { VectorConfig { version: None, - address: address.to_owned(), + address: Some(address.to_owned()), + addresses: Vec::new(), compression: VectorCompression::None, batch: BatchConfig::default(), request: TowerRequestConfig::default(), + endpoint_health: None, + endpoint_strategy: EndpointStrategy::default(), tls: None, acknowledgements: Default::default(), } @@ -116,36 +158,73 @@ fn default_config(address: &str) -> VectorConfig { impl SinkConfig for VectorConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSinkType, Healthcheck)> { let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?; - let uri = with_default_scheme(&self.address, tls.is_tls())?; + let uris = self.uris(tls.is_tls())?; let client = new_client(&tls, cx.proxy())?; - let healthcheck_uri = cx - .healthcheck - .uri - .clone() - .map(|uri| uri.uri) - .unwrap_or_else(|| uri.clone()); - let healthcheck_client = - VectorService::new(client.clone(), healthcheck_uri, VectorCompression::None); - let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); - let service = VectorService::new(client, uri, self.compression); + let healthcheck = healthchecks(client.clone(), &uris, cx.healthcheck); let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; - let service = ServiceBuilder::new() - .settings(request_settings, VectorGrpcRetryLogic) - .service(service); + let services = uris + .into_iter() + .map(|uri| { + let endpoint = uri.to_string(); + let service = VectorService::new(client.clone(), uri, self.compression); + (endpoint, service) + }) + .collect::>(); - let sink = VectorSink { - batch_settings, - service, + let sink = match self.endpoint_strategy { + _ if services.len() == 1 => { + let service = ServiceBuilder::new() + .settings(request_settings, VectorGrpcRetryLogic) + .service(services.into_iter().next().expect("one service").1); + + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } + EndpointStrategy::LoadBalance => { + let service = request_settings.distributed_service( + VectorGrpcRetryLogic, + services, + self.endpoint_health.clone().unwrap_or_default(), + VectorGrpcHealthLogic, + 1, + ); + + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } + EndpointStrategy::Failover => { + let endpoint_timeout = request_settings.timeout; + let mut failover_request_settings = request_settings; + failover_request_settings.timeout = endpoint_timeout + .checked_mul((services.len() + 1) as u32) + .unwrap_or(endpoint_timeout); + + let service = ServiceBuilder::new() + .settings(failover_request_settings, VectorGrpcRetryLogic) + .service(FailoverVectorService::new( + services + .into_iter() + .map(|(_endpoint, service)| service) + .collect(), + endpoint_timeout, + )); + + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } }; - Ok(( - VectorSinkType::from_event_streamsink(sink), - Box::pin(healthcheck), - )) + Ok((sink, Box::pin(healthcheck))) } fn input(&self) -> Input { @@ -157,6 +236,167 @@ impl SinkConfig for VectorConfig { } } +/// Strategy for routing requests across multiple Vector endpoints. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum EndpointStrategy { + /// Distribute requests across healthy endpoints. + #[default] + LoadBalance, + /// Use endpoints in configured order, moving to the next endpoint only when + /// the current endpoint fails. + Failover, +} + +#[derive(Clone)] +struct FailoverVectorService { + services: Vec, + state: Arc, + endpoint_timeout: std::time::Duration, +} + +impl FailoverVectorService { + fn new(services: Vec, endpoint_timeout: std::time::Duration) -> Self { + Self { + services, + state: Arc::new(AtomicUsize::new(0)), + endpoint_timeout, + } + } +} + +impl Service for FailoverVectorService { + type Response = VectorResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: VectorRequest) -> Self::Future { + let services = self.services.clone(); + let state = Arc::clone(&self.state); + let endpoint_timeout = self.endpoint_timeout; + + Box::pin(async move { + let start_state = state.load(Ordering::Acquire); + let start = failover_state_index(start_state, services.len()); + let mut expected_state = start_state; + let mut last_error = None; + + for offset in 0..services.len() { + let index = (start + offset) % services.len(); + let next_index = (index + 1) % services.len(); + let mut service = services[index].clone(); + + match tokio::time::timeout(endpoint_timeout, service.call(request.clone())).await { + Ok(Ok(response)) => { + return Ok(response); + } + Ok(Err(error)) => { + if !is_retriable_vector_error(&error) { + return Err(error); + } + + expected_state = failover_advance_if_current( + &state, + expected_state, + index, + next_index, + services.len(), + ); + last_error = Some(error); + } + Err(_elapsed) => { + expected_state = failover_advance_if_current( + &state, + expected_state, + index, + next_index, + services.len(), + ); + last_error = Some(Box::new(VectorSinkError::Request { + source: tonic::Status::deadline_exceeded( + "vector endpoint request timed out", + ), + }) as crate::Error); + } + } + } + + Err(last_error.expect("failover service should have at least one endpoint")) + }) + } +} + +const fn failover_state_index(state: usize, endpoints: usize) -> usize { + state % endpoints +} + +const fn failover_next_state(state: usize, next_index: usize, endpoints: usize) -> usize { + let generation = state / endpoints; + (generation + 1) * endpoints + next_index +} + +fn failover_advance_if_current( + state: &AtomicUsize, + expected_state: usize, + index: usize, + next_index: usize, + endpoints: usize, +) -> usize { + let mut current_state = expected_state; + + loop { + if failover_state_index(current_state, endpoints) != index { + let actual_state = state.load(Ordering::Acquire); + if actual_state == current_state + || failover_state_index(actual_state, endpoints) != index + { + return actual_state; + } + current_state = actual_state; + continue; + } + + let next_state = failover_next_state(current_state, next_index, endpoints); + match state.compare_exchange( + current_state, + next_state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return next_state, + Err(actual) => current_state = actual, + } + } +} + +fn is_retriable_vector_error(error: &crate::Error) -> bool { + error + .downcast_ref::() + .is_none_or(|error| VectorGrpcRetryLogic.is_retriable_error(error)) +} + +impl VectorConfig { + fn uris(&self, tls: bool) -> crate::Result> { + match (self.address.as_ref(), self.addresses.as_slice()) { + (Some(_), [_first, ..]) => Err( + "`address` and `addresses` options are mutually exclusive. Please use `addresses` for multiple Vector endpoints." + .into(), + ), + (None, []) => Err("No Vector endpoint configured. Please set `address` or `addresses`.".into()), + (Some(address), []) => Ok(vec![with_default_scheme(address, tls)?]), + (None, addresses) => addresses + .iter() + .map(|address| with_default_scheme(address, tls)) + .collect(), + } + } +} + /// Check to see if the remote service accepts new events. async fn healthcheck( mut service: VectorService, @@ -183,6 +423,39 @@ async fn healthcheck( } } +fn healthchecks( + client: hyper::Client>, BoxBody>, + uris: &[Uri], + options: SinkHealthcheckOptions, +) -> Healthcheck { + if !options.enabled { + return Box::pin(futures::future::ok(())); + } + + let healthcheck_uris = options + .uri + .clone() + .map(|uri| vec![uri.uri]) + .unwrap_or_else(|| uris.to_vec()); + + Box::pin( + futures::future::select_ok(healthcheck_uris.into_iter().map(move |uri| { + let service = VectorService::new(client.clone(), uri, VectorCompression::None); + let timeout = options.timeout; + healthcheck( + service, + SinkHealthcheckOptions { + enabled: true, + uri: None, + timeout, + }, + ) + .boxed() + })) + .map_ok(|((), _)| ()), + ) +} + /// grpc doesn't like an address without a scheme, so we default to http or https if one isn't /// specified in the address. pub fn with_default_scheme(address: &str, tls: bool) -> crate::Result { @@ -256,3 +529,81 @@ impl RetryLogic for VectorGrpcRetryLogic { } } } + +#[derive(Debug, Clone)] +struct VectorGrpcHealthLogic; + +impl HealthLogic for VectorGrpcHealthLogic { + type Error = crate::Error; + type Response = VectorResponse; + + fn is_healthy(&self, response: &Result) -> Option { + match response { + Ok(_) => Some(true), + Err(error) if is_retriable_vector_error(error) => Some(false), + Err(_) => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn health_logic_ignores_non_retriable_vector_errors() { + let response = Err(Box::new(VectorSinkError::Request { + source: tonic::Status::data_loss("batch rejected"), + }) as crate::Error); + + assert_eq!(VectorGrpcHealthLogic.is_healthy(&response), None); + } + + #[test] + fn health_logic_marks_retriable_vector_errors_unhealthy() { + let response = Err(Box::new(VectorSinkError::Request { + source: tonic::Status::unavailable("endpoint unavailable"), + }) as crate::Error); + + assert_eq!(VectorGrpcHealthLogic.is_healthy(&response), Some(false)); + } + + #[test] + fn failover_advance_reloads_stale_generation() { + let endpoints = 2; + let state = AtomicUsize::new(failover_next_state( + failover_next_state(0, 1, endpoints), + 0, + endpoints, + )); + + let observed = failover_advance_if_current(&state, 0, 0, 1, endpoints); + + assert_eq!(observed, 7); + assert_eq!(state.load(Ordering::Acquire), 7); + } + + #[test] + fn failover_advance_reloads_stale_mismatched_state() { + let endpoints = 3; + let shared_state = failover_next_state(failover_next_state(0, 1, endpoints), 0, endpoints); + let stale_state = 1; + let state = AtomicUsize::new(shared_state); + + let observed = failover_advance_if_current(&state, stale_state, 0, 1, endpoints); + + assert_eq!(observed, 10); + assert_eq!(state.load(Ordering::Acquire), 10); + } + + #[test] + fn failover_advance_ignores_current_non_matching_endpoint() { + let endpoints = 3; + let state = AtomicUsize::new(5); + + let observed = failover_advance_if_current(&state, 0, 0, 1, endpoints); + + assert_eq!(observed, 5); + assert_eq!(state.load(Ordering::Acquire), 5); + } +} diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index c3abd90d6b629..cd7cb6a95531c 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -43,8 +43,15 @@ mod tests { use bytes::{BufMut, Bytes, BytesMut}; use futures::{StreamExt, channel::mpsc}; use http::request::Parts; - use hyper::Method; + use hyper::{ + Method, Response, Server, + service::{make_service_fn, service_fn}, + }; use prost::Message; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; use vector_lib::{ config::{Tags, Telemetry, init_telemetry}, event::{BatchNotifier, BatchStatus}, @@ -74,6 +81,67 @@ mod tests { crate::test_util::test_generate_config::(); } + #[tokio::test] + async fn build_rejects_missing_address() { + let config: VectorConfig = toml::from_str("").unwrap(); + + let err = match config.build(SinkContext::default()).await { + Ok(_) => panic!("missing address should fail"), + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("No Vector endpoint configured. Please set `address` or `addresses`."), + "{err}" + ); + } + + #[tokio::test] + async fn build_rejects_address_and_addresses() { + let config: VectorConfig = toml::from_str( + r#" + address = "http://127.0.0.1:6000" + addresses = ["http://127.0.0.1:6001"] + "#, + ) + .unwrap(); + + let err = match config.build(SinkContext::default()).await { + Ok(_) => panic!("address and addresses should be mutually exclusive"), + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("`address` and `addresses` options are mutually exclusive"), + "{err}" + ); + } + + #[test] + fn parse_addresses_config() { + let config: Result = toml::from_str( + r#" + addresses = ["http://127.0.0.1:6000", "http://127.0.0.1:6001"] + "#, + ); + + assert!(config.is_ok()); + } + + #[test] + fn parse_failover_endpoint_strategy() { + let config: Result = toml::from_str( + r#" + addresses = ["http://127.0.0.1:6000", "http://127.0.0.1:6001"] + endpoint_strategy = "failover" + "#, + ); + + assert!(config.is_ok()); + } + enum TestType { Normal, DataVolume, @@ -163,6 +231,324 @@ mod tests { run_sink_test(TestType::Normal).await; } + #[tokio::test] + async fn deliver_message_to_multiple_addresses() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::default(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (mut input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let (mut output_lines, mut output_lines2) = + futures::future::join(get_received(rx1, |_| {}), get_received(rx2, |_| {})).await; + + output_lines.append(&mut output_lines2); + + input_lines.sort(); + output_lines.sort(); + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_strategy_prefers_first_address() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let (output_lines, output_lines2) = + futures::future::join(get_received(rx1, |_| {}), get_received(rx2, |_| {})).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + assert!(output_lines2.is_empty()); + } + + #[tokio::test] + async fn failover_strategy_uses_next_address_when_first_fails() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx2, |_| {}).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_strategy_does_not_resend_non_retriable_errors() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (_rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "15") // data loss + .header("content-type", "application/grpc") + .body(tonic::body::empty_body()) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + sink.run(events).await.expect("Running sink failed"); + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + assert!( + get_received(rx2, |_| {}).await.is_empty(), + "non-retriable primary rejection must not be resent to secondary endpoint" + ); + } + + #[tokio::test] + async fn failover_strategy_uses_next_address_when_first_times_out() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + + [request] + timeout_secs = 1 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let hanging_service = make_service_fn(|_| async { + Ok::<_, crate::Error>(service_fn(|_req| async { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + Ok::<_, crate::Error>( + Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap(), + ) + })) + }); + let hanging_server = tokio::spawn(Server::bind(&addr1).serve(hanging_service)); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + hanging_server.abort(); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx2, |_| {}).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_strategy_retries_after_all_endpoints_time_out() { + let num_lines = 10; + + let (_guard, addr) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr}/"] + endpoint_strategy = "failover" + + [request] + timeout_secs = 1 + retry_initial_backoff_secs = 1 + retry_max_duration_secs = 5 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let attempts = Arc::new(AtomicUsize::new(0)); + let service_attempts = Arc::clone(&attempts); + let service = make_service_fn(move |_| { + let service_attempts = Arc::clone(&service_attempts); + async move { + Ok::<_, crate::Error>(service_fn(move |_req| { + let attempt = service_attempts.fetch_add(1, Ordering::AcqRel); + async move { + if attempt == 0 { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + + Ok::<_, crate::Error>( + Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap(), + ) + } + })) + } + }); + let server = tokio::spawn(Server::bind(&addr).serve(service)); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + server.abort(); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert!( + attempts.load(Ordering::Acquire) > 1, + "sink should retry after endpoint timeout" + ); + } + #[tokio::test] async fn deliver_message_gzip() { run_sink_test_with_compression(TestType::Normal, Some("gzip")).await; diff --git a/website/cue/reference/components/sinks/generated/vector.cue b/website/cue/reference/components/sinks/generated/vector.cue index c1bc55fcfe0fe..04cf5ca109dcd 100644 --- a/website/cue/reference/components/sinks/generated/vector.cue +++ b/website/cue/reference/components/sinks/generated/vector.cue @@ -34,10 +34,30 @@ generated: components: sinks: vector: configuration: { Both IP address and hostname are accepted formats. The address _must_ include a port. + + This option is mutually exclusive with `addresses`. Set exactly one of + `address` or `addresses`. """ - required: true + required: false type: string: examples: ["92.12.333.224:6000", "https://somehost:6000"] } + addresses: { + description: """ + The downstream Vector addresses to which to connect. + + Both IP addresses and hostnames are accepted formats. + + Each address _must_ include a port. + + This option is mutually exclusive with `address`. Set exactly one of + `address` or `addresses`. + """ + required: false + type: array: { + default: [] + items: type: string: examples: ["92.12.333.224:6000", "https://somehost:6000"] + } + } batch: { description: "Event batching behavior." required: false @@ -98,6 +118,46 @@ generated: components: sinks: vector: configuration: { } } } + endpoint_health: { + description: "Options for determining the health of Vector endpoints." + required: false + type: object: options: { + retry_initial_backoff_secs: { + description: "Initial delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "Maximum delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + } + } + endpoint_strategy: { + description: """ + Strategy for routing requests across multiple configured addresses. + + This option is only used when `addresses` is configured. + """ + required: false + type: string: { + default: "load_balance" + enum: { + failover: """ + Use endpoints in configured order, moving to the next endpoint only when + the current endpoint fails. + """ + load_balance: "Distribute requests across healthy endpoints." + } + } + } request: { description: """ Middleware settings for outbound requests.