From df170e3eae2833edf70d765a635cb3773cab8866 Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Fri, 10 Oct 2025 18:13:29 +0300 Subject: [PATCH 1/4] Expose idle timeout settings for HTTP connection --- src/http.rs | 22 ++++++++++++++- src/sinks/axiom.rs | 1 + src/sinks/http/config.rs | 34 ++++++++++++++++++++++ src/sinks/http/tests.rs | 1 + src/sinks/humio/logs.rs | 1 + src/sinks/opentelemetry/mod.rs | 1 + src/sinks/splunk_hec/common/util.rs | 15 ++++++++++ src/sinks/splunk_hec/logs/config.rs | 44 +++++++++++++++++++++++++++-- src/sinks/splunk_hec/logs/tests.rs | 1 + src/sinks/util/http.rs | 18 ++++++++++++ src/sources/splunk_hec/mod.rs | 1 + 11 files changed, 136 insertions(+), 3 deletions(-) diff --git a/src/http.rs b/src/http.rs index 76aff937631d3..de6a722ea3f0a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -40,6 +40,7 @@ use crate::{ internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent}, tls::{tls_connector_builder, MaybeTlsSettings, TlsError}, }; +use crate::sinks::util::http::ConnectionConfig; pub mod status { pub const FORBIDDEN: u16 = 403; @@ -92,7 +93,26 @@ where tls_settings: impl Into, proxy_config: &ProxyConfig, ) -> Result, HttpError> { - HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut Client::builder()) + HttpClient::new_with_connection_config(tls_settings, proxy_config, None) + } + + pub fn new_with_connection_config( + tls_settings: impl Into, + proxy_config: &ProxyConfig, + connection_config: Option, + ) -> Result, HttpError> { + let mut builder = Client::builder(); + + if let Some(config) = connection_config { + if let Some(idle_secs) = config.idle_timeout_secs { + builder.pool_idle_timeout(Duration::from_secs(idle_secs)); + } + if let Some(max_idle) = config.pool_idle_per_host { + builder.pool_max_idle_per_host(max_idle); + } + } + + HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut builder) } pub fn new_with_custom_client( diff --git a/src/sinks/axiom.rs b/src/sinks/axiom.rs index dfc4ab124dcc1..c30d679d78a66 100644 --- a/src/sinks/axiom.rs +++ b/src/sinks/axiom.rs @@ -133,6 +133,7 @@ impl SinkConfig for AxiomConfig { ), payload_prefix: "".into(), // Always newline delimited JSON payload_suffix: "".into(), // Always newline delimited JSON + connection: None, }; http_sink_config.build(cx).await diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 4094768c34a43..366c0caf4ce3f 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -34,6 +34,7 @@ use crate::{ }, }, }; +use crate::sinks::util::http::ConnectionConfig; const CONTENT_TYPE_TEXT: &str = "text/plain"; const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson"; @@ -107,6 +108,10 @@ pub struct HttpSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } /// HTTP method. @@ -392,6 +397,7 @@ mod tests { acknowledgements: AcknowledgementsConfig::default(), payload_prefix: String::new(), payload_suffix: String::new(), + connection: None, }; let external_resource = ExternalResource::new( @@ -413,4 +419,32 @@ mod tests { } register_validatable_component!(HttpSinkConfig); + + #[test] + fn deserialize_connection_config_defaults() { + let cfg: ConnectionConfig = serde_yaml::from_str("{}").unwrap(); + // Defaults should be None + assert!(cfg.idle_timeout_secs.is_none()); + assert!(cfg.pool_idle_per_host.is_none()); + } + + #[test] + fn http_sink_config_with_connection() { + let yaml = r#" +uri: "http://example.com" +encoding: + codec: "json" +connection: + idle_timeout_secs: 120 + pool_idle_per_host: 20 +"#; + + let cfg: HttpSinkConfig = serde_yaml::from_str(yaml).unwrap(); + + assert_eq!(cfg.uri.uri, "http://example.com"); + assert!(cfg.connection.is_some()); + let conn = cfg.connection.unwrap(); + assert_eq!(conn.idle_timeout_secs, Some(120)); + assert_eq!(conn.pool_idle_per_host, Some(20)); + } } diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 363877380c308..dfe6bfb121836 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -60,6 +60,7 @@ fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + connection: Default::default(), } } diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index ba6b00f066407..5566e83847e9f 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -209,6 +209,7 @@ impl HumioLogsConfig { timestamp_key: Some(config_timestamp_key_target_path()), endpoint_target: EndpointTarget::Event, auto_extract_timestamp: None, + connection: None, } } } diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 90d26da27e202..d572a841f26ff 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -48,6 +48,7 @@ impl Default for Protocol { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + connection: None, }) } } diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 99bfc1fbd6d16..5a38d91ae6052 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -24,6 +24,7 @@ use crate::{ template::Template, tls::{TlsConfig, TlsSettings}, }; +use crate::sinks::util::http::ConnectionConfig; #[derive(Clone, Copy, Debug, Default)] pub struct SplunkHecDefaultBatchSettings; @@ -50,6 +51,20 @@ pub fn create_client( Ok(HttpClient::new(tls_settings, proxy_config)?) } +pub fn create_client_with_connection_config( + tls: Option<&TlsConfig>, + proxy_config: &ProxyConfig, + connection: Option, +) -> crate::Result { + let tls_settings = TlsSettings::from_options(tls)?; + + if let Some(conn) = connection { + Ok(HttpClient::new_with_connection_config(tls_settings, proxy_config, Some(conn))?) + } else { + Ok(HttpClient::new(tls_settings, proxy_config)?) + } +} + // TODO: `HttpBatchService` has been deprecated for direct use in sinks. // This sink should undergo a refactor to utilize the `HttpService` // instead, which extracts much of the boilerplate code for `Service`. diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 7479a58d0b2ba..3f539ceadea8c 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -19,7 +19,8 @@ use crate::{ util::http::HttpRetryLogic, }, }; - +use crate::sinks::splunk_hec::common::create_client_with_connection_config; +use crate::sinks::util::http::ConnectionConfig; use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; /// Configuration for the `splunk_hec_logs` sink. @@ -152,6 +153,15 @@ pub struct HecLogsSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(default = "default_endpoint_target")] pub endpoint_target: EndpointTarget, + + /// Connection-level settings for the underlying HTTP client. + /// + /// This allows configuring parameters like connection idle timeout and + /// maximum idle connections per host. Useful when running behind load + /// balancers with strict idle policies. + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } const fn default_endpoint_target() -> EndpointTarget { @@ -178,6 +188,7 @@ impl GenerateConfig for HecLogsSinkConfig { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, + connection: None, }) .unwrap() } @@ -191,7 +202,14 @@ impl SinkConfig for HecLogsSinkConfig { return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into()); } - let client = create_client(self.tls.as_ref(), cx.proxy())?; + //let client = create_client(self.tls.as_ref(), cx.proxy())?; + + let client = if let Some(conn) = &self.connection { + create_client_with_connection_config(self.tls.as_ref(), cx.proxy(), Some(conn.clone()))? + } else { + create_client(self.tls.as_ref(), cx.proxy())? + }; + let healthcheck = build_healthcheck( self.endpoint.clone(), self.default_token.inner().to_owned(), @@ -334,6 +352,7 @@ mod tests { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, + connection: None, }; let endpoint = format!("{endpoint}/services/collector/raw"); @@ -360,4 +379,25 @@ mod tests { } register_validatable_component!(HecLogsSinkConfig); + + #[test] + fn splunk_config_with_connection() { + let yaml = r#" +default_token: "test_token" +endpoint: "http://splunk:8088" +encoding: + codec: "json" +connection: + idle_timeout_secs: 120 + pool_idle_per_host: 15 +"#; + + let cfg: HecLogsSinkConfig = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(cfg.default_token.inner(), "test_token"); + assert!(cfg.connection.is_some()); + + let conn = cfg.connection.unwrap(); + assert_eq!(conn.idle_timeout_secs, Some(120)); + assert_eq!(conn.pool_idle_per_host, Some(15)); + } } diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 794fff1a4b684..f4c08508c0619 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -237,6 +237,7 @@ async fn splunk_passthrough_token() { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, + connection: None, }; let cx = SinkContext::default(); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index aa8ba4bb3446e..357c179d4b28d 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -633,6 +633,24 @@ impl RequestConfig { } } +#[configurable_component] +#[configurable(title = "Configuration for connection behavior in the HTTP client.")] +#[configurable( + description = "Configuration for connection behavior in the HTTP client." +)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct ConnectionConfig { + /// Maximum idle time for a connection before it’s closed (seconds) + pub idle_timeout_secs: Option, + + /// Maximum number of idle connections to keep per host + pub pool_idle_per_host: Option, +} + +impl ConnectionConfig { + pub const DEFAULT: Self = Self { idle_timeout_secs: None, pool_idle_per_host: None }; +} + #[derive(Debug, Snafu)] pub enum HeaderValidationError { #[snafu(display("{}: {}", source, name))] diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 0be523bf8fe19..b0733b9b0f879 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1357,6 +1357,7 @@ mod tests { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: Default::default(), + connection: None, } .build(SinkContext::default()) .await From 27636390ab9fa8957e6f40c5280d28f58be06f71 Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Fri, 10 Oct 2025 19:17:02 +0300 Subject: [PATCH 2/4] fix clippy warn and formatting --- src/http.rs | 2 +- src/sinks/http/config.rs | 2 +- src/sinks/splunk_hec/common/util.rs | 8 ++++++-- src/sinks/splunk_hec/logs/config.rs | 8 ++++---- src/sinks/util/http.rs | 9 +++++---- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/http.rs b/src/http.rs index de6a722ea3f0a..550abc4a0e334 100644 --- a/src/http.rs +++ b/src/http.rs @@ -35,12 +35,12 @@ use vector_lib::sensitive_string::SensitiveString; #[cfg(feature = "aws-core")] use crate::aws::AwsAuthentication; +use crate::sinks::util::http::ConnectionConfig; use crate::{ config::ProxyConfig, internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent}, tls::{tls_connector_builder, MaybeTlsSettings, TlsError}, }; -use crate::sinks::util::http::ConnectionConfig; pub mod status { pub const FORBIDDEN: u16 = 403; diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 366c0caf4ce3f..8063ed247ec27 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -21,6 +21,7 @@ use super::{ }; #[cfg(feature = "aws-core")] use crate::aws::AwsAuthentication; +use crate::sinks::util::http::ConnectionConfig; #[cfg(feature = "aws-core")] use crate::sinks::util::http::SigV4Config; use crate::{ @@ -34,7 +35,6 @@ use crate::{ }, }, }; -use crate::sinks::util::http::ConnectionConfig; const CONTENT_TYPE_TEXT: &str = "text/plain"; const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson"; diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 5a38d91ae6052..ac8a7b98971c6 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -13,6 +13,7 @@ use super::{ service::{HttpRequestBuilder, MetadataFields}, EndpointTarget, }; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::HttpClient, internal_events::TemplateRenderingError, @@ -24,7 +25,6 @@ use crate::{ template::Template, tls::{TlsConfig, TlsSettings}, }; -use crate::sinks::util::http::ConnectionConfig; #[derive(Clone, Copy, Debug, Default)] pub struct SplunkHecDefaultBatchSettings; @@ -59,7 +59,11 @@ pub fn create_client_with_connection_config( let tls_settings = TlsSettings::from_options(tls)?; if let Some(conn) = connection { - Ok(HttpClient::new_with_connection_config(tls_settings, proxy_config, Some(conn))?) + Ok(HttpClient::new_with_connection_config( + tls_settings, + proxy_config, + Some(conn), + )?) } else { Ok(HttpClient::new(tls_settings, proxy_config)?) } diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 3f539ceadea8c..61c33b43d7c68 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -6,6 +6,9 @@ use vector_lib::{ sensitive_string::SensitiveString, }; +use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; +use crate::sinks::splunk_hec::common::create_client_with_connection_config; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::HttpClient, sinks::{ @@ -19,9 +22,6 @@ use crate::{ util::http::HttpRetryLogic, }, }; -use crate::sinks::splunk_hec::common::create_client_with_connection_config; -use crate::sinks::util::http::ConnectionConfig; -use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; /// Configuration for the `splunk_hec_logs` sink. #[configurable_component(sink( @@ -205,7 +205,7 @@ impl SinkConfig for HecLogsSinkConfig { //let client = create_client(self.tls.as_ref(), cx.proxy())?; let client = if let Some(conn) = &self.connection { - create_client_with_connection_config(self.tls.as_ref(), cx.proxy(), Some(conn.clone()))? + create_client_with_connection_config(self.tls.as_ref(), cx.proxy(), Some(*conn))? } else { create_client(self.tls.as_ref(), cx.proxy())? }; diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 357c179d4b28d..248d952bd061b 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -635,9 +635,7 @@ impl RequestConfig { #[configurable_component] #[configurable(title = "Configuration for connection behavior in the HTTP client.")] -#[configurable( - description = "Configuration for connection behavior in the HTTP client." -)] +#[configurable(description = "Configuration for connection behavior in the HTTP client.")] #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct ConnectionConfig { /// Maximum idle time for a connection before it’s closed (seconds) @@ -648,7 +646,10 @@ pub struct ConnectionConfig { } impl ConnectionConfig { - pub const DEFAULT: Self = Self { idle_timeout_secs: None, pool_idle_per_host: None }; + pub const DEFAULT: Self = Self { + idle_timeout_secs: None, + pool_idle_per_host: None, + }; } #[derive(Debug, Snafu)] From d5327cea59af2574d7383819033fe77f2091bdab Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Mon, 13 Oct 2025 16:33:36 +0300 Subject: [PATCH 3/4] add connection idle timeout config for Elastcisearch sink --- src/sinks/elasticsearch/common.rs | 5 ++++- src/sinks/elasticsearch/config.rs | 14 +++++++++++++- src/sinks/http/config.rs | 2 +- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 1060ee5c7a22a..52a5130354ce8 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -23,6 +23,7 @@ use crate::{ tls::TlsSettings, transforms::metric_to_log::MetricToLog, }; +use crate::sinks::util::http::ConnectionConfig; #[derive(Debug, Clone)] pub struct ElasticsearchCommon { @@ -190,6 +191,7 @@ impl ElasticsearchCommon { &request, &tls_settings, proxy_config, + config.connection ) .await { @@ -341,6 +343,7 @@ async fn get_version( request: &RequestConfig, tls_settings: &TlsSettings, proxy_config: &ProxyConfig, + connection_config: Option, ) -> crate::Result { #[derive(Deserialize)] struct Version { @@ -351,7 +354,7 @@ async fn get_version( version: Option, } - let client = HttpClient::new(tls_settings.clone(), proxy_config)?; + let client = HttpClient::new_with_connection_config(tls_settings.clone(), proxy_config, connection_config)?; let response = get( base_url, auth, diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index ba639c6ec794b..0752ac81d87da 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -35,6 +35,7 @@ use vector_lib::lookup::event_path; use vector_lib::lookup::lookup_v2::ConfigValuePath; use vector_lib::schema::Requirement; use vrl::value::Kind; +use crate::sinks::util::http::ConnectionConfig; /// The field name for the timestamp required by data stream mode pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp"; @@ -218,6 +219,16 @@ pub struct ElasticsearchConfig { )] #[configurable(derived)] pub acknowledgements: AcknowledgementsConfig, + + + /// Connection-level settings for the underlying HTTP client. + /// + /// This allows configuring parameters like connection idle timeout and + /// maximum idle connections per host. Useful when running behind load + /// balancers with strict idle policies. + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } fn default_doc_type() -> String { @@ -255,6 +266,7 @@ impl Default for ElasticsearchConfig { data_stream: None, metrics: None, acknowledgements: Default::default(), + connection: None, } } } @@ -541,7 +553,7 @@ impl SinkConfig for ElasticsearchConfig { let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?; let common = commons[0].clone(); - let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?; + let client = HttpClient::new_with_connection_config(common.tls_settings.clone(), cx.proxy(), self.connection)?; let request_limits = self.request.tower.into_settings(); diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 8063ed247ec27..5d20fea6e88c1 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -168,7 +168,7 @@ impl From for Method { impl HttpSinkConfig { fn build_http_client(&self, cx: &SinkContext) -> crate::Result { let tls = TlsSettings::from_options(self.tls.as_ref())?; - Ok(HttpClient::new(tls, cx.proxy())?) + Ok(HttpClient::new_with_connection_config(tls, cx.proxy(), self.connection.clone())?) } pub(super) fn build_encoder(&self) -> crate::Result> { From f6841bf3b5e23748fd1b57baad187fad0c0d453b Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Wed, 22 Oct 2025 12:52:55 +0300 Subject: [PATCH 4/4] fix clippy warn and formatting --- src/sinks/elasticsearch/common.rs | 10 +++++++--- src/sinks/elasticsearch/config.rs | 9 ++++++--- src/sinks/http/config.rs | 6 +++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 52a5130354ce8..cb6d498666ed8 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -10,6 +10,7 @@ use super::{ request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType, }; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::{HttpClient, MaybeAuth, QueryParameterValue, QueryParameters}, sinks::{ @@ -23,7 +24,6 @@ use crate::{ tls::TlsSettings, transforms::metric_to_log::MetricToLog, }; -use crate::sinks::util::http::ConnectionConfig; #[derive(Debug, Clone)] pub struct ElasticsearchCommon { @@ -191,7 +191,7 @@ impl ElasticsearchCommon { &request, &tls_settings, proxy_config, - config.connection + config.connection, ) .await { @@ -354,7 +354,11 @@ async fn get_version( version: Option, } - let client = HttpClient::new_with_connection_config(tls_settings.clone(), proxy_config, connection_config)?; + let client = HttpClient::new_with_connection_config( + tls_settings.clone(), + proxy_config, + connection_config, + )?; let response = get( base_url, auth, diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 0752ac81d87da..7fe503969b0a4 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -6,6 +6,7 @@ use std::{ use futures::{FutureExt, TryFutureExt}; use vector_lib::configurable::configurable_component; +use crate::sinks::util::http::ConnectionConfig; use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, @@ -35,7 +36,6 @@ use vector_lib::lookup::event_path; use vector_lib::lookup::lookup_v2::ConfigValuePath; use vector_lib::schema::Requirement; use vrl::value::Kind; -use crate::sinks::util::http::ConnectionConfig; /// The field name for the timestamp required by data stream mode pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp"; @@ -220,7 +220,6 @@ pub struct ElasticsearchConfig { #[configurable(derived)] pub acknowledgements: AcknowledgementsConfig, - /// Connection-level settings for the underlying HTTP client. /// /// This allows configuring parameters like connection idle timeout and @@ -553,7 +552,11 @@ impl SinkConfig for ElasticsearchConfig { let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?; let common = commons[0].clone(); - let client = HttpClient::new_with_connection_config(common.tls_settings.clone(), cx.proxy(), self.connection)?; + let client = HttpClient::new_with_connection_config( + common.tls_settings.clone(), + cx.proxy(), + self.connection, + )?; let request_limits = self.request.tower.into_settings(); diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 5d20fea6e88c1..560fc6ce419fb 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -168,7 +168,11 @@ impl From for Method { impl HttpSinkConfig { fn build_http_client(&self, cx: &SinkContext) -> crate::Result { let tls = TlsSettings::from_options(self.tls.as_ref())?; - Ok(HttpClient::new_with_connection_config(tls, cx.proxy(), self.connection.clone())?) + Ok(HttpClient::new_with_connection_config( + tls, + cx.proxy(), + self.connection, + )?) } pub(super) fn build_encoder(&self) -> crate::Result> {