From 64b3f8c6daa50daeadd4a226fb433968cdfc84a5 Mon Sep 17 00:00:00 2001 From: Simon Dugas Date: Fri, 19 Jun 2026 13:21:25 -0400 Subject: [PATCH] feat(nats source): expose NATS message headers via headers_key Add an optional `headers_key` configuration field to the `nats` source. When set, NATS message headers are inserted into each event under the configured key as an object mapping each header name to an array of its string values (NATS headers can be multi-valued). The field is unset by default, so no headers are exposed unless explicitly configured, preserving backwards compatibility. Closes #24937 Co-Authored-By: Claude Opus 4.8 --- .../24937_nats_source_headers.feature.md | 3 + src/sources/nats/config.rs | 101 +++++++++++++++++- src/sources/nats/integration_tests.rs | 92 ++++++++++++++++ src/sources/nats/source.rs | 82 ++++++++++++++ .../components/sources/generated/nats.cue | 14 +++ 5 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 changelog.d/24937_nats_source_headers.feature.md diff --git a/changelog.d/24937_nats_source_headers.feature.md b/changelog.d/24937_nats_source_headers.feature.md new file mode 100644 index 0000000000000..c77e112a02954 --- /dev/null +++ b/changelog.d/24937_nats_source_headers.feature.md @@ -0,0 +1,3 @@ +The `nats` source can now expose NATS message headers on each event. Set the new `headers_key` option to the field where headers should be written (for example `headers_key: headers`). Each header name maps to an array of its string values, since NATS headers can be multi-valued. By default `headers_key` is unset and no headers are exposed, preserving backwards compatibility. + +authors: Simon Dugas diff --git a/src/sources/nats/config.rs b/src/sources/nats/config.rs index b4fce37c38a7e..84cdb4542f138 100644 --- a/src/sources/nats/config.rs +++ b/src/sources/nats/config.rs @@ -9,7 +9,7 @@ use vector_lib::{ configurable::configurable_component, lookup::{lookup_v2::OptionalValuePath, owned_value_path}, }; -use vrl::value::Kind; +use vrl::value::{Kind, kind::Collection}; use crate::{ codecs::DecodingConfig, @@ -156,6 +156,15 @@ pub struct NatsSourceConfig { #[serde(default = "default_subject_key_field")] pub subject_key_field: OptionalValuePath, + /// Enables exposing NATS message headers on each event under the named key. + /// + /// By default this is unset and no headers are exposed, preserving backwards + /// compatibility. When set, the message headers are inserted as an object + /// mapping each header name to an array of its string values. + #[serde(default)] + #[configurable(metadata(docs::examples = "headers"))] + pub headers_key: OptionalValuePath, + /// The buffer capacity of the underlying NATS subscriber. /// /// This value determines how many messages the NATS subscriber buffers @@ -258,7 +267,7 @@ impl SourceConfig for NatsSourceConfig { .clone() .path .map(LegacyKey::InsertIfEmpty); - let schema_definition = self + let mut schema_definition = self .decoding .schema_definition(log_namespace) .with_standard_vector_source_metadata() @@ -270,6 +279,19 @@ impl SourceConfig for NatsSourceConfig { None, ); + if let Some(headers_key) = self.headers_key.path.clone() { + schema_definition = schema_definition.with_source_metadata( + NatsSourceConfig::NAME, + Some(LegacyKey::Overwrite(headers_key)), + &owned_value_path!("headers"), + Kind::object( + Collection::empty() + .with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))), + ), + None, + ); + } + vec![SourceOutput::new_maybe_logs( self.decoding.output_type(), schema_definition, @@ -373,6 +395,81 @@ mod tests { assert_eq!(definitions, Some(expected_definition)); } + #[test] + fn output_schema_definition_vector_namespace_with_headers() { + let config = NatsSourceConfig { + log_namespace: Some(true), + subject_key_field: default_subject_key_field(), + headers_key: OptionalValuePath::new("headers"), + ..Default::default() + }; + + let definitions = config + .outputs(LogNamespace::Vector) + .remove(0) + .schema_definition(true); + + let expected_definition = + Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]) + .with_meaning(OwnedTargetPath::event_root(), "message") + .with_metadata_field( + &owned_value_path!("vector", "source_type"), + Kind::bytes(), + None, + ) + .with_metadata_field( + &owned_value_path!("vector", "ingest_timestamp"), + Kind::timestamp(), + None, + ) + .with_metadata_field(&owned_value_path!("nats", "subject"), Kind::bytes(), None) + .with_metadata_field( + &owned_value_path!("nats", "headers"), + Kind::object(Collection::empty().with_unknown(Kind::array( + Collection::empty().with_unknown(Kind::bytes()), + ))), + None, + ); + + assert_eq!(definitions, Some(expected_definition)); + } + + #[test] + fn output_schema_definition_legacy_namespace_with_headers() { + let config = NatsSourceConfig { + subject_key_field: default_subject_key_field(), + headers_key: OptionalValuePath::new("headers"), + ..Default::default() + }; + let definitions = config + .outputs(LogNamespace::Legacy) + .remove(0) + .schema_definition(true); + + let expected_definition = Definition::new_with_default_metadata( + Kind::object(Collection::empty()), + [LogNamespace::Legacy], + ) + .with_event_field( + &owned_value_path!("message"), + Kind::bytes(), + Some("message"), + ) + .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None) + .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None) + .with_event_field(&owned_value_path!("subject"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("headers"), + Kind::object( + Collection::empty() + .with_unknown(Kind::array(Collection::empty().with_unknown(Kind::bytes()))), + ), + None, + ); + + assert_eq!(definitions, Some(expected_definition)); + } + #[test] fn output_schema_definition_legacy_namespace() { let config = NatsSourceConfig { diff --git a/src/sources/nats/integration_tests.rs b/src/sources/nats/integration_tests.rs index a3caa7e766c07..7e0fd834c887c 100644 --- a/src/sources/nats/integration_tests.rs +++ b/src/sources/nats/integration_tests.rs @@ -131,6 +131,98 @@ async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> { Ok(()) } +/// Publishes a message with NATS headers and returns the resulting event's log. +async fn publish_with_headers_and_collect( + conf: NatsSourceConfig, + headers: async_nats::HeaderMap, +) -> Result { + let subject = conf.subject.clone(); + let (nc, sub) = create_subscription(&conf).await?; + let nc_pub = nc.clone(); + let msg = "my message with headers"; + + let events = assert_source_compliance(&SOURCE_TAGS, async move { + let (tx, rx) = SourceSender::new_test(); + let decoder = DecodingConfig::new( + conf.framing.clone(), + conf.decoding.clone(), + LogNamespace::Legacy, + ) + .build() + .unwrap(); + tokio::spawn(run_nats_core( + conf.clone(), + nc, + sub, + decoder, + LogNamespace::Legacy, + ShutdownSignal::noop(), + tx, + )); + nc_pub + .publish_with_headers(subject, headers, Bytes::from_static(msg.as_bytes())) + .await + .unwrap(); + + collect_n(rx, 1).await + }) + .await; + + Ok(events.into_iter().next().unwrap().into_log()) +} + +#[tokio::test] +async fn nats_headers_enabled() { + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let mut conf = generate_source_config(&url, &subject); + conf.headers_key = vector_lib::lookup::lookup_v2::OptionalValuePath::new("headers"); + + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + headers.insert("X-My-Custom-Timestamp", "1771517040123456000"); + + let log = publish_with_headers_and_collect(conf, headers) + .await + .expect("publish_with_headers_and_collect failed"); + + let mut expected = vrl::value::ObjectMap::new(); + expected.insert( + "X-My-Custom-Header".into(), + vec![vector_lib::event::Value::from("A")].into(), + ); + expected.insert( + "X-My-Custom-Timestamp".into(), + vec![vector_lib::event::Value::from("1771517040123456000")].into(), + ); + + assert_eq!(log["headers"], vector_lib::event::Value::Object(expected)); +} + +#[tokio::test] +async fn nats_headers_disabled_by_default() { + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + // No headers_key set: headers must not be exposed. + let conf = generate_source_config(&url, &subject); + + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + + let log = publish_with_headers_and_collect(conf, headers) + .await + .expect("publish_with_headers_and_collect failed"); + + assert!( + log.get("headers").is_none(), + "headers should not be present when headers_key is unset, got: {log:?}" + ); +} + #[tokio::test] async fn nats_no_auth() { let subject = format!("test-{}", random_string(10)); diff --git a/src/sources/nats/source.rs b/src/sources/nats/source.rs index 19352e7beec21..0ee2de3161a47 100644 --- a/src/sources/nats/source.rs +++ b/src/sources/nats/source.rs @@ -1,4 +1,5 @@ use async_nats::jetstream::consumer::pull::Stream as PullConsumerStream; +use bytes::Bytes; use chrono::Utc; use futures::StreamExt; use snafu::ResultExt; @@ -12,6 +13,7 @@ use vector_lib::{ }, lookup::owned_value_path, }; +use vrl::value::{ObjectMap, Value}; use crate::{ SourceSender, @@ -22,6 +24,22 @@ use crate::{ sources::nats::config::{BuildError, NatsSourceConfig, SubscribeSnafu}, }; +/// Converts a NATS [`HeaderMap`] into a [`Value`] suitable for insertion into an event. +/// +/// Each header name maps to an array of its string values, since NATS headers +/// can be multi-valued. +fn headers_to_value(headers: &async_nats::HeaderMap) -> Value { + let mut map = ObjectMap::new(); + for (name, values) in headers.iter() { + let values = values + .iter() + .map(|value| Value::from(Bytes::copy_from_slice(value.as_str().as_bytes()))) + .collect::>(); + map.insert(name.to_string().into(), Value::Array(values)); + } + Value::Object(map) +} + /// The outcome of processing a single NATS message. pub enum ProcessingStatus { /// The message payload was fully decoded and sent downstream. @@ -76,6 +94,18 @@ pub async fn process_message( &owned_value_path!("subject"), msg.subject.as_str(), ); + + if let Some(headers_key) = config.headers_key.path.as_ref() + && let Some(headers) = msg.headers.as_ref() + { + log_namespace.insert_source_metadata( + NatsSourceConfig::NAME, + log, + Some(LegacyKey::Overwrite(headers_key)), + &owned_value_path!("headers"), + headers_to_value(headers), + ); + } } event }); @@ -220,3 +250,55 @@ pub async fn create_subscription( Ok((nc, subscription)) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn headers_to_value_maps_names_to_arrays_of_values() { + let mut headers = async_nats::HeaderMap::new(); + headers.insert("X-My-Custom-Header", "A"); + headers.insert("X-My-Custom-Timestamp", "1771517040123456000"); + + let value = headers_to_value(&headers); + + let mut expected = ObjectMap::new(); + expected.insert( + "X-My-Custom-Header".into(), + Value::Array(vec![Value::from("A")]), + ); + expected.insert( + "X-My-Custom-Timestamp".into(), + Value::Array(vec![Value::from("1771517040123456000")]), + ); + + assert_eq!(value, Value::Object(expected)); + } + + #[test] + fn headers_to_value_collects_multiple_values_per_name() { + let mut headers = async_nats::HeaderMap::new(); + headers.append("X-Multi", "one"); + headers.append("X-Multi", "two"); + + let value = headers_to_value(&headers); + + let object = value.as_object().expect("headers should be an object"); + let values = object + .get("X-Multi") + .and_then(|v| v.as_array()) + .expect("X-Multi should be an array"); + + assert_eq!(values.len(), 2); + assert!(values.contains(&Value::from("one"))); + assert!(values.contains(&Value::from("two"))); + } + + #[test] + fn headers_to_value_empty_map_is_empty_object() { + let headers = async_nats::HeaderMap::new(); + let value = headers_to_value(&headers); + assert_eq!(value, Value::Object(ObjectMap::new())); + } +} diff --git a/website/cue/reference/components/sources/generated/nats.cue b/website/cue/reference/components/sources/generated/nats.cue index 16a024742ac3a..85361ba19c2e1 100644 --- a/website/cue/reference/components/sources/generated/nats.cue +++ b/website/cue/reference/components/sources/generated/nats.cue @@ -598,6 +598,20 @@ generated: components: sources: nats: configuration: { } } } + headers_key: { + description: """ + Enables exposing NATS message headers on each event under the named key. + + By default this is unset and no headers are exposed, preserving backwards + compatibility. When set, the message headers are inserted as an object + mapping each header name to an array of its string values. + """ + required: false + type: string: { + default: "" + examples: ["headers"] + } + } jetstream: { description: "Configuration for NATS JetStream." required: false